Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * receivelog.c - receive WAL files using the streaming
4 : * replication protocol.
5 : *
6 : * Author: Magnus Hagander <magnus@hagander.net>
7 : *
8 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
9 : *
10 : * IDENTIFICATION
11 : * src/bin/pg_basebackup/receivelog.c
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres_fe.h"
16 :
17 : #include <sys/select.h>
18 : #include <sys/stat.h>
19 : #include <unistd.h>
20 :
21 : #include "access/xlog_internal.h"
22 : #include "common/logging.h"
23 : #include "libpq-fe.h"
24 : #include "libpq/protocol.h"
25 : #include "receivelog.h"
26 : #include "streamutil.h"
27 :
28 : /* currently open WAL file */
29 : static Walfile *walfile = NULL;
30 : static bool reportFlushPosition = false;
31 : static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
32 :
33 : static bool still_sending = true; /* feedback still needs to be sent? */
34 :
35 : static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
36 : XLogRecPtr *stoppos);
37 : static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
38 : static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
39 : char **buffer);
40 : static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
41 : int len, XLogRecPtr blockpos, TimestampTz *last_status);
42 : static bool ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
43 : XLogRecPtr *blockpos);
44 : static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
45 : XLogRecPtr blockpos, XLogRecPtr *stoppos);
46 : static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
47 : static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
48 : TimestampTz last_status);
49 :
50 : static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
51 : uint32 *timeline);
52 :
53 : static bool
54 9 : mark_file_as_archived(StreamCtl *stream, const char *fname)
55 : {
56 : Walfile *f;
57 : static char tmppath[MAXPGPATH];
58 :
59 9 : snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
60 : fname);
61 :
62 9 : f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
63 : NULL, 0);
64 9 : if (f == NULL)
65 : {
66 0 : pg_log_error("could not create archive status file \"%s\": %s",
67 : tmppath, GetLastWalMethodError(stream->walmethod));
68 0 : return false;
69 : }
70 :
71 9 : if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
72 : {
73 0 : pg_log_error("could not close archive status file \"%s\": %s",
74 : tmppath, GetLastWalMethodError(stream->walmethod));
75 0 : return false;
76 : }
77 :
78 9 : return true;
79 : }
80 :
81 : /*
82 : * Open a new WAL file in the specified directory.
83 : *
84 : * Returns true if OK; on failure, returns false after printing an error msg.
85 : * On success, 'walfile' is set to the opened WAL file.
86 : *
87 : * The file will be padded to 16Mb with zeroes.
88 : */
89 : static bool
90 162 : open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
91 : {
92 : Walfile *f;
93 : char *fn;
94 : ssize_t size;
95 : XLogSegNo segno;
96 : char walfile_name[MAXPGPATH];
97 :
98 162 : XLByteToSeg(startpoint, segno, WalSegSz);
99 162 : XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
100 :
101 : /* Note that this considers the compression used if necessary */
102 162 : fn = stream->walmethod->ops->get_file_name(stream->walmethod,
103 : walfile_name,
104 162 : stream->partial_suffix);
105 :
106 : /*
107 : * When streaming to files, if an existing file exists we verify that it's
108 : * either empty (just created), or a complete WalSegSz segment (in which
109 : * case it has been created and padded). Anything else indicates a corrupt
110 : * file. Compressed files have no need for padding, so just ignore this
111 : * case.
112 : *
113 : * When streaming to tar, no file with this name will exist before, so we
114 : * never have to verify a size.
115 : */
116 317 : if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE &&
117 155 : stream->walmethod->ops->existsfile(stream->walmethod, fn))
118 : {
119 0 : size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
120 0 : if (size < 0)
121 : {
122 0 : pg_log_error("could not get size of write-ahead log file \"%s\": %s",
123 : fn, GetLastWalMethodError(stream->walmethod));
124 0 : pg_free(fn);
125 0 : return false;
126 : }
127 0 : if (size == WalSegSz)
128 : {
129 : /* Already padded file. Open it for use */
130 0 : f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
131 0 : if (f == NULL)
132 : {
133 0 : pg_log_error("could not open existing write-ahead log file \"%s\": %s",
134 : fn, GetLastWalMethodError(stream->walmethod));
135 0 : pg_free(fn);
136 0 : return false;
137 : }
138 :
139 : /* fsync file in case of a previous crash */
140 0 : if (stream->walmethod->ops->sync(f) != 0)
141 : {
142 0 : pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
143 : fn, GetLastWalMethodError(stream->walmethod));
144 0 : stream->walmethod->ops->close(f, CLOSE_UNLINK);
145 0 : exit(1);
146 : }
147 :
148 0 : walfile = f;
149 0 : pg_free(fn);
150 0 : return true;
151 : }
152 0 : if (size != 0)
153 : {
154 : /* if write didn't set errno, assume problem is no disk space */
155 0 : if (errno == 0)
156 0 : errno = ENOSPC;
157 0 : pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
158 : "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
159 : size),
160 : fn, size, WalSegSz);
161 0 : pg_free(fn);
162 0 : return false;
163 : }
164 : /* File existed and was empty, so fall through and open */
165 : }
166 :
167 : /* No file existed, so create one */
168 :
169 162 : f = stream->walmethod->ops->open_for_write(stream->walmethod,
170 : walfile_name,
171 162 : stream->partial_suffix,
172 : WalSegSz);
173 162 : if (f == NULL)
174 : {
175 0 : pg_log_error("could not open write-ahead log file \"%s\": %s",
176 : fn, GetLastWalMethodError(stream->walmethod));
177 0 : pg_free(fn);
178 0 : return false;
179 : }
180 :
181 162 : pg_free(fn);
182 162 : walfile = f;
183 162 : return true;
184 : }
185 :
186 : /*
187 : * Close the current WAL file (if open), and rename it to the correct
188 : * filename if it's complete. On failure, prints an error message to stderr
189 : * and returns false, otherwise returns true.
190 : */
191 : static bool
192 167 : close_walfile(StreamCtl *stream, XLogRecPtr pos)
193 : {
194 : char *fn;
195 : pgoff_t currpos;
196 : int r;
197 : char walfile_name[MAXPGPATH];
198 :
199 167 : if (walfile == NULL)
200 5 : return true;
201 :
202 162 : strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
203 162 : currpos = walfile->currpos;
204 :
205 : /* Note that this considers the compression used if necessary */
206 162 : fn = stream->walmethod->ops->get_file_name(stream->walmethod,
207 : walfile_name,
208 162 : stream->partial_suffix);
209 :
210 162 : if (stream->partial_suffix)
211 : {
212 12 : if (currpos == WalSegSz)
213 6 : r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
214 : else
215 : {
216 6 : pg_log_info("not renaming \"%s\", segment is not complete", fn);
217 6 : r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
218 : }
219 : }
220 : else
221 150 : r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
222 :
223 162 : walfile = NULL;
224 :
225 162 : if (r != 0)
226 : {
227 0 : pg_log_error("could not close file \"%s\": %s",
228 : fn, GetLastWalMethodError(stream->walmethod));
229 :
230 0 : pg_free(fn);
231 0 : return false;
232 : }
233 :
234 162 : pg_free(fn);
235 :
236 : /*
237 : * Mark file as archived if requested by the caller - pg_basebackup needs
238 : * to do so as files can otherwise get archived again after promotion of a
239 : * new node. This is in line with walreceiver.c always doing a
240 : * XLogArchiveForceDone() after a complete segment.
241 : */
242 162 : if (currpos == WalSegSz && stream->mark_done)
243 : {
244 : /* writes error message if failed */
245 5 : if (!mark_file_as_archived(stream, walfile_name))
246 0 : return false;
247 : }
248 :
249 162 : lastFlushPosition = pos;
250 162 : return true;
251 : }
252 :
253 :
254 : /*
255 : * Check if a timeline history file exists.
256 : */
257 : static bool
258 157 : existsTimeLineHistoryFile(StreamCtl *stream)
259 : {
260 : char histfname[MAXFNAMELEN];
261 :
262 : /*
263 : * Timeline 1 never has a history file. We treat that as if it existed,
264 : * since we never need to stream it.
265 : */
266 157 : if (stream->timeline == 1)
267 152 : return true;
268 :
269 5 : TLHistoryFileName(histfname, stream->timeline);
270 :
271 5 : return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
272 : }
273 :
274 : static bool
275 5 : writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
276 : {
277 5 : int size = strlen(content);
278 : char histfname[MAXFNAMELEN];
279 : Walfile *f;
280 :
281 : /*
282 : * Check that the server's idea of how timeline history files should be
283 : * named matches ours.
284 : */
285 5 : TLHistoryFileName(histfname, stream->timeline);
286 5 : if (strcmp(histfname, filename) != 0)
287 : {
288 0 : pg_log_error("server reported unexpected history file name for timeline %u: %s",
289 : stream->timeline, filename);
290 0 : return false;
291 : }
292 :
293 5 : f = stream->walmethod->ops->open_for_write(stream->walmethod,
294 : histfname, ".tmp", 0);
295 5 : if (f == NULL)
296 : {
297 0 : pg_log_error("could not create timeline history file \"%s\": %s",
298 : histfname, GetLastWalMethodError(stream->walmethod));
299 0 : return false;
300 : }
301 :
302 5 : if ((int) stream->walmethod->ops->write(f, content, size) != size)
303 : {
304 0 : pg_log_error("could not write timeline history file \"%s\": %s",
305 : histfname, GetLastWalMethodError(stream->walmethod));
306 :
307 : /*
308 : * If we fail to make the file, delete it to release disk space
309 : */
310 0 : stream->walmethod->ops->close(f, CLOSE_UNLINK);
311 :
312 0 : return false;
313 : }
314 :
315 5 : if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
316 : {
317 0 : pg_log_error("could not close file \"%s\": %s",
318 : histfname, GetLastWalMethodError(stream->walmethod));
319 0 : return false;
320 : }
321 :
322 : /* Maintain archive_status, check close_walfile() for details. */
323 5 : if (stream->mark_done)
324 : {
325 : /* writes error message if failed */
326 4 : if (!mark_file_as_archived(stream, histfname))
327 0 : return false;
328 : }
329 :
330 5 : return true;
331 : }
332 :
333 : /*
334 : * Send a Standby Status Update message to server.
335 : */
336 : static bool
337 156 : sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
338 : {
339 : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
340 156 : int len = 0;
341 :
342 156 : replybuf[len] = PqReplMsg_StandbyStatusUpdate;
343 156 : len += 1;
344 156 : fe_sendint64(blockpos, &replybuf[len]); /* write */
345 156 : len += 8;
346 156 : if (reportFlushPosition)
347 152 : fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
348 : else
349 4 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
350 156 : len += 8;
351 156 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
352 156 : len += 8;
353 156 : fe_sendint64(now, &replybuf[len]); /* sendTime */
354 156 : len += 8;
355 156 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
356 156 : len += 1;
357 :
358 156 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
359 : {
360 0 : pg_log_error("could not send feedback packet: %s",
361 : PQerrorMessage(conn));
362 0 : return false;
363 : }
364 :
365 156 : return true;
366 : }
367 :
368 : /*
369 : * Check that the server version we're connected to is supported by
370 : * ReceiveXlogStream().
371 : *
372 : * If it's not, an error message is printed to stderr, and false is returned.
373 : */
374 : bool
375 333 : CheckServerVersionForStreaming(PGconn *conn)
376 : {
377 : int minServerMajor,
378 : maxServerMajor;
379 : int serverMajor;
380 :
381 : /*
382 : * The message format used in streaming replication changed in 9.3, so we
383 : * cannot stream from older servers. And we don't support servers newer
384 : * than the client; it might work, but we don't know, so err on the safe
385 : * side.
386 : */
387 333 : minServerMajor = 903;
388 333 : maxServerMajor = PG_VERSION_NUM / 100;
389 333 : serverMajor = PQserverVersion(conn) / 100;
390 333 : if (serverMajor < minServerMajor)
391 : {
392 0 : const char *serverver = PQparameterStatus(conn, "server_version");
393 :
394 0 : pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
395 : serverver ? serverver : "'unknown'",
396 : "9.3");
397 0 : return false;
398 : }
399 333 : else if (serverMajor > maxServerMajor)
400 : {
401 0 : const char *serverver = PQparameterStatus(conn, "server_version");
402 :
403 0 : pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
404 : serverver ? serverver : "'unknown'",
405 : PG_VERSION);
406 0 : return false;
407 : }
408 333 : return true;
409 : }
410 :
411 : /*
412 : * Receive a log stream starting at the specified position.
413 : *
414 : * Individual parameters are passed through the StreamCtl structure.
415 : *
416 : * If sysidentifier is specified, validate that both the system
417 : * identifier and the timeline matches the specified ones
418 : * (by sending an extra IDENTIFY_SYSTEM command)
419 : *
420 : * All received segments will be written to the directory
421 : * specified by basedir. This will also fetch any missing timeline history
422 : * files.
423 : *
424 : * The stream_stop callback will be called every time data
425 : * is received, and whenever a segment is completed. If it returns
426 : * true, the streaming will stop and the function
427 : * return. As long as it returns false, streaming will continue
428 : * indefinitely.
429 : *
430 : * If stream_stop() checks for external input, stop_socket should be set to
431 : * the FD it checks. This will allow such input to be detected promptly
432 : * rather than after standby_message_timeout (which might be indefinite).
433 : * Note that signals will interrupt waits for input as well, but that is
434 : * race-y since a signal received while busy won't interrupt the wait.
435 : *
436 : * standby_message_timeout controls how often we send a message
437 : * back to the primary letting it know our progress, in milliseconds.
438 : * Zero means no messages are sent.
439 : * This message will only contain the write location, and never
440 : * flush or replay.
441 : *
442 : * If 'partial_suffix' is not NULL, files are initially created with the
443 : * given suffix, and the suffix is removed once the file is finished. That
444 : * allows you to tell the difference between partial and completed files,
445 : * so that you can continue later where you left.
446 : *
447 : * If 'synchronous' is true, the received WAL is flushed as soon as written,
448 : * otherwise only when the WAL file is closed.
449 : *
450 : * Note: The WAL location *must* be at a log segment start!
451 : */
452 : bool
453 156 : ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
454 : {
455 : PQExpBuffer query;
456 : PGresult *res;
457 : XLogRecPtr stoppos;
458 :
459 : /*
460 : * The caller should've checked the server version already, but doesn't do
461 : * any harm to check it here too.
462 : */
463 156 : if (!CheckServerVersionForStreaming(conn))
464 0 : return false;
465 :
466 : /*
467 : * Decide whether we want to report the flush position. If we report the
468 : * flush position, the primary will know what WAL we'll possibly
469 : * re-request, and it can then remove older WAL safely. We must always do
470 : * that when we are using slots.
471 : *
472 : * Reporting the flush position makes one eligible as a synchronous
473 : * replica. People shouldn't include generic names in
474 : * synchronous_standby_names, but we've protected them against it so far,
475 : * so let's continue to do so unless specifically requested.
476 : */
477 156 : if (stream->replication_slot != NULL)
478 : {
479 151 : reportFlushPosition = true;
480 : }
481 : else
482 : {
483 5 : if (stream->synchronous)
484 1 : reportFlushPosition = true;
485 : else
486 4 : reportFlushPosition = false;
487 : }
488 :
489 156 : if (stream->sysidentifier != NULL)
490 : {
491 156 : char *sysidentifier = NULL;
492 : TimeLineID servertli;
493 :
494 : /*
495 : * Get the server system identifier and timeline, and validate them.
496 : */
497 156 : if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
498 : {
499 0 : pg_free(sysidentifier);
500 0 : return false;
501 : }
502 :
503 156 : if (strcmp(stream->sysidentifier, sysidentifier) != 0)
504 : {
505 0 : pg_log_error("system identifier does not match between base backup and streaming connection");
506 0 : pg_free(sysidentifier);
507 0 : return false;
508 : }
509 156 : pg_free(sysidentifier);
510 :
511 156 : if (stream->timeline > servertli)
512 : {
513 0 : pg_log_error("starting timeline %u is not present in the server",
514 : stream->timeline);
515 0 : return false;
516 : }
517 : }
518 :
519 : /*
520 : * initialize flush position to starting point, it's the caller's
521 : * responsibility that that's sane.
522 : */
523 156 : lastFlushPosition = stream->startpos;
524 :
525 : while (1)
526 1 : {
527 : /*
528 : * Fetch the timeline history file for this timeline, if we don't have
529 : * it already. When streaming log to tar, this will always return
530 : * false, as we are never streaming into an existing file and
531 : * therefore there can be no pre-existing timeline history file.
532 : */
533 157 : if (!existsTimeLineHistoryFile(stream))
534 : {
535 5 : query = createPQExpBuffer();
536 5 : appendPQExpBuffer(query, "TIMELINE_HISTORY %u", stream->timeline);
537 5 : res = PQexec(conn, query->data);
538 5 : destroyPQExpBuffer(query);
539 5 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
540 : {
541 : /* FIXME: we might send it ok, but get an error */
542 0 : pg_log_error("could not send replication command \"%s\": %s",
543 : "TIMELINE_HISTORY", PQresultErrorMessage(res));
544 0 : PQclear(res);
545 0 : return false;
546 : }
547 :
548 : /*
549 : * The response to TIMELINE_HISTORY is a single row result set
550 : * with two fields: filename and content
551 : */
552 5 : if (PQnfields(res) != 2 || PQntuples(res) != 1)
553 : {
554 0 : pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
555 : PQntuples(res), PQnfields(res), 1, 2);
556 : }
557 :
558 : /* Write the history file to disk */
559 5 : writeTimeLineHistoryFile(stream,
560 : PQgetvalue(res, 0, 0),
561 : PQgetvalue(res, 0, 1));
562 :
563 5 : PQclear(res);
564 : }
565 :
566 : /*
567 : * Before we start streaming from the requested location, check if the
568 : * callback tells us to stop here.
569 : */
570 157 : if (stream->stream_stop(stream->startpos, stream->timeline, false))
571 0 : return true;
572 :
573 : /* Initiate the replication stream at specified location */
574 157 : query = createPQExpBuffer();
575 157 : appendPQExpBufferStr(query, "START_REPLICATION");
576 157 : if (stream->replication_slot != NULL)
577 : {
578 152 : appendPQExpBufferStr(query, " SLOT ");
579 152 : AppendQuotedIdentifier(query, stream->replication_slot);
580 : }
581 157 : appendPQExpBuffer(query, " %X/%08X TIMELINE %u",
582 157 : LSN_FORMAT_ARGS(stream->startpos),
583 : stream->timeline);
584 157 : res = PQexec(conn, query->data);
585 157 : destroyPQExpBuffer(query);
586 157 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
587 : {
588 1 : pg_log_error("could not send replication command \"%s\": %s",
589 : "START_REPLICATION", PQresultErrorMessage(res));
590 1 : PQclear(res);
591 1 : return false;
592 : }
593 156 : PQclear(res);
594 :
595 : /* Stream the WAL */
596 156 : res = HandleCopyStream(conn, stream, &stoppos);
597 156 : if (res == NULL)
598 0 : goto error;
599 :
600 : /*
601 : * Streaming finished.
602 : *
603 : * There are two possible reasons for that: a controlled shutdown, or
604 : * we reached the end of the current timeline. In case of
605 : * end-of-timeline, the server sends a result set after Copy has
606 : * finished, containing information about the next timeline. Read
607 : * that, and restart streaming from the next timeline. In case of
608 : * controlled shutdown, stop here.
609 : */
610 156 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
611 1 : {
612 : /*
613 : * End-of-timeline. Read the next timeline's ID and starting
614 : * position. Usually, the starting position will match the end of
615 : * the previous timeline, but there are corner cases like if the
616 : * server had sent us half of a WAL record, when it was promoted.
617 : * The new timeline will begin at the end of the last complete
618 : * record in that case, overlapping the partial WAL record on the
619 : * old timeline.
620 : */
621 : uint32 newtimeline;
622 : bool parsed;
623 :
624 1 : parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
625 1 : PQclear(res);
626 1 : if (!parsed)
627 0 : goto error;
628 :
629 : /* Sanity check the values the server gave us */
630 1 : if (newtimeline <= stream->timeline)
631 : {
632 0 : pg_log_error("server reported unexpected next timeline %u, following timeline %u",
633 : newtimeline, stream->timeline);
634 0 : goto error;
635 : }
636 1 : if (stream->startpos > stoppos)
637 : {
638 0 : pg_log_error("server stopped streaming timeline %u at %X/%08X, but reported next timeline %u to begin at %X/%08X",
639 : stream->timeline, LSN_FORMAT_ARGS(stoppos),
640 : newtimeline, LSN_FORMAT_ARGS(stream->startpos));
641 0 : goto error;
642 : }
643 :
644 : /* Read the final result, which should be CommandComplete. */
645 1 : res = PQgetResult(conn);
646 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
647 : {
648 0 : pg_log_error("unexpected termination of replication stream: %s",
649 : PQresultErrorMessage(res));
650 0 : PQclear(res);
651 0 : goto error;
652 : }
653 1 : PQclear(res);
654 :
655 : /*
656 : * Loop back to start streaming from the new timeline. Always
657 : * start streaming at the beginning of a segment.
658 : */
659 1 : stream->timeline = newtimeline;
660 1 : stream->startpos = stream->startpos -
661 1 : XLogSegmentOffset(stream->startpos, WalSegSz);
662 1 : continue;
663 : }
664 155 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
665 : {
666 154 : PQclear(res);
667 :
668 : /*
669 : * End of replication (ie. controlled shut down of the server).
670 : *
671 : * Check if the callback thinks it's OK to stop here. If not,
672 : * complain.
673 : */
674 154 : if (stream->stream_stop(stoppos, stream->timeline, false))
675 154 : return true;
676 : else
677 : {
678 0 : pg_log_error("replication stream was terminated before stop point");
679 0 : goto error;
680 : }
681 : }
682 : else
683 : {
684 : /* Server returned an error. */
685 1 : pg_log_error("unexpected termination of replication stream: %s",
686 : PQresultErrorMessage(res));
687 1 : PQclear(res);
688 1 : goto error;
689 : }
690 : }
691 :
692 1 : error:
693 1 : if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
694 0 : pg_log_error("could not close file \"%s\": %s",
695 : walfile->pathname, GetLastWalMethodError(stream->walmethod));
696 1 : walfile = NULL;
697 1 : return false;
698 : }
699 :
700 : /*
701 : * Helper function to parse the result set returned by server after streaming
702 : * has finished. On failure, prints an error to stderr and returns false.
703 : */
704 : static bool
705 1 : ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
706 : {
707 : uint32 startpos_xlogid,
708 : startpos_xrecoff;
709 :
710 : /*----------
711 : * The result set consists of one row and two columns, e.g:
712 : *
713 : * next_tli | next_tli_startpos
714 : * ----------+-------------------
715 : * 4 | 0/9949AE0
716 : *
717 : * next_tli is the timeline ID of the next timeline after the one that
718 : * just finished streaming. next_tli_startpos is the WAL location where
719 : * the server switched to it.
720 : *----------
721 : */
722 1 : if (PQnfields(res) < 2 || PQntuples(res) != 1)
723 : {
724 0 : pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
725 : PQntuples(res), PQnfields(res), 1, 2);
726 0 : return false;
727 : }
728 :
729 1 : *timeline = atoi(PQgetvalue(res, 0, 0));
730 1 : if (sscanf(PQgetvalue(res, 0, 1), "%X/%08X", &startpos_xlogid,
731 : &startpos_xrecoff) != 2)
732 : {
733 0 : pg_log_error("could not parse next timeline's starting point \"%s\"",
734 : PQgetvalue(res, 0, 1));
735 0 : return false;
736 : }
737 1 : *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
738 :
739 1 : return true;
740 : }
741 :
742 : /*
743 : * The main loop of ReceiveXlogStream. Handles the COPY stream after
744 : * initiating streaming with the START_REPLICATION command.
745 : *
746 : * If the COPY ends (not necessarily successfully) due a message from the
747 : * server, returns a PGresult and sets *stoppos to the last byte written.
748 : * On any other sort of error, returns NULL.
749 : */
750 : static PGresult *
751 156 : HandleCopyStream(PGconn *conn, StreamCtl *stream,
752 : XLogRecPtr *stoppos)
753 : {
754 156 : char *copybuf = NULL;
755 156 : TimestampTz last_status = -1;
756 156 : XLogRecPtr blockpos = stream->startpos;
757 :
758 156 : still_sending = true;
759 :
760 : while (1)
761 478 : {
762 : int r;
763 : TimestampTz now;
764 : long sleeptime;
765 :
766 : /*
767 : * Check if we should continue streaming, or abort at this point.
768 : */
769 634 : if (!CheckCopyStreamStop(conn, stream, blockpos))
770 0 : goto error;
771 :
772 634 : now = feGetCurrentTimestamp();
773 :
774 : /*
775 : * If synchronous option is true, issue sync command as soon as there
776 : * are WAL data which has not been flushed yet.
777 : */
778 634 : if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
779 : {
780 0 : if (stream->walmethod->ops->sync(walfile) != 0)
781 0 : pg_fatal("could not fsync file \"%s\": %s",
782 : walfile->pathname, GetLastWalMethodError(stream->walmethod));
783 0 : lastFlushPosition = blockpos;
784 :
785 : /*
786 : * Send feedback so that the server sees the latest WAL locations
787 : * immediately.
788 : */
789 0 : if (!sendFeedback(conn, blockpos, now, false))
790 0 : goto error;
791 0 : last_status = now;
792 : }
793 :
794 : /*
795 : * Potentially send a status message to the primary
796 : */
797 1205 : if (still_sending && stream->standby_message_timeout > 0 &&
798 571 : feTimestampDifferenceExceeds(last_status, now,
799 : stream->standby_message_timeout))
800 : {
801 : /* Time to send feedback! */
802 156 : if (!sendFeedback(conn, blockpos, now, false))
803 0 : goto error;
804 156 : last_status = now;
805 : }
806 :
807 : /*
808 : * Calculate how long send/receive loops should sleep
809 : */
810 634 : sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
811 : last_status);
812 :
813 : /* Done with any prior message */
814 634 : PQfreemem(copybuf);
815 634 : copybuf = NULL;
816 :
817 634 : r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf);
818 2387 : while (r != 0)
819 : {
820 1909 : if (r == -1)
821 0 : goto error;
822 1909 : if (r == -2)
823 : {
824 156 : PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
825 :
826 156 : if (res == NULL)
827 0 : goto error;
828 156 : PQfreemem(copybuf);
829 156 : return res;
830 : }
831 :
832 : /* Check the message type. */
833 1753 : if (copybuf[0] == PqReplMsg_Keepalive)
834 : {
835 0 : if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
836 : &last_status))
837 0 : goto error;
838 : }
839 1753 : else if (copybuf[0] == PqReplMsg_WALData)
840 : {
841 1753 : if (!ProcessWALDataMsg(conn, stream, copybuf, r, &blockpos))
842 0 : goto error;
843 :
844 : /*
845 : * Check if we should continue streaming, or abort at this
846 : * point.
847 : */
848 1753 : if (!CheckCopyStreamStop(conn, stream, blockpos))
849 0 : goto error;
850 : }
851 : else
852 : {
853 0 : pg_log_error("unrecognized streaming header: \"%c\"",
854 : copybuf[0]);
855 0 : goto error;
856 : }
857 :
858 : /* Done with that message */
859 1753 : PQfreemem(copybuf);
860 1753 : copybuf = NULL;
861 :
862 : /*
863 : * Process the received data, and any subsequent data we can read
864 : * without blocking.
865 : */
866 1753 : r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf);
867 : }
868 : }
869 :
870 0 : error:
871 0 : PQfreemem(copybuf);
872 0 : return NULL;
873 : }
874 :
875 : /*
876 : * Wait until we can read a CopyData message,
877 : * or timeout, or occurrence of a signal or input on the stop_socket.
878 : * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
879 : *
880 : * Returns 1 if data has become available for reading, 0 if timed out
881 : * or interrupted by signal or stop_socket input, and -1 on an error.
882 : */
883 : static int
884 2127 : CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
885 : {
886 : int ret;
887 : fd_set input_mask;
888 : int connsocket;
889 : int maxfd;
890 : struct timeval timeout;
891 : struct timeval *timeoutptr;
892 :
893 2127 : connsocket = PQsocket(conn);
894 2127 : if (connsocket < 0)
895 : {
896 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
897 0 : return -1;
898 : }
899 :
900 36159 : FD_ZERO(&input_mask);
901 2127 : FD_SET(connsocket, &input_mask);
902 2127 : maxfd = connsocket;
903 2127 : if (stop_socket != PGINVALID_SOCKET)
904 : {
905 2063 : FD_SET(stop_socket, &input_mask);
906 2063 : maxfd = Max(maxfd, stop_socket);
907 : }
908 :
909 2127 : if (timeout_ms < 0)
910 63 : timeoutptr = NULL;
911 : else
912 : {
913 2064 : timeout.tv_sec = timeout_ms / 1000L;
914 2064 : timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
915 2064 : timeoutptr = &timeout;
916 : }
917 :
918 2127 : ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
919 :
920 2127 : if (ret < 0)
921 : {
922 0 : if (errno == EINTR)
923 0 : return 0; /* Got a signal, so not an error */
924 0 : pg_log_error("%s() failed: %m", "select");
925 0 : return -1;
926 : }
927 2127 : if (ret > 0 && FD_ISSET(connsocket, &input_mask))
928 1869 : return 1; /* Got input on connection socket */
929 :
930 258 : return 0; /* Got timeout or input on stop_socket */
931 : }
932 :
933 : /*
934 : * Receive CopyData message available from XLOG stream, blocking for
935 : * maximum of 'timeout' ms.
936 : *
937 : * If data was received, returns the length of the data. *buffer is set to
938 : * point to a buffer holding the received message. The caller must eventually
939 : * free the buffer with PQfreemem().
940 : *
941 : * Returns 0 if no data was available within timeout, or if wait was
942 : * interrupted by signal or stop_socket input.
943 : * -1 on error. -2 if the server ended the COPY.
944 : */
945 : static int
946 2387 : CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
947 : char **buffer)
948 : {
949 2387 : char *copybuf = NULL;
950 : int rawlen;
951 :
952 : /* Caller should have cleared any prior buffer */
953 : Assert(*buffer == NULL);
954 :
955 : /* Try to receive a CopyData message */
956 2387 : rawlen = PQgetCopyData(conn, ©buf, 1);
957 2387 : if (rawlen == 0)
958 : {
959 : int ret;
960 :
961 : /*
962 : * No data available. Wait for some to appear, but not longer than
963 : * the specified timeout, so that we can ping the server. Also stop
964 : * waiting if input appears on stop_socket.
965 : */
966 2127 : ret = CopyStreamPoll(conn, timeout, stop_socket);
967 2127 : if (ret <= 0)
968 258 : return ret;
969 :
970 : /* Now there is actually data on the socket */
971 1869 : if (PQconsumeInput(conn) == 0)
972 : {
973 0 : pg_log_error("could not receive data from WAL stream: %s",
974 : PQerrorMessage(conn));
975 0 : return -1;
976 : }
977 :
978 : /* Now that we've consumed some input, try again */
979 1869 : rawlen = PQgetCopyData(conn, ©buf, 1);
980 1869 : if (rawlen == 0)
981 220 : return 0;
982 : }
983 1909 : if (rawlen == -1) /* end-of-streaming or error */
984 156 : return -2;
985 1753 : if (rawlen == -2)
986 : {
987 0 : pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
988 0 : return -1;
989 : }
990 :
991 : /* Return received messages to caller */
992 1753 : *buffer = copybuf;
993 1753 : return rawlen;
994 : }
995 :
996 : /*
997 : * Process the keepalive message.
998 : */
999 : static bool
1000 0 : ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1001 : XLogRecPtr blockpos, TimestampTz *last_status)
1002 : {
1003 : int pos;
1004 : bool replyRequested;
1005 : TimestampTz now;
1006 :
1007 : /*
1008 : * Parse the keepalive message, enclosed in the CopyData message. We just
1009 : * check if the server requested a reply, and ignore the rest.
1010 : */
1011 0 : pos = 1; /* skip msgtype PqReplMsg_Keepalive */
1012 0 : pos += 8; /* skip walEnd */
1013 0 : pos += 8; /* skip sendTime */
1014 :
1015 0 : if (len < pos + 1)
1016 : {
1017 0 : pg_log_error("streaming header too small: %d", len);
1018 0 : return false;
1019 : }
1020 0 : replyRequested = copybuf[pos];
1021 :
1022 : /* If the server requested an immediate reply, send one. */
1023 0 : if (replyRequested && still_sending)
1024 : {
1025 0 : if (reportFlushPosition && lastFlushPosition < blockpos &&
1026 0 : walfile != NULL)
1027 : {
1028 : /*
1029 : * If a valid flush location needs to be reported, flush the
1030 : * current WAL file so that the latest flush location is sent back
1031 : * to the server. This is necessary to see whether the last WAL
1032 : * data has been successfully replicated or not, at the normal
1033 : * shutdown of the server.
1034 : */
1035 0 : if (stream->walmethod->ops->sync(walfile) != 0)
1036 0 : pg_fatal("could not fsync file \"%s\": %s",
1037 : walfile->pathname, GetLastWalMethodError(stream->walmethod));
1038 0 : lastFlushPosition = blockpos;
1039 : }
1040 :
1041 0 : now = feGetCurrentTimestamp();
1042 0 : if (!sendFeedback(conn, blockpos, now, false))
1043 0 : return false;
1044 0 : *last_status = now;
1045 : }
1046 :
1047 0 : return true;
1048 : }
1049 :
1050 : /*
1051 : * Process WALData message.
1052 : */
1053 : static bool
1054 1753 : ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1055 : XLogRecPtr *blockpos)
1056 : {
1057 : int xlogoff;
1058 : int bytes_left;
1059 : int bytes_written;
1060 : int hdr_len;
1061 :
1062 : /*
1063 : * Once we've decided we don't want to receive any more, just ignore any
1064 : * subsequent WALData messages.
1065 : */
1066 1753 : if (!(still_sending))
1067 231 : return true;
1068 :
1069 : /*
1070 : * Read the header of the WALData message, enclosed in the CopyData
1071 : * message. We only need the WAL location field (dataStart), the rest of
1072 : * the header is ignored.
1073 : */
1074 1522 : hdr_len = 1; /* msgtype PqReplMsg_WALData */
1075 1522 : hdr_len += 8; /* dataStart */
1076 1522 : hdr_len += 8; /* walEnd */
1077 1522 : hdr_len += 8; /* sendTime */
1078 1522 : if (len < hdr_len)
1079 : {
1080 0 : pg_log_error("streaming header too small: %d", len);
1081 0 : return false;
1082 : }
1083 1522 : *blockpos = fe_recvint64(©buf[1]);
1084 :
1085 : /* Extract WAL location for this block */
1086 1522 : xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1087 :
1088 : /*
1089 : * Verify that the initial location in the stream matches where we think
1090 : * we are.
1091 : */
1092 1522 : if (walfile == NULL)
1093 : {
1094 : /* No file open yet */
1095 162 : if (xlogoff != 0)
1096 : {
1097 0 : pg_log_error("received write-ahead log record for offset %u with no file open",
1098 : xlogoff);
1099 0 : return false;
1100 : }
1101 : }
1102 : else
1103 : {
1104 : /* More data in existing segment */
1105 1360 : if (walfile->currpos != xlogoff)
1106 : {
1107 0 : pg_log_error("got WAL data offset %08x, expected %08x",
1108 : xlogoff, (int) walfile->currpos);
1109 0 : return false;
1110 : }
1111 : }
1112 :
1113 1522 : bytes_left = len - hdr_len;
1114 1522 : bytes_written = 0;
1115 :
1116 3044 : while (bytes_left)
1117 : {
1118 : int bytes_to_write;
1119 :
1120 : /*
1121 : * If crossing a WAL boundary, only write up until we reach wal
1122 : * segment size.
1123 : */
1124 1522 : if (xlogoff + bytes_left > WalSegSz)
1125 0 : bytes_to_write = WalSegSz - xlogoff;
1126 : else
1127 1522 : bytes_to_write = bytes_left;
1128 :
1129 1522 : if (walfile == NULL)
1130 : {
1131 162 : if (!open_walfile(stream, *blockpos))
1132 : {
1133 : /* Error logged by open_walfile */
1134 0 : return false;
1135 : }
1136 : }
1137 :
1138 3044 : if (stream->walmethod->ops->write(walfile,
1139 1522 : copybuf + hdr_len + bytes_written,
1140 1522 : bytes_to_write) != bytes_to_write)
1141 : {
1142 0 : pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
1143 : bytes_to_write, walfile->pathname,
1144 : GetLastWalMethodError(stream->walmethod));
1145 0 : return false;
1146 : }
1147 :
1148 : /* Write was successful, advance our position */
1149 1522 : bytes_written += bytes_to_write;
1150 1522 : bytes_left -= bytes_to_write;
1151 1522 : *blockpos += bytes_to_write;
1152 1522 : xlogoff += bytes_to_write;
1153 :
1154 : /* Did we reach the end of a WAL segment? */
1155 1522 : if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1156 : {
1157 11 : if (!close_walfile(stream, *blockpos))
1158 : /* Error message written in close_walfile() */
1159 0 : return false;
1160 :
1161 11 : xlogoff = 0;
1162 :
1163 11 : if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1164 : {
1165 0 : if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1166 : {
1167 0 : pg_log_error("could not send copy-end packet: %s",
1168 : PQerrorMessage(conn));
1169 0 : return false;
1170 : }
1171 0 : still_sending = false;
1172 0 : return true; /* ignore the rest of this WALData packet */
1173 : }
1174 : }
1175 : }
1176 : /* No more data left to write, receive next copy packet */
1177 :
1178 1522 : return true;
1179 : }
1180 :
1181 : /*
1182 : * Handle end of the copy stream.
1183 : */
1184 : static PGresult *
1185 156 : HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
1186 : XLogRecPtr blockpos, XLogRecPtr *stoppos)
1187 : {
1188 156 : PGresult *res = PQgetResult(conn);
1189 :
1190 : /*
1191 : * The server closed its end of the copy stream. If we haven't closed
1192 : * ours already, we need to do so now, unless the server threw an error,
1193 : * in which case we don't.
1194 : */
1195 156 : if (still_sending)
1196 : {
1197 2 : if (!close_walfile(stream, blockpos))
1198 : {
1199 : /* Error message written in close_walfile() */
1200 0 : PQclear(res);
1201 0 : return NULL;
1202 : }
1203 2 : if (PQresultStatus(res) == PGRES_COPY_IN)
1204 : {
1205 1 : if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1206 : {
1207 0 : pg_log_error("could not send copy-end packet: %s",
1208 : PQerrorMessage(conn));
1209 0 : PQclear(res);
1210 0 : return NULL;
1211 : }
1212 1 : res = PQgetResult(conn);
1213 : }
1214 2 : still_sending = false;
1215 : }
1216 156 : *stoppos = blockpos;
1217 156 : return res;
1218 : }
1219 :
1220 : /*
1221 : * Check if we should continue streaming, or abort at this point.
1222 : */
1223 : static bool
1224 2387 : CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
1225 : {
1226 2387 : if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1227 : {
1228 154 : if (!close_walfile(stream, blockpos))
1229 : {
1230 : /* Potential error message is written by close_walfile */
1231 0 : return false;
1232 : }
1233 154 : if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1234 : {
1235 0 : pg_log_error("could not send copy-end packet: %s",
1236 : PQerrorMessage(conn));
1237 0 : return false;
1238 : }
1239 154 : still_sending = false;
1240 : }
1241 :
1242 2387 : return true;
1243 : }
1244 :
1245 : /*
1246 : * Calculate how long send/receive loops should sleep
1247 : */
1248 : static long
1249 634 : CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
1250 : TimestampTz last_status)
1251 : {
1252 634 : TimestampTz status_targettime = 0;
1253 : long sleeptime;
1254 :
1255 634 : if (standby_message_timeout && still_sending)
1256 571 : status_targettime = last_status +
1257 571 : (standby_message_timeout - 1) * ((int64) 1000);
1258 :
1259 634 : if (status_targettime > 0)
1260 : {
1261 : long secs;
1262 : int usecs;
1263 :
1264 571 : feTimestampDifference(now,
1265 : status_targettime,
1266 : &secs,
1267 : &usecs);
1268 : /* Always sleep at least 1 sec */
1269 571 : if (secs <= 0)
1270 : {
1271 0 : secs = 1;
1272 0 : usecs = 0;
1273 : }
1274 :
1275 571 : sleeptime = secs * 1000 + usecs / 1000;
1276 : }
1277 : else
1278 63 : sleeptime = -1;
1279 :
1280 634 : return sleeptime;
1281 : }
|