LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - receivelog.c (source / functions) Hit Total Coverage
Test: PostgreSQL 12beta2 Lines: 216 422 51.2 %
Date: 2019-06-19 14:06:47 Functions: 14 17 82.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * receivelog.c - receive WAL files using the streaming
       4             :  *                replication protocol.
       5             :  *
       6             :  * Author: Magnus Hagander <magnus@hagander.net>
       7             :  *
       8             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *        src/bin/pg_basebackup/receivelog.c
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres_fe.h"
      16             : 
      17             : #include <sys/stat.h>
      18             : #include <unistd.h>
      19             : #ifdef HAVE_SYS_SELECT_H
      20             : #include <sys/select.h>
      21             : #endif
      22             : 
      23             : /* local includes */
      24             : #include "receivelog.h"
      25             : #include "streamutil.h"
      26             : 
      27             : #include "libpq-fe.h"
      28             : #include "access/xlog_internal.h"
      29             : #include "common/file_utils.h"
      30             : #include "common/logging.h"
      31             : 
      32             : 
      33             : /* fd and filename for currently open WAL file */
      34             : static Walfile *walfile = NULL;
      35             : static char current_walfile_name[MAXPGPATH] = "";
      36             : static bool reportFlushPosition = false;
      37             : static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
      38             : 
      39             : static bool still_sending = true;   /* feedback still needs to be sent? */
      40             : 
      41             : static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
      42             :                                   XLogRecPtr *stoppos);
      43             : static int  CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
      44             : static int  CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
      45             :                               char **buffer);
      46             : static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
      47             :                                 int len, XLogRecPtr blockpos, TimestampTz *last_status);
      48             : static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
      49             :                                XLogRecPtr *blockpos);
      50             : static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
      51             :                                        XLogRecPtr blockpos, XLogRecPtr *stoppos);
      52             : static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
      53             :                                 XLogRecPtr *stoppos);
      54             : static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
      55             :                                          TimestampTz last_status);
      56             : 
      57             : static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
      58             :                                      uint32 *timeline);
      59             : 
      60             : static bool
      61          12 : mark_file_as_archived(StreamCtl *stream, const char *fname)
      62             : {
      63             :     Walfile    *f;
      64             :     static char tmppath[MAXPGPATH];
      65             : 
      66          12 :     snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
      67             :              fname);
      68             : 
      69          12 :     f = stream->walmethod->open_for_write(tmppath, NULL, 0);
      70          12 :     if (f == NULL)
      71             :     {
      72           0 :         pg_log_error("could not create archive status file \"%s\": %s",
      73             :                      tmppath, stream->walmethod->getlasterror());
      74           0 :         return false;
      75             :     }
      76             : 
      77          12 :     stream->walmethod->close(f, CLOSE_NORMAL);
      78             : 
      79          12 :     return true;
      80             : }
      81             : 
      82             : /*
      83             :  * Open a new WAL file in the specified directory.
      84             :  *
      85             :  * Returns true if OK; on failure, returns false after printing an error msg.
      86             :  * On success, 'walfile' is set to the FD for the file, and the base filename
      87             :  * (without partial_suffix) is stored in 'current_walfile_name'.
      88             :  *
      89             :  * The file will be padded to 16Mb with zeroes.
      90             :  */
      91             : static bool
      92          82 : open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
      93             : {
      94             :     Walfile    *f;
      95             :     char        fn[MAXPGPATH];
      96             :     ssize_t     size;
      97             :     XLogSegNo   segno;
      98             : 
      99          82 :     XLByteToSeg(startpoint, segno, WalSegSz);
     100          82 :     XLogFileName(current_walfile_name, stream->timeline, segno, WalSegSz);
     101             : 
     102          82 :     snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
     103          82 :              stream->partial_suffix ? stream->partial_suffix : "");
     104             : 
     105             :     /*
     106             :      * When streaming to files, if an existing file exists we verify that it's
     107             :      * either empty (just created), or a complete WalSegSz segment (in which
     108             :      * case it has been created and padded). Anything else indicates a corrupt
     109             :      * file.
     110             :      *
     111             :      * When streaming to tar, no file with this name will exist before, so we
     112             :      * never have to verify a size.
     113             :      */
     114          82 :     if (stream->walmethod->existsfile(fn))
     115             :     {
     116           0 :         size = stream->walmethod->get_file_size(fn);
     117           0 :         if (size < 0)
     118             :         {
     119           0 :             pg_log_error("could not get size of write-ahead log file \"%s\": %s",
     120             :                          fn, stream->walmethod->getlasterror());
     121           0 :             return false;
     122             :         }
     123           0 :         if (size == WalSegSz)
     124             :         {
     125             :             /* Already padded file. Open it for use */
     126           0 :             f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
     127           0 :             if (f == NULL)
     128             :             {
     129           0 :                 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
     130             :                              fn, stream->walmethod->getlasterror());
     131           0 :                 return false;
     132             :             }
     133             : 
     134             :             /* fsync file in case of a previous crash */
     135           0 :             if (stream->walmethod->sync(f) != 0)
     136             :             {
     137           0 :                 pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
     138             :                              fn, stream->walmethod->getlasterror());
     139           0 :                 stream->walmethod->close(f, CLOSE_UNLINK);
     140           0 :                 return false;
     141             :             }
     142             : 
     143           0 :             walfile = f;
     144           0 :             return true;
     145             :         }
     146           0 :         if (size != 0)
     147             :         {
     148             :             /* if write didn't set errno, assume problem is no disk space */
     149           0 :             if (errno == 0)
     150           0 :                 errno = ENOSPC;
     151           0 :             pg_log_error(ngettext("write-ahead log file \"%s\" has %d byte, should be 0 or %d",
     152             :                                   "write-ahead log file \"%s\" has %d bytes, should be 0 or %d",
     153             :                                   size),
     154             :                          fn, (int) size, WalSegSz);
     155           0 :             return false;
     156             :         }
     157             :         /* File existed and was empty, so fall through and open */
     158             :     }
     159             : 
     160             :     /* No file existed, so create one */
     161             : 
     162         164 :     f = stream->walmethod->open_for_write(current_walfile_name,
     163          82 :                                           stream->partial_suffix, WalSegSz);
     164          82 :     if (f == NULL)
     165             :     {
     166           0 :         pg_log_error("could not open write-ahead log file \"%s\": %s",
     167             :                      fn, stream->walmethod->getlasterror());
     168           0 :         return false;
     169             :     }
     170             : 
     171          82 :     walfile = f;
     172          82 :     return true;
     173             : }
     174             : 
     175             : /*
     176             :  * Close the current WAL file (if open), and rename it to the correct
     177             :  * filename if it's complete. On failure, prints an error message to stderr
     178             :  * and returns false, otherwise returns true.
     179             :  */
     180             : static bool
     181          84 : close_walfile(StreamCtl *stream, XLogRecPtr pos)
     182             : {
     183             :     off_t       currpos;
     184             :     int         r;
     185             : 
     186          84 :     if (walfile == NULL)
     187           2 :         return true;
     188             : 
     189          82 :     currpos = stream->walmethod->get_current_pos(walfile);
     190          82 :     if (currpos == -1)
     191             :     {
     192           0 :         pg_log_error("could not determine seek position in file \"%s\": %s",
     193             :                      current_walfile_name, stream->walmethod->getlasterror());
     194           0 :         stream->walmethod->close(walfile, CLOSE_UNLINK);
     195           0 :         walfile = NULL;
     196             : 
     197           0 :         return false;
     198             :     }
     199             : 
     200          82 :     if (stream->partial_suffix)
     201             :     {
     202           2 :         if (currpos == WalSegSz)
     203           0 :             r = stream->walmethod->close(walfile, CLOSE_NORMAL);
     204             :         else
     205             :         {
     206           2 :             pg_log_info("not renaming \"%s%s\", segment is not complete",
     207             :                         current_walfile_name, stream->partial_suffix);
     208           2 :             r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
     209             :         }
     210             :     }
     211             :     else
     212          80 :         r = stream->walmethod->close(walfile, CLOSE_NORMAL);
     213             : 
     214          82 :     walfile = NULL;
     215             : 
     216          82 :     if (r != 0)
     217             :     {
     218           0 :         pg_log_error("could not close file \"%s\": %s",
     219             :                      current_walfile_name, stream->walmethod->getlasterror());
     220           0 :         return false;
     221             :     }
     222             : 
     223             :     /*
     224             :      * Mark file as archived if requested by the caller - pg_basebackup needs
     225             :      * to do so as files can otherwise get archived again after promotion of a
     226             :      * new node. This is in line with walreceiver.c always doing a
     227             :      * XLogArchiveForceDone() after a complete segment.
     228             :      */
     229          82 :     if (currpos == WalSegSz && stream->mark_done)
     230             :     {
     231             :         /* writes error message if failed */
     232          12 :         if (!mark_file_as_archived(stream, current_walfile_name))
     233           0 :             return false;
     234             :     }
     235             : 
     236          82 :     lastFlushPosition = pos;
     237          82 :     return true;
     238             : }
     239             : 
     240             : 
     241             : /*
     242             :  * Check if a timeline history file exists.
     243             :  */
     244             : static bool
     245          84 : existsTimeLineHistoryFile(StreamCtl *stream)
     246             : {
     247             :     char        histfname[MAXFNAMELEN];
     248             : 
     249             :     /*
     250             :      * Timeline 1 never has a history file. We treat that as if it existed,
     251             :      * since we never need to stream it.
     252             :      */
     253          84 :     if (stream->timeline == 1)
     254          84 :         return true;
     255             : 
     256           0 :     TLHistoryFileName(histfname, stream->timeline);
     257             : 
     258           0 :     return stream->walmethod->existsfile(histfname);
     259             : }
     260             : 
     261             : static bool
     262           0 : writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
     263             : {
     264           0 :     int         size = strlen(content);
     265             :     char        histfname[MAXFNAMELEN];
     266             :     Walfile    *f;
     267             : 
     268             :     /*
     269             :      * Check that the server's idea of how timeline history files should be
     270             :      * named matches ours.
     271             :      */
     272           0 :     TLHistoryFileName(histfname, stream->timeline);
     273           0 :     if (strcmp(histfname, filename) != 0)
     274             :     {
     275           0 :         pg_log_error("server reported unexpected history file name for timeline %u: %s",
     276             :                      stream->timeline, filename);
     277           0 :         return false;
     278             :     }
     279             : 
     280           0 :     f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
     281           0 :     if (f == NULL)
     282             :     {
     283           0 :         pg_log_error("could not create timeline history file \"%s\": %s",
     284             :                      histfname, stream->walmethod->getlasterror());
     285           0 :         return false;
     286             :     }
     287             : 
     288           0 :     if ((int) stream->walmethod->write(f, content, size) != size)
     289             :     {
     290           0 :         pg_log_error("could not write timeline history file \"%s\": %s",
     291             :                      histfname, stream->walmethod->getlasterror());
     292             : 
     293             :         /*
     294             :          * If we fail to make the file, delete it to release disk space
     295             :          */
     296           0 :         stream->walmethod->close(f, CLOSE_UNLINK);
     297             : 
     298           0 :         return false;
     299             :     }
     300             : 
     301           0 :     if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
     302             :     {
     303           0 :         pg_log_error("could not close file \"%s\": %s",
     304             :                      histfname, stream->walmethod->getlasterror());
     305           0 :         return false;
     306             :     }
     307             : 
     308             :     /* Maintain archive_status, check close_walfile() for details. */
     309           0 :     if (stream->mark_done)
     310             :     {
     311             :         /* writes error message if failed */
     312           0 :         if (!mark_file_as_archived(stream, histfname))
     313           0 :             return false;
     314             :     }
     315             : 
     316           0 :     return true;
     317             : }
     318             : 
     319             : /*
     320             :  * Send a Standby Status Update message to server.
     321             :  */
     322             : static bool
     323          82 : sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
     324             : {
     325             :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
     326          82 :     int         len = 0;
     327             : 
     328          82 :     replybuf[len] = 'r';
     329          82 :     len += 1;
     330          82 :     fe_sendint64(blockpos, &replybuf[len]); /* write */
     331          82 :     len += 8;
     332          82 :     if (reportFlushPosition)
     333          80 :         fe_sendint64(lastFlushPosition, &replybuf[len]);    /* flush */
     334             :     else
     335           2 :         fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* flush */
     336          82 :     len += 8;
     337          82 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
     338          82 :     len += 8;
     339          82 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
     340          82 :     len += 8;
     341          82 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
     342          82 :     len += 1;
     343             : 
     344          82 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
     345             :     {
     346           0 :         pg_log_error("could not send feedback packet: %s",
     347             :                      PQerrorMessage(conn));
     348           0 :         return false;
     349             :     }
     350             : 
     351          82 :     return true;
     352             : }
     353             : 
     354             : /*
     355             :  * Check that the server version we're connected to is supported by
     356             :  * ReceiveXlogStream().
     357             :  *
     358             :  * If it's not, an error message is printed to stderr, and false is returned.
     359             :  */
     360             : bool
     361         180 : CheckServerVersionForStreaming(PGconn *conn)
     362             : {
     363             :     int         minServerMajor,
     364             :                 maxServerMajor;
     365             :     int         serverMajor;
     366             : 
     367             :     /*
     368             :      * The message format used in streaming replication changed in 9.3, so we
     369             :      * cannot stream from older servers. And we don't support servers newer
     370             :      * than the client; it might work, but we don't know, so err on the safe
     371             :      * side.
     372             :      */
     373         180 :     minServerMajor = 903;
     374         180 :     maxServerMajor = PG_VERSION_NUM / 100;
     375         180 :     serverMajor = PQserverVersion(conn) / 100;
     376         180 :     if (serverMajor < minServerMajor)
     377             :     {
     378           0 :         const char *serverver = PQparameterStatus(conn, "server_version");
     379             : 
     380           0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
     381             :                      serverver ? serverver : "'unknown'",
     382             :                      "9.3");
     383           0 :         return false;
     384             :     }
     385         180 :     else if (serverMajor > maxServerMajor)
     386             :     {
     387           0 :         const char *serverver = PQparameterStatus(conn, "server_version");
     388             : 
     389           0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
     390             :                      serverver ? serverver : "'unknown'",
     391             :                      PG_VERSION);
     392           0 :         return false;
     393             :     }
     394         180 :     return true;
     395             : }
     396             : 
     397             : /*
     398             :  * Receive a log stream starting at the specified position.
     399             :  *
     400             :  * Individual parameters are passed through the StreamCtl structure.
     401             :  *
     402             :  * If sysidentifier is specified, validate that both the system
     403             :  * identifier and the timeline matches the specified ones
     404             :  * (by sending an extra IDENTIFY_SYSTEM command)
     405             :  *
     406             :  * All received segments will be written to the directory
     407             :  * specified by basedir. This will also fetch any missing timeline history
     408             :  * files.
     409             :  *
     410             :  * The stream_stop callback will be called every time data
     411             :  * is received, and whenever a segment is completed. If it returns
     412             :  * true, the streaming will stop and the function
     413             :  * return. As long as it returns false, streaming will continue
     414             :  * indefinitely.
     415             :  *
     416             :  * If stream_stop() checks for external input, stop_socket should be set to
     417             :  * the FD it checks.  This will allow such input to be detected promptly
     418             :  * rather than after standby_message_timeout (which might be indefinite).
     419             :  * Note that signals will interrupt waits for input as well, but that is
     420             :  * race-y since a signal received while busy won't interrupt the wait.
     421             :  *
     422             :  * standby_message_timeout controls how often we send a message
     423             :  * back to the master letting it know our progress, in milliseconds.
     424             :  * Zero means no messages are sent.
     425             :  * This message will only contain the write location, and never
     426             :  * flush or replay.
     427             :  *
     428             :  * If 'partial_suffix' is not NULL, files are initially created with the
     429             :  * given suffix, and the suffix is removed once the file is finished. That
     430             :  * allows you to tell the difference between partial and completed files,
     431             :  * so that you can continue later where you left.
     432             :  *
     433             :  * If 'synchronous' is true, the received WAL is flushed as soon as written,
     434             :  * otherwise only when the WAL file is closed.
     435             :  *
     436             :  * Note: The WAL location *must* be at a log segment start!
     437             :  */
     438             : bool
     439          84 : ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
     440             : {
     441             :     char        query[128];
     442             :     char        slotcmd[128];
     443             :     PGresult   *res;
     444             :     XLogRecPtr  stoppos;
     445             : 
     446             :     /*
     447             :      * The caller should've checked the server version already, but doesn't do
     448             :      * any harm to check it here too.
     449             :      */
     450          84 :     if (!CheckServerVersionForStreaming(conn))
     451           0 :         return false;
     452             : 
     453             :     /*
     454             :      * Decide whether we want to report the flush position. If we report the
     455             :      * flush position, the primary will know what WAL we'll possibly
     456             :      * re-request, and it can then remove older WAL safely. We must always do
     457             :      * that when we are using slots.
     458             :      *
     459             :      * Reporting the flush position makes one eligible as a synchronous
     460             :      * replica. People shouldn't include generic names in
     461             :      * synchronous_standby_names, but we've protected them against it so far,
     462             :      * so let's continue to do so unless specifically requested.
     463             :      */
     464          84 :     if (stream->replication_slot != NULL)
     465             :     {
     466          80 :         reportFlushPosition = true;
     467          80 :         sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
     468             :     }
     469             :     else
     470             :     {
     471           4 :         if (stream->synchronous)
     472           2 :             reportFlushPosition = true;
     473             :         else
     474           2 :             reportFlushPosition = false;
     475           4 :         slotcmd[0] = 0;
     476             :     }
     477             : 
     478          84 :     if (stream->sysidentifier != NULL)
     479             :     {
     480             :         /* Validate system identifier hasn't changed */
     481          82 :         res = PQexec(conn, "IDENTIFY_SYSTEM");
     482          82 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     483             :         {
     484           0 :             pg_log_error("could not send replication command \"%s\": %s",
     485             :                          "IDENTIFY_SYSTEM", PQerrorMessage(conn));
     486           0 :             PQclear(res);
     487           0 :             return false;
     488             :         }
     489          82 :         if (PQntuples(res) != 1 || PQnfields(res) < 3)
     490             :         {
     491           0 :             pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     492             :                          PQntuples(res), PQnfields(res), 1, 3);
     493           0 :             PQclear(res);
     494           0 :             return false;
     495             :         }
     496          82 :         if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
     497             :         {
     498           0 :             pg_log_error("system identifier does not match between base backup and streaming connection");
     499           0 :             PQclear(res);
     500           0 :             return false;
     501             :         }
     502          82 :         if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
     503             :         {
     504           0 :             pg_log_error("starting timeline %u is not present in the server",
     505             :                          stream->timeline);
     506           0 :             PQclear(res);
     507           0 :             return false;
     508             :         }
     509          82 :         PQclear(res);
     510             :     }
     511             : 
     512             :     /*
     513             :      * initialize flush position to starting point, it's the caller's
     514             :      * responsibility that that's sane.
     515             :      */
     516          84 :     lastFlushPosition = stream->startpos;
     517             : 
     518             :     while (1)
     519             :     {
     520             :         /*
     521             :          * Fetch the timeline history file for this timeline, if we don't have
     522             :          * it already. When streaming log to tar, this will always return
     523             :          * false, as we are never streaming into an existing file and
     524             :          * therefore there can be no pre-existing timeline history file.
     525             :          */
     526          84 :         if (!existsTimeLineHistoryFile(stream))
     527             :         {
     528           0 :             snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
     529           0 :             res = PQexec(conn, query);
     530           0 :             if (PQresultStatus(res) != PGRES_TUPLES_OK)
     531             :             {
     532             :                 /* FIXME: we might send it ok, but get an error */
     533           0 :                 pg_log_error("could not send replication command \"%s\": %s",
     534             :                              "TIMELINE_HISTORY", PQresultErrorMessage(res));
     535           0 :                 PQclear(res);
     536           0 :                 return false;
     537             :             }
     538             : 
     539             :             /*
     540             :              * The response to TIMELINE_HISTORY is a single row result set
     541             :              * with two fields: filename and content
     542             :              */
     543           0 :             if (PQnfields(res) != 2 || PQntuples(res) != 1)
     544             :             {
     545           0 :                 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
     546             :                                PQntuples(res), PQnfields(res), 1, 2);
     547             :             }
     548             : 
     549             :             /* Write the history file to disk */
     550           0 :             writeTimeLineHistoryFile(stream,
     551             :                                      PQgetvalue(res, 0, 0),
     552             :                                      PQgetvalue(res, 0, 1));
     553             : 
     554           0 :             PQclear(res);
     555             :         }
     556             : 
     557             :         /*
     558             :          * Before we start streaming from the requested location, check if the
     559             :          * callback tells us to stop here.
     560             :          */
     561          84 :         if (stream->stream_stop(stream->startpos, stream->timeline, false))
     562           0 :             return true;
     563             : 
     564             :         /* Initiate the replication stream at specified location */
     565         252 :         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
     566             :                  slotcmd,
     567         168 :                  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
     568             :                  stream->timeline);
     569          84 :         res = PQexec(conn, query);
     570          84 :         if (PQresultStatus(res) != PGRES_COPY_BOTH)
     571             :         {
     572           2 :             pg_log_error("could not send replication command \"%s\": %s",
     573             :                          "START_REPLICATION", PQresultErrorMessage(res));
     574           2 :             PQclear(res);
     575           2 :             return false;
     576             :         }
     577          82 :         PQclear(res);
     578             : 
     579             :         /* Stream the WAL */
     580          82 :         res = HandleCopyStream(conn, stream, &stoppos);
     581          82 :         if (res == NULL)
     582           0 :             goto error;
     583             : 
     584             :         /*
     585             :          * Streaming finished.
     586             :          *
     587             :          * There are two possible reasons for that: a controlled shutdown, or
     588             :          * we reached the end of the current timeline. In case of
     589             :          * end-of-timeline, the server sends a result set after Copy has
     590             :          * finished, containing information about the next timeline. Read
     591             :          * that, and restart streaming from the next timeline. In case of
     592             :          * controlled shutdown, stop here.
     593             :          */
     594          82 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
     595             :         {
     596             :             /*
     597             :              * End-of-timeline. Read the next timeline's ID and starting
     598             :              * position. Usually, the starting position will match the end of
     599             :              * the previous timeline, but there are corner cases like if the
     600             :              * server had sent us half of a WAL record, when it was promoted.
     601             :              * The new timeline will begin at the end of the last complete
     602             :              * record in that case, overlapping the partial WAL record on the
     603             :              * old timeline.
     604             :              */
     605             :             uint32      newtimeline;
     606             :             bool        parsed;
     607             : 
     608           0 :             parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
     609           0 :             PQclear(res);
     610           0 :             if (!parsed)
     611           0 :                 goto error;
     612             : 
     613             :             /* Sanity check the values the server gave us */
     614           0 :             if (newtimeline <= stream->timeline)
     615             :             {
     616           0 :                 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
     617             :                              newtimeline, stream->timeline);
     618           0 :                 goto error;
     619             :             }
     620           0 :             if (stream->startpos > stoppos)
     621             :             {
     622           0 :                 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
     623             :                              stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
     624             :                              newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
     625           0 :                 goto error;
     626             :             }
     627             : 
     628             :             /* Read the final result, which should be CommandComplete. */
     629           0 :             res = PQgetResult(conn);
     630           0 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
     631             :             {
     632           0 :                 pg_log_error("unexpected termination of replication stream: %s",
     633             :                              PQresultErrorMessage(res));
     634           0 :                 PQclear(res);
     635           0 :                 goto error;
     636             :             }
     637           0 :             PQclear(res);
     638             : 
     639             :             /*
     640             :              * Loop back to start streaming from the new timeline. Always
     641             :              * start streaming at the beginning of a segment.
     642             :              */
     643           0 :             stream->timeline = newtimeline;
     644           0 :             stream->startpos = stream->startpos -
     645           0 :                 XLogSegmentOffset(stream->startpos, WalSegSz);
     646           0 :             continue;
     647             :         }
     648          82 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     649             :         {
     650          82 :             PQclear(res);
     651             : 
     652             :             /*
     653             :              * End of replication (ie. controlled shut down of the server).
     654             :              *
     655             :              * Check if the callback thinks it's OK to stop here. If not,
     656             :              * complain.
     657             :              */
     658          82 :             if (stream->stream_stop(stoppos, stream->timeline, false))
     659          82 :                 return true;
     660             :             else
     661             :             {
     662           0 :                 pg_log_error("replication stream was terminated before stop point");
     663           0 :                 goto error;
     664             :             }
     665             :         }
     666             :         else
     667             :         {
     668             :             /* Server returned an error. */
     669           0 :             pg_log_error("unexpected termination of replication stream: %s",
     670             :                          PQresultErrorMessage(res));
     671           0 :             PQclear(res);
     672           0 :             goto error;
     673             :         }
     674             :     }
     675             : 
     676             : error:
     677           0 :     if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
     678           0 :         pg_log_error("could not close file \"%s\": %s",
     679             :                      current_walfile_name, stream->walmethod->getlasterror());
     680           0 :     walfile = NULL;
     681           0 :     return false;
     682             : }
     683             : 
     684             : /*
     685             :  * Helper function to parse the result set returned by server after streaming
     686             :  * has finished. On failure, prints an error to stderr and returns false.
     687             :  */
     688             : static bool
     689           0 : ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
     690             : {
     691             :     uint32      startpos_xlogid,
     692             :                 startpos_xrecoff;
     693             : 
     694             :     /*----------
     695             :      * The result set consists of one row and two columns, e.g:
     696             :      *
     697             :      *  next_tli | next_tli_startpos
     698             :      * ----------+-------------------
     699             :      *         4 | 0/9949AE0
     700             :      *
     701             :      * next_tli is the timeline ID of the next timeline after the one that
     702             :      * just finished streaming. next_tli_startpos is the WAL location where
     703             :      * the server switched to it.
     704             :      *----------
     705             :      */
     706           0 :     if (PQnfields(res) < 2 || PQntuples(res) != 1)
     707             :     {
     708           0 :         pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
     709             :                      PQntuples(res), PQnfields(res), 1, 2);
     710           0 :         return false;
     711             :     }
     712             : 
     713           0 :     *timeline = atoi(PQgetvalue(res, 0, 0));
     714           0 :     if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
     715             :                &startpos_xrecoff) != 2)
     716             :     {
     717           0 :         pg_log_error("could not parse next timeline's starting point \"%s\"",
     718             :                      PQgetvalue(res, 0, 1));
     719           0 :         return false;
     720             :     }
     721           0 :     *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
     722             : 
     723           0 :     return true;
     724             : }
     725             : 
     726             : /*
     727             :  * The main loop of ReceiveXlogStream. Handles the COPY stream after
     728             :  * initiating streaming with the START_REPLICATION command.
     729             :  *
     730             :  * If the COPY ends (not necessarily successfully) due a message from the
     731             :  * server, returns a PGresult and sets *stoppos to the last byte written.
     732             :  * On any other sort of error, returns NULL.
     733             :  */
     734             : static PGresult *
     735          82 : HandleCopyStream(PGconn *conn, StreamCtl *stream,
     736             :                  XLogRecPtr *stoppos)
     737             : {
     738          82 :     char       *copybuf = NULL;
     739          82 :     TimestampTz last_status = -1;
     740          82 :     XLogRecPtr  blockpos = stream->startpos;
     741             : 
     742          82 :     still_sending = true;
     743             : 
     744             :     while (1)
     745        1852 :     {
     746             :         int         r;
     747             :         TimestampTz now;
     748             :         long        sleeptime;
     749             : 
     750             :         /*
     751             :          * Check if we should continue streaming, or abort at this point.
     752             :          */
     753        1934 :         if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
     754           0 :             goto error;
     755             : 
     756        1934 :         now = feGetCurrentTimestamp();
     757             : 
     758             :         /*
     759             :          * If synchronous option is true, issue sync command as soon as there
     760             :          * are WAL data which has not been flushed yet.
     761             :          */
     762        1934 :         if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
     763             :         {
     764           0 :             if (stream->walmethod->sync(walfile) != 0)
     765             :             {
     766           0 :                 pg_log_error("could not fsync file \"%s\": %s",
     767             :                              current_walfile_name, stream->walmethod->getlasterror());
     768           0 :                 goto error;
     769             :             }
     770           0 :             lastFlushPosition = blockpos;
     771             : 
     772             :             /*
     773             :              * Send feedback so that the server sees the latest WAL locations
     774             :              * immediately.
     775             :              */
     776           0 :             if (!sendFeedback(conn, blockpos, now, false))
     777           0 :                 goto error;
     778           0 :             last_status = now;
     779             :         }
     780             : 
     781             :         /*
     782             :          * Potentially send a status message to the master
     783             :          */
     784        3818 :         if (still_sending && stream->standby_message_timeout > 0 &&
     785        1884 :             feTimestampDifferenceExceeds(last_status, now,
     786             :                                          stream->standby_message_timeout))
     787             :         {
     788             :             /* Time to send feedback! */
     789          82 :             if (!sendFeedback(conn, blockpos, now, false))
     790           0 :                 goto error;
     791          82 :             last_status = now;
     792             :         }
     793             : 
     794             :         /*
     795             :          * Calculate how long send/receive loops should sleep
     796             :          */
     797        1934 :         sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
     798             :                                                  last_status);
     799             : 
     800        1934 :         r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
     801        7914 :         while (r != 0)
     802             :         {
     803        4128 :             if (r == -1)
     804           0 :                 goto error;
     805        4128 :             if (r == -2)
     806             :             {
     807          82 :                 PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
     808             : 
     809          82 :                 if (res == NULL)
     810           0 :                     goto error;
     811             :                 else
     812          82 :                     return res;
     813             :             }
     814             : 
     815             :             /* Check the message type. */
     816        4046 :             if (copybuf[0] == 'k')
     817             :             {
     818           0 :                 if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
     819             :                                          &last_status))
     820           0 :                     goto error;
     821             :             }
     822        4046 :             else if (copybuf[0] == 'w')
     823             :             {
     824        4046 :                 if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
     825           0 :                     goto error;
     826             : 
     827             :                 /*
     828             :                  * Check if we should continue streaming, or abort at this
     829             :                  * point.
     830             :                  */
     831        4046 :                 if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
     832           0 :                     goto error;
     833             :             }
     834             :             else
     835             :             {
     836           0 :                 pg_log_error("unrecognized streaming header: \"%c\"",
     837             :                              copybuf[0]);
     838           0 :                 goto error;
     839             :             }
     840             : 
     841             :             /*
     842             :              * Process the received data, and any subsequent data we can read
     843             :              * without blocking.
     844             :              */
     845        4046 :             r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
     846             :         }
     847             :     }
     848             : 
     849             : error:
     850           0 :     if (copybuf != NULL)
     851           0 :         PQfreemem(copybuf);
     852           0 :     return NULL;
     853             : }
     854             : 
     855             : /*
     856             :  * Wait until we can read a CopyData message,
     857             :  * or timeout, or occurrence of a signal or input on the stop_socket.
     858             :  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
     859             :  *
     860             :  * Returns 1 if data has become available for reading, 0 if timed out
     861             :  * or interrupted by signal or stop_socket input, and -1 on an error.
     862             :  */
     863             : static int
     864        5872 : CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
     865             : {
     866             :     int         ret;
     867             :     fd_set      input_mask;
     868             :     int         connsocket;
     869             :     int         maxfd;
     870             :     struct timeval timeout;
     871             :     struct timeval *timeoutptr;
     872             : 
     873        5872 :     connsocket = PQsocket(conn);
     874        5872 :     if (connsocket < 0)
     875             :     {
     876           0 :         pg_log_error("invalid socket: %s", PQerrorMessage(conn));
     877           0 :         return -1;
     878             :     }
     879             : 
     880        5872 :     FD_ZERO(&input_mask);
     881        5872 :     FD_SET(connsocket, &input_mask);
     882        5872 :     maxfd = connsocket;
     883        5872 :     if (stop_socket != PGINVALID_SOCKET)
     884             :     {
     885        5866 :         FD_SET(stop_socket, &input_mask);
     886        5866 :         maxfd = Max(maxfd, stop_socket);
     887             :     }
     888             : 
     889        5872 :     if (timeout_ms < 0)
     890          50 :         timeoutptr = NULL;
     891             :     else
     892             :     {
     893        5822 :         timeout.tv_sec = timeout_ms / 1000L;
     894        5822 :         timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
     895        5822 :         timeoutptr = &timeout;
     896             :     }
     897             : 
     898        5872 :     ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
     899             : 
     900        5872 :     if (ret < 0)
     901             :     {
     902           0 :         if (errno == EINTR)
     903           0 :             return 0;           /* Got a signal, so not an error */
     904           0 :         pg_log_error("select() failed: %m");
     905           0 :         return -1;
     906             :     }
     907        5872 :     if (ret > 0 && FD_ISSET(connsocket, &input_mask))
     908        4136 :         return 1;               /* Got input on connection socket */
     909             : 
     910        1736 :     return 0;                   /* Got timeout or input on stop_socket */
     911             : }
     912             : 
     913             : /*
     914             :  * Receive CopyData message available from XLOG stream, blocking for
     915             :  * maximum of 'timeout' ms.
     916             :  *
     917             :  * If data was received, returns the length of the data. *buffer is set to
     918             :  * point to a buffer holding the received message. The buffer is only valid
     919             :  * until the next CopyStreamReceive call.
     920             :  *
     921             :  * Returns 0 if no data was available within timeout, or if wait was
     922             :  * interrupted by signal or stop_socket input.
     923             :  * -1 on error. -2 if the server ended the COPY.
     924             :  */
     925             : static int
     926        5980 : CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
     927             :                   char **buffer)
     928             : {
     929        5980 :     char       *copybuf = NULL;
     930             :     int         rawlen;
     931             : 
     932        5980 :     if (*buffer != NULL)
     933        4046 :         PQfreemem(*buffer);
     934        5980 :     *buffer = NULL;
     935             : 
     936             :     /* Try to receive a CopyData message */
     937        5980 :     rawlen = PQgetCopyData(conn, &copybuf, 1);
     938        5980 :     if (rawlen == 0)
     939             :     {
     940             :         int         ret;
     941             : 
     942             :         /*
     943             :          * No data available.  Wait for some to appear, but not longer than
     944             :          * the specified timeout, so that we can ping the server.  Also stop
     945             :          * waiting if input appears on stop_socket.
     946             :          */
     947        5872 :         ret = CopyStreamPoll(conn, timeout, stop_socket);
     948        5872 :         if (ret <= 0)
     949        1736 :             return ret;
     950             : 
     951             :         /* Now there is actually data on the socket */
     952        4136 :         if (PQconsumeInput(conn) == 0)
     953             :         {
     954           0 :             pg_log_error("could not receive data from WAL stream: %s",
     955             :                          PQerrorMessage(conn));
     956           0 :             return -1;
     957             :         }
     958             : 
     959             :         /* Now that we've consumed some input, try again */
     960        4136 :         rawlen = PQgetCopyData(conn, &copybuf, 1);
     961        4136 :         if (rawlen == 0)
     962         116 :             return 0;
     963             :     }
     964        4128 :     if (rawlen == -1)           /* end-of-streaming or error */
     965          82 :         return -2;
     966        4046 :     if (rawlen == -2)
     967             :     {
     968           0 :         pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
     969           0 :         return -1;
     970             :     }
     971             : 
     972             :     /* Return received messages to caller */
     973        4046 :     *buffer = copybuf;
     974        4046 :     return rawlen;
     975             : }
     976             : 
     977             : /*
     978             :  * Process the keepalive message.
     979             :  */
     980             : static bool
     981           0 : ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
     982             :                     XLogRecPtr blockpos, TimestampTz *last_status)
     983             : {
     984             :     int         pos;
     985             :     bool        replyRequested;
     986             :     TimestampTz now;
     987             : 
     988             :     /*
     989             :      * Parse the keepalive message, enclosed in the CopyData message. We just
     990             :      * check if the server requested a reply, and ignore the rest.
     991             :      */
     992           0 :     pos = 1;                    /* skip msgtype 'k' */
     993           0 :     pos += 8;                   /* skip walEnd */
     994           0 :     pos += 8;                   /* skip sendTime */
     995             : 
     996           0 :     if (len < pos + 1)
     997             :     {
     998           0 :         pg_log_error("streaming header too small: %d", len);
     999           0 :         return false;
    1000             :     }
    1001           0 :     replyRequested = copybuf[pos];
    1002             : 
    1003             :     /* If the server requested an immediate reply, send one. */
    1004           0 :     if (replyRequested && still_sending)
    1005             :     {
    1006           0 :         if (reportFlushPosition && lastFlushPosition < blockpos &&
    1007           0 :             walfile != NULL)
    1008             :         {
    1009             :             /*
    1010             :              * If a valid flush location needs to be reported, flush the
    1011             :              * current WAL file so that the latest flush location is sent back
    1012             :              * to the server. This is necessary to see whether the last WAL
    1013             :              * data has been successfully replicated or not, at the normal
    1014             :              * shutdown of the server.
    1015             :              */
    1016           0 :             if (stream->walmethod->sync(walfile) != 0)
    1017             :             {
    1018           0 :                 pg_log_error("could not fsync file \"%s\": %s",
    1019             :                              current_walfile_name, stream->walmethod->getlasterror());
    1020           0 :                 return false;
    1021             :             }
    1022           0 :             lastFlushPosition = blockpos;
    1023             :         }
    1024             : 
    1025           0 :         now = feGetCurrentTimestamp();
    1026           0 :         if (!sendFeedback(conn, blockpos, now, false))
    1027           0 :             return false;
    1028           0 :         *last_status = now;
    1029             :     }
    1030             : 
    1031           0 :     return true;
    1032             : }
    1033             : 
    1034             : /*
    1035             :  * Process XLogData message.
    1036             :  */
    1037             : static bool
    1038        4046 : ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
    1039             :                    XLogRecPtr *blockpos)
    1040             : {
    1041             :     int         xlogoff;
    1042             :     int         bytes_left;
    1043             :     int         bytes_written;
    1044             :     int         hdr_len;
    1045             : 
    1046             :     /*
    1047             :      * Once we've decided we don't want to receive any more, just ignore any
    1048             :      * subsequent XLogData messages.
    1049             :      */
    1050        4046 :     if (!(still_sending))
    1051         120 :         return true;
    1052             : 
    1053             :     /*
    1054             :      * Read the header of the XLogData message, enclosed in the CopyData
    1055             :      * message. We only need the WAL location field (dataStart), the rest of
    1056             :      * the header is ignored.
    1057             :      */
    1058        3926 :     hdr_len = 1;                /* msgtype 'w' */
    1059        3926 :     hdr_len += 8;               /* dataStart */
    1060        3926 :     hdr_len += 8;               /* walEnd */
    1061        3926 :     hdr_len += 8;               /* sendTime */
    1062        3926 :     if (len < hdr_len)
    1063             :     {
    1064           0 :         pg_log_error("streaming header too small: %d", len);
    1065           0 :         return false;
    1066             :     }
    1067        3926 :     *blockpos = fe_recvint64(&copybuf[1]);
    1068             : 
    1069             :     /* Extract WAL location for this block */
    1070        3926 :     xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
    1071             : 
    1072             :     /*
    1073             :      * Verify that the initial location in the stream matches where we think
    1074             :      * we are.
    1075             :      */
    1076        3926 :     if (walfile == NULL)
    1077             :     {
    1078             :         /* No file open yet */
    1079          82 :         if (xlogoff != 0)
    1080             :         {
    1081           0 :             pg_log_error("received write-ahead log record for offset %u with no file open",
    1082             :                          xlogoff);
    1083           0 :             return false;
    1084             :         }
    1085             :     }
    1086             :     else
    1087             :     {
    1088             :         /* More data in existing segment */
    1089        3844 :         if (stream->walmethod->get_current_pos(walfile) != xlogoff)
    1090             :         {
    1091           0 :             pg_log_error("got WAL data offset %08x, expected %08x",
    1092             :                          xlogoff, (int) stream->walmethod->get_current_pos(walfile));
    1093           0 :             return false;
    1094             :         }
    1095             :     }
    1096             : 
    1097        3926 :     bytes_left = len - hdr_len;
    1098        3926 :     bytes_written = 0;
    1099             : 
    1100       11768 :     while (bytes_left)
    1101             :     {
    1102             :         int         bytes_to_write;
    1103             : 
    1104             :         /*
    1105             :          * If crossing a WAL boundary, only write up until we reach wal
    1106             :          * segment size.
    1107             :          */
    1108        3926 :         if (xlogoff + bytes_left > WalSegSz)
    1109           0 :             bytes_to_write = WalSegSz - xlogoff;
    1110             :         else
    1111        3926 :             bytes_to_write = bytes_left;
    1112             : 
    1113        3926 :         if (walfile == NULL)
    1114             :         {
    1115          82 :             if (!open_walfile(stream, *blockpos))
    1116             :             {
    1117             :                 /* Error logged by open_walfile */
    1118           0 :                 return false;
    1119             :             }
    1120             :         }
    1121             : 
    1122        7852 :         if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
    1123        3926 :                                      bytes_to_write) != bytes_to_write)
    1124             :         {
    1125           0 :             pg_log_error("could not write %u bytes to WAL file \"%s\": %s",
    1126             :                          bytes_to_write, current_walfile_name,
    1127             :                          stream->walmethod->getlasterror());
    1128           0 :             return false;
    1129             :         }
    1130             : 
    1131             :         /* Write was successful, advance our position */
    1132        3926 :         bytes_written += bytes_to_write;
    1133        3926 :         bytes_left -= bytes_to_write;
    1134        3926 :         *blockpos += bytes_to_write;
    1135        3926 :         xlogoff += bytes_to_write;
    1136             : 
    1137             :         /* Did we reach the end of a WAL segment? */
    1138        3926 :         if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
    1139             :         {
    1140          12 :             if (!close_walfile(stream, *blockpos))
    1141             :                 /* Error message written in close_walfile() */
    1142           0 :                 return false;
    1143             : 
    1144          12 :             xlogoff = 0;
    1145             : 
    1146          12 :             if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
    1147             :             {
    1148          10 :                 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1149             :                 {
    1150           0 :                     pg_log_error("could not send copy-end packet: %s",
    1151             :                                  PQerrorMessage(conn));
    1152           0 :                     return false;
    1153             :                 }
    1154          10 :                 still_sending = false;
    1155          10 :                 return true;    /* ignore the rest of this XLogData packet */
    1156             :             }
    1157             :         }
    1158             :     }
    1159             :     /* No more data left to write, receive next copy packet */
    1160             : 
    1161        3916 :     return true;
    1162             : }
    1163             : 
    1164             : /*
    1165             :  * Handle end of the copy stream.
    1166             :  */
    1167             : static PGresult *
    1168          82 : HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
    1169             :                       XLogRecPtr blockpos, XLogRecPtr *stoppos)
    1170             : {
    1171          82 :     PGresult   *res = PQgetResult(conn);
    1172             : 
    1173             :     /*
    1174             :      * The server closed its end of the copy stream.  If we haven't closed
    1175             :      * ours already, we need to do so now, unless the server threw an error,
    1176             :      * in which case we don't.
    1177             :      */
    1178          82 :     if (still_sending)
    1179             :     {
    1180           0 :         if (!close_walfile(stream, blockpos))
    1181             :         {
    1182             :             /* Error message written in close_walfile() */
    1183           0 :             PQclear(res);
    1184           0 :             return NULL;
    1185             :         }
    1186           0 :         if (PQresultStatus(res) == PGRES_COPY_IN)
    1187             :         {
    1188           0 :             if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1189             :             {
    1190           0 :                 pg_log_error("could not send copy-end packet: %s",
    1191             :                              PQerrorMessage(conn));
    1192           0 :                 PQclear(res);
    1193           0 :                 return NULL;
    1194             :             }
    1195           0 :             res = PQgetResult(conn);
    1196             :         }
    1197           0 :         still_sending = false;
    1198             :     }
    1199          82 :     if (copybuf != NULL)
    1200           0 :         PQfreemem(copybuf);
    1201          82 :     *stoppos = blockpos;
    1202          82 :     return res;
    1203             : }
    1204             : 
    1205             : /*
    1206             :  * Check if we should continue streaming, or abort at this point.
    1207             :  */
    1208             : static bool
    1209        5980 : CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
    1210             :                     XLogRecPtr *stoppos)
    1211             : {
    1212        5980 :     if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
    1213             :     {
    1214          72 :         if (!close_walfile(stream, blockpos))
    1215             :         {
    1216             :             /* Potential error message is written by close_walfile */
    1217           0 :             return false;
    1218             :         }
    1219          72 :         if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1220             :         {
    1221           0 :             pg_log_error("could not send copy-end packet: %s",
    1222             :                          PQerrorMessage(conn));
    1223           0 :             return false;
    1224             :         }
    1225          72 :         still_sending = false;
    1226             :     }
    1227             : 
    1228        5980 :     return true;
    1229             : }
    1230             : 
    1231             : /*
    1232             :  * Calculate how long send/receive loops should sleep
    1233             :  */
    1234             : static long
    1235        1934 : CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
    1236             :                              TimestampTz last_status)
    1237             : {
    1238        1934 :     TimestampTz status_targettime = 0;
    1239             :     long        sleeptime;
    1240             : 
    1241        1934 :     if (standby_message_timeout && still_sending)
    1242        1884 :         status_targettime = last_status +
    1243        1884 :             (standby_message_timeout - 1) * ((int64) 1000);
    1244             : 
    1245        1934 :     if (status_targettime > 0)
    1246             :     {
    1247             :         long        secs;
    1248             :         int         usecs;
    1249             : 
    1250        1884 :         feTimestampDifference(now,
    1251             :                               status_targettime,
    1252             :                               &secs,
    1253             :                               &usecs);
    1254             :         /* Always sleep at least 1 sec */
    1255        1884 :         if (secs <= 0)
    1256             :         {
    1257           0 :             secs = 1;
    1258           0 :             usecs = 0;
    1259             :         }
    1260             : 
    1261        1884 :         sleeptime = secs * 1000 + usecs / 1000;
    1262             :     }
    1263             :     else
    1264          50 :         sleeptime = -1;
    1265             : 
    1266        1934 :     return sleeptime;
    1267             : }

Generated by: LCOV version 1.13