Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * receivelog.c - receive WAL files using the streaming
4 : * replication protocol.
5 : *
6 : * Author: Magnus Hagander <magnus@hagander.net>
7 : *
8 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
9 : *
10 : * IDENTIFICATION
11 : * src/bin/pg_basebackup/receivelog.c
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres_fe.h"
16 :
17 : #include <sys/select.h>
18 : #include <sys/stat.h>
19 : #include <unistd.h>
20 :
21 : #include "access/xlog_internal.h"
22 : #include "common/logging.h"
23 : #include "libpq-fe.h"
24 : #include "receivelog.h"
25 : #include "streamutil.h"
26 :
27 : /* currently open WAL file */
28 : static Walfile *walfile = NULL;
29 : static bool reportFlushPosition = false;
30 : static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
31 :
32 : static bool still_sending = true; /* feedback still needs to be sent? */
33 :
34 : static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
35 : XLogRecPtr *stoppos);
36 : static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
37 : static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
38 : char **buffer);
39 : static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
40 : int len, XLogRecPtr blockpos, TimestampTz *last_status);
41 : static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
42 : XLogRecPtr *blockpos);
43 : static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
44 : XLogRecPtr blockpos, XLogRecPtr *stoppos);
45 : static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
46 : static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
47 : TimestampTz last_status);
48 :
49 : static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
50 : uint32 *timeline);
51 :
52 : static bool
53 38 : mark_file_as_archived(StreamCtl *stream, const char *fname)
54 : {
55 : Walfile *f;
56 : static char tmppath[MAXPGPATH];
57 :
58 38 : snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
59 : fname);
60 :
61 38 : f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
62 : NULL, 0);
63 38 : if (f == NULL)
64 : {
65 0 : pg_log_error("could not create archive status file \"%s\": %s",
66 : tmppath, GetLastWalMethodError(stream->walmethod));
67 0 : return false;
68 : }
69 :
70 38 : if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
71 : {
72 0 : pg_log_error("could not close archive status file \"%s\": %s",
73 : tmppath, GetLastWalMethodError(stream->walmethod));
74 0 : return false;
75 : }
76 :
77 38 : return true;
78 : }
79 :
80 : /*
81 : * Open a new WAL file in the specified directory.
82 : *
83 : * Returns true if OK; on failure, returns false after printing an error msg.
84 : * On success, 'walfile' is set to the opened WAL file.
85 : *
86 : * The file will be padded to 16Mb with zeroes.
87 : */
88 : static bool
89 282 : open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
90 : {
91 : Walfile *f;
92 : char *fn;
93 : ssize_t size;
94 : XLogSegNo segno;
95 : char walfile_name[MAXPGPATH];
96 :
97 282 : XLByteToSeg(startpoint, segno, WalSegSz);
98 282 : XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
99 :
100 : /* Note that this considers the compression used if necessary */
101 282 : fn = stream->walmethod->ops->get_file_name(stream->walmethod,
102 : walfile_name,
103 282 : stream->partial_suffix);
104 :
105 : /*
106 : * When streaming to files, if an existing file exists we verify that it's
107 : * either empty (just created), or a complete WalSegSz segment (in which
108 : * case it has been created and padded). Anything else indicates a corrupt
109 : * file. Compressed files have no need for padding, so just ignore this
110 : * case.
111 : *
112 : * When streaming to tar, no file with this name will exist before, so we
113 : * never have to verify a size.
114 : */
115 550 : if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE &&
116 268 : stream->walmethod->ops->existsfile(stream->walmethod, fn))
117 : {
118 0 : size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
119 0 : if (size < 0)
120 : {
121 0 : pg_log_error("could not get size of write-ahead log file \"%s\": %s",
122 : fn, GetLastWalMethodError(stream->walmethod));
123 0 : pg_free(fn);
124 0 : return false;
125 : }
126 0 : if (size == WalSegSz)
127 : {
128 : /* Already padded file. Open it for use */
129 0 : f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
130 0 : if (f == NULL)
131 : {
132 0 : pg_log_error("could not open existing write-ahead log file \"%s\": %s",
133 : fn, GetLastWalMethodError(stream->walmethod));
134 0 : pg_free(fn);
135 0 : return false;
136 : }
137 :
138 : /* fsync file in case of a previous crash */
139 0 : if (stream->walmethod->ops->sync(f) != 0)
140 : {
141 0 : pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
142 : fn, GetLastWalMethodError(stream->walmethod));
143 0 : stream->walmethod->ops->close(f, CLOSE_UNLINK);
144 0 : exit(1);
145 : }
146 :
147 0 : walfile = f;
148 0 : pg_free(fn);
149 0 : return true;
150 : }
151 0 : if (size != 0)
152 : {
153 : /* if write didn't set errno, assume problem is no disk space */
154 0 : if (errno == 0)
155 0 : errno = ENOSPC;
156 0 : pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
157 : "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
158 : size),
159 : fn, size, WalSegSz);
160 0 : pg_free(fn);
161 0 : return false;
162 : }
163 : /* File existed and was empty, so fall through and open */
164 : }
165 :
166 : /* No file existed, so create one */
167 :
168 282 : f = stream->walmethod->ops->open_for_write(stream->walmethod,
169 : walfile_name,
170 282 : stream->partial_suffix,
171 : WalSegSz);
172 282 : if (f == NULL)
173 : {
174 0 : pg_log_error("could not open write-ahead log file \"%s\": %s",
175 : fn, GetLastWalMethodError(stream->walmethod));
176 0 : pg_free(fn);
177 0 : return false;
178 : }
179 :
180 282 : pg_free(fn);
181 282 : walfile = f;
182 282 : return true;
183 : }
184 :
185 : /*
186 : * Close the current WAL file (if open), and rename it to the correct
187 : * filename if it's complete. On failure, prints an error message to stderr
188 : * and returns false, otherwise returns true.
189 : */
190 : static bool
191 318 : close_walfile(StreamCtl *stream, XLogRecPtr pos)
192 : {
193 : char *fn;
194 : pgoff_t currpos;
195 : int r;
196 : char walfile_name[MAXPGPATH];
197 :
198 318 : if (walfile == NULL)
199 36 : return true;
200 :
201 282 : strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
202 282 : currpos = walfile->currpos;
203 :
204 : /* Note that this considers the compression used if necessary */
205 282 : fn = stream->walmethod->ops->get_file_name(stream->walmethod,
206 : walfile_name,
207 282 : stream->partial_suffix);
208 :
209 282 : if (stream->partial_suffix)
210 : {
211 24 : if (currpos == WalSegSz)
212 12 : r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
213 : else
214 : {
215 12 : pg_log_info("not renaming \"%s\", segment is not complete", fn);
216 12 : r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
217 : }
218 : }
219 : else
220 258 : r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
221 :
222 282 : walfile = NULL;
223 :
224 282 : if (r != 0)
225 : {
226 0 : pg_log_error("could not close file \"%s\": %s",
227 : fn, GetLastWalMethodError(stream->walmethod));
228 :
229 0 : pg_free(fn);
230 0 : return false;
231 : }
232 :
233 282 : pg_free(fn);
234 :
235 : /*
236 : * Mark file as archived if requested by the caller - pg_basebackup needs
237 : * to do so as files can otherwise get archived again after promotion of a
238 : * new node. This is in line with walreceiver.c always doing a
239 : * XLogArchiveForceDone() after a complete segment.
240 : */
241 282 : if (currpos == WalSegSz && stream->mark_done)
242 : {
243 : /* writes error message if failed */
244 34 : if (!mark_file_as_archived(stream, walfile_name))
245 0 : return false;
246 : }
247 :
248 282 : lastFlushPosition = pos;
249 282 : return true;
250 : }
251 :
252 :
253 : /*
254 : * Check if a timeline history file exists.
255 : */
256 : static bool
257 276 : existsTimeLineHistoryFile(StreamCtl *stream)
258 : {
259 : char histfname[MAXFNAMELEN];
260 :
261 : /*
262 : * Timeline 1 never has a history file. We treat that as if it existed,
263 : * since we never need to stream it.
264 : */
265 276 : if (stream->timeline == 1)
266 270 : return true;
267 :
268 6 : TLHistoryFileName(histfname, stream->timeline);
269 :
270 6 : return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
271 : }
272 :
273 : static bool
274 6 : writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
275 : {
276 6 : int size = strlen(content);
277 : char histfname[MAXFNAMELEN];
278 : Walfile *f;
279 :
280 : /*
281 : * Check that the server's idea of how timeline history files should be
282 : * named matches ours.
283 : */
284 6 : TLHistoryFileName(histfname, stream->timeline);
285 6 : if (strcmp(histfname, filename) != 0)
286 : {
287 0 : pg_log_error("server reported unexpected history file name for timeline %u: %s",
288 : stream->timeline, filename);
289 0 : return false;
290 : }
291 :
292 6 : f = stream->walmethod->ops->open_for_write(stream->walmethod,
293 : histfname, ".tmp", 0);
294 6 : if (f == NULL)
295 : {
296 0 : pg_log_error("could not create timeline history file \"%s\": %s",
297 : histfname, GetLastWalMethodError(stream->walmethod));
298 0 : return false;
299 : }
300 :
301 6 : if ((int) stream->walmethod->ops->write(f, content, size) != size)
302 : {
303 0 : pg_log_error("could not write timeline history file \"%s\": %s",
304 : histfname, GetLastWalMethodError(stream->walmethod));
305 :
306 : /*
307 : * If we fail to make the file, delete it to release disk space
308 : */
309 0 : stream->walmethod->ops->close(f, CLOSE_UNLINK);
310 :
311 0 : return false;
312 : }
313 :
314 6 : if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
315 : {
316 0 : pg_log_error("could not close file \"%s\": %s",
317 : histfname, GetLastWalMethodError(stream->walmethod));
318 0 : return false;
319 : }
320 :
321 : /* Maintain archive_status, check close_walfile() for details. */
322 6 : if (stream->mark_done)
323 : {
324 : /* writes error message if failed */
325 4 : if (!mark_file_as_archived(stream, histfname))
326 0 : return false;
327 : }
328 :
329 6 : return true;
330 : }
331 :
332 : /*
333 : * Send a Standby Status Update message to server.
334 : */
335 : static bool
336 272 : sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
337 : {
338 : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
339 272 : int len = 0;
340 :
341 272 : replybuf[len] = 'r';
342 272 : len += 1;
343 272 : fe_sendint64(blockpos, &replybuf[len]); /* write */
344 272 : len += 8;
345 272 : if (reportFlushPosition)
346 264 : fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
347 : else
348 8 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
349 272 : len += 8;
350 272 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
351 272 : len += 8;
352 272 : fe_sendint64(now, &replybuf[len]); /* sendTime */
353 272 : len += 8;
354 272 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
355 272 : len += 1;
356 :
357 272 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
358 : {
359 0 : pg_log_error("could not send feedback packet: %s",
360 : PQerrorMessage(conn));
361 0 : return false;
362 : }
363 :
364 272 : return true;
365 : }
366 :
367 : /*
368 : * Check that the server version we're connected to is supported by
369 : * ReceiveXlogStream().
370 : *
371 : * If it's not, an error message is printed to stderr, and false is returned.
372 : */
373 : bool
374 590 : CheckServerVersionForStreaming(PGconn *conn)
375 : {
376 : int minServerMajor,
377 : maxServerMajor;
378 : int serverMajor;
379 :
380 : /*
381 : * The message format used in streaming replication changed in 9.3, so we
382 : * cannot stream from older servers. And we don't support servers newer
383 : * than the client; it might work, but we don't know, so err on the safe
384 : * side.
385 : */
386 590 : minServerMajor = 903;
387 590 : maxServerMajor = PG_VERSION_NUM / 100;
388 590 : serverMajor = PQserverVersion(conn) / 100;
389 590 : if (serverMajor < minServerMajor)
390 : {
391 0 : const char *serverver = PQparameterStatus(conn, "server_version");
392 :
393 0 : pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
394 : serverver ? serverver : "'unknown'",
395 : "9.3");
396 0 : return false;
397 : }
398 590 : else if (serverMajor > maxServerMajor)
399 : {
400 0 : const char *serverver = PQparameterStatus(conn, "server_version");
401 :
402 0 : pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
403 : serverver ? serverver : "'unknown'",
404 : PG_VERSION);
405 0 : return false;
406 : }
407 590 : return true;
408 : }
409 :
410 : /*
411 : * Receive a log stream starting at the specified position.
412 : *
413 : * Individual parameters are passed through the StreamCtl structure.
414 : *
415 : * If sysidentifier is specified, validate that both the system
416 : * identifier and the timeline matches the specified ones
417 : * (by sending an extra IDENTIFY_SYSTEM command)
418 : *
419 : * All received segments will be written to the directory
420 : * specified by basedir. This will also fetch any missing timeline history
421 : * files.
422 : *
423 : * The stream_stop callback will be called every time data
424 : * is received, and whenever a segment is completed. If it returns
425 : * true, the streaming will stop and the function
426 : * return. As long as it returns false, streaming will continue
427 : * indefinitely.
428 : *
429 : * If stream_stop() checks for external input, stop_socket should be set to
430 : * the FD it checks. This will allow such input to be detected promptly
431 : * rather than after standby_message_timeout (which might be indefinite).
432 : * Note that signals will interrupt waits for input as well, but that is
433 : * race-y since a signal received while busy won't interrupt the wait.
434 : *
435 : * standby_message_timeout controls how often we send a message
436 : * back to the primary letting it know our progress, in milliseconds.
437 : * Zero means no messages are sent.
438 : * This message will only contain the write location, and never
439 : * flush or replay.
440 : *
441 : * If 'partial_suffix' is not NULL, files are initially created with the
442 : * given suffix, and the suffix is removed once the file is finished. That
443 : * allows you to tell the difference between partial and completed files,
444 : * so that you can continue later where you left.
445 : *
446 : * If 'synchronous' is true, the received WAL is flushed as soon as written,
447 : * otherwise only when the WAL file is closed.
448 : *
449 : * Note: The WAL location *must* be at a log segment start!
450 : */
451 : bool
452 274 : ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
453 : {
454 : char query[128];
455 : char slotcmd[128];
456 : PGresult *res;
457 : XLogRecPtr stoppos;
458 :
459 : /*
460 : * The caller should've checked the server version already, but doesn't do
461 : * any harm to check it here too.
462 : */
463 274 : if (!CheckServerVersionForStreaming(conn))
464 0 : return false;
465 :
466 : /*
467 : * Decide whether we want to report the flush position. If we report the
468 : * flush position, the primary will know what WAL we'll possibly
469 : * re-request, and it can then remove older WAL safely. We must always do
470 : * that when we are using slots.
471 : *
472 : * Reporting the flush position makes one eligible as a synchronous
473 : * replica. People shouldn't include generic names in
474 : * synchronous_standby_names, but we've protected them against it so far,
475 : * so let's continue to do so unless specifically requested.
476 : */
477 274 : if (stream->replication_slot != NULL)
478 : {
479 264 : reportFlushPosition = true;
480 264 : sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
481 : }
482 : else
483 : {
484 10 : if (stream->synchronous)
485 2 : reportFlushPosition = true;
486 : else
487 8 : reportFlushPosition = false;
488 10 : slotcmd[0] = 0;
489 : }
490 :
491 274 : if (stream->sysidentifier != NULL)
492 : {
493 274 : char *sysidentifier = NULL;
494 : TimeLineID servertli;
495 :
496 : /*
497 : * Get the server system identifier and timeline, and validate them.
498 : */
499 274 : if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
500 : {
501 0 : pg_free(sysidentifier);
502 0 : return false;
503 : }
504 :
505 274 : if (strcmp(stream->sysidentifier, sysidentifier) != 0)
506 : {
507 0 : pg_log_error("system identifier does not match between base backup and streaming connection");
508 0 : pg_free(sysidentifier);
509 0 : return false;
510 : }
511 274 : pg_free(sysidentifier);
512 :
513 274 : if (stream->timeline > servertli)
514 : {
515 0 : pg_log_error("starting timeline %u is not present in the server",
516 : stream->timeline);
517 0 : return false;
518 : }
519 : }
520 :
521 : /*
522 : * initialize flush position to starting point, it's the caller's
523 : * responsibility that that's sane.
524 : */
525 274 : lastFlushPosition = stream->startpos;
526 :
527 : while (1)
528 : {
529 : /*
530 : * Fetch the timeline history file for this timeline, if we don't have
531 : * it already. When streaming log to tar, this will always return
532 : * false, as we are never streaming into an existing file and
533 : * therefore there can be no pre-existing timeline history file.
534 : */
535 276 : if (!existsTimeLineHistoryFile(stream))
536 : {
537 6 : snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
538 6 : res = PQexec(conn, query);
539 6 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
540 : {
541 : /* FIXME: we might send it ok, but get an error */
542 0 : pg_log_error("could not send replication command \"%s\": %s",
543 : "TIMELINE_HISTORY", PQresultErrorMessage(res));
544 0 : PQclear(res);
545 0 : return false;
546 : }
547 :
548 : /*
549 : * The response to TIMELINE_HISTORY is a single row result set
550 : * with two fields: filename and content
551 : */
552 6 : if (PQnfields(res) != 2 || PQntuples(res) != 1)
553 : {
554 0 : pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
555 : PQntuples(res), PQnfields(res), 1, 2);
556 : }
557 :
558 : /* Write the history file to disk */
559 6 : writeTimeLineHistoryFile(stream,
560 : PQgetvalue(res, 0, 0),
561 : PQgetvalue(res, 0, 1));
562 :
563 6 : PQclear(res);
564 : }
565 :
566 : /*
567 : * Before we start streaming from the requested location, check if the
568 : * callback tells us to stop here.
569 : */
570 276 : if (stream->stream_stop(stream->startpos, stream->timeline, false))
571 0 : return true;
572 :
573 : /* Initiate the replication stream at specified location */
574 276 : snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
575 : slotcmd,
576 276 : LSN_FORMAT_ARGS(stream->startpos),
577 : stream->timeline);
578 276 : res = PQexec(conn, query);
579 276 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
580 : {
581 4 : pg_log_error("could not send replication command \"%s\": %s",
582 : "START_REPLICATION", PQresultErrorMessage(res));
583 4 : PQclear(res);
584 4 : return false;
585 : }
586 272 : PQclear(res);
587 :
588 : /* Stream the WAL */
589 272 : res = HandleCopyStream(conn, stream, &stoppos);
590 272 : if (res == NULL)
591 0 : goto error;
592 :
593 : /*
594 : * Streaming finished.
595 : *
596 : * There are two possible reasons for that: a controlled shutdown, or
597 : * we reached the end of the current timeline. In case of
598 : * end-of-timeline, the server sends a result set after Copy has
599 : * finished, containing information about the next timeline. Read
600 : * that, and restart streaming from the next timeline. In case of
601 : * controlled shutdown, stop here.
602 : */
603 272 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
604 : {
605 : /*
606 : * End-of-timeline. Read the next timeline's ID and starting
607 : * position. Usually, the starting position will match the end of
608 : * the previous timeline, but there are corner cases like if the
609 : * server had sent us half of a WAL record, when it was promoted.
610 : * The new timeline will begin at the end of the last complete
611 : * record in that case, overlapping the partial WAL record on the
612 : * old timeline.
613 : */
614 : uint32 newtimeline;
615 : bool parsed;
616 :
617 2 : parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
618 2 : PQclear(res);
619 2 : if (!parsed)
620 0 : goto error;
621 :
622 : /* Sanity check the values the server gave us */
623 2 : if (newtimeline <= stream->timeline)
624 : {
625 0 : pg_log_error("server reported unexpected next timeline %u, following timeline %u",
626 : newtimeline, stream->timeline);
627 0 : goto error;
628 : }
629 2 : if (stream->startpos > stoppos)
630 : {
631 0 : pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
632 : stream->timeline, LSN_FORMAT_ARGS(stoppos),
633 : newtimeline, LSN_FORMAT_ARGS(stream->startpos));
634 0 : goto error;
635 : }
636 :
637 : /* Read the final result, which should be CommandComplete. */
638 2 : res = PQgetResult(conn);
639 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
640 : {
641 0 : pg_log_error("unexpected termination of replication stream: %s",
642 : PQresultErrorMessage(res));
643 0 : PQclear(res);
644 0 : goto error;
645 : }
646 2 : PQclear(res);
647 :
648 : /*
649 : * Loop back to start streaming from the new timeline. Always
650 : * start streaming at the beginning of a segment.
651 : */
652 2 : stream->timeline = newtimeline;
653 2 : stream->startpos = stream->startpos -
654 2 : XLogSegmentOffset(stream->startpos, WalSegSz);
655 2 : continue;
656 : }
657 270 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
658 : {
659 268 : PQclear(res);
660 :
661 : /*
662 : * End of replication (ie. controlled shut down of the server).
663 : *
664 : * Check if the callback thinks it's OK to stop here. If not,
665 : * complain.
666 : */
667 268 : if (stream->stream_stop(stoppos, stream->timeline, false))
668 268 : return true;
669 : else
670 : {
671 0 : pg_log_error("replication stream was terminated before stop point");
672 0 : goto error;
673 : }
674 : }
675 : else
676 : {
677 : /* Server returned an error. */
678 2 : pg_log_error("unexpected termination of replication stream: %s",
679 : PQresultErrorMessage(res));
680 2 : PQclear(res);
681 2 : goto error;
682 : }
683 : }
684 :
685 2 : error:
686 2 : if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
687 0 : pg_log_error("could not close file \"%s\": %s",
688 : walfile->pathname, GetLastWalMethodError(stream->walmethod));
689 2 : walfile = NULL;
690 2 : return false;
691 : }
692 :
693 : /*
694 : * Helper function to parse the result set returned by server after streaming
695 : * has finished. On failure, prints an error to stderr and returns false.
696 : */
697 : static bool
698 2 : ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
699 : {
700 : uint32 startpos_xlogid,
701 : startpos_xrecoff;
702 :
703 : /*----------
704 : * The result set consists of one row and two columns, e.g:
705 : *
706 : * next_tli | next_tli_startpos
707 : * ----------+-------------------
708 : * 4 | 0/9949AE0
709 : *
710 : * next_tli is the timeline ID of the next timeline after the one that
711 : * just finished streaming. next_tli_startpos is the WAL location where
712 : * the server switched to it.
713 : *----------
714 : */
715 2 : if (PQnfields(res) < 2 || PQntuples(res) != 1)
716 : {
717 0 : pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
718 : PQntuples(res), PQnfields(res), 1, 2);
719 0 : return false;
720 : }
721 :
722 2 : *timeline = atoi(PQgetvalue(res, 0, 0));
723 2 : if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
724 : &startpos_xrecoff) != 2)
725 : {
726 0 : pg_log_error("could not parse next timeline's starting point \"%s\"",
727 : PQgetvalue(res, 0, 1));
728 0 : return false;
729 : }
730 2 : *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
731 :
732 2 : return true;
733 : }
734 :
735 : /*
736 : * The main loop of ReceiveXlogStream. Handles the COPY stream after
737 : * initiating streaming with the START_REPLICATION command.
738 : *
739 : * If the COPY ends (not necessarily successfully) due a message from the
740 : * server, returns a PGresult and sets *stoppos to the last byte written.
741 : * On any other sort of error, returns NULL.
742 : */
743 : static PGresult *
744 272 : HandleCopyStream(PGconn *conn, StreamCtl *stream,
745 : XLogRecPtr *stoppos)
746 : {
747 272 : char *copybuf = NULL;
748 272 : TimestampTz last_status = -1;
749 272 : XLogRecPtr blockpos = stream->startpos;
750 :
751 272 : still_sending = true;
752 :
753 : while (1)
754 3706 : {
755 : int r;
756 : TimestampTz now;
757 : long sleeptime;
758 :
759 : /*
760 : * Check if we should continue streaming, or abort at this point.
761 : */
762 3978 : if (!CheckCopyStreamStop(conn, stream, blockpos))
763 0 : goto error;
764 :
765 3978 : now = feGetCurrentTimestamp();
766 :
767 : /*
768 : * If synchronous option is true, issue sync command as soon as there
769 : * are WAL data which has not been flushed yet.
770 : */
771 3978 : if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
772 : {
773 0 : if (stream->walmethod->ops->sync(walfile) != 0)
774 0 : pg_fatal("could not fsync file \"%s\": %s",
775 : walfile->pathname, GetLastWalMethodError(stream->walmethod));
776 0 : lastFlushPosition = blockpos;
777 :
778 : /*
779 : * Send feedback so that the server sees the latest WAL locations
780 : * immediately.
781 : */
782 0 : if (!sendFeedback(conn, blockpos, now, false))
783 0 : goto error;
784 0 : last_status = now;
785 : }
786 :
787 : /*
788 : * Potentially send a status message to the primary
789 : */
790 7768 : if (still_sending && stream->standby_message_timeout > 0 &&
791 3790 : feTimestampDifferenceExceeds(last_status, now,
792 : stream->standby_message_timeout))
793 : {
794 : /* Time to send feedback! */
795 272 : if (!sendFeedback(conn, blockpos, now, false))
796 0 : goto error;
797 272 : last_status = now;
798 : }
799 :
800 : /*
801 : * Calculate how long send/receive loops should sleep
802 : */
803 3978 : sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
804 : last_status);
805 :
806 3978 : r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf);
807 12832 : while (r != 0)
808 : {
809 9126 : if (r == -1)
810 0 : goto error;
811 9126 : if (r == -2)
812 : {
813 272 : PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
814 :
815 272 : if (res == NULL)
816 0 : goto error;
817 : else
818 272 : return res;
819 : }
820 :
821 : /* Check the message type. */
822 8854 : if (copybuf[0] == 'k')
823 : {
824 0 : if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
825 : &last_status))
826 0 : goto error;
827 : }
828 8854 : else if (copybuf[0] == 'w')
829 : {
830 8854 : if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
831 0 : goto error;
832 :
833 : /*
834 : * Check if we should continue streaming, or abort at this
835 : * point.
836 : */
837 8854 : if (!CheckCopyStreamStop(conn, stream, blockpos))
838 0 : goto error;
839 : }
840 : else
841 : {
842 0 : pg_log_error("unrecognized streaming header: \"%c\"",
843 : copybuf[0]);
844 0 : goto error;
845 : }
846 :
847 : /*
848 : * Process the received data, and any subsequent data we can read
849 : * without blocking.
850 : */
851 8854 : r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf);
852 : }
853 : }
854 :
855 0 : error:
856 0 : PQfreemem(copybuf);
857 0 : return NULL;
858 : }
859 :
860 : /*
861 : * Wait until we can read a CopyData message,
862 : * or timeout, or occurrence of a signal or input on the stop_socket.
863 : * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
864 : *
865 : * Returns 1 if data has become available for reading, 0 if timed out
866 : * or interrupted by signal or stop_socket input, and -1 on an error.
867 : */
868 : static int
869 12460 : CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
870 : {
871 : int ret;
872 : fd_set input_mask;
873 : int connsocket;
874 : int maxfd;
875 : struct timeval timeout;
876 : struct timeval *timeoutptr;
877 :
878 12460 : connsocket = PQsocket(conn);
879 12460 : if (connsocket < 0)
880 : {
881 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
882 0 : return -1;
883 : }
884 :
885 12460 : FD_ZERO(&input_mask);
886 12460 : FD_SET(connsocket, &input_mask);
887 12460 : maxfd = connsocket;
888 12460 : if (stop_socket != PGINVALID_SOCKET)
889 : {
890 12164 : FD_SET(stop_socket, &input_mask);
891 12164 : maxfd = Max(maxfd, stop_socket);
892 : }
893 :
894 12460 : if (timeout_ms < 0)
895 188 : timeoutptr = NULL;
896 : else
897 : {
898 12272 : timeout.tv_sec = timeout_ms / 1000L;
899 12272 : timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
900 12272 : timeoutptr = &timeout;
901 : }
902 :
903 12460 : ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
904 :
905 12460 : if (ret < 0)
906 : {
907 0 : if (errno == EINTR)
908 0 : return 0; /* Got a signal, so not an error */
909 0 : pg_log_error("%s() failed: %m", "select");
910 0 : return -1;
911 : }
912 12460 : if (ret > 0 && FD_ISSET(connsocket, &input_mask))
913 9282 : return 1; /* Got input on connection socket */
914 :
915 3178 : return 0; /* Got timeout or input on stop_socket */
916 : }
917 :
918 : /*
919 : * Receive CopyData message available from XLOG stream, blocking for
920 : * maximum of 'timeout' ms.
921 : *
922 : * If data was received, returns the length of the data. *buffer is set to
923 : * point to a buffer holding the received message. The buffer is only valid
924 : * until the next CopyStreamReceive call.
925 : *
926 : * Returns 0 if no data was available within timeout, or if wait was
927 : * interrupted by signal or stop_socket input.
928 : * -1 on error. -2 if the server ended the COPY.
929 : */
930 : static int
931 12832 : CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
932 : char **buffer)
933 : {
934 12832 : char *copybuf = NULL;
935 : int rawlen;
936 :
937 12832 : PQfreemem(*buffer);
938 12832 : *buffer = NULL;
939 :
940 : /* Try to receive a CopyData message */
941 12832 : rawlen = PQgetCopyData(conn, ©buf, 1);
942 12832 : if (rawlen == 0)
943 : {
944 : int ret;
945 :
946 : /*
947 : * No data available. Wait for some to appear, but not longer than
948 : * the specified timeout, so that we can ping the server. Also stop
949 : * waiting if input appears on stop_socket.
950 : */
951 12460 : ret = CopyStreamPoll(conn, timeout, stop_socket);
952 12460 : if (ret <= 0)
953 3178 : return ret;
954 :
955 : /* Now there is actually data on the socket */
956 9282 : if (PQconsumeInput(conn) == 0)
957 : {
958 0 : pg_log_error("could not receive data from WAL stream: %s",
959 : PQerrorMessage(conn));
960 0 : return -1;
961 : }
962 :
963 : /* Now that we've consumed some input, try again */
964 9282 : rawlen = PQgetCopyData(conn, ©buf, 1);
965 9282 : if (rawlen == 0)
966 528 : return 0;
967 : }
968 9126 : if (rawlen == -1) /* end-of-streaming or error */
969 272 : return -2;
970 8854 : if (rawlen == -2)
971 : {
972 0 : pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
973 0 : return -1;
974 : }
975 :
976 : /* Return received messages to caller */
977 8854 : *buffer = copybuf;
978 8854 : return rawlen;
979 : }
980 :
981 : /*
982 : * Process the keepalive message.
983 : */
984 : static bool
985 0 : ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
986 : XLogRecPtr blockpos, TimestampTz *last_status)
987 : {
988 : int pos;
989 : bool replyRequested;
990 : TimestampTz now;
991 :
992 : /*
993 : * Parse the keepalive message, enclosed in the CopyData message. We just
994 : * check if the server requested a reply, and ignore the rest.
995 : */
996 0 : pos = 1; /* skip msgtype 'k' */
997 0 : pos += 8; /* skip walEnd */
998 0 : pos += 8; /* skip sendTime */
999 :
1000 0 : if (len < pos + 1)
1001 : {
1002 0 : pg_log_error("streaming header too small: %d", len);
1003 0 : return false;
1004 : }
1005 0 : replyRequested = copybuf[pos];
1006 :
1007 : /* If the server requested an immediate reply, send one. */
1008 0 : if (replyRequested && still_sending)
1009 : {
1010 0 : if (reportFlushPosition && lastFlushPosition < blockpos &&
1011 0 : walfile != NULL)
1012 : {
1013 : /*
1014 : * If a valid flush location needs to be reported, flush the
1015 : * current WAL file so that the latest flush location is sent back
1016 : * to the server. This is necessary to see whether the last WAL
1017 : * data has been successfully replicated or not, at the normal
1018 : * shutdown of the server.
1019 : */
1020 0 : if (stream->walmethod->ops->sync(walfile) != 0)
1021 0 : pg_fatal("could not fsync file \"%s\": %s",
1022 : walfile->pathname, GetLastWalMethodError(stream->walmethod));
1023 0 : lastFlushPosition = blockpos;
1024 : }
1025 :
1026 0 : now = feGetCurrentTimestamp();
1027 0 : if (!sendFeedback(conn, blockpos, now, false))
1028 0 : return false;
1029 0 : *last_status = now;
1030 : }
1031 :
1032 0 : return true;
1033 : }
1034 :
1035 : /*
1036 : * Process XLogData message.
1037 : */
1038 : static bool
1039 8854 : ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1040 : XLogRecPtr *blockpos)
1041 : {
1042 : int xlogoff;
1043 : int bytes_left;
1044 : int bytes_written;
1045 : int hdr_len;
1046 :
1047 : /*
1048 : * Once we've decided we don't want to receive any more, just ignore any
1049 : * subsequent XLogData messages.
1050 : */
1051 8854 : if (!(still_sending))
1052 382 : return true;
1053 :
1054 : /*
1055 : * Read the header of the XLogData message, enclosed in the CopyData
1056 : * message. We only need the WAL location field (dataStart), the rest of
1057 : * the header is ignored.
1058 : */
1059 8472 : hdr_len = 1; /* msgtype 'w' */
1060 8472 : hdr_len += 8; /* dataStart */
1061 8472 : hdr_len += 8; /* walEnd */
1062 8472 : hdr_len += 8; /* sendTime */
1063 8472 : if (len < hdr_len)
1064 : {
1065 0 : pg_log_error("streaming header too small: %d", len);
1066 0 : return false;
1067 : }
1068 8472 : *blockpos = fe_recvint64(©buf[1]);
1069 :
1070 : /* Extract WAL location for this block */
1071 8472 : xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1072 :
1073 : /*
1074 : * Verify that the initial location in the stream matches where we think
1075 : * we are.
1076 : */
1077 8472 : if (walfile == NULL)
1078 : {
1079 : /* No file open yet */
1080 282 : if (xlogoff != 0)
1081 : {
1082 0 : pg_log_error("received write-ahead log record for offset %u with no file open",
1083 : xlogoff);
1084 0 : return false;
1085 : }
1086 : }
1087 : else
1088 : {
1089 : /* More data in existing segment */
1090 8190 : if (walfile->currpos != xlogoff)
1091 : {
1092 0 : pg_log_error("got WAL data offset %08x, expected %08x",
1093 : xlogoff, (int) walfile->currpos);
1094 0 : return false;
1095 : }
1096 : }
1097 :
1098 8472 : bytes_left = len - hdr_len;
1099 8472 : bytes_written = 0;
1100 :
1101 16944 : while (bytes_left)
1102 : {
1103 : int bytes_to_write;
1104 :
1105 : /*
1106 : * If crossing a WAL boundary, only write up until we reach wal
1107 : * segment size.
1108 : */
1109 8472 : if (xlogoff + bytes_left > WalSegSz)
1110 0 : bytes_to_write = WalSegSz - xlogoff;
1111 : else
1112 8472 : bytes_to_write = bytes_left;
1113 :
1114 8472 : if (walfile == NULL)
1115 : {
1116 282 : if (!open_walfile(stream, *blockpos))
1117 : {
1118 : /* Error logged by open_walfile */
1119 0 : return false;
1120 : }
1121 : }
1122 :
1123 16944 : if (stream->walmethod->ops->write(walfile,
1124 8472 : copybuf + hdr_len + bytes_written,
1125 8472 : bytes_to_write) != bytes_to_write)
1126 : {
1127 0 : pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
1128 : bytes_to_write, walfile->pathname,
1129 : GetLastWalMethodError(stream->walmethod));
1130 0 : return false;
1131 : }
1132 :
1133 : /* Write was successful, advance our position */
1134 8472 : bytes_written += bytes_to_write;
1135 8472 : bytes_left -= bytes_to_write;
1136 8472 : *blockpos += bytes_to_write;
1137 8472 : xlogoff += bytes_to_write;
1138 :
1139 : /* Did we reach the end of a WAL segment? */
1140 8472 : if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1141 : {
1142 46 : if (!close_walfile(stream, *blockpos))
1143 : /* Error message written in close_walfile() */
1144 0 : return false;
1145 :
1146 46 : xlogoff = 0;
1147 :
1148 46 : if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1149 : {
1150 0 : if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1151 : {
1152 0 : pg_log_error("could not send copy-end packet: %s",
1153 : PQerrorMessage(conn));
1154 0 : return false;
1155 : }
1156 0 : still_sending = false;
1157 0 : return true; /* ignore the rest of this XLogData packet */
1158 : }
1159 : }
1160 : }
1161 : /* No more data left to write, receive next copy packet */
1162 :
1163 8472 : return true;
1164 : }
1165 :
1166 : /*
1167 : * Handle end of the copy stream.
1168 : */
1169 : static PGresult *
1170 272 : HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
1171 : XLogRecPtr blockpos, XLogRecPtr *stoppos)
1172 : {
1173 272 : PGresult *res = PQgetResult(conn);
1174 :
1175 : /*
1176 : * The server closed its end of the copy stream. If we haven't closed
1177 : * ours already, we need to do so now, unless the server threw an error,
1178 : * in which case we don't.
1179 : */
1180 272 : if (still_sending)
1181 : {
1182 4 : if (!close_walfile(stream, blockpos))
1183 : {
1184 : /* Error message written in close_walfile() */
1185 0 : PQclear(res);
1186 0 : return NULL;
1187 : }
1188 4 : if (PQresultStatus(res) == PGRES_COPY_IN)
1189 : {
1190 2 : if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1191 : {
1192 0 : pg_log_error("could not send copy-end packet: %s",
1193 : PQerrorMessage(conn));
1194 0 : PQclear(res);
1195 0 : return NULL;
1196 : }
1197 2 : res = PQgetResult(conn);
1198 : }
1199 4 : still_sending = false;
1200 : }
1201 272 : PQfreemem(copybuf);
1202 272 : *stoppos = blockpos;
1203 272 : return res;
1204 : }
1205 :
1206 : /*
1207 : * Check if we should continue streaming, or abort at this point.
1208 : */
1209 : static bool
1210 12832 : CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
1211 : {
1212 12832 : if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1213 : {
1214 268 : if (!close_walfile(stream, blockpos))
1215 : {
1216 : /* Potential error message is written by close_walfile */
1217 0 : return false;
1218 : }
1219 268 : if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1220 : {
1221 0 : pg_log_error("could not send copy-end packet: %s",
1222 : PQerrorMessage(conn));
1223 0 : return false;
1224 : }
1225 268 : still_sending = false;
1226 : }
1227 :
1228 12832 : return true;
1229 : }
1230 :
1231 : /*
1232 : * Calculate how long send/receive loops should sleep
1233 : */
1234 : static long
1235 3978 : CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
1236 : TimestampTz last_status)
1237 : {
1238 3978 : TimestampTz status_targettime = 0;
1239 : long sleeptime;
1240 :
1241 3978 : if (standby_message_timeout && still_sending)
1242 3790 : status_targettime = last_status +
1243 3790 : (standby_message_timeout - 1) * ((int64) 1000);
1244 :
1245 3978 : if (status_targettime > 0)
1246 : {
1247 : long secs;
1248 : int usecs;
1249 :
1250 3790 : feTimestampDifference(now,
1251 : status_targettime,
1252 : &secs,
1253 : &usecs);
1254 : /* Always sleep at least 1 sec */
1255 3790 : if (secs <= 0)
1256 : {
1257 0 : secs = 1;
1258 0 : usecs = 0;
1259 : }
1260 :
1261 3790 : sleeptime = secs * 1000 + usecs / 1000;
1262 : }
1263 : else
1264 188 : sleeptime = -1;
1265 :
1266 3978 : return sleeptime;
1267 : }
|