LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - receivelog.c (source / functions) Hit Total Coverage
Test: PostgreSQL 14devel Lines: 213 424 50.2 %
Date: 2021-03-02 10:06:42 Functions: 14 17 82.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * receivelog.c - receive WAL files using the streaming
       4             :  *                replication protocol.
       5             :  *
       6             :  * Author: Magnus Hagander <magnus@hagander.net>
       7             :  *
       8             :  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *        src/bin/pg_basebackup/receivelog.c
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres_fe.h"
      16             : 
      17             : #include <sys/stat.h>
      18             : #include <unistd.h>
      19             : #ifdef HAVE_SYS_SELECT_H
      20             : #include <sys/select.h>
      21             : #endif
      22             : 
      23             : #include "access/xlog_internal.h"
      24             : #include "common/file_utils.h"
      25             : #include "common/logging.h"
      26             : #include "libpq-fe.h"
      27             : #include "receivelog.h"
      28             : #include "streamutil.h"
      29             : 
      30             : /* fd and filename for currently open WAL file */
      31             : static Walfile *walfile = NULL;
      32             : static char current_walfile_name[MAXPGPATH] = "";
      33             : static bool reportFlushPosition = false;
      34             : static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
      35             : 
      36             : static bool still_sending = true;   /* feedback still needs to be sent? */
      37             : 
      38             : static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
      39             :                                   XLogRecPtr *stoppos);
      40             : static int  CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
      41             : static int  CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
      42             :                               char **buffer);
      43             : static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
      44             :                                 int len, XLogRecPtr blockpos, TimestampTz *last_status);
      45             : static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
      46             :                                XLogRecPtr *blockpos);
      47             : static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
      48             :                                        XLogRecPtr blockpos, XLogRecPtr *stoppos);
      49             : static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
      50             : static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
      51             :                                          TimestampTz last_status);
      52             : 
      53             : static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
      54             :                                      uint32 *timeline);
      55             : 
      56             : static bool
      57          24 : mark_file_as_archived(StreamCtl *stream, const char *fname)
      58             : {
      59             :     Walfile    *f;
      60             :     static char tmppath[MAXPGPATH];
      61             : 
      62          24 :     snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
      63             :              fname);
      64             : 
      65          24 :     f = stream->walmethod->open_for_write(tmppath, NULL, 0);
      66          24 :     if (f == NULL)
      67             :     {
      68           0 :         pg_log_error("could not create archive status file \"%s\": %s",
      69             :                      tmppath, stream->walmethod->getlasterror());
      70           0 :         return false;
      71             :     }
      72             : 
      73          24 :     stream->walmethod->close(f, CLOSE_NORMAL);
      74             : 
      75          24 :     return true;
      76             : }
      77             : 
      78             : /*
      79             :  * Open a new WAL file in the specified directory.
      80             :  *
      81             :  * Returns true if OK; on failure, returns false after printing an error msg.
      82             :  * On success, 'walfile' is set to the FD for the file, and the base filename
      83             :  * (without partial_suffix) is stored in 'current_walfile_name'.
      84             :  *
      85             :  * The file will be padded to 16Mb with zeroes.
      86             :  */
      87             : static bool
      88         138 : open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
      89             : {
      90             :     Walfile    *f;
      91             :     char        fn[MAXPGPATH];
      92             :     ssize_t     size;
      93             :     XLogSegNo   segno;
      94             : 
      95         138 :     XLByteToSeg(startpoint, segno, WalSegSz);
      96         138 :     XLogFileName(current_walfile_name, stream->timeline, segno, WalSegSz);
      97             : 
      98         138 :     snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
      99         138 :              stream->partial_suffix ? stream->partial_suffix : "");
     100             : 
     101             :     /*
     102             :      * When streaming to files, if an existing file exists we verify that it's
     103             :      * either empty (just created), or a complete WalSegSz segment (in which
     104             :      * case it has been created and padded). Anything else indicates a corrupt
     105             :      * file.
     106             :      *
     107             :      * When streaming to tar, no file with this name will exist before, so we
     108             :      * never have to verify a size.
     109             :      */
     110         138 :     if (stream->walmethod->existsfile(fn))
     111             :     {
     112           0 :         size = stream->walmethod->get_file_size(fn);
     113           0 :         if (size < 0)
     114             :         {
     115           0 :             pg_log_error("could not get size of write-ahead log file \"%s\": %s",
     116             :                          fn, stream->walmethod->getlasterror());
     117           0 :             return false;
     118             :         }
     119           0 :         if (size == WalSegSz)
     120             :         {
     121             :             /* Already padded file. Open it for use */
     122           0 :             f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
     123           0 :             if (f == NULL)
     124             :             {
     125           0 :                 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
     126             :                              fn, stream->walmethod->getlasterror());
     127           0 :                 return false;
     128             :             }
     129             : 
     130             :             /* fsync file in case of a previous crash */
     131           0 :             if (stream->walmethod->sync(f) != 0)
     132             :             {
     133           0 :                 pg_log_fatal("could not fsync existing write-ahead log file \"%s\": %s",
     134             :                              fn, stream->walmethod->getlasterror());
     135           0 :                 stream->walmethod->close(f, CLOSE_UNLINK);
     136           0 :                 exit(1);
     137             :             }
     138             : 
     139           0 :             walfile = f;
     140           0 :             return true;
     141             :         }
     142           0 :         if (size != 0)
     143             :         {
     144             :             /* if write didn't set errno, assume problem is no disk space */
     145           0 :             if (errno == 0)
     146           0 :                 errno = ENOSPC;
     147           0 :             pg_log_error(ngettext("write-ahead log file \"%s\" has %d byte, should be 0 or %d",
     148             :                                   "write-ahead log file \"%s\" has %d bytes, should be 0 or %d",
     149             :                                   size),
     150             :                          fn, (int) size, WalSegSz);
     151           0 :             return false;
     152             :         }
     153             :         /* File existed and was empty, so fall through and open */
     154             :     }
     155             : 
     156             :     /* No file existed, so create one */
     157             : 
     158         276 :     f = stream->walmethod->open_for_write(current_walfile_name,
     159         138 :                                           stream->partial_suffix, WalSegSz);
     160         138 :     if (f == NULL)
     161             :     {
     162           0 :         pg_log_error("could not open write-ahead log file \"%s\": %s",
     163             :                      fn, stream->walmethod->getlasterror());
     164           0 :         return false;
     165             :     }
     166             : 
     167         138 :     walfile = f;
     168         138 :     return true;
     169             : }
     170             : 
     171             : /*
     172             :  * Close the current WAL file (if open), and rename it to the correct
     173             :  * filename if it's complete. On failure, prints an error message to stderr
     174             :  * and returns false, otherwise returns true.
     175             :  */
     176             : static bool
     177         162 : close_walfile(StreamCtl *stream, XLogRecPtr pos)
     178             : {
     179             :     off_t       currpos;
     180             :     int         r;
     181             : 
     182         162 :     if (walfile == NULL)
     183          24 :         return true;
     184             : 
     185         138 :     currpos = stream->walmethod->get_current_pos(walfile);
     186         138 :     if (currpos == -1)
     187             :     {
     188           0 :         pg_log_error("could not determine seek position in file \"%s\": %s",
     189             :                      current_walfile_name, stream->walmethod->getlasterror());
     190           0 :         stream->walmethod->close(walfile, CLOSE_UNLINK);
     191           0 :         walfile = NULL;
     192             : 
     193           0 :         return false;
     194             :     }
     195             : 
     196         138 :     if (stream->partial_suffix)
     197             :     {
     198           2 :         if (currpos == WalSegSz)
     199           0 :             r = stream->walmethod->close(walfile, CLOSE_NORMAL);
     200             :         else
     201             :         {
     202           2 :             pg_log_info("not renaming \"%s%s\", segment is not complete",
     203             :                         current_walfile_name, stream->partial_suffix);
     204           2 :             r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
     205             :         }
     206             :     }
     207             :     else
     208         136 :         r = stream->walmethod->close(walfile, CLOSE_NORMAL);
     209             : 
     210         138 :     walfile = NULL;
     211             : 
     212         138 :     if (r != 0)
     213             :     {
     214           0 :         pg_log_error("could not close file \"%s\": %s",
     215             :                      current_walfile_name, stream->walmethod->getlasterror());
     216           0 :         return false;
     217             :     }
     218             : 
     219             :     /*
     220             :      * Mark file as archived if requested by the caller - pg_basebackup needs
     221             :      * to do so as files can otherwise get archived again after promotion of a
     222             :      * new node. This is in line with walreceiver.c always doing a
     223             :      * XLogArchiveForceDone() after a complete segment.
     224             :      */
     225         138 :     if (currpos == WalSegSz && stream->mark_done)
     226             :     {
     227             :         /* writes error message if failed */
     228          24 :         if (!mark_file_as_archived(stream, current_walfile_name))
     229           0 :             return false;
     230             :     }
     231             : 
     232         138 :     lastFlushPosition = pos;
     233         138 :     return true;
     234             : }
     235             : 
     236             : 
     237             : /*
     238             :  * Check if a timeline history file exists.
     239             :  */
     240             : static bool
     241         140 : existsTimeLineHistoryFile(StreamCtl *stream)
     242             : {
     243             :     char        histfname[MAXFNAMELEN];
     244             : 
     245             :     /*
     246             :      * Timeline 1 never has a history file. We treat that as if it existed,
     247             :      * since we never need to stream it.
     248             :      */
     249         140 :     if (stream->timeline == 1)
     250         140 :         return true;
     251             : 
     252           0 :     TLHistoryFileName(histfname, stream->timeline);
     253             : 
     254           0 :     return stream->walmethod->existsfile(histfname);
     255             : }
     256             : 
     257             : static bool
     258           0 : writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
     259             : {
     260           0 :     int         size = strlen(content);
     261             :     char        histfname[MAXFNAMELEN];
     262             :     Walfile    *f;
     263             : 
     264             :     /*
     265             :      * Check that the server's idea of how timeline history files should be
     266             :      * named matches ours.
     267             :      */
     268           0 :     TLHistoryFileName(histfname, stream->timeline);
     269           0 :     if (strcmp(histfname, filename) != 0)
     270             :     {
     271           0 :         pg_log_error("server reported unexpected history file name for timeline %u: %s",
     272             :                      stream->timeline, filename);
     273           0 :         return false;
     274             :     }
     275             : 
     276           0 :     f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
     277           0 :     if (f == NULL)
     278             :     {
     279           0 :         pg_log_error("could not create timeline history file \"%s\": %s",
     280             :                      histfname, stream->walmethod->getlasterror());
     281           0 :         return false;
     282             :     }
     283             : 
     284           0 :     if ((int) stream->walmethod->write(f, content, size) != size)
     285             :     {
     286           0 :         pg_log_error("could not write timeline history file \"%s\": %s",
     287             :                      histfname, stream->walmethod->getlasterror());
     288             : 
     289             :         /*
     290             :          * If we fail to make the file, delete it to release disk space
     291             :          */
     292           0 :         stream->walmethod->close(f, CLOSE_UNLINK);
     293             : 
     294           0 :         return false;
     295             :     }
     296             : 
     297           0 :     if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
     298             :     {
     299           0 :         pg_log_error("could not close file \"%s\": %s",
     300             :                      histfname, stream->walmethod->getlasterror());
     301           0 :         return false;
     302             :     }
     303             : 
     304             :     /* Maintain archive_status, check close_walfile() for details. */
     305           0 :     if (stream->mark_done)
     306             :     {
     307             :         /* writes error message if failed */
     308           0 :         if (!mark_file_as_archived(stream, histfname))
     309           0 :             return false;
     310             :     }
     311             : 
     312           0 :     return true;
     313             : }
     314             : 
     315             : /*
     316             :  * Send a Standby Status Update message to server.
     317             :  */
     318             : static bool
     319         138 : sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
     320             : {
     321             :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
     322         138 :     int         len = 0;
     323             : 
     324         138 :     replybuf[len] = 'r';
     325         138 :     len += 1;
     326         138 :     fe_sendint64(blockpos, &replybuf[len]); /* write */
     327         138 :     len += 8;
     328         138 :     if (reportFlushPosition)
     329         136 :         fe_sendint64(lastFlushPosition, &replybuf[len]);    /* flush */
     330             :     else
     331           2 :         fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* flush */
     332         138 :     len += 8;
     333         138 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
     334         138 :     len += 8;
     335         138 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
     336         138 :     len += 8;
     337         138 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
     338         138 :     len += 1;
     339             : 
     340         138 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
     341             :     {
     342           0 :         pg_log_error("could not send feedback packet: %s",
     343             :                      PQerrorMessage(conn));
     344           0 :         return false;
     345             :     }
     346             : 
     347         138 :     return true;
     348             : }
     349             : 
     350             : /*
     351             :  * Check that the server version we're connected to is supported by
     352             :  * ReceiveXlogStream().
     353             :  *
     354             :  * If it's not, an error message is printed to stderr, and false is returned.
     355             :  */
     356             : bool
     357         294 : CheckServerVersionForStreaming(PGconn *conn)
     358             : {
     359             :     int         minServerMajor,
     360             :                 maxServerMajor;
     361             :     int         serverMajor;
     362             : 
     363             :     /*
     364             :      * The message format used in streaming replication changed in 9.3, so we
     365             :      * cannot stream from older servers. And we don't support servers newer
     366             :      * than the client; it might work, but we don't know, so err on the safe
     367             :      * side.
     368             :      */
     369         294 :     minServerMajor = 903;
     370         294 :     maxServerMajor = PG_VERSION_NUM / 100;
     371         294 :     serverMajor = PQserverVersion(conn) / 100;
     372         294 :     if (serverMajor < minServerMajor)
     373             :     {
     374           0 :         const char *serverver = PQparameterStatus(conn, "server_version");
     375             : 
     376           0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
     377             :                      serverver ? serverver : "'unknown'",
     378             :                      "9.3");
     379           0 :         return false;
     380             :     }
     381         294 :     else if (serverMajor > maxServerMajor)
     382             :     {
     383           0 :         const char *serverver = PQparameterStatus(conn, "server_version");
     384             : 
     385           0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
     386             :                      serverver ? serverver : "'unknown'",
     387             :                      PG_VERSION);
     388           0 :         return false;
     389             :     }
     390         294 :     return true;
     391             : }
     392             : 
     393             : /*
     394             :  * Receive a log stream starting at the specified position.
     395             :  *
     396             :  * Individual parameters are passed through the StreamCtl structure.
     397             :  *
     398             :  * If sysidentifier is specified, validate that both the system
     399             :  * identifier and the timeline matches the specified ones
     400             :  * (by sending an extra IDENTIFY_SYSTEM command)
     401             :  *
     402             :  * All received segments will be written to the directory
     403             :  * specified by basedir. This will also fetch any missing timeline history
     404             :  * files.
     405             :  *
     406             :  * The stream_stop callback will be called every time data
     407             :  * is received, and whenever a segment is completed. If it returns
     408             :  * true, the streaming will stop and the function
     409             :  * return. As long as it returns false, streaming will continue
     410             :  * indefinitely.
     411             :  *
     412             :  * If stream_stop() checks for external input, stop_socket should be set to
     413             :  * the FD it checks.  This will allow such input to be detected promptly
     414             :  * rather than after standby_message_timeout (which might be indefinite).
     415             :  * Note that signals will interrupt waits for input as well, but that is
     416             :  * race-y since a signal received while busy won't interrupt the wait.
     417             :  *
     418             :  * standby_message_timeout controls how often we send a message
     419             :  * back to the primary letting it know our progress, in milliseconds.
     420             :  * Zero means no messages are sent.
     421             :  * This message will only contain the write location, and never
     422             :  * flush or replay.
     423             :  *
     424             :  * If 'partial_suffix' is not NULL, files are initially created with the
     425             :  * given suffix, and the suffix is removed once the file is finished. That
     426             :  * allows you to tell the difference between partial and completed files,
     427             :  * so that you can continue later where you left.
     428             :  *
     429             :  * If 'synchronous' is true, the received WAL is flushed as soon as written,
     430             :  * otherwise only when the WAL file is closed.
     431             :  *
     432             :  * Note: The WAL location *must* be at a log segment start!
     433             :  */
     434             : bool
     435         140 : ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
     436             : {
     437             :     char        query[128];
     438             :     char        slotcmd[128];
     439             :     PGresult   *res;
     440             :     XLogRecPtr  stoppos;
     441             : 
     442             :     /*
     443             :      * The caller should've checked the server version already, but doesn't do
     444             :      * any harm to check it here too.
     445             :      */
     446         140 :     if (!CheckServerVersionForStreaming(conn))
     447           0 :         return false;
     448             : 
     449             :     /*
     450             :      * Decide whether we want to report the flush position. If we report the
     451             :      * flush position, the primary will know what WAL we'll possibly
     452             :      * re-request, and it can then remove older WAL safely. We must always do
     453             :      * that when we are using slots.
     454             :      *
     455             :      * Reporting the flush position makes one eligible as a synchronous
     456             :      * replica. People shouldn't include generic names in
     457             :      * synchronous_standby_names, but we've protected them against it so far,
     458             :      * so let's continue to do so unless specifically requested.
     459             :      */
     460         140 :     if (stream->replication_slot != NULL)
     461             :     {
     462         136 :         reportFlushPosition = true;
     463         136 :         sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
     464             :     }
     465             :     else
     466             :     {
     467           4 :         if (stream->synchronous)
     468           2 :             reportFlushPosition = true;
     469             :         else
     470           2 :             reportFlushPosition = false;
     471           4 :         slotcmd[0] = 0;
     472             :     }
     473             : 
     474         140 :     if (stream->sysidentifier != NULL)
     475             :     {
     476             :         /* Validate system identifier hasn't changed */
     477         138 :         res = PQexec(conn, "IDENTIFY_SYSTEM");
     478         138 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     479             :         {
     480           0 :             pg_log_error("could not send replication command \"%s\": %s",
     481             :                          "IDENTIFY_SYSTEM", PQerrorMessage(conn));
     482           0 :             PQclear(res);
     483           0 :             return false;
     484             :         }
     485         138 :         if (PQntuples(res) != 1 || PQnfields(res) < 3)
     486             :         {
     487           0 :             pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     488             :                          PQntuples(res), PQnfields(res), 1, 3);
     489           0 :             PQclear(res);
     490           0 :             return false;
     491             :         }
     492         138 :         if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
     493             :         {
     494           0 :             pg_log_error("system identifier does not match between base backup and streaming connection");
     495           0 :             PQclear(res);
     496           0 :             return false;
     497             :         }
     498         138 :         if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
     499             :         {
     500           0 :             pg_log_error("starting timeline %u is not present in the server",
     501             :                          stream->timeline);
     502           0 :             PQclear(res);
     503           0 :             return false;
     504             :         }
     505         138 :         PQclear(res);
     506             :     }
     507             : 
     508             :     /*
     509             :      * initialize flush position to starting point, it's the caller's
     510             :      * responsibility that that's sane.
     511             :      */
     512         140 :     lastFlushPosition = stream->startpos;
     513             : 
     514             :     while (1)
     515             :     {
     516             :         /*
     517             :          * Fetch the timeline history file for this timeline, if we don't have
     518             :          * it already. When streaming log to tar, this will always return
     519             :          * false, as we are never streaming into an existing file and
     520             :          * therefore there can be no pre-existing timeline history file.
     521             :          */
     522         140 :         if (!existsTimeLineHistoryFile(stream))
     523             :         {
     524           0 :             snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
     525           0 :             res = PQexec(conn, query);
     526           0 :             if (PQresultStatus(res) != PGRES_TUPLES_OK)
     527             :             {
     528             :                 /* FIXME: we might send it ok, but get an error */
     529           0 :                 pg_log_error("could not send replication command \"%s\": %s",
     530             :                              "TIMELINE_HISTORY", PQresultErrorMessage(res));
     531           0 :                 PQclear(res);
     532           0 :                 return false;
     533             :             }
     534             : 
     535             :             /*
     536             :              * The response to TIMELINE_HISTORY is a single row result set
     537             :              * with two fields: filename and content
     538             :              */
     539           0 :             if (PQnfields(res) != 2 || PQntuples(res) != 1)
     540             :             {
     541           0 :                 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
     542             :                                PQntuples(res), PQnfields(res), 1, 2);
     543             :             }
     544             : 
     545             :             /* Write the history file to disk */
     546           0 :             writeTimeLineHistoryFile(stream,
     547             :                                      PQgetvalue(res, 0, 0),
     548             :                                      PQgetvalue(res, 0, 1));
     549             : 
     550           0 :             PQclear(res);
     551             :         }
     552             : 
     553             :         /*
     554             :          * Before we start streaming from the requested location, check if the
     555             :          * callback tells us to stop here.
     556             :          */
     557         140 :         if (stream->stream_stop(stream->startpos, stream->timeline, false))
     558           0 :             return true;
     559             : 
     560             :         /* Initiate the replication stream at specified location */
     561         420 :         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
     562             :                  slotcmd,
     563         140 :                  LSN_FORMAT_ARGS(stream->startpos),
     564             :                  stream->timeline);
     565         140 :         res = PQexec(conn, query);
     566         140 :         if (PQresultStatus(res) != PGRES_COPY_BOTH)
     567             :         {
     568           2 :             pg_log_error("could not send replication command \"%s\": %s",
     569             :                          "START_REPLICATION", PQresultErrorMessage(res));
     570           2 :             PQclear(res);
     571           2 :             return false;
     572             :         }
     573         138 :         PQclear(res);
     574             : 
     575             :         /* Stream the WAL */
     576         138 :         res = HandleCopyStream(conn, stream, &stoppos);
     577         138 :         if (res == NULL)
     578           0 :             goto error;
     579             : 
     580             :         /*
     581             :          * Streaming finished.
     582             :          *
     583             :          * There are two possible reasons for that: a controlled shutdown, or
     584             :          * we reached the end of the current timeline. In case of
     585             :          * end-of-timeline, the server sends a result set after Copy has
     586             :          * finished, containing information about the next timeline. Read
     587             :          * that, and restart streaming from the next timeline. In case of
     588             :          * controlled shutdown, stop here.
     589             :          */
     590         138 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
     591             :         {
     592             :             /*
     593             :              * End-of-timeline. Read the next timeline's ID and starting
     594             :              * position. Usually, the starting position will match the end of
     595             :              * the previous timeline, but there are corner cases like if the
     596             :              * server had sent us half of a WAL record, when it was promoted.
     597             :              * The new timeline will begin at the end of the last complete
     598             :              * record in that case, overlapping the partial WAL record on the
     599             :              * old timeline.
     600             :              */
     601             :             uint32      newtimeline;
     602             :             bool        parsed;
     603             : 
     604           0 :             parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
     605           0 :             PQclear(res);
     606           0 :             if (!parsed)
     607           0 :                 goto error;
     608             : 
     609             :             /* Sanity check the values the server gave us */
     610           0 :             if (newtimeline <= stream->timeline)
     611             :             {
     612           0 :                 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
     613             :                              newtimeline, stream->timeline);
     614           0 :                 goto error;
     615             :             }
     616           0 :             if (stream->startpos > stoppos)
     617             :             {
     618           0 :                 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
     619             :                              stream->timeline, LSN_FORMAT_ARGS(stoppos),
     620             :                              newtimeline, LSN_FORMAT_ARGS(stream->startpos));
     621           0 :                 goto error;
     622             :             }
     623             : 
     624             :             /* Read the final result, which should be CommandComplete. */
     625           0 :             res = PQgetResult(conn);
     626           0 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
     627             :             {
     628           0 :                 pg_log_error("unexpected termination of replication stream: %s",
     629             :                              PQresultErrorMessage(res));
     630           0 :                 PQclear(res);
     631           0 :                 goto error;
     632             :             }
     633           0 :             PQclear(res);
     634             : 
     635             :             /*
     636             :              * Loop back to start streaming from the new timeline. Always
     637             :              * start streaming at the beginning of a segment.
     638             :              */
     639           0 :             stream->timeline = newtimeline;
     640           0 :             stream->startpos = stream->startpos -
     641           0 :                 XLogSegmentOffset(stream->startpos, WalSegSz);
     642           0 :             continue;
     643             :         }
     644         138 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     645             :         {
     646         138 :             PQclear(res);
     647             : 
     648             :             /*
     649             :              * End of replication (ie. controlled shut down of the server).
     650             :              *
     651             :              * Check if the callback thinks it's OK to stop here. If not,
     652             :              * complain.
     653             :              */
     654         138 :             if (stream->stream_stop(stoppos, stream->timeline, false))
     655         138 :                 return true;
     656             :             else
     657             :             {
     658           0 :                 pg_log_error("replication stream was terminated before stop point");
     659           0 :                 goto error;
     660             :             }
     661             :         }
     662             :         else
     663             :         {
     664             :             /* Server returned an error. */
     665           0 :             pg_log_error("unexpected termination of replication stream: %s",
     666             :                          PQresultErrorMessage(res));
     667           0 :             PQclear(res);
     668           0 :             goto error;
     669             :         }
     670             :     }
     671             : 
     672           0 : error:
     673           0 :     if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
     674           0 :         pg_log_error("could not close file \"%s\": %s",
     675             :                      current_walfile_name, stream->walmethod->getlasterror());
     676           0 :     walfile = NULL;
     677           0 :     return false;
     678             : }
     679             : 
     680             : /*
     681             :  * Helper function to parse the result set returned by server after streaming
     682             :  * has finished. On failure, prints an error to stderr and returns false.
     683             :  */
     684             : static bool
     685           0 : ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
     686             : {
     687             :     uint32      startpos_xlogid,
     688             :                 startpos_xrecoff;
     689             : 
     690             :     /*----------
     691             :      * The result set consists of one row and two columns, e.g:
     692             :      *
     693             :      *  next_tli | next_tli_startpos
     694             :      * ----------+-------------------
     695             :      *         4 | 0/9949AE0
     696             :      *
     697             :      * next_tli is the timeline ID of the next timeline after the one that
     698             :      * just finished streaming. next_tli_startpos is the WAL location where
     699             :      * the server switched to it.
     700             :      *----------
     701             :      */
     702           0 :     if (PQnfields(res) < 2 || PQntuples(res) != 1)
     703             :     {
     704           0 :         pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
     705             :                      PQntuples(res), PQnfields(res), 1, 2);
     706           0 :         return false;
     707             :     }
     708             : 
     709           0 :     *timeline = atoi(PQgetvalue(res, 0, 0));
     710           0 :     if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
     711             :                &startpos_xrecoff) != 2)
     712             :     {
     713           0 :         pg_log_error("could not parse next timeline's starting point \"%s\"",
     714             :                      PQgetvalue(res, 0, 1));
     715           0 :         return false;
     716             :     }
     717           0 :     *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
     718             : 
     719           0 :     return true;
     720             : }
     721             : 
     722             : /*
     723             :  * The main loop of ReceiveXlogStream. Handles the COPY stream after
     724             :  * initiating streaming with the START_REPLICATION command.
     725             :  *
     726             :  * If the COPY ends (not necessarily successfully) due a message from the
     727             :  * server, returns a PGresult and sets *stoppos to the last byte written.
     728             :  * On any other sort of error, returns NULL.
     729             :  */
     730             : static PGresult *
     731         138 : HandleCopyStream(PGconn *conn, StreamCtl *stream,
     732             :                  XLogRecPtr *stoppos)
     733             : {
     734         138 :     char       *copybuf = NULL;
     735         138 :     TimestampTz last_status = -1;
     736         138 :     XLogRecPtr  blockpos = stream->startpos;
     737             : 
     738         138 :     still_sending = true;
     739             : 
     740             :     while (1)
     741        1428 :     {
     742             :         int         r;
     743             :         TimestampTz now;
     744             :         long        sleeptime;
     745             : 
     746             :         /*
     747             :          * Check if we should continue streaming, or abort at this point.
     748             :          */
     749        1566 :         if (!CheckCopyStreamStop(conn, stream, blockpos))
     750           0 :             goto error;
     751             : 
     752        1566 :         now = feGetCurrentTimestamp();
     753             : 
     754             :         /*
     755             :          * If synchronous option is true, issue sync command as soon as there
     756             :          * are WAL data which has not been flushed yet.
     757             :          */
     758        1566 :         if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
     759             :         {
     760           0 :             if (stream->walmethod->sync(walfile) != 0)
     761             :             {
     762           0 :                 pg_log_fatal("could not fsync file \"%s\": %s",
     763             :                              current_walfile_name, stream->walmethod->getlasterror());
     764           0 :                 exit(1);
     765             :             }
     766           0 :             lastFlushPosition = blockpos;
     767             : 
     768             :             /*
     769             :              * Send feedback so that the server sees the latest WAL locations
     770             :              * immediately.
     771             :              */
     772           0 :             if (!sendFeedback(conn, blockpos, now, false))
     773           0 :                 goto error;
     774           0 :             last_status = now;
     775             :         }
     776             : 
     777             :         /*
     778             :          * Potentially send a status message to the primary
     779             :          */
     780        3050 :         if (still_sending && stream->standby_message_timeout > 0 &&
     781        1484 :             feTimestampDifferenceExceeds(last_status, now,
     782             :                                          stream->standby_message_timeout))
     783             :         {
     784             :             /* Time to send feedback! */
     785         138 :             if (!sendFeedback(conn, blockpos, now, false))
     786           0 :                 goto error;
     787         138 :             last_status = now;
     788             :         }
     789             : 
     790             :         /*
     791             :          * Calculate how long send/receive loops should sleep
     792             :          */
     793        1566 :         sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
     794             :                                                  last_status);
     795             : 
     796        1566 :         r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
     797        6570 :         while (r != 0)
     798             :         {
     799        5142 :             if (r == -1)
     800           0 :                 goto error;
     801        5142 :             if (r == -2)
     802             :             {
     803         138 :                 PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
     804             : 
     805         138 :                 if (res == NULL)
     806           0 :                     goto error;
     807             :                 else
     808         138 :                     return res;
     809             :             }
     810             : 
     811             :             /* Check the message type. */
     812        5004 :             if (copybuf[0] == 'k')
     813             :             {
     814           0 :                 if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
     815             :                                          &last_status))
     816           0 :                     goto error;
     817             :             }
     818        5004 :             else if (copybuf[0] == 'w')
     819             :             {
     820        5004 :                 if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
     821           0 :                     goto error;
     822             : 
     823             :                 /*
     824             :                  * Check if we should continue streaming, or abort at this
     825             :                  * point.
     826             :                  */
     827        5004 :                 if (!CheckCopyStreamStop(conn, stream, blockpos))
     828           0 :                     goto error;
     829             :             }
     830             :             else
     831             :             {
     832           0 :                 pg_log_error("unrecognized streaming header: \"%c\"",
     833             :                              copybuf[0]);
     834           0 :                 goto error;
     835             :             }
     836             : 
     837             :             /*
     838             :              * Process the received data, and any subsequent data we can read
     839             :              * without blocking.
     840             :              */
     841        5004 :             r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
     842             :         }
     843             :     }
     844             : 
     845           0 : error:
     846           0 :     if (copybuf != NULL)
     847           0 :         PQfreemem(copybuf);
     848           0 :     return NULL;
     849             : }
     850             : 
     851             : /*
     852             :  * Wait until we can read a CopyData message,
     853             :  * or timeout, or occurrence of a signal or input on the stop_socket.
     854             :  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
     855             :  *
     856             :  * Returns 1 if data has become available for reading, 0 if timed out
     857             :  * or interrupted by signal or stop_socket input, and -1 on an error.
     858             :  */
     859             : static int
     860        6402 : CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
     861             : {
     862             :     int         ret;
     863             :     fd_set      input_mask;
     864             :     int         connsocket;
     865             :     int         maxfd;
     866             :     struct timeval timeout;
     867             :     struct timeval *timeoutptr;
     868             : 
     869        6402 :     connsocket = PQsocket(conn);
     870        6402 :     if (connsocket < 0)
     871             :     {
     872           0 :         pg_log_error("invalid socket: %s", PQerrorMessage(conn));
     873           0 :         return -1;
     874             :     }
     875             : 
     876        6402 :     FD_ZERO(&input_mask);
     877        6402 :     FD_SET(connsocket, &input_mask);
     878        6402 :     maxfd = connsocket;
     879        6402 :     if (stop_socket != PGINVALID_SOCKET)
     880             :     {
     881        6396 :         FD_SET(stop_socket, &input_mask);
     882        6396 :         maxfd = Max(maxfd, stop_socket);
     883             :     }
     884             : 
     885        6402 :     if (timeout_ms < 0)
     886          82 :         timeoutptr = NULL;
     887             :     else
     888             :     {
     889        6320 :         timeout.tv_sec = timeout_ms / 1000L;
     890        6320 :         timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
     891        6320 :         timeoutptr = &timeout;
     892             :     }
     893             : 
     894        6402 :     ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
     895             : 
     896        6402 :     if (ret < 0)
     897             :     {
     898           0 :         if (errno == EINTR)
     899           0 :             return 0;           /* Got a signal, so not an error */
     900           0 :         pg_log_error("select() failed: %m");
     901           0 :         return -1;
     902             :     }
     903        6402 :     if (ret > 0 && FD_ISSET(connsocket, &input_mask))
     904        5182 :         return 1;               /* Got input on connection socket */
     905             : 
     906        1220 :     return 0;                   /* Got timeout or input on stop_socket */
     907             : }
     908             : 
     909             : /*
     910             :  * Receive CopyData message available from XLOG stream, blocking for
     911             :  * maximum of 'timeout' ms.
     912             :  *
     913             :  * If data was received, returns the length of the data. *buffer is set to
     914             :  * point to a buffer holding the received message. The buffer is only valid
     915             :  * until the next CopyStreamReceive call.
     916             :  *
     917             :  * Returns 0 if no data was available within timeout, or if wait was
     918             :  * interrupted by signal or stop_socket input.
     919             :  * -1 on error. -2 if the server ended the COPY.
     920             :  */
     921             : static int
     922        6570 : CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
     923             :                   char **buffer)
     924             : {
     925        6570 :     char       *copybuf = NULL;
     926             :     int         rawlen;
     927             : 
     928        6570 :     if (*buffer != NULL)
     929        5004 :         PQfreemem(*buffer);
     930        6570 :     *buffer = NULL;
     931             : 
     932             :     /* Try to receive a CopyData message */
     933        6570 :     rawlen = PQgetCopyData(conn, &copybuf, 1);
     934        6570 :     if (rawlen == 0)
     935             :     {
     936             :         int         ret;
     937             : 
     938             :         /*
     939             :          * No data available.  Wait for some to appear, but not longer than
     940             :          * the specified timeout, so that we can ping the server.  Also stop
     941             :          * waiting if input appears on stop_socket.
     942             :          */
     943        6402 :         ret = CopyStreamPoll(conn, timeout, stop_socket);
     944        6402 :         if (ret <= 0)
     945        1220 :             return ret;
     946             : 
     947             :         /* Now there is actually data on the socket */
     948        5182 :         if (PQconsumeInput(conn) == 0)
     949             :         {
     950           0 :             pg_log_error("could not receive data from WAL stream: %s",
     951             :                          PQerrorMessage(conn));
     952           0 :             return -1;
     953             :         }
     954             : 
     955             :         /* Now that we've consumed some input, try again */
     956        5182 :         rawlen = PQgetCopyData(conn, &copybuf, 1);
     957        5182 :         if (rawlen == 0)
     958         208 :             return 0;
     959             :     }
     960        5142 :     if (rawlen == -1)           /* end-of-streaming or error */
     961         138 :         return -2;
     962        5004 :     if (rawlen == -2)
     963             :     {
     964           0 :         pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
     965           0 :         return -1;
     966             :     }
     967             : 
     968             :     /* Return received messages to caller */
     969        5004 :     *buffer = copybuf;
     970        5004 :     return rawlen;
     971             : }
     972             : 
     973             : /*
     974             :  * Process the keepalive message.
     975             :  */
     976             : static bool
     977           0 : ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
     978             :                     XLogRecPtr blockpos, TimestampTz *last_status)
     979             : {
     980             :     int         pos;
     981             :     bool        replyRequested;
     982             :     TimestampTz now;
     983             : 
     984             :     /*
     985             :      * Parse the keepalive message, enclosed in the CopyData message. We just
     986             :      * check if the server requested a reply, and ignore the rest.
     987             :      */
     988           0 :     pos = 1;                    /* skip msgtype 'k' */
     989           0 :     pos += 8;                   /* skip walEnd */
     990           0 :     pos += 8;                   /* skip sendTime */
     991             : 
     992           0 :     if (len < pos + 1)
     993             :     {
     994           0 :         pg_log_error("streaming header too small: %d", len);
     995           0 :         return false;
     996             :     }
     997           0 :     replyRequested = copybuf[pos];
     998             : 
     999             :     /* If the server requested an immediate reply, send one. */
    1000           0 :     if (replyRequested && still_sending)
    1001             :     {
    1002           0 :         if (reportFlushPosition && lastFlushPosition < blockpos &&
    1003           0 :             walfile != NULL)
    1004             :         {
    1005             :             /*
    1006             :              * If a valid flush location needs to be reported, flush the
    1007             :              * current WAL file so that the latest flush location is sent back
    1008             :              * to the server. This is necessary to see whether the last WAL
    1009             :              * data has been successfully replicated or not, at the normal
    1010             :              * shutdown of the server.
    1011             :              */
    1012           0 :             if (stream->walmethod->sync(walfile) != 0)
    1013             :             {
    1014           0 :                 pg_log_fatal("could not fsync file \"%s\": %s",
    1015             :                              current_walfile_name, stream->walmethod->getlasterror());
    1016           0 :                 exit(1);
    1017             :             }
    1018           0 :             lastFlushPosition = blockpos;
    1019             :         }
    1020             : 
    1021           0 :         now = feGetCurrentTimestamp();
    1022           0 :         if (!sendFeedback(conn, blockpos, now, false))
    1023           0 :             return false;
    1024           0 :         *last_status = now;
    1025             :     }
    1026             : 
    1027           0 :     return true;
    1028             : }
    1029             : 
    1030             : /*
    1031             :  * Process XLogData message.
    1032             :  */
    1033             : static bool
    1034        5004 : ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
    1035             :                    XLogRecPtr *blockpos)
    1036             : {
    1037             :     int         xlogoff;
    1038             :     int         bytes_left;
    1039             :     int         bytes_written;
    1040             :     int         hdr_len;
    1041             : 
    1042             :     /*
    1043             :      * Once we've decided we don't want to receive any more, just ignore any
    1044             :      * subsequent XLogData messages.
    1045             :      */
    1046        5004 :     if (!(still_sending))
    1047         220 :         return true;
    1048             : 
    1049             :     /*
    1050             :      * Read the header of the XLogData message, enclosed in the CopyData
    1051             :      * message. We only need the WAL location field (dataStart), the rest of
    1052             :      * the header is ignored.
    1053             :      */
    1054        4784 :     hdr_len = 1;                /* msgtype 'w' */
    1055        4784 :     hdr_len += 8;               /* dataStart */
    1056        4784 :     hdr_len += 8;               /* walEnd */
    1057        4784 :     hdr_len += 8;               /* sendTime */
    1058        4784 :     if (len < hdr_len)
    1059             :     {
    1060           0 :         pg_log_error("streaming header too small: %d", len);
    1061           0 :         return false;
    1062             :     }
    1063        4784 :     *blockpos = fe_recvint64(&copybuf[1]);
    1064             : 
    1065             :     /* Extract WAL location for this block */
    1066        4784 :     xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
    1067             : 
    1068             :     /*
    1069             :      * Verify that the initial location in the stream matches where we think
    1070             :      * we are.
    1071             :      */
    1072        4784 :     if (walfile == NULL)
    1073             :     {
    1074             :         /* No file open yet */
    1075         138 :         if (xlogoff != 0)
    1076             :         {
    1077           0 :             pg_log_error("received write-ahead log record for offset %u with no file open",
    1078             :                          xlogoff);
    1079           0 :             return false;
    1080             :         }
    1081             :     }
    1082             :     else
    1083             :     {
    1084             :         /* More data in existing segment */
    1085        4646 :         if (stream->walmethod->get_current_pos(walfile) != xlogoff)
    1086             :         {
    1087           0 :             pg_log_error("got WAL data offset %08x, expected %08x",
    1088             :                          xlogoff, (int) stream->walmethod->get_current_pos(walfile));
    1089           0 :             return false;
    1090             :         }
    1091             :     }
    1092             : 
    1093        4784 :     bytes_left = len - hdr_len;
    1094        4784 :     bytes_written = 0;
    1095             : 
    1096        9568 :     while (bytes_left)
    1097             :     {
    1098             :         int         bytes_to_write;
    1099             : 
    1100             :         /*
    1101             :          * If crossing a WAL boundary, only write up until we reach wal
    1102             :          * segment size.
    1103             :          */
    1104        4784 :         if (xlogoff + bytes_left > WalSegSz)
    1105           0 :             bytes_to_write = WalSegSz - xlogoff;
    1106             :         else
    1107        4784 :             bytes_to_write = bytes_left;
    1108             : 
    1109        4784 :         if (walfile == NULL)
    1110             :         {
    1111         138 :             if (!open_walfile(stream, *blockpos))
    1112             :             {
    1113             :                 /* Error logged by open_walfile */
    1114           0 :                 return false;
    1115             :             }
    1116             :         }
    1117             : 
    1118        4784 :         if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
    1119        4784 :                                      bytes_to_write) != bytes_to_write)
    1120             :         {
    1121           0 :             pg_log_error("could not write %u bytes to WAL file \"%s\": %s",
    1122             :                          bytes_to_write, current_walfile_name,
    1123             :                          stream->walmethod->getlasterror());
    1124           0 :             return false;
    1125             :         }
    1126             : 
    1127             :         /* Write was successful, advance our position */
    1128        4784 :         bytes_written += bytes_to_write;
    1129        4784 :         bytes_left -= bytes_to_write;
    1130        4784 :         *blockpos += bytes_to_write;
    1131        4784 :         xlogoff += bytes_to_write;
    1132             : 
    1133             :         /* Did we reach the end of a WAL segment? */
    1134        4784 :         if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
    1135             :         {
    1136          24 :             if (!close_walfile(stream, *blockpos))
    1137             :                 /* Error message written in close_walfile() */
    1138           0 :                 return false;
    1139             : 
    1140          24 :             xlogoff = 0;
    1141             : 
    1142          24 :             if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
    1143             :             {
    1144           0 :                 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1145             :                 {
    1146           0 :                     pg_log_error("could not send copy-end packet: %s",
    1147             :                                  PQerrorMessage(conn));
    1148           0 :                     return false;
    1149             :                 }
    1150           0 :                 still_sending = false;
    1151           0 :                 return true;    /* ignore the rest of this XLogData packet */
    1152             :             }
    1153             :         }
    1154             :     }
    1155             :     /* No more data left to write, receive next copy packet */
    1156             : 
    1157        4784 :     return true;
    1158             : }
    1159             : 
    1160             : /*
    1161             :  * Handle end of the copy stream.
    1162             :  */
    1163             : static PGresult *
    1164         138 : HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
    1165             :                       XLogRecPtr blockpos, XLogRecPtr *stoppos)
    1166             : {
    1167         138 :     PGresult   *res = PQgetResult(conn);
    1168             : 
    1169             :     /*
    1170             :      * The server closed its end of the copy stream.  If we haven't closed
    1171             :      * ours already, we need to do so now, unless the server threw an error,
    1172             :      * in which case we don't.
    1173             :      */
    1174         138 :     if (still_sending)
    1175             :     {
    1176           0 :         if (!close_walfile(stream, blockpos))
    1177             :         {
    1178             :             /* Error message written in close_walfile() */
    1179           0 :             PQclear(res);
    1180           0 :             return NULL;
    1181             :         }
    1182           0 :         if (PQresultStatus(res) == PGRES_COPY_IN)
    1183             :         {
    1184           0 :             if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1185             :             {
    1186           0 :                 pg_log_error("could not send copy-end packet: %s",
    1187             :                              PQerrorMessage(conn));
    1188           0 :                 PQclear(res);
    1189           0 :                 return NULL;
    1190             :             }
    1191           0 :             res = PQgetResult(conn);
    1192             :         }
    1193           0 :         still_sending = false;
    1194             :     }
    1195         138 :     if (copybuf != NULL)
    1196           0 :         PQfreemem(copybuf);
    1197         138 :     *stoppos = blockpos;
    1198         138 :     return res;
    1199             : }
    1200             : 
    1201             : /*
    1202             :  * Check if we should continue streaming, or abort at this point.
    1203             :  */
    1204             : static bool
    1205        6570 : CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
    1206             : {
    1207        6570 :     if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
    1208             :     {
    1209         138 :         if (!close_walfile(stream, blockpos))
    1210             :         {
    1211             :             /* Potential error message is written by close_walfile */
    1212           0 :             return false;
    1213             :         }
    1214         138 :         if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1215             :         {
    1216           0 :             pg_log_error("could not send copy-end packet: %s",
    1217             :                          PQerrorMessage(conn));
    1218           0 :             return false;
    1219             :         }
    1220         138 :         still_sending = false;
    1221             :     }
    1222             : 
    1223        6570 :     return true;
    1224             : }
    1225             : 
    1226             : /*
    1227             :  * Calculate how long send/receive loops should sleep
    1228             :  */
    1229             : static long
    1230        1566 : CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
    1231             :                              TimestampTz last_status)
    1232             : {
    1233        1566 :     TimestampTz status_targettime = 0;
    1234             :     long        sleeptime;
    1235             : 
    1236        1566 :     if (standby_message_timeout && still_sending)
    1237        1484 :         status_targettime = last_status +
    1238        1484 :             (standby_message_timeout - 1) * ((int64) 1000);
    1239             : 
    1240        1566 :     if (status_targettime > 0)
    1241             :     {
    1242             :         long        secs;
    1243             :         int         usecs;
    1244             : 
    1245        1484 :         feTimestampDifference(now,
    1246             :                               status_targettime,
    1247             :                               &secs,
    1248             :                               &usecs);
    1249             :         /* Always sleep at least 1 sec */
    1250        1484 :         if (secs <= 0)
    1251             :         {
    1252           0 :             secs = 1;
    1253           0 :             usecs = 0;
    1254             :         }
    1255             : 
    1256        1484 :         sleeptime = secs * 1000 + usecs / 1000;
    1257             :     }
    1258             :     else
    1259          82 :         sleeptime = -1;
    1260             : 
    1261        1566 :     return sleeptime;
    1262             : }

Generated by: LCOV version 1.13