LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - receivelog.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 267 423 63.1 %
Date: 2024-09-16 13:12:04 Functions: 16 17 94.1 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * receivelog.c - receive WAL files using the streaming
       4             :  *                replication protocol.
       5             :  *
       6             :  * Author: Magnus Hagander <magnus@hagander.net>
       7             :  *
       8             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *        src/bin/pg_basebackup/receivelog.c
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres_fe.h"
      16             : 
      17             : #include <sys/select.h>
      18             : #include <sys/stat.h>
      19             : #include <unistd.h>
      20             : 
      21             : #include "access/xlog_internal.h"
      22             : #include "common/file_utils.h"
      23             : #include "common/logging.h"
      24             : #include "libpq-fe.h"
      25             : #include "receivelog.h"
      26             : #include "streamutil.h"
      27             : 
      28             : /* currently open WAL file */
      29             : static Walfile *walfile = NULL;
      30             : static bool reportFlushPosition = false;
      31             : static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
      32             : 
      33             : static bool still_sending = true;   /* feedback still needs to be sent? */
      34             : 
      35             : static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
      36             :                                   XLogRecPtr *stoppos);
      37             : static int  CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
      38             : static int  CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
      39             :                               char **buffer);
      40             : static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
      41             :                                 int len, XLogRecPtr blockpos, TimestampTz *last_status);
      42             : static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
      43             :                                XLogRecPtr *blockpos);
      44             : static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
      45             :                                        XLogRecPtr blockpos, XLogRecPtr *stoppos);
      46             : static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
      47             : static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
      48             :                                          TimestampTz last_status);
      49             : 
      50             : static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
      51             :                                      uint32 *timeline);
      52             : 
      53             : static bool
      54          30 : mark_file_as_archived(StreamCtl *stream, const char *fname)
      55             : {
      56             :     Walfile    *f;
      57             :     static char tmppath[MAXPGPATH];
      58             : 
      59          30 :     snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
      60             :              fname);
      61             : 
      62          30 :     f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
      63             :                                                NULL, 0);
      64          30 :     if (f == NULL)
      65             :     {
      66           0 :         pg_log_error("could not create archive status file \"%s\": %s",
      67             :                      tmppath, GetLastWalMethodError(stream->walmethod));
      68           0 :         return false;
      69             :     }
      70             : 
      71          30 :     if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
      72             :     {
      73           0 :         pg_log_error("could not close archive status file \"%s\": %s",
      74             :                      tmppath, GetLastWalMethodError(stream->walmethod));
      75           0 :         return false;
      76             :     }
      77             : 
      78          30 :     return true;
      79             : }
      80             : 
      81             : /*
      82             :  * Open a new WAL file in the specified directory.
      83             :  *
      84             :  * Returns true if OK; on failure, returns false after printing an error msg.
      85             :  * On success, 'walfile' is set to the opened WAL file.
      86             :  *
      87             :  * The file will be padded to 16Mb with zeroes.
      88             :  */
      89             : static bool
      90         264 : open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
      91             : {
      92             :     Walfile    *f;
      93             :     char       *fn;
      94             :     ssize_t     size;
      95             :     XLogSegNo   segno;
      96             :     char        walfile_name[MAXPGPATH];
      97             : 
      98         264 :     XLByteToSeg(startpoint, segno, WalSegSz);
      99         264 :     XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
     100             : 
     101             :     /* Note that this considers the compression used if necessary */
     102         264 :     fn = stream->walmethod->ops->get_file_name(stream->walmethod,
     103             :                                                walfile_name,
     104         264 :                                                stream->partial_suffix);
     105             : 
     106             :     /*
     107             :      * When streaming to files, if an existing file exists we verify that it's
     108             :      * either empty (just created), or a complete WalSegSz segment (in which
     109             :      * case it has been created and padded). Anything else indicates a corrupt
     110             :      * file. Compressed files have no need for padding, so just ignore this
     111             :      * case.
     112             :      *
     113             :      * When streaming to tar, no file with this name will exist before, so we
     114             :      * never have to verify a size.
     115             :      */
     116         514 :     if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE &&
     117         250 :         stream->walmethod->ops->existsfile(stream->walmethod, fn))
     118             :     {
     119           0 :         size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
     120           0 :         if (size < 0)
     121             :         {
     122           0 :             pg_log_error("could not get size of write-ahead log file \"%s\": %s",
     123             :                          fn, GetLastWalMethodError(stream->walmethod));
     124           0 :             pg_free(fn);
     125           0 :             return false;
     126             :         }
     127           0 :         if (size == WalSegSz)
     128             :         {
     129             :             /* Already padded file. Open it for use */
     130           0 :             f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
     131           0 :             if (f == NULL)
     132             :             {
     133           0 :                 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
     134             :                              fn, GetLastWalMethodError(stream->walmethod));
     135           0 :                 pg_free(fn);
     136           0 :                 return false;
     137             :             }
     138             : 
     139             :             /* fsync file in case of a previous crash */
     140           0 :             if (stream->walmethod->ops->sync(f) != 0)
     141             :             {
     142           0 :                 pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
     143             :                              fn, GetLastWalMethodError(stream->walmethod));
     144           0 :                 stream->walmethod->ops->close(f, CLOSE_UNLINK);
     145           0 :                 exit(1);
     146             :             }
     147             : 
     148           0 :             walfile = f;
     149           0 :             pg_free(fn);
     150           0 :             return true;
     151             :         }
     152           0 :         if (size != 0)
     153             :         {
     154             :             /* if write didn't set errno, assume problem is no disk space */
     155           0 :             if (errno == 0)
     156           0 :                 errno = ENOSPC;
     157           0 :             pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
     158             :                                   "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
     159             :                                   size),
     160             :                          fn, size, WalSegSz);
     161           0 :             pg_free(fn);
     162           0 :             return false;
     163             :         }
     164             :         /* File existed and was empty, so fall through and open */
     165             :     }
     166             : 
     167             :     /* No file existed, so create one */
     168             : 
     169         264 :     f = stream->walmethod->ops->open_for_write(stream->walmethod,
     170             :                                                walfile_name,
     171         264 :                                                stream->partial_suffix,
     172             :                                                WalSegSz);
     173         264 :     if (f == NULL)
     174             :     {
     175           0 :         pg_log_error("could not open write-ahead log file \"%s\": %s",
     176             :                      fn, GetLastWalMethodError(stream->walmethod));
     177           0 :         pg_free(fn);
     178           0 :         return false;
     179             :     }
     180             : 
     181         264 :     pg_free(fn);
     182         264 :     walfile = f;
     183         264 :     return true;
     184             : }
     185             : 
     186             : /*
     187             :  * Close the current WAL file (if open), and rename it to the correct
     188             :  * filename if it's complete. On failure, prints an error message to stderr
     189             :  * and returns false, otherwise returns true.
     190             :  */
     191             : static bool
     192         292 : close_walfile(StreamCtl *stream, XLogRecPtr pos)
     193             : {
     194             :     char       *fn;
     195             :     off_t       currpos;
     196             :     int         r;
     197             :     char        walfile_name[MAXPGPATH];
     198             : 
     199         292 :     if (walfile == NULL)
     200          28 :         return true;
     201             : 
     202         264 :     strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
     203         264 :     currpos = walfile->currpos;
     204             : 
     205             :     /* Note that this considers the compression used if necessary */
     206         264 :     fn = stream->walmethod->ops->get_file_name(stream->walmethod,
     207             :                                                walfile_name,
     208         264 :                                                stream->partial_suffix);
     209             : 
     210         264 :     if (stream->partial_suffix)
     211             :     {
     212          24 :         if (currpos == WalSegSz)
     213          12 :             r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
     214             :         else
     215             :         {
     216          12 :             pg_log_info("not renaming \"%s\", segment is not complete", fn);
     217          12 :             r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
     218             :         }
     219             :     }
     220             :     else
     221         240 :         r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
     222             : 
     223         264 :     walfile = NULL;
     224             : 
     225         264 :     if (r != 0)
     226             :     {
     227           0 :         pg_log_error("could not close file \"%s\": %s",
     228             :                      fn, GetLastWalMethodError(stream->walmethod));
     229             : 
     230           0 :         pg_free(fn);
     231           0 :         return false;
     232             :     }
     233             : 
     234         264 :     pg_free(fn);
     235             : 
     236             :     /*
     237             :      * Mark file as archived if requested by the caller - pg_basebackup needs
     238             :      * to do so as files can otherwise get archived again after promotion of a
     239             :      * new node. This is in line with walreceiver.c always doing a
     240             :      * XLogArchiveForceDone() after a complete segment.
     241             :      */
     242         264 :     if (currpos == WalSegSz && stream->mark_done)
     243             :     {
     244             :         /* writes error message if failed */
     245          26 :         if (!mark_file_as_archived(stream, walfile_name))
     246           0 :             return false;
     247             :     }
     248             : 
     249         264 :     lastFlushPosition = pos;
     250         264 :     return true;
     251             : }
     252             : 
     253             : 
     254             : /*
     255             :  * Check if a timeline history file exists.
     256             :  */
     257             : static bool
     258         258 : existsTimeLineHistoryFile(StreamCtl *stream)
     259             : {
     260             :     char        histfname[MAXFNAMELEN];
     261             : 
     262             :     /*
     263             :      * Timeline 1 never has a history file. We treat that as if it existed,
     264             :      * since we never need to stream it.
     265             :      */
     266         258 :     if (stream->timeline == 1)
     267         252 :         return true;
     268             : 
     269           6 :     TLHistoryFileName(histfname, stream->timeline);
     270             : 
     271           6 :     return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
     272             : }
     273             : 
     274             : static bool
     275           6 : writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
     276             : {
     277           6 :     int         size = strlen(content);
     278             :     char        histfname[MAXFNAMELEN];
     279             :     Walfile    *f;
     280             : 
     281             :     /*
     282             :      * Check that the server's idea of how timeline history files should be
     283             :      * named matches ours.
     284             :      */
     285           6 :     TLHistoryFileName(histfname, stream->timeline);
     286           6 :     if (strcmp(histfname, filename) != 0)
     287             :     {
     288           0 :         pg_log_error("server reported unexpected history file name for timeline %u: %s",
     289             :                      stream->timeline, filename);
     290           0 :         return false;
     291             :     }
     292             : 
     293           6 :     f = stream->walmethod->ops->open_for_write(stream->walmethod,
     294             :                                                histfname, ".tmp", 0);
     295           6 :     if (f == NULL)
     296             :     {
     297           0 :         pg_log_error("could not create timeline history file \"%s\": %s",
     298             :                      histfname, GetLastWalMethodError(stream->walmethod));
     299           0 :         return false;
     300             :     }
     301             : 
     302           6 :     if ((int) stream->walmethod->ops->write(f, content, size) != size)
     303             :     {
     304           0 :         pg_log_error("could not write timeline history file \"%s\": %s",
     305             :                      histfname, GetLastWalMethodError(stream->walmethod));
     306             : 
     307             :         /*
     308             :          * If we fail to make the file, delete it to release disk space
     309             :          */
     310           0 :         stream->walmethod->ops->close(f, CLOSE_UNLINK);
     311             : 
     312           0 :         return false;
     313             :     }
     314             : 
     315           6 :     if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
     316             :     {
     317           0 :         pg_log_error("could not close file \"%s\": %s",
     318             :                      histfname, GetLastWalMethodError(stream->walmethod));
     319           0 :         return false;
     320             :     }
     321             : 
     322             :     /* Maintain archive_status, check close_walfile() for details. */
     323           6 :     if (stream->mark_done)
     324             :     {
     325             :         /* writes error message if failed */
     326           4 :         if (!mark_file_as_archived(stream, histfname))
     327           0 :             return false;
     328             :     }
     329             : 
     330           6 :     return true;
     331             : }
     332             : 
     333             : /*
     334             :  * Send a Standby Status Update message to server.
     335             :  */
     336             : static bool
     337         254 : sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
     338             : {
     339             :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
     340         254 :     int         len = 0;
     341             : 
     342         254 :     replybuf[len] = 'r';
     343         254 :     len += 1;
     344         254 :     fe_sendint64(blockpos, &replybuf[len]); /* write */
     345         254 :     len += 8;
     346         254 :     if (reportFlushPosition)
     347         246 :         fe_sendint64(lastFlushPosition, &replybuf[len]);    /* flush */
     348             :     else
     349           8 :         fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* flush */
     350         254 :     len += 8;
     351         254 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
     352         254 :     len += 8;
     353         254 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
     354         254 :     len += 8;
     355         254 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
     356         254 :     len += 1;
     357             : 
     358         254 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
     359             :     {
     360           0 :         pg_log_error("could not send feedback packet: %s",
     361             :                      PQerrorMessage(conn));
     362           0 :         return false;
     363             :     }
     364             : 
     365         254 :     return true;
     366             : }
     367             : 
     368             : /*
     369             :  * Check that the server version we're connected to is supported by
     370             :  * ReceiveXlogStream().
     371             :  *
     372             :  * If it's not, an error message is printed to stderr, and false is returned.
     373             :  */
     374             : bool
     375         552 : CheckServerVersionForStreaming(PGconn *conn)
     376             : {
     377             :     int         minServerMajor,
     378             :                 maxServerMajor;
     379             :     int         serverMajor;
     380             : 
     381             :     /*
     382             :      * The message format used in streaming replication changed in 9.3, so we
     383             :      * cannot stream from older servers. And we don't support servers newer
     384             :      * than the client; it might work, but we don't know, so err on the safe
     385             :      * side.
     386             :      */
     387         552 :     minServerMajor = 903;
     388         552 :     maxServerMajor = PG_VERSION_NUM / 100;
     389         552 :     serverMajor = PQserverVersion(conn) / 100;
     390         552 :     if (serverMajor < minServerMajor)
     391             :     {
     392           0 :         const char *serverver = PQparameterStatus(conn, "server_version");
     393             : 
     394           0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
     395             :                      serverver ? serverver : "'unknown'",
     396             :                      "9.3");
     397           0 :         return false;
     398             :     }
     399         552 :     else if (serverMajor > maxServerMajor)
     400             :     {
     401           0 :         const char *serverver = PQparameterStatus(conn, "server_version");
     402             : 
     403           0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
     404             :                      serverver ? serverver : "'unknown'",
     405             :                      PG_VERSION);
     406           0 :         return false;
     407             :     }
     408         552 :     return true;
     409             : }
     410             : 
     411             : /*
     412             :  * Receive a log stream starting at the specified position.
     413             :  *
     414             :  * Individual parameters are passed through the StreamCtl structure.
     415             :  *
     416             :  * If sysidentifier is specified, validate that both the system
     417             :  * identifier and the timeline matches the specified ones
     418             :  * (by sending an extra IDENTIFY_SYSTEM command)
     419             :  *
     420             :  * All received segments will be written to the directory
     421             :  * specified by basedir. This will also fetch any missing timeline history
     422             :  * files.
     423             :  *
     424             :  * The stream_stop callback will be called every time data
     425             :  * is received, and whenever a segment is completed. If it returns
     426             :  * true, the streaming will stop and the function
     427             :  * return. As long as it returns false, streaming will continue
     428             :  * indefinitely.
     429             :  *
     430             :  * If stream_stop() checks for external input, stop_socket should be set to
     431             :  * the FD it checks.  This will allow such input to be detected promptly
     432             :  * rather than after standby_message_timeout (which might be indefinite).
     433             :  * Note that signals will interrupt waits for input as well, but that is
     434             :  * race-y since a signal received while busy won't interrupt the wait.
     435             :  *
     436             :  * standby_message_timeout controls how often we send a message
     437             :  * back to the primary letting it know our progress, in milliseconds.
     438             :  * Zero means no messages are sent.
     439             :  * This message will only contain the write location, and never
     440             :  * flush or replay.
     441             :  *
     442             :  * If 'partial_suffix' is not NULL, files are initially created with the
     443             :  * given suffix, and the suffix is removed once the file is finished. That
     444             :  * allows you to tell the difference between partial and completed files,
     445             :  * so that you can continue later where you left.
     446             :  *
     447             :  * If 'synchronous' is true, the received WAL is flushed as soon as written,
     448             :  * otherwise only when the WAL file is closed.
     449             :  *
     450             :  * Note: The WAL location *must* be at a log segment start!
     451             :  */
     452             : bool
     453         256 : ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
     454             : {
     455             :     char        query[128];
     456             :     char        slotcmd[128];
     457             :     PGresult   *res;
     458             :     XLogRecPtr  stoppos;
     459             : 
     460             :     /*
     461             :      * The caller should've checked the server version already, but doesn't do
     462             :      * any harm to check it here too.
     463             :      */
     464         256 :     if (!CheckServerVersionForStreaming(conn))
     465           0 :         return false;
     466             : 
     467             :     /*
     468             :      * Decide whether we want to report the flush position. If we report the
     469             :      * flush position, the primary will know what WAL we'll possibly
     470             :      * re-request, and it can then remove older WAL safely. We must always do
     471             :      * that when we are using slots.
     472             :      *
     473             :      * Reporting the flush position makes one eligible as a synchronous
     474             :      * replica. People shouldn't include generic names in
     475             :      * synchronous_standby_names, but we've protected them against it so far,
     476             :      * so let's continue to do so unless specifically requested.
     477             :      */
     478         256 :     if (stream->replication_slot != NULL)
     479             :     {
     480         246 :         reportFlushPosition = true;
     481         246 :         sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
     482             :     }
     483             :     else
     484             :     {
     485          10 :         if (stream->synchronous)
     486           2 :             reportFlushPosition = true;
     487             :         else
     488           8 :             reportFlushPosition = false;
     489          10 :         slotcmd[0] = 0;
     490             :     }
     491             : 
     492         256 :     if (stream->sysidentifier != NULL)
     493             :     {
     494         256 :         char       *sysidentifier = NULL;
     495             :         TimeLineID  servertli;
     496             : 
     497             :         /*
     498             :          * Get the server system identifier and timeline, and validate them.
     499             :          */
     500         256 :         if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
     501             :         {
     502           0 :             pg_free(sysidentifier);
     503           0 :             return false;
     504             :         }
     505             : 
     506         256 :         if (strcmp(stream->sysidentifier, sysidentifier) != 0)
     507             :         {
     508           0 :             pg_log_error("system identifier does not match between base backup and streaming connection");
     509           0 :             pg_free(sysidentifier);
     510           0 :             return false;
     511             :         }
     512         256 :         pg_free(sysidentifier);
     513             : 
     514         256 :         if (stream->timeline > servertli)
     515             :         {
     516           0 :             pg_log_error("starting timeline %u is not present in the server",
     517             :                          stream->timeline);
     518           0 :             return false;
     519             :         }
     520             :     }
     521             : 
     522             :     /*
     523             :      * initialize flush position to starting point, it's the caller's
     524             :      * responsibility that that's sane.
     525             :      */
     526         256 :     lastFlushPosition = stream->startpos;
     527             : 
     528             :     while (1)
     529             :     {
     530             :         /*
     531             :          * Fetch the timeline history file for this timeline, if we don't have
     532             :          * it already. When streaming log to tar, this will always return
     533             :          * false, as we are never streaming into an existing file and
     534             :          * therefore there can be no pre-existing timeline history file.
     535             :          */
     536         258 :         if (!existsTimeLineHistoryFile(stream))
     537             :         {
     538           6 :             snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
     539           6 :             res = PQexec(conn, query);
     540           6 :             if (PQresultStatus(res) != PGRES_TUPLES_OK)
     541             :             {
     542             :                 /* FIXME: we might send it ok, but get an error */
     543           0 :                 pg_log_error("could not send replication command \"%s\": %s",
     544             :                              "TIMELINE_HISTORY", PQresultErrorMessage(res));
     545           0 :                 PQclear(res);
     546           0 :                 return false;
     547             :             }
     548             : 
     549             :             /*
     550             :              * The response to TIMELINE_HISTORY is a single row result set
     551             :              * with two fields: filename and content
     552             :              */
     553           6 :             if (PQnfields(res) != 2 || PQntuples(res) != 1)
     554             :             {
     555           0 :                 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
     556             :                                PQntuples(res), PQnfields(res), 1, 2);
     557             :             }
     558             : 
     559             :             /* Write the history file to disk */
     560           6 :             writeTimeLineHistoryFile(stream,
     561             :                                      PQgetvalue(res, 0, 0),
     562             :                                      PQgetvalue(res, 0, 1));
     563             : 
     564           6 :             PQclear(res);
     565             :         }
     566             : 
     567             :         /*
     568             :          * Before we start streaming from the requested location, check if the
     569             :          * callback tells us to stop here.
     570             :          */
     571         258 :         if (stream->stream_stop(stream->startpos, stream->timeline, false))
     572           0 :             return true;
     573             : 
     574             :         /* Initiate the replication stream at specified location */
     575         258 :         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
     576             :                  slotcmd,
     577         258 :                  LSN_FORMAT_ARGS(stream->startpos),
     578             :                  stream->timeline);
     579         258 :         res = PQexec(conn, query);
     580         258 :         if (PQresultStatus(res) != PGRES_COPY_BOTH)
     581             :         {
     582           4 :             pg_log_error("could not send replication command \"%s\": %s",
     583             :                          "START_REPLICATION", PQresultErrorMessage(res));
     584           4 :             PQclear(res);
     585           4 :             return false;
     586             :         }
     587         254 :         PQclear(res);
     588             : 
     589             :         /* Stream the WAL */
     590         254 :         res = HandleCopyStream(conn, stream, &stoppos);
     591         254 :         if (res == NULL)
     592           0 :             goto error;
     593             : 
     594             :         /*
     595             :          * Streaming finished.
     596             :          *
     597             :          * There are two possible reasons for that: a controlled shutdown, or
     598             :          * we reached the end of the current timeline. In case of
     599             :          * end-of-timeline, the server sends a result set after Copy has
     600             :          * finished, containing information about the next timeline. Read
     601             :          * that, and restart streaming from the next timeline. In case of
     602             :          * controlled shutdown, stop here.
     603             :          */
     604         254 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
     605             :         {
     606             :             /*
     607             :              * End-of-timeline. Read the next timeline's ID and starting
     608             :              * position. Usually, the starting position will match the end of
     609             :              * the previous timeline, but there are corner cases like if the
     610             :              * server had sent us half of a WAL record, when it was promoted.
     611             :              * The new timeline will begin at the end of the last complete
     612             :              * record in that case, overlapping the partial WAL record on the
     613             :              * old timeline.
     614             :              */
     615             :             uint32      newtimeline;
     616             :             bool        parsed;
     617             : 
     618           2 :             parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
     619           2 :             PQclear(res);
     620           2 :             if (!parsed)
     621           0 :                 goto error;
     622             : 
     623             :             /* Sanity check the values the server gave us */
     624           2 :             if (newtimeline <= stream->timeline)
     625             :             {
     626           0 :                 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
     627             :                              newtimeline, stream->timeline);
     628           0 :                 goto error;
     629             :             }
     630           2 :             if (stream->startpos > stoppos)
     631             :             {
     632           0 :                 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
     633             :                              stream->timeline, LSN_FORMAT_ARGS(stoppos),
     634             :                              newtimeline, LSN_FORMAT_ARGS(stream->startpos));
     635           0 :                 goto error;
     636             :             }
     637             : 
     638             :             /* Read the final result, which should be CommandComplete. */
     639           2 :             res = PQgetResult(conn);
     640           2 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
     641             :             {
     642           0 :                 pg_log_error("unexpected termination of replication stream: %s",
     643             :                              PQresultErrorMessage(res));
     644           0 :                 PQclear(res);
     645           0 :                 goto error;
     646             :             }
     647           2 :             PQclear(res);
     648             : 
     649             :             /*
     650             :              * Loop back to start streaming from the new timeline. Always
     651             :              * start streaming at the beginning of a segment.
     652             :              */
     653           2 :             stream->timeline = newtimeline;
     654           2 :             stream->startpos = stream->startpos -
     655           2 :                 XLogSegmentOffset(stream->startpos, WalSegSz);
     656           2 :             continue;
     657             :         }
     658         252 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     659             :         {
     660         250 :             PQclear(res);
     661             : 
     662             :             /*
     663             :              * End of replication (ie. controlled shut down of the server).
     664             :              *
     665             :              * Check if the callback thinks it's OK to stop here. If not,
     666             :              * complain.
     667             :              */
     668         250 :             if (stream->stream_stop(stoppos, stream->timeline, false))
     669         250 :                 return true;
     670             :             else
     671             :             {
     672           0 :                 pg_log_error("replication stream was terminated before stop point");
     673           0 :                 goto error;
     674             :             }
     675             :         }
     676             :         else
     677             :         {
     678             :             /* Server returned an error. */
     679           2 :             pg_log_error("unexpected termination of replication stream: %s",
     680             :                          PQresultErrorMessage(res));
     681           2 :             PQclear(res);
     682           2 :             goto error;
     683             :         }
     684             :     }
     685             : 
     686           2 : error:
     687           2 :     if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
     688           0 :         pg_log_error("could not close file \"%s\": %s",
     689             :                      walfile->pathname, GetLastWalMethodError(stream->walmethod));
     690           2 :     walfile = NULL;
     691           2 :     return false;
     692             : }
     693             : 
     694             : /*
     695             :  * Helper function to parse the result set returned by server after streaming
     696             :  * has finished. On failure, prints an error to stderr and returns false.
     697             :  */
     698             : static bool
     699           2 : ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
     700             : {
     701             :     uint32      startpos_xlogid,
     702             :                 startpos_xrecoff;
     703             : 
     704             :     /*----------
     705             :      * The result set consists of one row and two columns, e.g:
     706             :      *
     707             :      *  next_tli | next_tli_startpos
     708             :      * ----------+-------------------
     709             :      *         4 | 0/9949AE0
     710             :      *
     711             :      * next_tli is the timeline ID of the next timeline after the one that
     712             :      * just finished streaming. next_tli_startpos is the WAL location where
     713             :      * the server switched to it.
     714             :      *----------
     715             :      */
     716           2 :     if (PQnfields(res) < 2 || PQntuples(res) != 1)
     717             :     {
     718           0 :         pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
     719             :                      PQntuples(res), PQnfields(res), 1, 2);
     720           0 :         return false;
     721             :     }
     722             : 
     723           2 :     *timeline = atoi(PQgetvalue(res, 0, 0));
     724           2 :     if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
     725             :                &startpos_xrecoff) != 2)
     726             :     {
     727           0 :         pg_log_error("could not parse next timeline's starting point \"%s\"",
     728             :                      PQgetvalue(res, 0, 1));
     729           0 :         return false;
     730             :     }
     731           2 :     *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
     732             : 
     733           2 :     return true;
     734             : }
     735             : 
     736             : /*
     737             :  * The main loop of ReceiveXlogStream. Handles the COPY stream after
     738             :  * initiating streaming with the START_REPLICATION command.
     739             :  *
     740             :  * If the COPY ends (not necessarily successfully) due a message from the
     741             :  * server, returns a PGresult and sets *stoppos to the last byte written.
     742             :  * On any other sort of error, returns NULL.
     743             :  */
     744             : static PGresult *
     745         254 : HandleCopyStream(PGconn *conn, StreamCtl *stream,
     746             :                  XLogRecPtr *stoppos)
     747             : {
     748         254 :     char       *copybuf = NULL;
     749         254 :     TimestampTz last_status = -1;
     750         254 :     XLogRecPtr  blockpos = stream->startpos;
     751             : 
     752         254 :     still_sending = true;
     753             : 
     754             :     while (1)
     755        3410 :     {
     756             :         int         r;
     757             :         TimestampTz now;
     758             :         long        sleeptime;
     759             : 
     760             :         /*
     761             :          * Check if we should continue streaming, or abort at this point.
     762             :          */
     763        3664 :         if (!CheckCopyStreamStop(conn, stream, blockpos))
     764           0 :             goto error;
     765             : 
     766        3664 :         now = feGetCurrentTimestamp();
     767             : 
     768             :         /*
     769             :          * If synchronous option is true, issue sync command as soon as there
     770             :          * are WAL data which has not been flushed yet.
     771             :          */
     772        3664 :         if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
     773             :         {
     774           0 :             if (stream->walmethod->ops->sync(walfile) != 0)
     775           0 :                 pg_fatal("could not fsync file \"%s\": %s",
     776             :                          walfile->pathname, GetLastWalMethodError(stream->walmethod));
     777           0 :             lastFlushPosition = blockpos;
     778             : 
     779             :             /*
     780             :              * Send feedback so that the server sees the latest WAL locations
     781             :              * immediately.
     782             :              */
     783           0 :             if (!sendFeedback(conn, blockpos, now, false))
     784           0 :                 goto error;
     785           0 :             last_status = now;
     786             :         }
     787             : 
     788             :         /*
     789             :          * Potentially send a status message to the primary
     790             :          */
     791        7142 :         if (still_sending && stream->standby_message_timeout > 0 &&
     792        3478 :             feTimestampDifferenceExceeds(last_status, now,
     793             :                                          stream->standby_message_timeout))
     794             :         {
     795             :             /* Time to send feedback! */
     796         254 :             if (!sendFeedback(conn, blockpos, now, false))
     797           0 :                 goto error;
     798         254 :             last_status = now;
     799             :         }
     800             : 
     801             :         /*
     802             :          * Calculate how long send/receive loops should sleep
     803             :          */
     804        3664 :         sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
     805             :                                                  last_status);
     806             : 
     807        3664 :         r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
     808       11570 :         while (r != 0)
     809             :         {
     810        8160 :             if (r == -1)
     811           0 :                 goto error;
     812        8160 :             if (r == -2)
     813             :             {
     814         254 :                 PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
     815             : 
     816         254 :                 if (res == NULL)
     817           0 :                     goto error;
     818             :                 else
     819         254 :                     return res;
     820             :             }
     821             : 
     822             :             /* Check the message type. */
     823        7906 :             if (copybuf[0] == 'k')
     824             :             {
     825           0 :                 if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
     826             :                                          &last_status))
     827           0 :                     goto error;
     828             :             }
     829        7906 :             else if (copybuf[0] == 'w')
     830             :             {
     831        7906 :                 if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
     832           0 :                     goto error;
     833             : 
     834             :                 /*
     835             :                  * Check if we should continue streaming, or abort at this
     836             :                  * point.
     837             :                  */
     838        7906 :                 if (!CheckCopyStreamStop(conn, stream, blockpos))
     839           0 :                     goto error;
     840             :             }
     841             :             else
     842             :             {
     843           0 :                 pg_log_error("unrecognized streaming header: \"%c\"",
     844             :                              copybuf[0]);
     845           0 :                 goto error;
     846             :             }
     847             : 
     848             :             /*
     849             :              * Process the received data, and any subsequent data we can read
     850             :              * without blocking.
     851             :              */
     852        7906 :             r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
     853             :         }
     854             :     }
     855             : 
     856           0 : error:
     857           0 :     PQfreemem(copybuf);
     858           0 :     return NULL;
     859             : }
     860             : 
     861             : /*
     862             :  * Wait until we can read a CopyData message,
     863             :  * or timeout, or occurrence of a signal or input on the stop_socket.
     864             :  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
     865             :  *
     866             :  * Returns 1 if data has become available for reading, 0 if timed out
     867             :  * or interrupted by signal or stop_socket input, and -1 on an error.
     868             :  */
     869             : static int
     870       11264 : CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
     871             : {
     872             :     int         ret;
     873             :     fd_set      input_mask;
     874             :     int         connsocket;
     875             :     int         maxfd;
     876             :     struct timeval timeout;
     877             :     struct timeval *timeoutptr;
     878             : 
     879       11264 :     connsocket = PQsocket(conn);
     880       11264 :     if (connsocket < 0)
     881             :     {
     882           0 :         pg_log_error("invalid socket: %s", PQerrorMessage(conn));
     883           0 :         return -1;
     884             :     }
     885             : 
     886       11264 :     FD_ZERO(&input_mask);
     887       11264 :     FD_SET(connsocket, &input_mask);
     888       11264 :     maxfd = connsocket;
     889       11264 :     if (stop_socket != PGINVALID_SOCKET)
     890             :     {
     891       10910 :         FD_SET(stop_socket, &input_mask);
     892       10910 :         maxfd = Max(maxfd, stop_socket);
     893             :     }
     894             : 
     895       11264 :     if (timeout_ms < 0)
     896         186 :         timeoutptr = NULL;
     897             :     else
     898             :     {
     899       11078 :         timeout.tv_sec = timeout_ms / 1000L;
     900       11078 :         timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
     901       11078 :         timeoutptr = &timeout;
     902             :     }
     903             : 
     904       11264 :     ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
     905             : 
     906       11264 :     if (ret < 0)
     907             :     {
     908           0 :         if (errno == EINTR)
     909           0 :             return 0;           /* Got a signal, so not an error */
     910           0 :         pg_log_error("%s() failed: %m", "select");
     911           0 :         return -1;
     912             :     }
     913       11264 :     if (ret > 0 && FD_ISSET(connsocket, &input_mask))
     914        8388 :         return 1;               /* Got input on connection socket */
     915             : 
     916        2876 :     return 0;                   /* Got timeout or input on stop_socket */
     917             : }
     918             : 
     919             : /*
     920             :  * Receive CopyData message available from XLOG stream, blocking for
     921             :  * maximum of 'timeout' ms.
     922             :  *
     923             :  * If data was received, returns the length of the data. *buffer is set to
     924             :  * point to a buffer holding the received message. The buffer is only valid
     925             :  * until the next CopyStreamReceive call.
     926             :  *
     927             :  * Returns 0 if no data was available within timeout, or if wait was
     928             :  * interrupted by signal or stop_socket input.
     929             :  * -1 on error. -2 if the server ended the COPY.
     930             :  */
     931             : static int
     932       11570 : CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
     933             :                   char **buffer)
     934             : {
     935       11570 :     char       *copybuf = NULL;
     936             :     int         rawlen;
     937             : 
     938       11570 :     PQfreemem(*buffer);
     939       11570 :     *buffer = NULL;
     940             : 
     941             :     /* Try to receive a CopyData message */
     942       11570 :     rawlen = PQgetCopyData(conn, &copybuf, 1);
     943       11570 :     if (rawlen == 0)
     944             :     {
     945             :         int         ret;
     946             : 
     947             :         /*
     948             :          * No data available.  Wait for some to appear, but not longer than
     949             :          * the specified timeout, so that we can ping the server.  Also stop
     950             :          * waiting if input appears on stop_socket.
     951             :          */
     952       11264 :         ret = CopyStreamPoll(conn, timeout, stop_socket);
     953       11264 :         if (ret <= 0)
     954        2876 :             return ret;
     955             : 
     956             :         /* Now there is actually data on the socket */
     957        8388 :         if (PQconsumeInput(conn) == 0)
     958             :         {
     959           0 :             pg_log_error("could not receive data from WAL stream: %s",
     960             :                          PQerrorMessage(conn));
     961           0 :             return -1;
     962             :         }
     963             : 
     964             :         /* Now that we've consumed some input, try again */
     965        8388 :         rawlen = PQgetCopyData(conn, &copybuf, 1);
     966        8388 :         if (rawlen == 0)
     967         534 :             return 0;
     968             :     }
     969        8160 :     if (rawlen == -1)           /* end-of-streaming or error */
     970         254 :         return -2;
     971        7906 :     if (rawlen == -2)
     972             :     {
     973           0 :         pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
     974           0 :         return -1;
     975             :     }
     976             : 
     977             :     /* Return received messages to caller */
     978        7906 :     *buffer = copybuf;
     979        7906 :     return rawlen;
     980             : }
     981             : 
     982             : /*
     983             :  * Process the keepalive message.
     984             :  */
     985             : static bool
     986           0 : ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
     987             :                     XLogRecPtr blockpos, TimestampTz *last_status)
     988             : {
     989             :     int         pos;
     990             :     bool        replyRequested;
     991             :     TimestampTz now;
     992             : 
     993             :     /*
     994             :      * Parse the keepalive message, enclosed in the CopyData message. We just
     995             :      * check if the server requested a reply, and ignore the rest.
     996             :      */
     997           0 :     pos = 1;                    /* skip msgtype 'k' */
     998           0 :     pos += 8;                   /* skip walEnd */
     999           0 :     pos += 8;                   /* skip sendTime */
    1000             : 
    1001           0 :     if (len < pos + 1)
    1002             :     {
    1003           0 :         pg_log_error("streaming header too small: %d", len);
    1004           0 :         return false;
    1005             :     }
    1006           0 :     replyRequested = copybuf[pos];
    1007             : 
    1008             :     /* If the server requested an immediate reply, send one. */
    1009           0 :     if (replyRequested && still_sending)
    1010             :     {
    1011           0 :         if (reportFlushPosition && lastFlushPosition < blockpos &&
    1012           0 :             walfile != NULL)
    1013             :         {
    1014             :             /*
    1015             :              * If a valid flush location needs to be reported, flush the
    1016             :              * current WAL file so that the latest flush location is sent back
    1017             :              * to the server. This is necessary to see whether the last WAL
    1018             :              * data has been successfully replicated or not, at the normal
    1019             :              * shutdown of the server.
    1020             :              */
    1021           0 :             if (stream->walmethod->ops->sync(walfile) != 0)
    1022           0 :                 pg_fatal("could not fsync file \"%s\": %s",
    1023             :                          walfile->pathname, GetLastWalMethodError(stream->walmethod));
    1024           0 :             lastFlushPosition = blockpos;
    1025             :         }
    1026             : 
    1027           0 :         now = feGetCurrentTimestamp();
    1028           0 :         if (!sendFeedback(conn, blockpos, now, false))
    1029           0 :             return false;
    1030           0 :         *last_status = now;
    1031             :     }
    1032             : 
    1033           0 :     return true;
    1034             : }
    1035             : 
    1036             : /*
    1037             :  * Process XLogData message.
    1038             :  */
    1039             : static bool
    1040        7906 : ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
    1041             :                    XLogRecPtr *blockpos)
    1042             : {
    1043             :     int         xlogoff;
    1044             :     int         bytes_left;
    1045             :     int         bytes_written;
    1046             :     int         hdr_len;
    1047             : 
    1048             :     /*
    1049             :      * Once we've decided we don't want to receive any more, just ignore any
    1050             :      * subsequent XLogData messages.
    1051             :      */
    1052        7906 :     if (!(still_sending))
    1053         362 :         return true;
    1054             : 
    1055             :     /*
    1056             :      * Read the header of the XLogData message, enclosed in the CopyData
    1057             :      * message. We only need the WAL location field (dataStart), the rest of
    1058             :      * the header is ignored.
    1059             :      */
    1060        7544 :     hdr_len = 1;                /* msgtype 'w' */
    1061        7544 :     hdr_len += 8;               /* dataStart */
    1062        7544 :     hdr_len += 8;               /* walEnd */
    1063        7544 :     hdr_len += 8;               /* sendTime */
    1064        7544 :     if (len < hdr_len)
    1065             :     {
    1066           0 :         pg_log_error("streaming header too small: %d", len);
    1067           0 :         return false;
    1068             :     }
    1069        7544 :     *blockpos = fe_recvint64(&copybuf[1]);
    1070             : 
    1071             :     /* Extract WAL location for this block */
    1072        7544 :     xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
    1073             : 
    1074             :     /*
    1075             :      * Verify that the initial location in the stream matches where we think
    1076             :      * we are.
    1077             :      */
    1078        7544 :     if (walfile == NULL)
    1079             :     {
    1080             :         /* No file open yet */
    1081         264 :         if (xlogoff != 0)
    1082             :         {
    1083           0 :             pg_log_error("received write-ahead log record for offset %u with no file open",
    1084             :                          xlogoff);
    1085           0 :             return false;
    1086             :         }
    1087             :     }
    1088             :     else
    1089             :     {
    1090             :         /* More data in existing segment */
    1091        7280 :         if (walfile->currpos != xlogoff)
    1092             :         {
    1093           0 :             pg_log_error("got WAL data offset %08x, expected %08x",
    1094             :                          xlogoff, (int) walfile->currpos);
    1095           0 :             return false;
    1096             :         }
    1097             :     }
    1098             : 
    1099        7544 :     bytes_left = len - hdr_len;
    1100        7544 :     bytes_written = 0;
    1101             : 
    1102       15088 :     while (bytes_left)
    1103             :     {
    1104             :         int         bytes_to_write;
    1105             : 
    1106             :         /*
    1107             :          * If crossing a WAL boundary, only write up until we reach wal
    1108             :          * segment size.
    1109             :          */
    1110        7544 :         if (xlogoff + bytes_left > WalSegSz)
    1111           0 :             bytes_to_write = WalSegSz - xlogoff;
    1112             :         else
    1113        7544 :             bytes_to_write = bytes_left;
    1114             : 
    1115        7544 :         if (walfile == NULL)
    1116             :         {
    1117         264 :             if (!open_walfile(stream, *blockpos))
    1118             :             {
    1119             :                 /* Error logged by open_walfile */
    1120           0 :                 return false;
    1121             :             }
    1122             :         }
    1123             : 
    1124       15088 :         if (stream->walmethod->ops->write(walfile,
    1125        7544 :                                           copybuf + hdr_len + bytes_written,
    1126        7544 :                                           bytes_to_write) != bytes_to_write)
    1127             :         {
    1128           0 :             pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
    1129             :                          bytes_to_write, walfile->pathname,
    1130             :                          GetLastWalMethodError(stream->walmethod));
    1131           0 :             return false;
    1132             :         }
    1133             : 
    1134             :         /* Write was successful, advance our position */
    1135        7544 :         bytes_written += bytes_to_write;
    1136        7544 :         bytes_left -= bytes_to_write;
    1137        7544 :         *blockpos += bytes_to_write;
    1138        7544 :         xlogoff += bytes_to_write;
    1139             : 
    1140             :         /* Did we reach the end of a WAL segment? */
    1141        7544 :         if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
    1142             :         {
    1143          38 :             if (!close_walfile(stream, *blockpos))
    1144             :                 /* Error message written in close_walfile() */
    1145           0 :                 return false;
    1146             : 
    1147          38 :             xlogoff = 0;
    1148             : 
    1149          38 :             if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
    1150             :             {
    1151           0 :                 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1152             :                 {
    1153           0 :                     pg_log_error("could not send copy-end packet: %s",
    1154             :                                  PQerrorMessage(conn));
    1155           0 :                     return false;
    1156             :                 }
    1157           0 :                 still_sending = false;
    1158           0 :                 return true;    /* ignore the rest of this XLogData packet */
    1159             :             }
    1160             :         }
    1161             :     }
    1162             :     /* No more data left to write, receive next copy packet */
    1163             : 
    1164        7544 :     return true;
    1165             : }
    1166             : 
    1167             : /*
    1168             :  * Handle end of the copy stream.
    1169             :  */
    1170             : static PGresult *
    1171         254 : HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
    1172             :                       XLogRecPtr blockpos, XLogRecPtr *stoppos)
    1173             : {
    1174         254 :     PGresult   *res = PQgetResult(conn);
    1175             : 
    1176             :     /*
    1177             :      * The server closed its end of the copy stream.  If we haven't closed
    1178             :      * ours already, we need to do so now, unless the server threw an error,
    1179             :      * in which case we don't.
    1180             :      */
    1181         254 :     if (still_sending)
    1182             :     {
    1183           4 :         if (!close_walfile(stream, blockpos))
    1184             :         {
    1185             :             /* Error message written in close_walfile() */
    1186           0 :             PQclear(res);
    1187           0 :             return NULL;
    1188             :         }
    1189           4 :         if (PQresultStatus(res) == PGRES_COPY_IN)
    1190             :         {
    1191           2 :             if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1192             :             {
    1193           0 :                 pg_log_error("could not send copy-end packet: %s",
    1194             :                              PQerrorMessage(conn));
    1195           0 :                 PQclear(res);
    1196           0 :                 return NULL;
    1197             :             }
    1198           2 :             res = PQgetResult(conn);
    1199             :         }
    1200           4 :         still_sending = false;
    1201             :     }
    1202         254 :     PQfreemem(copybuf);
    1203         254 :     *stoppos = blockpos;
    1204         254 :     return res;
    1205             : }
    1206             : 
    1207             : /*
    1208             :  * Check if we should continue streaming, or abort at this point.
    1209             :  */
    1210             : static bool
    1211       11570 : CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
    1212             : {
    1213       11570 :     if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
    1214             :     {
    1215         250 :         if (!close_walfile(stream, blockpos))
    1216             :         {
    1217             :             /* Potential error message is written by close_walfile */
    1218           0 :             return false;
    1219             :         }
    1220         250 :         if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1221             :         {
    1222           0 :             pg_log_error("could not send copy-end packet: %s",
    1223             :                          PQerrorMessage(conn));
    1224           0 :             return false;
    1225             :         }
    1226         250 :         still_sending = false;
    1227             :     }
    1228             : 
    1229       11570 :     return true;
    1230             : }
    1231             : 
    1232             : /*
    1233             :  * Calculate how long send/receive loops should sleep
    1234             :  */
    1235             : static long
    1236        3664 : CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
    1237             :                              TimestampTz last_status)
    1238             : {
    1239        3664 :     TimestampTz status_targettime = 0;
    1240             :     long        sleeptime;
    1241             : 
    1242        3664 :     if (standby_message_timeout && still_sending)
    1243        3478 :         status_targettime = last_status +
    1244        3478 :             (standby_message_timeout - 1) * ((int64) 1000);
    1245             : 
    1246        3664 :     if (status_targettime > 0)
    1247             :     {
    1248             :         long        secs;
    1249             :         int         usecs;
    1250             : 
    1251        3478 :         feTimestampDifference(now,
    1252             :                               status_targettime,
    1253             :                               &secs,
    1254             :                               &usecs);
    1255             :         /* Always sleep at least 1 sec */
    1256        3478 :         if (secs <= 0)
    1257             :         {
    1258           0 :             secs = 1;
    1259           0 :             usecs = 0;
    1260             :         }
    1261             : 
    1262        3478 :         sleeptime = secs * 1000 + usecs / 1000;
    1263             :     }
    1264             :     else
    1265         186 :         sleeptime = -1;
    1266             : 
    1267        3664 :     return sleeptime;
    1268             : }

Generated by: LCOV version 1.14