LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - receivelog.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 64.0 % 433 277
Test Date: 2026-06-18 00:16:42 Functions: 94.1 % 17 16
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * receivelog.c - receive WAL files using the streaming
       4              :  *                replication protocol.
       5              :  *
       6              :  * Author: Magnus Hagander <magnus@hagander.net>
       7              :  *
       8              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *        src/bin/pg_basebackup/receivelog.c
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres_fe.h"
      16              : 
      17              : #include <sys/select.h>
      18              : #include <sys/stat.h>
      19              : #include <unistd.h>
      20              : 
      21              : #include "access/xlog_internal.h"
      22              : #include "common/logging.h"
      23              : #include "libpq-fe.h"
      24              : #include "libpq/protocol.h"
      25              : #include "receivelog.h"
      26              : #include "streamutil.h"
      27              : 
      28              : /* currently open WAL file */
      29              : static Walfile *walfile = NULL;
      30              : static bool reportFlushPosition = false;
      31              : static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
      32              : 
      33              : static bool still_sending = true;   /* feedback still needs to be sent? */
      34              : 
      35              : static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
      36              :                                   XLogRecPtr *stoppos);
      37              : static int  CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
      38              : static int  CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
      39              :                               char **buffer);
      40              : static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
      41              :                                 int len, XLogRecPtr blockpos, TimestampTz *last_status);
      42              : static bool ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
      43              :                               XLogRecPtr *blockpos);
      44              : static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
      45              :                                        XLogRecPtr blockpos, XLogRecPtr *stoppos);
      46              : static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
      47              : static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
      48              :                                          TimestampTz last_status);
      49              : 
      50              : static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
      51              :                                      uint32 *timeline);
      52              : 
      53              : static bool
      54            9 : mark_file_as_archived(StreamCtl *stream, const char *fname)
      55              : {
      56              :     Walfile    *f;
      57              :     static char tmppath[MAXPGPATH];
      58              : 
      59            9 :     snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
      60              :              fname);
      61              : 
      62            9 :     f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
      63              :                                                NULL, 0);
      64            9 :     if (f == NULL)
      65              :     {
      66            0 :         pg_log_error("could not create archive status file \"%s\": %s",
      67              :                      tmppath, GetLastWalMethodError(stream->walmethod));
      68            0 :         return false;
      69              :     }
      70              : 
      71            9 :     if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
      72              :     {
      73            0 :         pg_log_error("could not close archive status file \"%s\": %s",
      74              :                      tmppath, GetLastWalMethodError(stream->walmethod));
      75            0 :         return false;
      76              :     }
      77              : 
      78            9 :     return true;
      79              : }
      80              : 
      81              : /*
      82              :  * Open a new WAL file in the specified directory.
      83              :  *
      84              :  * Returns true if OK; on failure, returns false after printing an error msg.
      85              :  * On success, 'walfile' is set to the opened WAL file.
      86              :  *
      87              :  * The file will be padded to 16Mb with zeroes.
      88              :  */
      89              : static bool
      90          162 : open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
      91              : {
      92              :     Walfile    *f;
      93              :     char       *fn;
      94              :     ssize_t     size;
      95              :     XLogSegNo   segno;
      96              :     char        walfile_name[MAXPGPATH];
      97              : 
      98          162 :     XLByteToSeg(startpoint, segno, WalSegSz);
      99          162 :     XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
     100              : 
     101              :     /* Note that this considers the compression used if necessary */
     102          162 :     fn = stream->walmethod->ops->get_file_name(stream->walmethod,
     103              :                                                walfile_name,
     104          162 :                                                stream->partial_suffix);
     105              : 
     106              :     /*
     107              :      * When streaming to files, if an existing file exists we verify that it's
     108              :      * either empty (just created), or a complete WalSegSz segment (in which
     109              :      * case it has been created and padded). Anything else indicates a corrupt
     110              :      * file. Compressed files have no need for padding, so just ignore this
     111              :      * case.
     112              :      *
     113              :      * When streaming to tar, no file with this name will exist before, so we
     114              :      * never have to verify a size.
     115              :      */
     116          317 :     if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE &&
     117          155 :         stream->walmethod->ops->existsfile(stream->walmethod, fn))
     118              :     {
     119            0 :         size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
     120            0 :         if (size < 0)
     121              :         {
     122            0 :             pg_log_error("could not get size of write-ahead log file \"%s\": %s",
     123              :                          fn, GetLastWalMethodError(stream->walmethod));
     124            0 :             pg_free(fn);
     125            0 :             return false;
     126              :         }
     127            0 :         if (size == WalSegSz)
     128              :         {
     129              :             /* Already padded file. Open it for use */
     130            0 :             f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
     131            0 :             if (f == NULL)
     132              :             {
     133            0 :                 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
     134              :                              fn, GetLastWalMethodError(stream->walmethod));
     135            0 :                 pg_free(fn);
     136            0 :                 return false;
     137              :             }
     138              : 
     139              :             /* fsync file in case of a previous crash */
     140            0 :             if (stream->walmethod->ops->sync(f) != 0)
     141              :             {
     142            0 :                 pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
     143              :                              fn, GetLastWalMethodError(stream->walmethod));
     144            0 :                 stream->walmethod->ops->close(f, CLOSE_UNLINK);
     145            0 :                 exit(1);
     146              :             }
     147              : 
     148            0 :             walfile = f;
     149            0 :             pg_free(fn);
     150            0 :             return true;
     151              :         }
     152            0 :         if (size != 0)
     153              :         {
     154              :             /* if write didn't set errno, assume problem is no disk space */
     155            0 :             if (errno == 0)
     156            0 :                 errno = ENOSPC;
     157            0 :             pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
     158              :                                   "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
     159              :                                   size),
     160              :                          fn, size, WalSegSz);
     161            0 :             pg_free(fn);
     162            0 :             return false;
     163              :         }
     164              :         /* File existed and was empty, so fall through and open */
     165              :     }
     166              : 
     167              :     /* No file existed, so create one */
     168              : 
     169          162 :     f = stream->walmethod->ops->open_for_write(stream->walmethod,
     170              :                                                walfile_name,
     171          162 :                                                stream->partial_suffix,
     172              :                                                WalSegSz);
     173          162 :     if (f == NULL)
     174              :     {
     175            0 :         pg_log_error("could not open write-ahead log file \"%s\": %s",
     176              :                      fn, GetLastWalMethodError(stream->walmethod));
     177            0 :         pg_free(fn);
     178            0 :         return false;
     179              :     }
     180              : 
     181          162 :     pg_free(fn);
     182          162 :     walfile = f;
     183          162 :     return true;
     184              : }
     185              : 
     186              : /*
     187              :  * Close the current WAL file (if open), and rename it to the correct
     188              :  * filename if it's complete. On failure, prints an error message to stderr
     189              :  * and returns false, otherwise returns true.
     190              :  */
     191              : static bool
     192          167 : close_walfile(StreamCtl *stream, XLogRecPtr pos)
     193              : {
     194              :     char       *fn;
     195              :     pgoff_t     currpos;
     196              :     int         r;
     197              :     char        walfile_name[MAXPGPATH];
     198              : 
     199          167 :     if (walfile == NULL)
     200            5 :         return true;
     201              : 
     202          162 :     strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
     203          162 :     currpos = walfile->currpos;
     204              : 
     205              :     /* Note that this considers the compression used if necessary */
     206          162 :     fn = stream->walmethod->ops->get_file_name(stream->walmethod,
     207              :                                                walfile_name,
     208          162 :                                                stream->partial_suffix);
     209              : 
     210          162 :     if (stream->partial_suffix)
     211              :     {
     212           12 :         if (currpos == WalSegSz)
     213            6 :             r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
     214              :         else
     215              :         {
     216            6 :             pg_log_info("not renaming \"%s\", segment is not complete", fn);
     217            6 :             r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
     218              :         }
     219              :     }
     220              :     else
     221          150 :         r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
     222              : 
     223          162 :     walfile = NULL;
     224              : 
     225          162 :     if (r != 0)
     226              :     {
     227            0 :         pg_log_error("could not close file \"%s\": %s",
     228              :                      fn, GetLastWalMethodError(stream->walmethod));
     229              : 
     230            0 :         pg_free(fn);
     231            0 :         return false;
     232              :     }
     233              : 
     234          162 :     pg_free(fn);
     235              : 
     236              :     /*
     237              :      * Mark file as archived if requested by the caller - pg_basebackup needs
     238              :      * to do so as files can otherwise get archived again after promotion of a
     239              :      * new node. This is in line with walreceiver.c always doing a
     240              :      * XLogArchiveForceDone() after a complete segment.
     241              :      */
     242          162 :     if (currpos == WalSegSz && stream->mark_done)
     243              :     {
     244              :         /* writes error message if failed */
     245            5 :         if (!mark_file_as_archived(stream, walfile_name))
     246            0 :             return false;
     247              :     }
     248              : 
     249          162 :     lastFlushPosition = pos;
     250          162 :     return true;
     251              : }
     252              : 
     253              : 
     254              : /*
     255              :  * Check if a timeline history file exists.
     256              :  */
     257              : static bool
     258          157 : existsTimeLineHistoryFile(StreamCtl *stream)
     259              : {
     260              :     char        histfname[MAXFNAMELEN];
     261              : 
     262              :     /*
     263              :      * Timeline 1 never has a history file. We treat that as if it existed,
     264              :      * since we never need to stream it.
     265              :      */
     266          157 :     if (stream->timeline == 1)
     267          152 :         return true;
     268              : 
     269            5 :     TLHistoryFileName(histfname, stream->timeline);
     270              : 
     271            5 :     return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
     272              : }
     273              : 
     274              : static bool
     275            5 : writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
     276              : {
     277            5 :     int         size = strlen(content);
     278              :     char        histfname[MAXFNAMELEN];
     279              :     Walfile    *f;
     280              : 
     281              :     /*
     282              :      * Check that the server's idea of how timeline history files should be
     283              :      * named matches ours.
     284              :      */
     285            5 :     TLHistoryFileName(histfname, stream->timeline);
     286            5 :     if (strcmp(histfname, filename) != 0)
     287              :     {
     288            0 :         pg_log_error("server reported unexpected history file name for timeline %u: %s",
     289              :                      stream->timeline, filename);
     290            0 :         return false;
     291              :     }
     292              : 
     293            5 :     f = stream->walmethod->ops->open_for_write(stream->walmethod,
     294              :                                                histfname, ".tmp", 0);
     295            5 :     if (f == NULL)
     296              :     {
     297            0 :         pg_log_error("could not create timeline history file \"%s\": %s",
     298              :                      histfname, GetLastWalMethodError(stream->walmethod));
     299            0 :         return false;
     300              :     }
     301              : 
     302            5 :     if ((int) stream->walmethod->ops->write(f, content, size) != size)
     303              :     {
     304            0 :         pg_log_error("could not write timeline history file \"%s\": %s",
     305              :                      histfname, GetLastWalMethodError(stream->walmethod));
     306              : 
     307              :         /*
     308              :          * If we fail to make the file, delete it to release disk space
     309              :          */
     310            0 :         stream->walmethod->ops->close(f, CLOSE_UNLINK);
     311              : 
     312            0 :         return false;
     313              :     }
     314              : 
     315            5 :     if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
     316              :     {
     317            0 :         pg_log_error("could not close file \"%s\": %s",
     318              :                      histfname, GetLastWalMethodError(stream->walmethod));
     319            0 :         return false;
     320              :     }
     321              : 
     322              :     /* Maintain archive_status, check close_walfile() for details. */
     323            5 :     if (stream->mark_done)
     324              :     {
     325              :         /* writes error message if failed */
     326            4 :         if (!mark_file_as_archived(stream, histfname))
     327            0 :             return false;
     328              :     }
     329              : 
     330            5 :     return true;
     331              : }
     332              : 
     333              : /*
     334              :  * Send a Standby Status Update message to server.
     335              :  */
     336              : static bool
     337          156 : sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
     338              : {
     339              :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
     340          156 :     int         len = 0;
     341              : 
     342          156 :     replybuf[len] = PqReplMsg_StandbyStatusUpdate;
     343          156 :     len += 1;
     344          156 :     fe_sendint64(blockpos, &replybuf[len]); /* write */
     345          156 :     len += 8;
     346          156 :     if (reportFlushPosition)
     347          152 :         fe_sendint64(lastFlushPosition, &replybuf[len]);    /* flush */
     348              :     else
     349            4 :         fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* flush */
     350          156 :     len += 8;
     351          156 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
     352          156 :     len += 8;
     353          156 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
     354          156 :     len += 8;
     355          156 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
     356          156 :     len += 1;
     357              : 
     358          156 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
     359              :     {
     360            0 :         pg_log_error("could not send feedback packet: %s",
     361              :                      PQerrorMessage(conn));
     362            0 :         return false;
     363              :     }
     364              : 
     365          156 :     return true;
     366              : }
     367              : 
     368              : /*
     369              :  * Check that the server version we're connected to is supported by
     370              :  * ReceiveXlogStream().
     371              :  *
     372              :  * If it's not, an error message is printed to stderr, and false is returned.
     373              :  */
     374              : bool
     375          333 : CheckServerVersionForStreaming(PGconn *conn)
     376              : {
     377              :     int         minServerMajor,
     378              :                 maxServerMajor;
     379              :     int         serverMajor;
     380              : 
     381              :     /*
     382              :      * The message format used in streaming replication changed in 9.3, so we
     383              :      * cannot stream from older servers. And we don't support servers newer
     384              :      * than the client; it might work, but we don't know, so err on the safe
     385              :      * side.
     386              :      */
     387          333 :     minServerMajor = 903;
     388          333 :     maxServerMajor = PG_VERSION_NUM / 100;
     389          333 :     serverMajor = PQserverVersion(conn) / 100;
     390          333 :     if (serverMajor < minServerMajor)
     391              :     {
     392            0 :         const char *serverver = PQparameterStatus(conn, "server_version");
     393              : 
     394            0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
     395              :                      serverver ? serverver : "'unknown'",
     396              :                      "9.3");
     397            0 :         return false;
     398              :     }
     399          333 :     else if (serverMajor > maxServerMajor)
     400              :     {
     401            0 :         const char *serverver = PQparameterStatus(conn, "server_version");
     402              : 
     403            0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
     404              :                      serverver ? serverver : "'unknown'",
     405              :                      PG_VERSION);
     406            0 :         return false;
     407              :     }
     408          333 :     return true;
     409              : }
     410              : 
     411              : /*
     412              :  * Receive a log stream starting at the specified position.
     413              :  *
     414              :  * Individual parameters are passed through the StreamCtl structure.
     415              :  *
     416              :  * If sysidentifier is specified, validate that both the system
     417              :  * identifier and the timeline matches the specified ones
     418              :  * (by sending an extra IDENTIFY_SYSTEM command)
     419              :  *
     420              :  * All received segments will be written to the directory
     421              :  * specified by basedir. This will also fetch any missing timeline history
     422              :  * files.
     423              :  *
     424              :  * The stream_stop callback will be called every time data
     425              :  * is received, and whenever a segment is completed. If it returns
     426              :  * true, the streaming will stop and the function
     427              :  * return. As long as it returns false, streaming will continue
     428              :  * indefinitely.
     429              :  *
     430              :  * If stream_stop() checks for external input, stop_socket should be set to
     431              :  * the FD it checks.  This will allow such input to be detected promptly
     432              :  * rather than after standby_message_timeout (which might be indefinite).
     433              :  * Note that signals will interrupt waits for input as well, but that is
     434              :  * race-y since a signal received while busy won't interrupt the wait.
     435              :  *
     436              :  * standby_message_timeout controls how often we send a message
     437              :  * back to the primary letting it know our progress, in milliseconds.
     438              :  * Zero means no messages are sent.
     439              :  * This message will only contain the write location, and never
     440              :  * flush or replay.
     441              :  *
     442              :  * If 'partial_suffix' is not NULL, files are initially created with the
     443              :  * given suffix, and the suffix is removed once the file is finished. That
     444              :  * allows you to tell the difference between partial and completed files,
     445              :  * so that you can continue later where you left.
     446              :  *
     447              :  * If 'synchronous' is true, the received WAL is flushed as soon as written,
     448              :  * otherwise only when the WAL file is closed.
     449              :  *
     450              :  * Note: The WAL location *must* be at a log segment start!
     451              :  */
     452              : bool
     453          156 : ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
     454              : {
     455              :     PQExpBuffer query;
     456              :     PGresult   *res;
     457              :     XLogRecPtr  stoppos;
     458              : 
     459              :     /*
     460              :      * The caller should've checked the server version already, but doesn't do
     461              :      * any harm to check it here too.
     462              :      */
     463          156 :     if (!CheckServerVersionForStreaming(conn))
     464            0 :         return false;
     465              : 
     466              :     /*
     467              :      * Decide whether we want to report the flush position. If we report the
     468              :      * flush position, the primary will know what WAL we'll possibly
     469              :      * re-request, and it can then remove older WAL safely. We must always do
     470              :      * that when we are using slots.
     471              :      *
     472              :      * Reporting the flush position makes one eligible as a synchronous
     473              :      * replica. People shouldn't include generic names in
     474              :      * synchronous_standby_names, but we've protected them against it so far,
     475              :      * so let's continue to do so unless specifically requested.
     476              :      */
     477          156 :     if (stream->replication_slot != NULL)
     478              :     {
     479          151 :         reportFlushPosition = true;
     480              :     }
     481              :     else
     482              :     {
     483            5 :         if (stream->synchronous)
     484            1 :             reportFlushPosition = true;
     485              :         else
     486            4 :             reportFlushPosition = false;
     487              :     }
     488              : 
     489          156 :     if (stream->sysidentifier != NULL)
     490              :     {
     491          156 :         char       *sysidentifier = NULL;
     492              :         TimeLineID  servertli;
     493              : 
     494              :         /*
     495              :          * Get the server system identifier and timeline, and validate them.
     496              :          */
     497          156 :         if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
     498              :         {
     499            0 :             pg_free(sysidentifier);
     500            0 :             return false;
     501              :         }
     502              : 
     503          156 :         if (strcmp(stream->sysidentifier, sysidentifier) != 0)
     504              :         {
     505            0 :             pg_log_error("system identifier does not match between base backup and streaming connection");
     506            0 :             pg_free(sysidentifier);
     507            0 :             return false;
     508              :         }
     509          156 :         pg_free(sysidentifier);
     510              : 
     511          156 :         if (stream->timeline > servertli)
     512              :         {
     513            0 :             pg_log_error("starting timeline %u is not present in the server",
     514              :                          stream->timeline);
     515            0 :             return false;
     516              :         }
     517              :     }
     518              : 
     519              :     /*
     520              :      * initialize flush position to starting point, it's the caller's
     521              :      * responsibility that that's sane.
     522              :      */
     523          156 :     lastFlushPosition = stream->startpos;
     524              : 
     525              :     while (1)
     526            1 :     {
     527              :         /*
     528              :          * Fetch the timeline history file for this timeline, if we don't have
     529              :          * it already. When streaming log to tar, this will always return
     530              :          * false, as we are never streaming into an existing file and
     531              :          * therefore there can be no pre-existing timeline history file.
     532              :          */
     533          157 :         if (!existsTimeLineHistoryFile(stream))
     534              :         {
     535            5 :             query = createPQExpBuffer();
     536            5 :             appendPQExpBuffer(query, "TIMELINE_HISTORY %u", stream->timeline);
     537            5 :             res = PQexec(conn, query->data);
     538            5 :             destroyPQExpBuffer(query);
     539            5 :             if (PQresultStatus(res) != PGRES_TUPLES_OK)
     540              :             {
     541              :                 /* FIXME: we might send it ok, but get an error */
     542            0 :                 pg_log_error("could not send replication command \"%s\": %s",
     543              :                              "TIMELINE_HISTORY", PQresultErrorMessage(res));
     544            0 :                 PQclear(res);
     545            0 :                 return false;
     546              :             }
     547              : 
     548              :             /*
     549              :              * The response to TIMELINE_HISTORY is a single row result set
     550              :              * with two fields: filename and content
     551              :              */
     552            5 :             if (PQnfields(res) != 2 || PQntuples(res) != 1)
     553              :             {
     554            0 :                 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
     555              :                                PQntuples(res), PQnfields(res), 1, 2);
     556              :             }
     557              : 
     558              :             /* Write the history file to disk */
     559            5 :             writeTimeLineHistoryFile(stream,
     560              :                                      PQgetvalue(res, 0, 0),
     561              :                                      PQgetvalue(res, 0, 1));
     562              : 
     563            5 :             PQclear(res);
     564              :         }
     565              : 
     566              :         /*
     567              :          * Before we start streaming from the requested location, check if the
     568              :          * callback tells us to stop here.
     569              :          */
     570          157 :         if (stream->stream_stop(stream->startpos, stream->timeline, false))
     571            0 :             return true;
     572              : 
     573              :         /* Initiate the replication stream at specified location */
     574          157 :         query = createPQExpBuffer();
     575          157 :         appendPQExpBufferStr(query, "START_REPLICATION");
     576          157 :         if (stream->replication_slot != NULL)
     577              :         {
     578          152 :             appendPQExpBufferStr(query, " SLOT ");
     579          152 :             AppendQuotedIdentifier(query, stream->replication_slot);
     580              :         }
     581          157 :         appendPQExpBuffer(query, " %X/%08X TIMELINE %u",
     582          157 :                           LSN_FORMAT_ARGS(stream->startpos),
     583              :                           stream->timeline);
     584          157 :         res = PQexec(conn, query->data);
     585          157 :         destroyPQExpBuffer(query);
     586          157 :         if (PQresultStatus(res) != PGRES_COPY_BOTH)
     587              :         {
     588            1 :             pg_log_error("could not send replication command \"%s\": %s",
     589              :                          "START_REPLICATION", PQresultErrorMessage(res));
     590            1 :             PQclear(res);
     591            1 :             return false;
     592              :         }
     593          156 :         PQclear(res);
     594              : 
     595              :         /* Stream the WAL */
     596          156 :         res = HandleCopyStream(conn, stream, &stoppos);
     597          156 :         if (res == NULL)
     598            0 :             goto error;
     599              : 
     600              :         /*
     601              :          * Streaming finished.
     602              :          *
     603              :          * There are two possible reasons for that: a controlled shutdown, or
     604              :          * we reached the end of the current timeline. In case of
     605              :          * end-of-timeline, the server sends a result set after Copy has
     606              :          * finished, containing information about the next timeline. Read
     607              :          * that, and restart streaming from the next timeline. In case of
     608              :          * controlled shutdown, stop here.
     609              :          */
     610          156 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
     611            1 :         {
     612              :             /*
     613              :              * End-of-timeline. Read the next timeline's ID and starting
     614              :              * position. Usually, the starting position will match the end of
     615              :              * the previous timeline, but there are corner cases like if the
     616              :              * server had sent us half of a WAL record, when it was promoted.
     617              :              * The new timeline will begin at the end of the last complete
     618              :              * record in that case, overlapping the partial WAL record on the
     619              :              * old timeline.
     620              :              */
     621              :             uint32      newtimeline;
     622              :             bool        parsed;
     623              : 
     624            1 :             parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
     625            1 :             PQclear(res);
     626            1 :             if (!parsed)
     627            0 :                 goto error;
     628              : 
     629              :             /* Sanity check the values the server gave us */
     630            1 :             if (newtimeline <= stream->timeline)
     631              :             {
     632            0 :                 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
     633              :                              newtimeline, stream->timeline);
     634            0 :                 goto error;
     635              :             }
     636            1 :             if (stream->startpos > stoppos)
     637              :             {
     638            0 :                 pg_log_error("server stopped streaming timeline %u at %X/%08X, but reported next timeline %u to begin at %X/%08X",
     639              :                              stream->timeline, LSN_FORMAT_ARGS(stoppos),
     640              :                              newtimeline, LSN_FORMAT_ARGS(stream->startpos));
     641            0 :                 goto error;
     642              :             }
     643              : 
     644              :             /* Read the final result, which should be CommandComplete. */
     645            1 :             res = PQgetResult(conn);
     646            1 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
     647              :             {
     648            0 :                 pg_log_error("unexpected termination of replication stream: %s",
     649              :                              PQresultErrorMessage(res));
     650            0 :                 PQclear(res);
     651            0 :                 goto error;
     652              :             }
     653            1 :             PQclear(res);
     654              : 
     655              :             /*
     656              :              * Loop back to start streaming from the new timeline. Always
     657              :              * start streaming at the beginning of a segment.
     658              :              */
     659            1 :             stream->timeline = newtimeline;
     660            1 :             stream->startpos = stream->startpos -
     661            1 :                 XLogSegmentOffset(stream->startpos, WalSegSz);
     662            1 :             continue;
     663              :         }
     664          155 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     665              :         {
     666          154 :             PQclear(res);
     667              : 
     668              :             /*
     669              :              * End of replication (ie. controlled shut down of the server).
     670              :              *
     671              :              * Check if the callback thinks it's OK to stop here. If not,
     672              :              * complain.
     673              :              */
     674          154 :             if (stream->stream_stop(stoppos, stream->timeline, false))
     675          154 :                 return true;
     676              :             else
     677              :             {
     678            0 :                 pg_log_error("replication stream was terminated before stop point");
     679            0 :                 goto error;
     680              :             }
     681              :         }
     682              :         else
     683              :         {
     684              :             /* Server returned an error. */
     685            1 :             pg_log_error("unexpected termination of replication stream: %s",
     686              :                          PQresultErrorMessage(res));
     687            1 :             PQclear(res);
     688            1 :             goto error;
     689              :         }
     690              :     }
     691              : 
     692            1 : error:
     693            1 :     if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
     694            0 :         pg_log_error("could not close file \"%s\": %s",
     695              :                      walfile->pathname, GetLastWalMethodError(stream->walmethod));
     696            1 :     walfile = NULL;
     697            1 :     return false;
     698              : }
     699              : 
     700              : /*
     701              :  * Helper function to parse the result set returned by server after streaming
     702              :  * has finished. On failure, prints an error to stderr and returns false.
     703              :  */
     704              : static bool
     705            1 : ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
     706              : {
     707              :     uint32      startpos_xlogid,
     708              :                 startpos_xrecoff;
     709              : 
     710              :     /*----------
     711              :      * The result set consists of one row and two columns, e.g:
     712              :      *
     713              :      *  next_tli | next_tli_startpos
     714              :      * ----------+-------------------
     715              :      *         4 | 0/9949AE0
     716              :      *
     717              :      * next_tli is the timeline ID of the next timeline after the one that
     718              :      * just finished streaming. next_tli_startpos is the WAL location where
     719              :      * the server switched to it.
     720              :      *----------
     721              :      */
     722            1 :     if (PQnfields(res) < 2 || PQntuples(res) != 1)
     723              :     {
     724            0 :         pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
     725              :                      PQntuples(res), PQnfields(res), 1, 2);
     726            0 :         return false;
     727              :     }
     728              : 
     729            1 :     *timeline = atoi(PQgetvalue(res, 0, 0));
     730            1 :     if (sscanf(PQgetvalue(res, 0, 1), "%X/%08X", &startpos_xlogid,
     731              :                &startpos_xrecoff) != 2)
     732              :     {
     733            0 :         pg_log_error("could not parse next timeline's starting point \"%s\"",
     734              :                      PQgetvalue(res, 0, 1));
     735            0 :         return false;
     736              :     }
     737            1 :     *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
     738              : 
     739            1 :     return true;
     740              : }
     741              : 
     742              : /*
     743              :  * The main loop of ReceiveXlogStream. Handles the COPY stream after
     744              :  * initiating streaming with the START_REPLICATION command.
     745              :  *
     746              :  * If the COPY ends (not necessarily successfully) due a message from the
     747              :  * server, returns a PGresult and sets *stoppos to the last byte written.
     748              :  * On any other sort of error, returns NULL.
     749              :  */
     750              : static PGresult *
     751          156 : HandleCopyStream(PGconn *conn, StreamCtl *stream,
     752              :                  XLogRecPtr *stoppos)
     753              : {
     754          156 :     char       *copybuf = NULL;
     755          156 :     TimestampTz last_status = -1;
     756          156 :     XLogRecPtr  blockpos = stream->startpos;
     757              : 
     758          156 :     still_sending = true;
     759              : 
     760              :     while (1)
     761          478 :     {
     762              :         int         r;
     763              :         TimestampTz now;
     764              :         long        sleeptime;
     765              : 
     766              :         /*
     767              :          * Check if we should continue streaming, or abort at this point.
     768              :          */
     769          634 :         if (!CheckCopyStreamStop(conn, stream, blockpos))
     770            0 :             goto error;
     771              : 
     772          634 :         now = feGetCurrentTimestamp();
     773              : 
     774              :         /*
     775              :          * If synchronous option is true, issue sync command as soon as there
     776              :          * are WAL data which has not been flushed yet.
     777              :          */
     778          634 :         if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
     779              :         {
     780            0 :             if (stream->walmethod->ops->sync(walfile) != 0)
     781            0 :                 pg_fatal("could not fsync file \"%s\": %s",
     782              :                          walfile->pathname, GetLastWalMethodError(stream->walmethod));
     783            0 :             lastFlushPosition = blockpos;
     784              : 
     785              :             /*
     786              :              * Send feedback so that the server sees the latest WAL locations
     787              :              * immediately.
     788              :              */
     789            0 :             if (!sendFeedback(conn, blockpos, now, false))
     790            0 :                 goto error;
     791            0 :             last_status = now;
     792              :         }
     793              : 
     794              :         /*
     795              :          * Potentially send a status message to the primary
     796              :          */
     797         1205 :         if (still_sending && stream->standby_message_timeout > 0 &&
     798          571 :             feTimestampDifferenceExceeds(last_status, now,
     799              :                                          stream->standby_message_timeout))
     800              :         {
     801              :             /* Time to send feedback! */
     802          156 :             if (!sendFeedback(conn, blockpos, now, false))
     803            0 :                 goto error;
     804          156 :             last_status = now;
     805              :         }
     806              : 
     807              :         /*
     808              :          * Calculate how long send/receive loops should sleep
     809              :          */
     810          634 :         sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
     811              :                                                  last_status);
     812              : 
     813              :         /* Done with any prior message */
     814          634 :         PQfreemem(copybuf);
     815          634 :         copybuf = NULL;
     816              : 
     817          634 :         r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
     818         2387 :         while (r != 0)
     819              :         {
     820         1909 :             if (r == -1)
     821            0 :                 goto error;
     822         1909 :             if (r == -2)
     823              :             {
     824          156 :                 PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
     825              : 
     826          156 :                 if (res == NULL)
     827            0 :                     goto error;
     828          156 :                 PQfreemem(copybuf);
     829          156 :                 return res;
     830              :             }
     831              : 
     832              :             /* Check the message type. */
     833         1753 :             if (copybuf[0] == PqReplMsg_Keepalive)
     834              :             {
     835            0 :                 if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
     836              :                                          &last_status))
     837            0 :                     goto error;
     838              :             }
     839         1753 :             else if (copybuf[0] == PqReplMsg_WALData)
     840              :             {
     841         1753 :                 if (!ProcessWALDataMsg(conn, stream, copybuf, r, &blockpos))
     842            0 :                     goto error;
     843              : 
     844              :                 /*
     845              :                  * Check if we should continue streaming, or abort at this
     846              :                  * point.
     847              :                  */
     848         1753 :                 if (!CheckCopyStreamStop(conn, stream, blockpos))
     849            0 :                     goto error;
     850              :             }
     851              :             else
     852              :             {
     853            0 :                 pg_log_error("unrecognized streaming header: \"%c\"",
     854              :                              copybuf[0]);
     855            0 :                 goto error;
     856              :             }
     857              : 
     858              :             /* Done with that message */
     859         1753 :             PQfreemem(copybuf);
     860         1753 :             copybuf = NULL;
     861              : 
     862              :             /*
     863              :              * Process the received data, and any subsequent data we can read
     864              :              * without blocking.
     865              :              */
     866         1753 :             r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
     867              :         }
     868              :     }
     869              : 
     870            0 : error:
     871            0 :     PQfreemem(copybuf);
     872            0 :     return NULL;
     873              : }
     874              : 
     875              : /*
     876              :  * Wait until we can read a CopyData message,
     877              :  * or timeout, or occurrence of a signal or input on the stop_socket.
     878              :  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
     879              :  *
     880              :  * Returns 1 if data has become available for reading, 0 if timed out
     881              :  * or interrupted by signal or stop_socket input, and -1 on an error.
     882              :  */
     883              : static int
     884         2127 : CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
     885              : {
     886              :     int         ret;
     887              :     fd_set      input_mask;
     888              :     int         connsocket;
     889              :     int         maxfd;
     890              :     struct timeval timeout;
     891              :     struct timeval *timeoutptr;
     892              : 
     893         2127 :     connsocket = PQsocket(conn);
     894         2127 :     if (connsocket < 0)
     895              :     {
     896            0 :         pg_log_error("invalid socket: %s", PQerrorMessage(conn));
     897            0 :         return -1;
     898              :     }
     899              : 
     900        36159 :     FD_ZERO(&input_mask);
     901         2127 :     FD_SET(connsocket, &input_mask);
     902         2127 :     maxfd = connsocket;
     903         2127 :     if (stop_socket != PGINVALID_SOCKET)
     904              :     {
     905         2063 :         FD_SET(stop_socket, &input_mask);
     906         2063 :         maxfd = Max(maxfd, stop_socket);
     907              :     }
     908              : 
     909         2127 :     if (timeout_ms < 0)
     910           63 :         timeoutptr = NULL;
     911              :     else
     912              :     {
     913         2064 :         timeout.tv_sec = timeout_ms / 1000L;
     914         2064 :         timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
     915         2064 :         timeoutptr = &timeout;
     916              :     }
     917              : 
     918         2127 :     ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
     919              : 
     920         2127 :     if (ret < 0)
     921              :     {
     922            0 :         if (errno == EINTR)
     923            0 :             return 0;           /* Got a signal, so not an error */
     924            0 :         pg_log_error("%s() failed: %m", "select");
     925            0 :         return -1;
     926              :     }
     927         2127 :     if (ret > 0 && FD_ISSET(connsocket, &input_mask))
     928         1869 :         return 1;               /* Got input on connection socket */
     929              : 
     930          258 :     return 0;                   /* Got timeout or input on stop_socket */
     931              : }
     932              : 
     933              : /*
     934              :  * Receive CopyData message available from XLOG stream, blocking for
     935              :  * maximum of 'timeout' ms.
     936              :  *
     937              :  * If data was received, returns the length of the data. *buffer is set to
     938              :  * point to a buffer holding the received message. The caller must eventually
     939              :  * free the buffer with PQfreemem().
     940              :  *
     941              :  * Returns 0 if no data was available within timeout, or if wait was
     942              :  * interrupted by signal or stop_socket input.
     943              :  * -1 on error. -2 if the server ended the COPY.
     944              :  */
     945              : static int
     946         2387 : CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
     947              :                   char **buffer)
     948              : {
     949         2387 :     char       *copybuf = NULL;
     950              :     int         rawlen;
     951              : 
     952              :     /* Caller should have cleared any prior buffer */
     953              :     Assert(*buffer == NULL);
     954              : 
     955              :     /* Try to receive a CopyData message */
     956         2387 :     rawlen = PQgetCopyData(conn, &copybuf, 1);
     957         2387 :     if (rawlen == 0)
     958              :     {
     959              :         int         ret;
     960              : 
     961              :         /*
     962              :          * No data available.  Wait for some to appear, but not longer than
     963              :          * the specified timeout, so that we can ping the server.  Also stop
     964              :          * waiting if input appears on stop_socket.
     965              :          */
     966         2127 :         ret = CopyStreamPoll(conn, timeout, stop_socket);
     967         2127 :         if (ret <= 0)
     968          258 :             return ret;
     969              : 
     970              :         /* Now there is actually data on the socket */
     971         1869 :         if (PQconsumeInput(conn) == 0)
     972              :         {
     973            0 :             pg_log_error("could not receive data from WAL stream: %s",
     974              :                          PQerrorMessage(conn));
     975            0 :             return -1;
     976              :         }
     977              : 
     978              :         /* Now that we've consumed some input, try again */
     979         1869 :         rawlen = PQgetCopyData(conn, &copybuf, 1);
     980         1869 :         if (rawlen == 0)
     981          220 :             return 0;
     982              :     }
     983         1909 :     if (rawlen == -1)           /* end-of-streaming or error */
     984          156 :         return -2;
     985         1753 :     if (rawlen == -2)
     986              :     {
     987            0 :         pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
     988            0 :         return -1;
     989              :     }
     990              : 
     991              :     /* Return received messages to caller */
     992         1753 :     *buffer = copybuf;
     993         1753 :     return rawlen;
     994              : }
     995              : 
     996              : /*
     997              :  * Process the keepalive message.
     998              :  */
     999              : static bool
    1000            0 : ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
    1001              :                     XLogRecPtr blockpos, TimestampTz *last_status)
    1002              : {
    1003              :     int         pos;
    1004              :     bool        replyRequested;
    1005              :     TimestampTz now;
    1006              : 
    1007              :     /*
    1008              :      * Parse the keepalive message, enclosed in the CopyData message. We just
    1009              :      * check if the server requested a reply, and ignore the rest.
    1010              :      */
    1011            0 :     pos = 1;                    /* skip msgtype PqReplMsg_Keepalive */
    1012            0 :     pos += 8;                   /* skip walEnd */
    1013            0 :     pos += 8;                   /* skip sendTime */
    1014              : 
    1015            0 :     if (len < pos + 1)
    1016              :     {
    1017            0 :         pg_log_error("streaming header too small: %d", len);
    1018            0 :         return false;
    1019              :     }
    1020            0 :     replyRequested = copybuf[pos];
    1021              : 
    1022              :     /* If the server requested an immediate reply, send one. */
    1023            0 :     if (replyRequested && still_sending)
    1024              :     {
    1025            0 :         if (reportFlushPosition && lastFlushPosition < blockpos &&
    1026            0 :             walfile != NULL)
    1027              :         {
    1028              :             /*
    1029              :              * If a valid flush location needs to be reported, flush the
    1030              :              * current WAL file so that the latest flush location is sent back
    1031              :              * to the server. This is necessary to see whether the last WAL
    1032              :              * data has been successfully replicated or not, at the normal
    1033              :              * shutdown of the server.
    1034              :              */
    1035            0 :             if (stream->walmethod->ops->sync(walfile) != 0)
    1036            0 :                 pg_fatal("could not fsync file \"%s\": %s",
    1037              :                          walfile->pathname, GetLastWalMethodError(stream->walmethod));
    1038            0 :             lastFlushPosition = blockpos;
    1039              :         }
    1040              : 
    1041            0 :         now = feGetCurrentTimestamp();
    1042            0 :         if (!sendFeedback(conn, blockpos, now, false))
    1043            0 :             return false;
    1044            0 :         *last_status = now;
    1045              :     }
    1046              : 
    1047            0 :     return true;
    1048              : }
    1049              : 
    1050              : /*
    1051              :  * Process WALData message.
    1052              :  */
    1053              : static bool
    1054         1753 : ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
    1055              :                   XLogRecPtr *blockpos)
    1056              : {
    1057              :     int         xlogoff;
    1058              :     int         bytes_left;
    1059              :     int         bytes_written;
    1060              :     int         hdr_len;
    1061              : 
    1062              :     /*
    1063              :      * Once we've decided we don't want to receive any more, just ignore any
    1064              :      * subsequent WALData messages.
    1065              :      */
    1066         1753 :     if (!(still_sending))
    1067          231 :         return true;
    1068              : 
    1069              :     /*
    1070              :      * Read the header of the WALData message, enclosed in the CopyData
    1071              :      * message. We only need the WAL location field (dataStart), the rest of
    1072              :      * the header is ignored.
    1073              :      */
    1074         1522 :     hdr_len = 1;                /* msgtype PqReplMsg_WALData */
    1075         1522 :     hdr_len += 8;               /* dataStart */
    1076         1522 :     hdr_len += 8;               /* walEnd */
    1077         1522 :     hdr_len += 8;               /* sendTime */
    1078         1522 :     if (len < hdr_len)
    1079              :     {
    1080            0 :         pg_log_error("streaming header too small: %d", len);
    1081            0 :         return false;
    1082              :     }
    1083         1522 :     *blockpos = fe_recvint64(&copybuf[1]);
    1084              : 
    1085              :     /* Extract WAL location for this block */
    1086         1522 :     xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
    1087              : 
    1088              :     /*
    1089              :      * Verify that the initial location in the stream matches where we think
    1090              :      * we are.
    1091              :      */
    1092         1522 :     if (walfile == NULL)
    1093              :     {
    1094              :         /* No file open yet */
    1095          162 :         if (xlogoff != 0)
    1096              :         {
    1097            0 :             pg_log_error("received write-ahead log record for offset %u with no file open",
    1098              :                          xlogoff);
    1099            0 :             return false;
    1100              :         }
    1101              :     }
    1102              :     else
    1103              :     {
    1104              :         /* More data in existing segment */
    1105         1360 :         if (walfile->currpos != xlogoff)
    1106              :         {
    1107            0 :             pg_log_error("got WAL data offset %08x, expected %08x",
    1108              :                          xlogoff, (int) walfile->currpos);
    1109            0 :             return false;
    1110              :         }
    1111              :     }
    1112              : 
    1113         1522 :     bytes_left = len - hdr_len;
    1114         1522 :     bytes_written = 0;
    1115              : 
    1116         3044 :     while (bytes_left)
    1117              :     {
    1118              :         int         bytes_to_write;
    1119              : 
    1120              :         /*
    1121              :          * If crossing a WAL boundary, only write up until we reach wal
    1122              :          * segment size.
    1123              :          */
    1124         1522 :         if (xlogoff + bytes_left > WalSegSz)
    1125            0 :             bytes_to_write = WalSegSz - xlogoff;
    1126              :         else
    1127         1522 :             bytes_to_write = bytes_left;
    1128              : 
    1129         1522 :         if (walfile == NULL)
    1130              :         {
    1131          162 :             if (!open_walfile(stream, *blockpos))
    1132              :             {
    1133              :                 /* Error logged by open_walfile */
    1134            0 :                 return false;
    1135              :             }
    1136              :         }
    1137              : 
    1138         3044 :         if (stream->walmethod->ops->write(walfile,
    1139         1522 :                                           copybuf + hdr_len + bytes_written,
    1140         1522 :                                           bytes_to_write) != bytes_to_write)
    1141              :         {
    1142            0 :             pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
    1143              :                          bytes_to_write, walfile->pathname,
    1144              :                          GetLastWalMethodError(stream->walmethod));
    1145            0 :             return false;
    1146              :         }
    1147              : 
    1148              :         /* Write was successful, advance our position */
    1149         1522 :         bytes_written += bytes_to_write;
    1150         1522 :         bytes_left -= bytes_to_write;
    1151         1522 :         *blockpos += bytes_to_write;
    1152         1522 :         xlogoff += bytes_to_write;
    1153              : 
    1154              :         /* Did we reach the end of a WAL segment? */
    1155         1522 :         if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
    1156              :         {
    1157           11 :             if (!close_walfile(stream, *blockpos))
    1158              :                 /* Error message written in close_walfile() */
    1159            0 :                 return false;
    1160              : 
    1161           11 :             xlogoff = 0;
    1162              : 
    1163           11 :             if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
    1164              :             {
    1165            0 :                 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1166              :                 {
    1167            0 :                     pg_log_error("could not send copy-end packet: %s",
    1168              :                                  PQerrorMessage(conn));
    1169            0 :                     return false;
    1170              :                 }
    1171            0 :                 still_sending = false;
    1172            0 :                 return true;    /* ignore the rest of this WALData packet */
    1173              :             }
    1174              :         }
    1175              :     }
    1176              :     /* No more data left to write, receive next copy packet */
    1177              : 
    1178         1522 :     return true;
    1179              : }
    1180              : 
    1181              : /*
    1182              :  * Handle end of the copy stream.
    1183              :  */
    1184              : static PGresult *
    1185          156 : HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
    1186              :                       XLogRecPtr blockpos, XLogRecPtr *stoppos)
    1187              : {
    1188          156 :     PGresult   *res = PQgetResult(conn);
    1189              : 
    1190              :     /*
    1191              :      * The server closed its end of the copy stream.  If we haven't closed
    1192              :      * ours already, we need to do so now, unless the server threw an error,
    1193              :      * in which case we don't.
    1194              :      */
    1195          156 :     if (still_sending)
    1196              :     {
    1197            2 :         if (!close_walfile(stream, blockpos))
    1198              :         {
    1199              :             /* Error message written in close_walfile() */
    1200            0 :             PQclear(res);
    1201            0 :             return NULL;
    1202              :         }
    1203            2 :         if (PQresultStatus(res) == PGRES_COPY_IN)
    1204              :         {
    1205            1 :             if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1206              :             {
    1207            0 :                 pg_log_error("could not send copy-end packet: %s",
    1208              :                              PQerrorMessage(conn));
    1209            0 :                 PQclear(res);
    1210            0 :                 return NULL;
    1211              :             }
    1212            1 :             res = PQgetResult(conn);
    1213              :         }
    1214            2 :         still_sending = false;
    1215              :     }
    1216          156 :     *stoppos = blockpos;
    1217          156 :     return res;
    1218              : }
    1219              : 
    1220              : /*
    1221              :  * Check if we should continue streaming, or abort at this point.
    1222              :  */
    1223              : static bool
    1224         2387 : CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
    1225              : {
    1226         2387 :     if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
    1227              :     {
    1228          154 :         if (!close_walfile(stream, blockpos))
    1229              :         {
    1230              :             /* Potential error message is written by close_walfile */
    1231            0 :             return false;
    1232              :         }
    1233          154 :         if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1234              :         {
    1235            0 :             pg_log_error("could not send copy-end packet: %s",
    1236              :                          PQerrorMessage(conn));
    1237            0 :             return false;
    1238              :         }
    1239          154 :         still_sending = false;
    1240              :     }
    1241              : 
    1242         2387 :     return true;
    1243              : }
    1244              : 
    1245              : /*
    1246              :  * Calculate how long send/receive loops should sleep
    1247              :  */
    1248              : static long
    1249          634 : CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
    1250              :                              TimestampTz last_status)
    1251              : {
    1252          634 :     TimestampTz status_targettime = 0;
    1253              :     long        sleeptime;
    1254              : 
    1255          634 :     if (standby_message_timeout && still_sending)
    1256          571 :         status_targettime = last_status +
    1257          571 :             (standby_message_timeout - 1) * ((int64) 1000);
    1258              : 
    1259          634 :     if (status_targettime > 0)
    1260              :     {
    1261              :         long        secs;
    1262              :         int         usecs;
    1263              : 
    1264          571 :         feTimestampDifference(now,
    1265              :                               status_targettime,
    1266              :                               &secs,
    1267              :                               &usecs);
    1268              :         /* Always sleep at least 1 sec */
    1269          571 :         if (secs <= 0)
    1270              :         {
    1271            0 :             secs = 1;
    1272            0 :             usecs = 0;
    1273              :         }
    1274              : 
    1275          571 :         sleeptime = secs * 1000 + usecs / 1000;
    1276              :     }
    1277              :     else
    1278           63 :         sleeptime = -1;
    1279              : 
    1280          634 :     return sleeptime;
    1281              : }
        

Generated by: LCOV version 2.0-1