Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * receivelog.c - receive WAL files using the streaming
4 : : * replication protocol.
5 : : *
6 : : * Author: Magnus Hagander <magnus@hagander.net>
7 : : *
8 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
9 : : *
10 : : * IDENTIFICATION
11 : : * src/bin/pg_basebackup/receivelog.c
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres_fe.h"
16 : :
17 : : #include <sys/select.h>
18 : : #include <sys/stat.h>
19 : : #include <unistd.h>
20 : :
21 : : #include "access/xlog_internal.h"
22 : : #include "common/logging.h"
23 : : #include "libpq-fe.h"
24 : : #include "libpq/protocol.h"
25 : : #include "receivelog.h"
26 : : #include "streamutil.h"
27 : :
28 : : /* currently open WAL file */
29 : : static Walfile *walfile = NULL;
30 : : static bool reportFlushPosition = false;
31 : : static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
32 : :
33 : : static bool still_sending = true; /* feedback still needs to be sent? */
34 : :
35 : : static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
36 : : XLogRecPtr *stoppos);
37 : : static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
38 : : static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
39 : : char **buffer);
40 : : static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
41 : : int len, XLogRecPtr blockpos, TimestampTz *last_status);
42 : : static bool ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
43 : : XLogRecPtr *blockpos);
44 : : static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
45 : : XLogRecPtr blockpos, XLogRecPtr *stoppos);
46 : : static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
47 : : static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
48 : : TimestampTz last_status);
49 : :
50 : : static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
51 : : uint32 *timeline);
52 : :
53 : : static bool
3537 magnus@hagander.net 54 :CBC 9 : mark_file_as_archived(StreamCtl *stream, const char *fname)
55 : : {
56 : : Walfile *f;
57 : : static char tmppath[MAXPGPATH];
58 : :
59 : 9 : snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
60 : : fname);
61 : :
1380 rhaas@postgresql.org 62 : 9 : f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
63 : : NULL, 0);
3537 magnus@hagander.net 64 [ - + ]: 9 : if (f == NULL)
65 : : {
2647 peter@eisentraut.org 66 :UBC 0 : pg_log_error("could not create archive status file \"%s\": %s",
67 : : tmppath, GetLastWalMethodError(stream->walmethod));
4196 andres@anarazel.de 68 : 0 : return false;
69 : : }
70 : :
1380 rhaas@postgresql.org 71 [ - + ]:CBC 9 : if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
72 : : {
1686 tgl@sss.pgh.pa.us 73 :UBC 0 : pg_log_error("could not close archive status file \"%s\": %s",
74 : : tmppath, GetLastWalMethodError(stream->walmethod));
75 : 0 : return false;
76 : : }
77 : :
4196 andres@anarazel.de 78 :CBC 9 : return true;
79 : : }
80 : :
81 : : /*
82 : : * Open a new WAL file in the specified directory.
83 : : *
84 : : * Returns true if OK; on failure, returns false after printing an error msg.
85 : : * On success, 'walfile' is set to the opened WAL file.
86 : : *
87 : : * The file will be padded to 16Mb with zeroes.
88 : : */
89 : : static bool
3763 magnus@hagander.net 90 : 162 : open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
91 : : {
92 : : Walfile *f;
93 : : char *fn;
94 : : ssize_t size;
95 : : XLogSegNo segno;
96 : : char walfile_name[MAXPGPATH];
97 : :
3206 andres@anarazel.de 98 : 162 : XLByteToSeg(startpoint, segno, WalSegSz);
1380 rhaas@postgresql.org 99 : 162 : XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
100 : :
101 : : /* Note that this considers the compression used if necessary */
102 : 162 : fn = stream->walmethod->ops->get_file_name(stream->walmethod,
103 : : walfile_name,
104 : 162 : stream->partial_suffix);
105 : :
106 : : /*
107 : : * When streaming to files, if an existing file exists we verify that it's
108 : : * either empty (just created), or a complete WalSegSz segment (in which
109 : : * case it has been created and padded). Anything else indicates a corrupt
110 : : * file. Compressed files have no need for padding, so just ignore this
111 : : * case.
112 : : *
113 : : * When streaming to tar, no file with this name will exist before, so we
114 : : * never have to verify a size.
115 : : */
116 [ + + - + ]: 317 : if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE &&
117 : 155 : stream->walmethod->ops->existsfile(stream->walmethod, fn))
118 : : {
1380 rhaas@postgresql.org 119 :UBC 0 : size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
3537 magnus@hagander.net 120 [ # # ]: 0 : if (size < 0)
121 : : {
2647 peter@eisentraut.org 122 : 0 : pg_log_error("could not get size of write-ahead log file \"%s\": %s",
123 : : fn, GetLastWalMethodError(stream->walmethod));
1800 michael@paquier.xyz 124 : 0 : pg_free(fn);
3537 magnus@hagander.net 125 : 0 : return false;
126 : : }
3206 andres@anarazel.de 127 [ # # ]: 0 : if (size == WalSegSz)
128 : : {
129 : : /* Already padded file. Open it for use */
1380 rhaas@postgresql.org 130 : 0 : f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
3537 magnus@hagander.net 131 [ # # ]: 0 : if (f == NULL)
132 : : {
2647 peter@eisentraut.org 133 : 0 : pg_log_error("could not open existing write-ahead log file \"%s\": %s",
134 : : fn, GetLastWalMethodError(stream->walmethod));
1800 michael@paquier.xyz 135 : 0 : pg_free(fn);
3558 tgl@sss.pgh.pa.us 136 : 0 : return false;
137 : : }
138 : :
139 : : /* fsync file in case of a previous crash */
1380 rhaas@postgresql.org 140 [ # # ]: 0 : if (stream->walmethod->ops->sync(f) != 0)
141 : : {
1544 tgl@sss.pgh.pa.us 142 : 0 : pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
143 : : fn, GetLastWalMethodError(stream->walmethod));
1380 rhaas@postgresql.org 144 : 0 : stream->walmethod->ops->close(f, CLOSE_UNLINK);
2528 peter@eisentraut.org 145 : 0 : exit(1);
146 : : }
147 : :
3537 magnus@hagander.net 148 : 0 : walfile = f;
1800 michael@paquier.xyz 149 : 0 : pg_free(fn);
3537 magnus@hagander.net 150 : 0 : return true;
151 : : }
152 [ # # ]: 0 : if (size != 0)
153 : : {
154 : : /* if write didn't set errno, assume problem is no disk space */
3558 tgl@sss.pgh.pa.us 155 [ # # ]: 0 : if (errno == 0)
156 : 0 : errno = ENOSPC;
1787 peter@eisentraut.org 157 : 0 : pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
158 : : "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
159 : : size),
160 : : fn, size, WalSegSz);
1800 michael@paquier.xyz 161 : 0 : pg_free(fn);
4912 heikki.linnakangas@i 162 : 0 : return false;
163 : : }
164 : : /* File existed and was empty, so fall through and open */
165 : : }
166 : :
167 : : /* No file existed, so create one */
168 : :
1380 rhaas@postgresql.org 169 :CBC 162 : f = stream->walmethod->ops->open_for_write(stream->walmethod,
170 : : walfile_name,
171 : 162 : stream->partial_suffix,
172 : : WalSegSz);
3537 magnus@hagander.net 173 [ - + ]: 162 : if (f == NULL)
174 : : {
2647 peter@eisentraut.org 175 :UBC 0 : pg_log_error("could not open write-ahead log file \"%s\": %s",
176 : : fn, GetLastWalMethodError(stream->walmethod));
1800 michael@paquier.xyz 177 : 0 : pg_free(fn);
4912 heikki.linnakangas@i 178 : 0 : return false;
179 : : }
180 : :
1800 michael@paquier.xyz 181 :CBC 162 : pg_free(fn);
4912 heikki.linnakangas@i 182 : 162 : walfile = f;
183 : 162 : return true;
184 : : }
185 : :
186 : : /*
187 : : * Close the current WAL file (if open), and rename it to the correct
188 : : * filename if it's complete. On failure, prints an error message to stderr
189 : : * and returns false, otherwise returns true.
190 : : */
191 : : static bool
3763 magnus@hagander.net 192 : 167 : close_walfile(StreamCtl *stream, XLogRecPtr pos)
193 : : {
194 : : char *fn;
195 : : pgoff_t currpos;
196 : : int r;
197 : : char walfile_name[MAXPGPATH];
198 : :
3537 199 [ + + ]: 167 : if (walfile == NULL)
4912 heikki.linnakangas@i 200 : 5 : return true;
201 : :
1380 rhaas@postgresql.org 202 : 162 : strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
203 : 162 : currpos = walfile->currpos;
204 : :
205 : : /* Note that this considers the compression used if necessary */
206 : 162 : fn = stream->walmethod->ops->get_file_name(stream->walmethod,
207 : : walfile_name,
208 : 162 : stream->partial_suffix);
209 : :
3537 magnus@hagander.net 210 [ + + ]: 162 : if (stream->partial_suffix)
211 : : {
3206 andres@anarazel.de 212 [ + + ]: 12 : if (currpos == WalSegSz)
1380 rhaas@postgresql.org 213 : 6 : r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
214 : : else
215 : : {
1747 michael@paquier.xyz 216 : 6 : pg_log_info("not renaming \"%s\", segment is not complete", fn);
1380 rhaas@postgresql.org 217 : 6 : r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
218 : : }
219 : : }
220 : : else
221 : 150 : r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
222 : :
3537 magnus@hagander.net 223 : 162 : walfile = NULL;
224 : :
225 [ - + ]: 162 : if (r != 0)
226 : : {
2647 peter@eisentraut.org 227 :UBC 0 : pg_log_error("could not close file \"%s\": %s",
228 : : fn, GetLastWalMethodError(stream->walmethod));
229 : :
1747 michael@paquier.xyz 230 : 0 : pg_free(fn);
5353 magnus@hagander.net 231 : 0 : return false;
232 : : }
233 : :
1747 michael@paquier.xyz 234 :CBC 162 : pg_free(fn);
235 : :
236 : : /*
237 : : * Mark file as archived if requested by the caller - pg_basebackup needs
238 : : * to do so as files can otherwise get archived again after promotion of a
239 : : * new node. This is in line with walreceiver.c always doing a
240 : : * XLogArchiveForceDone() after a complete segment.
241 : : */
3206 andres@anarazel.de 242 [ + + + + ]: 162 : if (currpos == WalSegSz && stream->mark_done)
243 : : {
244 : : /* writes error message if failed */
1380 rhaas@postgresql.org 245 [ - + ]: 5 : if (!mark_file_as_archived(stream, walfile_name))
4196 andres@anarazel.de 246 :UBC 0 : return false;
247 : : }
248 : :
4533 rhaas@postgresql.org 249 :CBC 162 : lastFlushPosition = pos;
5353 magnus@hagander.net 250 : 162 : return true;
251 : : }
252 : :
253 : :
254 : : /*
255 : : * Check if a timeline history file exists.
256 : : */
257 : : static bool
3763 258 : 157 : existsTimeLineHistoryFile(StreamCtl *stream)
259 : : {
260 : : char histfname[MAXFNAMELEN];
261 : :
262 : : /*
263 : : * Timeline 1 never has a history file. We treat that as if it existed,
264 : : * since we never need to stream it.
265 : : */
266 [ + + ]: 157 : if (stream->timeline == 1)
4912 heikki.linnakangas@i 267 : 152 : return true;
268 : :
3763 magnus@hagander.net 269 : 5 : TLHistoryFileName(histfname, stream->timeline);
270 : :
1380 rhaas@postgresql.org 271 : 5 : return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
272 : : }
273 : :
274 : : static bool
3763 magnus@hagander.net 275 : 5 : writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
276 : : {
4912 heikki.linnakangas@i 277 : 5 : int size = strlen(content);
278 : : char histfname[MAXFNAMELEN];
279 : : Walfile *f;
280 : :
281 : : /*
282 : : * Check that the server's idea of how timeline history files should be
283 : : * named matches ours.
284 : : */
3763 magnus@hagander.net 285 : 5 : TLHistoryFileName(histfname, stream->timeline);
4912 heikki.linnakangas@i 286 [ - + ]: 5 : if (strcmp(histfname, filename) != 0)
287 : : {
2647 peter@eisentraut.org 288 :UBC 0 : pg_log_error("server reported unexpected history file name for timeline %u: %s",
289 : : stream->timeline, filename);
4912 heikki.linnakangas@i 290 : 0 : return false;
291 : : }
292 : :
1380 rhaas@postgresql.org 293 :CBC 5 : f = stream->walmethod->ops->open_for_write(stream->walmethod,
294 : : histfname, ".tmp", 0);
3537 magnus@hagander.net 295 [ - + ]: 5 : if (f == NULL)
296 : : {
2647 peter@eisentraut.org 297 :UBC 0 : pg_log_error("could not create timeline history file \"%s\": %s",
298 : : histfname, GetLastWalMethodError(stream->walmethod));
4912 heikki.linnakangas@i 299 : 0 : return false;
300 : : }
301 : :
1380 rhaas@postgresql.org 302 [ - + ]:CBC 5 : if ((int) stream->walmethod->ops->write(f, content, size) != size)
303 : : {
2647 peter@eisentraut.org 304 :UBC 0 : pg_log_error("could not write timeline history file \"%s\": %s",
305 : : histfname, GetLastWalMethodError(stream->walmethod));
306 : :
307 : : /*
308 : : * If we fail to make the file, delete it to release disk space
309 : : */
1380 rhaas@postgresql.org 310 : 0 : stream->walmethod->ops->close(f, CLOSE_UNLINK);
311 : :
4912 heikki.linnakangas@i 312 : 0 : return false;
313 : : }
314 : :
1380 rhaas@postgresql.org 315 [ - + ]:CBC 5 : if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
316 : : {
2647 peter@eisentraut.org 317 :UBC 0 : pg_log_error("could not close file \"%s\": %s",
318 : : histfname, GetLastWalMethodError(stream->walmethod));
4912 heikki.linnakangas@i 319 : 0 : return false;
320 : : }
321 : :
322 : : /* Maintain archive_status, check close_walfile() for details. */
3763 magnus@hagander.net 323 [ + + ]:CBC 5 : if (stream->mark_done)
324 : : {
325 : : /* writes error message if failed */
3537 326 [ - + ]: 4 : if (!mark_file_as_archived(stream, histfname))
4196 andres@anarazel.de 327 :UBC 0 : return false;
328 : : }
329 : :
4912 heikki.linnakangas@i 330 :CBC 5 : return true;
331 : : }
332 : :
333 : : /*
334 : : * Send a Standby Status Update message to server.
335 : : */
336 : : static bool
3414 tgl@sss.pgh.pa.us 337 : 156 : sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
338 : : {
339 : : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
4780 bruce@momjian.us 340 : 156 : int len = 0;
341 : :
328 nathan@postgresql.or 342 :GNC 156 : replybuf[len] = PqReplMsg_StandbyStatusUpdate;
4983 heikki.linnakangas@i 343 :CBC 156 : len += 1;
3296 tgl@sss.pgh.pa.us 344 : 156 : fe_sendint64(blockpos, &replybuf[len]); /* write */
4983 heikki.linnakangas@i 345 : 156 : len += 8;
4533 rhaas@postgresql.org 346 [ + + ]: 156 : if (reportFlushPosition)
3296 tgl@sss.pgh.pa.us 347 : 152 : fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
348 : : else
349 : 4 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
4983 heikki.linnakangas@i 350 : 156 : len += 8;
4438 bruce@momjian.us 351 : 156 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
4983 heikki.linnakangas@i 352 : 156 : len += 8;
4438 bruce@momjian.us 353 : 156 : fe_sendint64(now, &replybuf[len]); /* sendTime */
4983 heikki.linnakangas@i 354 : 156 : len += 8;
3296 tgl@sss.pgh.pa.us 355 : 156 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
4983 heikki.linnakangas@i 356 : 156 : len += 1;
357 : :
358 [ + - - + ]: 156 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
359 : : {
2647 peter@eisentraut.org 360 :UBC 0 : pg_log_error("could not send feedback packet: %s",
361 : : PQerrorMessage(conn));
4983 heikki.linnakangas@i 362 : 0 : return false;
363 : : }
364 : :
4983 heikki.linnakangas@i 365 :CBC 156 : return true;
366 : : }
367 : :
368 : : /*
369 : : * Check that the server version we're connected to is supported by
370 : : * ReceiveXlogStream().
371 : : *
372 : : * If it's not, an error message is printed to stderr, and false is returned.
373 : : */
374 : : bool
4848 375 : 333 : CheckServerVersionForStreaming(PGconn *conn)
376 : : {
377 : : int minServerMajor,
378 : : maxServerMajor;
379 : : int serverMajor;
380 : :
381 : : /*
382 : : * The message format used in streaming replication changed in 9.3, so we
383 : : * cannot stream from older servers. And we don't support servers newer
384 : : * than the client; it might work, but we don't know, so err on the safe
385 : : * side.
386 : : */
387 : 333 : minServerMajor = 903;
388 : 333 : maxServerMajor = PG_VERSION_NUM / 100;
389 : 333 : serverMajor = PQserverVersion(conn) / 100;
4438 simon@2ndQuadrant.co 390 [ - + ]: 333 : if (serverMajor < minServerMajor)
391 : : {
4848 heikki.linnakangas@i 392 :UBC 0 : const char *serverver = PQparameterStatus(conn, "server_version");
393 : :
2647 peter@eisentraut.org 394 [ # # ]: 0 : pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
395 : : serverver ? serverver : "'unknown'",
396 : : "9.3");
4438 simon@2ndQuadrant.co 397 : 0 : return false;
398 : : }
4438 simon@2ndQuadrant.co 399 [ - + ]:CBC 333 : else if (serverMajor > maxServerMajor)
400 : : {
4438 simon@2ndQuadrant.co 401 :UBC 0 : const char *serverver = PQparameterStatus(conn, "server_version");
402 : :
2647 peter@eisentraut.org 403 [ # # ]: 0 : pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
404 : : serverver ? serverver : "'unknown'",
405 : : PG_VERSION);
4848 heikki.linnakangas@i 406 : 0 : return false;
407 : : }
4848 heikki.linnakangas@i 408 :CBC 333 : return true;
409 : : }
410 : :
411 : : /*
412 : : * Receive a log stream starting at the specified position.
413 : : *
414 : : * Individual parameters are passed through the StreamCtl structure.
415 : : *
416 : : * If sysidentifier is specified, validate that both the system
417 : : * identifier and the timeline matches the specified ones
418 : : * (by sending an extra IDENTIFY_SYSTEM command)
419 : : *
420 : : * All received segments will be written to the directory
421 : : * specified by basedir. This will also fetch any missing timeline history
422 : : * files.
423 : : *
424 : : * The stream_stop callback will be called every time data
425 : : * is received, and whenever a segment is completed. If it returns
426 : : * true, the streaming will stop and the function
427 : : * return. As long as it returns false, streaming will continue
428 : : * indefinitely.
429 : : *
430 : : * If stream_stop() checks for external input, stop_socket should be set to
431 : : * the FD it checks. This will allow such input to be detected promptly
432 : : * rather than after standby_message_timeout (which might be indefinite).
433 : : * Note that signals will interrupt waits for input as well, but that is
434 : : * race-y since a signal received while busy won't interrupt the wait.
435 : : *
436 : : * standby_message_timeout controls how often we send a message
437 : : * back to the primary letting it know our progress, in milliseconds.
438 : : * Zero means no messages are sent.
439 : : * This message will only contain the write location, and never
440 : : * flush or replay.
441 : : *
442 : : * If 'partial_suffix' is not NULL, files are initially created with the
443 : : * given suffix, and the suffix is removed once the file is finished. That
444 : : * allows you to tell the difference between partial and completed files,
445 : : * so that you can continue later where you left.
446 : : *
447 : : * If 'synchronous' is true, the received WAL is flushed as soon as written,
448 : : * otherwise only when the WAL file is closed.
449 : : *
450 : : * Note: The WAL location *must* be at a log segment start!
451 : : */
452 : : bool
3763 magnus@hagander.net 453 : 156 : ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
454 : : {
455 : : PQExpBuffer query;
456 : : PGresult *res;
457 : : XLogRecPtr stoppos;
458 : :
459 : : /*
460 : : * The caller should've checked the server version already, but doesn't do
461 : : * any harm to check it here too.
462 : : */
4848 heikki.linnakangas@i 463 [ - + ]: 156 : if (!CheckServerVersionForStreaming(conn))
4914 heikki.linnakangas@i 464 :UBC 0 : return false;
465 : :
466 : : /*
467 : : * Decide whether we want to report the flush position. If we report the
468 : : * flush position, the primary will know what WAL we'll possibly
469 : : * re-request, and it can then remove older WAL safely. We must always do
470 : : * that when we are using slots.
471 : : *
472 : : * Reporting the flush position makes one eligible as a synchronous
473 : : * replica. People shouldn't include generic names in
474 : : * synchronous_standby_names, but we've protected them against it so far,
475 : : * so let's continue to do so unless specifically requested.
476 : : */
3452 magnus@hagander.net 477 [ + + ]:CBC 156 : if (stream->replication_slot != NULL)
478 : : {
4533 rhaas@postgresql.org 479 : 151 : reportFlushPosition = true;
480 : : }
481 : : else
482 : : {
3592 simon@2ndQuadrant.co 483 [ + + ]: 5 : if (stream->synchronous)
484 : 1 : reportFlushPosition = true;
485 : : else
486 : 4 : reportFlushPosition = false;
487 : : }
488 : :
3763 magnus@hagander.net 489 [ + - ]: 156 : if (stream->sysidentifier != NULL)
490 : : {
1764 michael@paquier.xyz 491 : 156 : char *sysidentifier = NULL;
492 : : TimeLineID servertli;
493 : :
494 : : /*
495 : : * Get the server system identifier and timeline, and validate them.
496 : : */
497 [ - + ]: 156 : if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
498 : : {
1764 michael@paquier.xyz 499 :UBC 0 : pg_free(sysidentifier);
5361 magnus@hagander.net 500 : 0 : return false;
501 : : }
502 : :
1764 michael@paquier.xyz 503 [ - + ]:CBC 156 : if (strcmp(stream->sysidentifier, sysidentifier) != 0)
504 : : {
2647 peter@eisentraut.org 505 :UBC 0 : pg_log_error("system identifier does not match between base backup and streaming connection");
1764 michael@paquier.xyz 506 : 0 : pg_free(sysidentifier);
5361 magnus@hagander.net 507 : 0 : return false;
508 : : }
1764 michael@paquier.xyz 509 :CBC 156 : pg_free(sysidentifier);
510 : :
511 [ - + ]: 156 : if (stream->timeline > servertli)
512 : : {
2647 peter@eisentraut.org 513 :UBC 0 : pg_log_error("starting timeline %u is not present in the server",
514 : : stream->timeline);
5361 magnus@hagander.net 515 : 0 : return false;
516 : : }
517 : : }
518 : :
519 : : /*
520 : : * initialize flush position to starting point, it's the caller's
521 : : * responsibility that that's sane.
522 : : */
3763 magnus@hagander.net 523 :CBC 156 : lastFlushPosition = stream->startpos;
524 : :
525 : : while (1)
5361 magnus@hagander.net 526 :GIC 1 : {
527 : : /*
528 : : * Fetch the timeline history file for this timeline, if we don't have
529 : : * it already. When streaming log to tar, this will always return
530 : : * false, as we are never streaming into an existing file and
531 : : * therefore there can be no pre-existing timeline history file.
532 : : */
3763 magnus@hagander.net 533 [ + + ]:CBC 157 : if (!existsTimeLineHistoryFile(stream))
534 : : {
15 tgl@sss.pgh.pa.us 535 : 5 : query = createPQExpBuffer();
536 : 5 : appendPQExpBuffer(query, "TIMELINE_HISTORY %u", stream->timeline);
537 : 5 : res = PQexec(conn, query->data);
538 : 5 : destroyPQExpBuffer(query);
4912 heikki.linnakangas@i 539 [ - + ]: 5 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
540 : : {
541 : : /* FIXME: we might send it ok, but get an error */
2647 peter@eisentraut.org 542 :UBC 0 : pg_log_error("could not send replication command \"%s\": %s",
543 : : "TIMELINE_HISTORY", PQresultErrorMessage(res));
4912 heikki.linnakangas@i 544 : 0 : PQclear(res);
545 : 0 : return false;
546 : : }
547 : :
548 : : /*
549 : : * The response to TIMELINE_HISTORY is a single row result set
550 : : * with two fields: filename and content
551 : : */
4912 heikki.linnakangas@i 552 [ + - - + ]:CBC 5 : if (PQnfields(res) != 2 || PQntuples(res) != 1)
553 : : {
2647 peter@eisentraut.org 554 :UBC 0 : pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
555 : : PQntuples(res), PQnfields(res), 1, 2);
556 : : }
557 : :
558 : : /* Write the history file to disk */
3763 magnus@hagander.net 559 :CBC 5 : writeTimeLineHistoryFile(stream,
560 : : PQgetvalue(res, 0, 0),
561 : : PQgetvalue(res, 0, 1));
562 : :
4912 heikki.linnakangas@i 563 : 5 : PQclear(res);
564 : : }
565 : :
566 : : /*
567 : : * Before we start streaming from the requested location, check if the
568 : : * callback tells us to stop here.
569 : : */
3763 magnus@hagander.net 570 [ - + ]: 157 : if (stream->stream_stop(stream->startpos, stream->timeline, false))
4912 heikki.linnakangas@i 571 :UBC 0 : return true;
572 : :
573 : : /* Initiate the replication stream at specified location */
15 tgl@sss.pgh.pa.us 574 :CBC 157 : query = createPQExpBuffer();
575 : 157 : appendPQExpBufferStr(query, "START_REPLICATION");
576 [ + + ]: 157 : if (stream->replication_slot != NULL)
577 : : {
578 : 152 : appendPQExpBufferStr(query, " SLOT ");
579 : 152 : AppendQuotedIdentifier(query, stream->replication_slot);
580 : : }
15 tgl@sss.pgh.pa.us 581 :GNC 157 : appendPQExpBuffer(query, " %X/%08X TIMELINE %u",
15 tgl@sss.pgh.pa.us 582 :CBC 157 : LSN_FORMAT_ARGS(stream->startpos),
583 : : stream->timeline);
584 : 157 : res = PQexec(conn, query->data);
585 : 157 : destroyPQExpBuffer(query);
4912 heikki.linnakangas@i 586 [ + + ]: 157 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
587 : : {
2647 peter@eisentraut.org 588 : 1 : pg_log_error("could not send replication command \"%s\": %s",
589 : : "START_REPLICATION", PQresultErrorMessage(res));
4912 heikki.linnakangas@i 590 : 1 : PQclear(res);
591 : 1 : return false;
592 : : }
5101 magnus@hagander.net 593 : 156 : PQclear(res);
594 : :
595 : : /* Stream the WAL */
3763 596 : 156 : res = HandleCopyStream(conn, stream, &stoppos);
4810 rhaas@postgresql.org 597 [ - + ]: 156 : if (res == NULL)
4912 heikki.linnakangas@i 598 :UBC 0 : goto error;
599 : :
600 : : /*
601 : : * Streaming finished.
602 : : *
603 : : * There are two possible reasons for that: a controlled shutdown, or
604 : : * we reached the end of the current timeline. In case of
605 : : * end-of-timeline, the server sends a result set after Copy has
606 : : * finished, containing information about the next timeline. Read
607 : : * that, and restart streaming from the next timeline. In case of
608 : : * controlled shutdown, stop here.
609 : : */
4912 heikki.linnakangas@i 610 [ + + ]:CBC 156 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
611 : 1 : {
612 : : /*
613 : : * End-of-timeline. Read the next timeline's ID and starting
614 : : * position. Usually, the starting position will match the end of
615 : : * the previous timeline, but there are corner cases like if the
616 : : * server had sent us half of a WAL record, when it was promoted.
617 : : * The new timeline will begin at the end of the last complete
618 : : * record in that case, overlapping the partial WAL record on the
619 : : * old timeline.
620 : : */
621 : : uint32 newtimeline;
622 : : bool parsed;
623 : :
3763 magnus@hagander.net 624 : 1 : parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
4912 heikki.linnakangas@i 625 : 1 : PQclear(res);
4801 626 [ - + ]: 1 : if (!parsed)
4801 heikki.linnakangas@i 627 :UBC 0 : goto error;
628 : :
629 : : /* Sanity check the values the server gave us */
3763 magnus@hagander.net 630 [ - + ]:CBC 1 : if (newtimeline <= stream->timeline)
631 : : {
2647 peter@eisentraut.org 632 :UBC 0 : pg_log_error("server reported unexpected next timeline %u, following timeline %u",
633 : : newtimeline, stream->timeline);
4801 heikki.linnakangas@i 634 : 0 : goto error;
635 : : }
3763 magnus@hagander.net 636 [ - + ]:CBC 1 : if (stream->startpos > stoppos)
637 : : {
358 alvherre@kurilemu.de 638 :UNC 0 : pg_log_error("server stopped streaming timeline %u at %X/%08X, but reported next timeline %u to begin at %X/%08X",
639 : : stream->timeline, LSN_FORMAT_ARGS(stoppos),
640 : : newtimeline, LSN_FORMAT_ARGS(stream->startpos));
4912 heikki.linnakangas@i 641 :UBC 0 : goto error;
642 : : }
643 : :
644 : : /* Read the final result, which should be CommandComplete. */
4912 heikki.linnakangas@i 645 :CBC 1 : res = PQgetResult(conn);
646 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
647 : : {
2647 peter@eisentraut.org 648 :UBC 0 : pg_log_error("unexpected termination of replication stream: %s",
649 : : PQresultErrorMessage(res));
4350 fujii@postgresql.org 650 : 0 : PQclear(res);
4912 heikki.linnakangas@i 651 : 0 : goto error;
652 : : }
4912 heikki.linnakangas@i 653 :CBC 1 : PQclear(res);
654 : :
655 : : /*
656 : : * Loop back to start streaming from the new timeline. Always
657 : : * start streaming at the beginning of a segment.
658 : : */
3763 magnus@hagander.net 659 : 1 : stream->timeline = newtimeline;
3206 andres@anarazel.de 660 : 1 : stream->startpos = stream->startpos -
661 : 1 : XLogSegmentOffset(stream->startpos, WalSegSz);
4912 heikki.linnakangas@i 662 : 1 : continue;
663 : : }
664 [ + + ]: 155 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
665 : : {
4350 fujii@postgresql.org 666 : 154 : PQclear(res);
667 : :
668 : : /*
669 : : * End of replication (ie. controlled shut down of the server).
670 : : *
671 : : * Check if the callback thinks it's OK to stop here. If not,
672 : : * complain.
673 : : */
3763 magnus@hagander.net 674 [ + - ]: 154 : if (stream->stream_stop(stoppos, stream->timeline, false))
4912 heikki.linnakangas@i 675 : 154 : return true;
676 : : else
677 : : {
2647 peter@eisentraut.org 678 :UBC 0 : pg_log_error("replication stream was terminated before stop point");
4912 heikki.linnakangas@i 679 : 0 : goto error;
680 : : }
681 : : }
682 : : else
683 : : {
684 : : /* Server returned an error. */
2647 peter@eisentraut.org 685 :CBC 1 : pg_log_error("unexpected termination of replication stream: %s",
686 : : PQresultErrorMessage(res));
4350 fujii@postgresql.org 687 : 1 : PQclear(res);
4912 heikki.linnakangas@i 688 : 1 : goto error;
689 : : }
690 : : }
691 : :
692 : 1 : error:
1380 rhaas@postgresql.org 693 [ - + - - ]: 1 : if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
2647 peter@eisentraut.org 694 :UBC 0 : pg_log_error("could not close file \"%s\": %s",
695 : : walfile->pathname, GetLastWalMethodError(stream->walmethod));
3537 magnus@hagander.net 696 :CBC 1 : walfile = NULL;
4912 heikki.linnakangas@i 697 : 1 : return false;
698 : : }
699 : :
700 : : /*
701 : : * Helper function to parse the result set returned by server after streaming
702 : : * has finished. On failure, prints an error to stderr and returns false.
703 : : */
704 : : static bool
4801 705 : 1 : ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
706 : : {
707 : : uint32 startpos_xlogid,
708 : : startpos_xrecoff;
709 : :
710 : : /*----------
711 : : * The result set consists of one row and two columns, e.g:
712 : : *
713 : : * next_tli | next_tli_startpos
714 : : * ----------+-------------------
715 : : * 4 | 0/9949AE0
716 : : *
717 : : * next_tli is the timeline ID of the next timeline after the one that
718 : : * just finished streaming. next_tli_startpos is the WAL location where
719 : : * the server switched to it.
720 : : *----------
721 : : */
722 [ + - - + ]: 1 : if (PQnfields(res) < 2 || PQntuples(res) != 1)
723 : : {
2647 peter@eisentraut.org 724 :UBC 0 : pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
725 : : PQntuples(res), PQnfields(res), 1, 2);
4801 heikki.linnakangas@i 726 : 0 : return false;
727 : : }
728 : :
4801 heikki.linnakangas@i 729 :CBC 1 : *timeline = atoi(PQgetvalue(res, 0, 0));
358 alvherre@kurilemu.de 730 [ - + ]:GNC 1 : if (sscanf(PQgetvalue(res, 0, 1), "%X/%08X", &startpos_xlogid,
731 : : &startpos_xrecoff) != 2)
732 : : {
2647 peter@eisentraut.org 733 :UBC 0 : pg_log_error("could not parse next timeline's starting point \"%s\"",
734 : : PQgetvalue(res, 0, 1));
4801 heikki.linnakangas@i 735 : 0 : return false;
736 : : }
4801 heikki.linnakangas@i 737 :CBC 1 : *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
738 : :
739 : 1 : return true;
740 : : }
741 : :
742 : : /*
743 : : * The main loop of ReceiveXlogStream. Handles the COPY stream after
744 : : * initiating streaming with the START_REPLICATION command.
745 : : *
746 : : * If the COPY ends (not necessarily successfully) due a message from the
747 : : * server, returns a PGresult and sets *stoppos to the last byte written.
748 : : * On any other sort of error, returns NULL.
749 : : */
750 : : static PGresult *
3763 magnus@hagander.net 751 : 156 : HandleCopyStream(PGconn *conn, StreamCtl *stream,
752 : : XLogRecPtr *stoppos)
753 : : {
4912 heikki.linnakangas@i 754 : 156 : char *copybuf = NULL;
3414 tgl@sss.pgh.pa.us 755 : 156 : TimestampTz last_status = -1;
3763 magnus@hagander.net 756 : 156 : XLogRecPtr blockpos = stream->startpos;
757 : :
4346 fujii@postgresql.org 758 : 156 : still_sending = true;
759 : :
760 : : while (1)
5361 magnus@hagander.net 761 : 958 : {
762 : : int r;
763 : : TimestampTz now;
764 : : long sleeptime;
765 : :
766 : : /*
767 : : * Check if we should continue streaming, or abort at this point.
768 : : */
2119 peter@eisentraut.org 769 [ - + ]: 1114 : if (!CheckCopyStreamStop(conn, stream, blockpos))
4344 fujii@postgresql.org 770 :UBC 0 : goto error;
771 : :
4344 fujii@postgresql.org 772 :CBC 1114 : now = feGetCurrentTimestamp();
773 : :
774 : : /*
775 : : * If synchronous option is true, issue sync command as soon as there
776 : : * are WAL data which has not been flushed yet.
777 : : */
3537 magnus@hagander.net 778 [ + + - + : 1114 : if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
- - ]
779 : : {
1380 rhaas@postgresql.org 780 [ # # ]:UBC 0 : if (stream->walmethod->ops->sync(walfile) != 0)
1544 tgl@sss.pgh.pa.us 781 : 0 : pg_fatal("could not fsync file \"%s\": %s",
782 : : walfile->pathname, GetLastWalMethodError(stream->walmethod));
4344 fujii@postgresql.org 783 : 0 : lastFlushPosition = blockpos;
784 : :
785 : : /*
786 : : * Send feedback so that the server sees the latest WAL locations
787 : : * immediately.
788 : : */
4242 789 [ # # ]: 0 : if (!sendFeedback(conn, blockpos, now, false))
790 : 0 : goto error;
791 : 0 : last_status = now;
792 : : }
793 : :
794 : : /*
795 : : * Potentially send a status message to the primary
796 : : */
3763 magnus@hagander.net 797 [ + + + - :CBC 2131 : if (still_sending && stream->standby_message_timeout > 0 &&
+ + ]
4487 rhaas@postgresql.org 798 : 1017 : feTimestampDifferenceExceeds(last_status, now,
799 : : stream->standby_message_timeout))
800 : : {
801 : : /* Time to send feedback! */
4982 heikki.linnakangas@i 802 [ - + ]: 156 : if (!sendFeedback(conn, blockpos, now, false))
5101 magnus@hagander.net 803 :UBC 0 : goto error;
5361 magnus@hagander.net 804 :CBC 156 : last_status = now;
805 : : }
806 : :
807 : : /*
808 : : * Calculate how long send/receive loops should sleep
809 : : */
3763 810 : 1114 : sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
811 : : last_status);
812 : :
813 : : /* Done with any prior message */
503 tgl@sss.pgh.pa.us 814 : 1114 : PQfreemem(copybuf);
815 : 1114 : copybuf = NULL;
816 : :
3351 817 : 1114 : r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf);
4344 fujii@postgresql.org 818 [ + + ]: 3098 : while (r != 0)
819 : : {
820 [ - + ]: 2140 : if (r == -1)
4346 fujii@postgresql.org 821 :UBC 0 : goto error;
4344 fujii@postgresql.org 822 [ + + ]:CBC 2140 : if (r == -2)
823 : : {
3763 magnus@hagander.net 824 : 156 : PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
825 : :
4344 fujii@postgresql.org 826 [ - + ]: 156 : if (res == NULL)
4344 fujii@postgresql.org 827 :UBC 0 : goto error;
503 tgl@sss.pgh.pa.us 828 :CBC 156 : PQfreemem(copybuf);
829 : 156 : return res;
830 : : }
831 : :
832 : : /* Check the message type. */
328 nathan@postgresql.or 833 [ - + ]:GNC 1984 : if (copybuf[0] == PqReplMsg_Keepalive)
834 : : {
3561 peter_e@gmx.net 835 [ # # ]:UBC 0 : if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
836 : : &last_status))
4344 fujii@postgresql.org 837 : 0 : goto error;
838 : : }
328 nathan@postgresql.or 839 [ + - ]:GNC 1984 : else if (copybuf[0] == PqReplMsg_WALData)
840 : : {
330 alvherre@kurilemu.de 841 [ - + ]: 1984 : if (!ProcessWALDataMsg(conn, stream, copybuf, r, &blockpos))
4344 fujii@postgresql.org 842 :UBC 0 : goto error;
843 : :
844 : : /*
845 : : * Check if we should continue streaming, or abort at this
846 : : * point.
847 : : */
2119 peter@eisentraut.org 848 [ - + ]:CBC 1984 : if (!CheckCopyStreamStop(conn, stream, blockpos))
4344 fujii@postgresql.org 849 :UBC 0 : goto error;
850 : : }
851 : : else
852 : : {
2647 peter@eisentraut.org 853 : 0 : pg_log_error("unrecognized streaming header: \"%c\"",
854 : : copybuf[0]);
5101 magnus@hagander.net 855 : 0 : goto error;
856 : : }
857 : :
858 : : /* Done with that message */
503 tgl@sss.pgh.pa.us 859 :CBC 1984 : PQfreemem(copybuf);
860 : 1984 : copybuf = NULL;
861 : :
862 : : /*
863 : : * Process the received data, and any subsequent data we can read
864 : : * without blocking.
865 : : */
3351 866 : 1984 : r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf);
867 : : }
868 : : }
869 : :
5101 magnus@hagander.net 870 :UBC 0 : error:
1404 peter@eisentraut.org 871 : 0 : PQfreemem(copybuf);
4810 rhaas@postgresql.org 872 : 0 : return NULL;
873 : : }
874 : :
875 : : /*
876 : : * Wait until we can read a CopyData message,
877 : : * or timeout, or occurrence of a signal or input on the stop_socket.
878 : : * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
879 : : *
880 : : * Returns 1 if data has become available for reading, 0 if timed out
881 : : * or interrupted by signal or stop_socket input, and -1 on an error.
882 : : */
883 : : static int
3351 tgl@sss.pgh.pa.us 884 :CBC 2879 : CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
885 : : {
886 : : int ret;
887 : : fd_set input_mask;
888 : : int connsocket;
889 : : int maxfd;
890 : : struct timeval timeout;
891 : : struct timeval *timeoutptr;
892 : :
893 : 2879 : connsocket = PQsocket(conn);
894 [ - + ]: 2879 : if (connsocket < 0)
895 : : {
2647 peter@eisentraut.org 896 :UBC 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
4379 fujii@postgresql.org 897 : 0 : return -1;
898 : : }
899 : :
4379 fujii@postgresql.org 900 [ + + ]:CBC 48943 : FD_ZERO(&input_mask);
3351 tgl@sss.pgh.pa.us 901 : 2879 : FD_SET(connsocket, &input_mask);
902 : 2879 : maxfd = connsocket;
903 [ + + ]: 2879 : if (stop_socket != PGINVALID_SOCKET)
904 : : {
905 : 2774 : FD_SET(stop_socket, &input_mask);
906 : 2774 : maxfd = Max(maxfd, stop_socket);
907 : : }
908 : :
4379 fujii@postgresql.org 909 [ + + ]: 2879 : if (timeout_ms < 0)
910 : 97 : timeoutptr = NULL;
911 : : else
912 : : {
913 : 2782 : timeout.tv_sec = timeout_ms / 1000L;
914 : 2782 : timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
915 : 2782 : timeoutptr = &timeout;
916 : : }
917 : :
3351 tgl@sss.pgh.pa.us 918 : 2879 : ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
919 : :
920 [ - + ]: 2879 : if (ret < 0)
921 : : {
3351 tgl@sss.pgh.pa.us 922 [ # # ]:UBC 0 : if (errno == EINTR)
923 : 0 : return 0; /* Got a signal, so not an error */
1894 peter@eisentraut.org 924 : 0 : pg_log_error("%s() failed: %m", "select");
4379 fujii@postgresql.org 925 : 0 : return -1;
926 : : }
3351 tgl@sss.pgh.pa.us 927 [ + + + + ]:CBC 2879 : if (ret > 0 && FD_ISSET(connsocket, &input_mask))
928 : 2321 : return 1; /* Got input on connection socket */
929 : :
930 : 558 : return 0; /* Got timeout or input on stop_socket */
931 : : }
932 : :
933 : : /*
934 : : * Receive CopyData message available from XLOG stream, blocking for
935 : : * maximum of 'timeout' ms.
936 : : *
937 : : * If data was received, returns the length of the data. *buffer is set to
938 : : * point to a buffer holding the received message. The caller must eventually
939 : : * free the buffer with PQfreemem().
940 : : *
941 : : * Returns 0 if no data was available within timeout, or if wait was
942 : : * interrupted by signal or stop_socket input.
943 : : * -1 on error. -2 if the server ended the COPY.
944 : : */
945 : : static int
946 : 3098 : CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
947 : : char **buffer)
948 : : {
4379 fujii@postgresql.org 949 : 3098 : char *copybuf = NULL;
950 : : int rawlen;
951 : :
952 : : /* Caller should have cleared any prior buffer */
503 tgl@sss.pgh.pa.us 953 [ - + ]: 3098 : Assert(*buffer == NULL);
954 : :
955 : : /* Try to receive a CopyData message */
4379 fujii@postgresql.org 956 : 3098 : rawlen = PQgetCopyData(conn, ©buf, 1);
957 [ + + ]: 3098 : if (rawlen == 0)
958 : : {
959 : : int ret;
960 : :
961 : : /*
962 : : * No data available. Wait for some to appear, but not longer than
963 : : * the specified timeout, so that we can ping the server. Also stop
964 : : * waiting if input appears on stop_socket.
965 : : */
3351 tgl@sss.pgh.pa.us 966 : 2879 : ret = CopyStreamPoll(conn, timeout, stop_socket);
967 [ + + ]: 2879 : if (ret <= 0)
968 : 558 : return ret;
969 : :
970 : : /* Now there is actually data on the socket */
4379 fujii@postgresql.org 971 [ - + ]: 2321 : if (PQconsumeInput(conn) == 0)
972 : : {
2647 peter@eisentraut.org 973 :UBC 0 : pg_log_error("could not receive data from WAL stream: %s",
974 : : PQerrorMessage(conn));
4379 fujii@postgresql.org 975 : 0 : return -1;
976 : : }
977 : :
978 : : /* Now that we've consumed some input, try again */
4379 fujii@postgresql.org 979 :CBC 2321 : rawlen = PQgetCopyData(conn, ©buf, 1);
980 [ + + ]: 2321 : if (rawlen == 0)
981 : 400 : return 0;
982 : : }
983 [ + + ]: 2140 : if (rawlen == -1) /* end-of-streaming or error */
984 : 156 : return -2;
985 [ - + ]: 1984 : if (rawlen == -2)
986 : : {
2647 peter@eisentraut.org 987 :UBC 0 : pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
4379 fujii@postgresql.org 988 : 0 : return -1;
989 : : }
990 : :
991 : : /* Return received messages to caller */
4379 fujii@postgresql.org 992 :CBC 1984 : *buffer = copybuf;
993 : 1984 : return rawlen;
994 : : }
995 : :
996 : : /*
997 : : * Process the keepalive message.
998 : : */
999 : : static bool
3561 peter_e@gmx.net 1000 :UBC 0 : ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1001 : : XLogRecPtr blockpos, TimestampTz *last_status)
1002 : : {
1003 : : int pos;
1004 : : bool replyRequested;
1005 : : TimestampTz now;
1006 : :
1007 : : /*
1008 : : * Parse the keepalive message, enclosed in the CopyData message. We just
1009 : : * check if the server requested a reply, and ignore the rest.
1010 : : */
328 nathan@postgresql.or 1011 :UNC 0 : pos = 1; /* skip msgtype PqReplMsg_Keepalive */
4056 bruce@momjian.us 1012 :UBC 0 : pos += 8; /* skip walEnd */
1013 : 0 : pos += 8; /* skip sendTime */
1014 : :
4346 fujii@postgresql.org 1015 [ # # ]: 0 : if (len < pos + 1)
1016 : : {
2647 peter@eisentraut.org 1017 : 0 : pg_log_error("streaming header too small: %d", len);
4346 fujii@postgresql.org 1018 : 0 : return false;
1019 : : }
1020 : 0 : replyRequested = copybuf[pos];
1021 : :
1022 : : /* If the server requested an immediate reply, send one. */
1023 [ # # # # ]: 0 : if (replyRequested && still_sending)
1024 : : {
4241 1025 [ # # # # ]: 0 : if (reportFlushPosition && lastFlushPosition < blockpos &&
3537 magnus@hagander.net 1026 [ # # ]: 0 : walfile != NULL)
1027 : : {
1028 : : /*
1029 : : * If a valid flush location needs to be reported, flush the
1030 : : * current WAL file so that the latest flush location is sent back
1031 : : * to the server. This is necessary to see whether the last WAL
1032 : : * data has been successfully replicated or not, at the normal
1033 : : * shutdown of the server.
1034 : : */
1380 rhaas@postgresql.org 1035 [ # # ]: 0 : if (stream->walmethod->ops->sync(walfile) != 0)
1544 tgl@sss.pgh.pa.us 1036 : 0 : pg_fatal("could not fsync file \"%s\": %s",
1037 : : walfile->pathname, GetLastWalMethodError(stream->walmethod));
4241 fujii@postgresql.org 1038 : 0 : lastFlushPosition = blockpos;
1039 : : }
1040 : :
4346 1041 : 0 : now = feGetCurrentTimestamp();
1042 [ # # ]: 0 : if (!sendFeedback(conn, blockpos, now, false))
1043 : 0 : return false;
1044 : 0 : *last_status = now;
1045 : : }
1046 : :
1047 : 0 : return true;
1048 : : }
1049 : :
1050 : : /*
1051 : : * Process WALData message.
1052 : : */
1053 : : static bool
330 alvherre@kurilemu.de 1054 :GNC 1984 : ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1055 : : XLogRecPtr *blockpos)
1056 : : {
1057 : : int xlogoff;
1058 : : int bytes_left;
1059 : : int bytes_written;
1060 : : int hdr_len;
1061 : :
1062 : : /*
1063 : : * Once we've decided we don't want to receive any more, just ignore any
1064 : : * subsequent WALData messages.
1065 : : */
4346 fujii@postgresql.org 1066 [ + + ]:CBC 1984 : if (!(still_sending))
1067 : 198 : return true;
1068 : :
1069 : : /*
1070 : : * Read the header of the WALData message, enclosed in the CopyData
1071 : : * message. We only need the WAL location field (dataStart), the rest of
1072 : : * the header is ignored.
1073 : : */
328 nathan@postgresql.or 1074 :GNC 1786 : hdr_len = 1; /* msgtype PqReplMsg_WALData */
4056 bruce@momjian.us 1075 :CBC 1786 : hdr_len += 8; /* dataStart */
1076 : 1786 : hdr_len += 8; /* walEnd */
1077 : 1786 : hdr_len += 8; /* sendTime */
4346 fujii@postgresql.org 1078 [ - + ]: 1786 : if (len < hdr_len)
1079 : : {
2647 peter@eisentraut.org 1080 :UBC 0 : pg_log_error("streaming header too small: %d", len);
4346 fujii@postgresql.org 1081 : 0 : return false;
1082 : : }
4346 fujii@postgresql.org 1083 :CBC 1786 : *blockpos = fe_recvint64(©buf[1]);
1084 : :
1085 : : /* Extract WAL location for this block */
3206 andres@anarazel.de 1086 : 1786 : xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1087 : :
1088 : : /*
1089 : : * Verify that the initial location in the stream matches where we think
1090 : : * we are.
1091 : : */
3537 magnus@hagander.net 1092 [ + + ]: 1786 : if (walfile == NULL)
1093 : : {
1094 : : /* No file open yet */
4346 fujii@postgresql.org 1095 [ - + ]: 162 : if (xlogoff != 0)
1096 : : {
2647 peter@eisentraut.org 1097 :UBC 0 : pg_log_error("received write-ahead log record for offset %u with no file open",
1098 : : xlogoff);
4346 fujii@postgresql.org 1099 : 0 : return false;
1100 : : }
1101 : : }
1102 : : else
1103 : : {
1104 : : /* More data in existing segment */
1380 rhaas@postgresql.org 1105 [ - + ]:CBC 1624 : if (walfile->currpos != xlogoff)
1106 : : {
2647 peter@eisentraut.org 1107 :UBC 0 : pg_log_error("got WAL data offset %08x, expected %08x",
1108 : : xlogoff, (int) walfile->currpos);
4346 fujii@postgresql.org 1109 : 0 : return false;
1110 : : }
1111 : : }
1112 : :
4346 fujii@postgresql.org 1113 :CBC 1786 : bytes_left = len - hdr_len;
1114 : 1786 : bytes_written = 0;
1115 : :
1116 [ + + ]: 3572 : while (bytes_left)
1117 : : {
1118 : : int bytes_to_write;
1119 : :
1120 : : /*
1121 : : * If crossing a WAL boundary, only write up until we reach wal
1122 : : * segment size.
1123 : : */
3206 andres@anarazel.de 1124 [ - + ]: 1786 : if (xlogoff + bytes_left > WalSegSz)
3206 andres@anarazel.de 1125 :UBC 0 : bytes_to_write = WalSegSz - xlogoff;
1126 : : else
4346 fujii@postgresql.org 1127 :CBC 1786 : bytes_to_write = bytes_left;
1128 : :
3537 magnus@hagander.net 1129 [ + + ]: 1786 : if (walfile == NULL)
1130 : : {
3763 1131 [ - + ]: 162 : if (!open_walfile(stream, *blockpos))
1132 : : {
1133 : : /* Error logged by open_walfile */
4346 fujii@postgresql.org 1134 :UBC 0 : return false;
1135 : : }
1136 : : }
1137 : :
1380 rhaas@postgresql.org 1138 :CBC 3572 : if (stream->walmethod->ops->write(walfile,
1139 : 1786 : copybuf + hdr_len + bytes_written,
1140 [ - + ]: 1786 : bytes_to_write) != bytes_to_write)
1141 : : {
1686 peter@eisentraut.org 1142 :UBC 0 : pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
1143 : : bytes_to_write, walfile->pathname,
1144 : : GetLastWalMethodError(stream->walmethod));
4346 fujii@postgresql.org 1145 : 0 : return false;
1146 : : }
1147 : :
1148 : : /* Write was successful, advance our position */
4346 fujii@postgresql.org 1149 :CBC 1786 : bytes_written += bytes_to_write;
1150 : 1786 : bytes_left -= bytes_to_write;
1151 : 1786 : *blockpos += bytes_to_write;
1152 : 1786 : xlogoff += bytes_to_write;
1153 : :
1154 : : /* Did we reach the end of a WAL segment? */
3206 andres@anarazel.de 1155 [ + + ]: 1786 : if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1156 : : {
3763 magnus@hagander.net 1157 [ - + ]: 11 : if (!close_walfile(stream, *blockpos))
1158 : : /* Error message written in close_walfile() */
4346 fujii@postgresql.org 1159 :UBC 0 : return false;
1160 : :
4346 fujii@postgresql.org 1161 :CBC 11 : xlogoff = 0;
1162 : :
3763 magnus@hagander.net 1163 [ + - - + ]: 11 : if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1164 : : {
4346 fujii@postgresql.org 1165 [ # # # # ]:UBC 0 : if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1166 : : {
2647 peter@eisentraut.org 1167 : 0 : pg_log_error("could not send copy-end packet: %s",
1168 : : PQerrorMessage(conn));
4346 fujii@postgresql.org 1169 : 0 : return false;
1170 : : }
1171 : 0 : still_sending = false;
330 alvherre@kurilemu.de 1172 :UNC 0 : return true; /* ignore the rest of this WALData packet */
1173 : : }
1174 : : }
1175 : : }
1176 : : /* No more data left to write, receive next copy packet */
1177 : :
4346 fujii@postgresql.org 1178 :CBC 1786 : return true;
1179 : : }
1180 : :
1181 : : /*
1182 : : * Handle end of the copy stream.
1183 : : */
1184 : : static PGresult *
3763 magnus@hagander.net 1185 : 156 : HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
1186 : : XLogRecPtr blockpos, XLogRecPtr *stoppos)
1187 : : {
4346 fujii@postgresql.org 1188 : 156 : PGresult *res = PQgetResult(conn);
1189 : :
1190 : : /*
1191 : : * The server closed its end of the copy stream. If we haven't closed
1192 : : * ours already, we need to do so now, unless the server threw an error,
1193 : : * in which case we don't.
1194 : : */
1195 [ + + ]: 156 : if (still_sending)
1196 : : {
3763 magnus@hagander.net 1197 [ - + ]: 2 : if (!close_walfile(stream, blockpos))
1198 : : {
1199 : : /* Error message written in close_walfile() */
4346 fujii@postgresql.org 1200 :UBC 0 : PQclear(res);
1201 : 0 : return NULL;
1202 : : }
4346 fujii@postgresql.org 1203 [ + + ]:CBC 2 : if (PQresultStatus(res) == PGRES_COPY_IN)
1204 : : {
1205 [ + - - + ]: 1 : if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1206 : : {
2647 peter@eisentraut.org 1207 :UBC 0 : pg_log_error("could not send copy-end packet: %s",
1208 : : PQerrorMessage(conn));
4346 fujii@postgresql.org 1209 : 0 : PQclear(res);
1210 : 0 : return NULL;
1211 : : }
4346 fujii@postgresql.org 1212 :CBC 1 : res = PQgetResult(conn);
1213 : : }
1214 : 2 : still_sending = false;
1215 : : }
1216 : 156 : *stoppos = blockpos;
1217 : 156 : return res;
1218 : : }
1219 : :
1220 : : /*
1221 : : * Check if we should continue streaming, or abort at this point.
1222 : : */
1223 : : static bool
2119 peter@eisentraut.org 1224 : 3098 : CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
1225 : : {
3763 magnus@hagander.net 1226 [ + + + + ]: 3098 : if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1227 : : {
1228 [ - + ]: 154 : if (!close_walfile(stream, blockpos))
1229 : : {
1230 : : /* Potential error message is written by close_walfile */
4344 fujii@postgresql.org 1231 :UBC 0 : return false;
1232 : : }
4344 fujii@postgresql.org 1233 [ + - - + ]:CBC 154 : if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1234 : : {
2647 peter@eisentraut.org 1235 :UBC 0 : pg_log_error("could not send copy-end packet: %s",
1236 : : PQerrorMessage(conn));
4344 fujii@postgresql.org 1237 : 0 : return false;
1238 : : }
4344 fujii@postgresql.org 1239 :CBC 154 : still_sending = false;
1240 : : }
1241 : :
1242 : 3098 : return true;
1243 : : }
1244 : :
1245 : : /*
1246 : : * Calculate how long send/receive loops should sleep
1247 : : */
1248 : : static long
3414 tgl@sss.pgh.pa.us 1249 : 1114 : CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
1250 : : TimestampTz last_status)
1251 : : {
1252 : 1114 : TimestampTz status_targettime = 0;
1253 : : long sleeptime;
1254 : :
4344 fujii@postgresql.org 1255 [ + - + + ]: 1114 : if (standby_message_timeout && still_sending)
1256 : 1017 : status_targettime = last_status +
1257 : 1017 : (standby_message_timeout - 1) * ((int64) 1000);
1258 : :
4242 1259 [ + + ]: 1114 : if (status_targettime > 0)
1260 : : {
1261 : : long secs;
1262 : : int usecs;
1263 : :
4344 1264 : 1017 : feTimestampDifference(now,
1265 : : status_targettime,
1266 : : &secs,
1267 : : &usecs);
1268 : : /* Always sleep at least 1 sec */
1269 [ - + ]: 1017 : if (secs <= 0)
1270 : : {
4344 fujii@postgresql.org 1271 :UBC 0 : secs = 1;
1272 : 0 : usecs = 0;
1273 : : }
1274 : :
4344 fujii@postgresql.org 1275 :CBC 1017 : sleeptime = secs * 1000 + usecs / 1000;
1276 : : }
1277 : : else
1278 : 97 : sleeptime = -1;
1279 : :
1280 : 1114 : return sleeptime;
1281 : : }
|