LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - receivelog.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 269 425 63.3 %
Date: 2025-02-22 07:14:56 Functions: 16 17 94.1 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * receivelog.c - receive WAL files using the streaming
       4             :  *                replication protocol.
       5             :  *
       6             :  * Author: Magnus Hagander <magnus@hagander.net>
       7             :  *
       8             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *        src/bin/pg_basebackup/receivelog.c
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres_fe.h"
      16             : 
      17             : #include <sys/select.h>
      18             : #include <sys/stat.h>
      19             : #include <unistd.h>
      20             : 
      21             : #include "access/xlog_internal.h"
      22             : #include "common/logging.h"
      23             : #include "libpq-fe.h"
      24             : #include "receivelog.h"
      25             : #include "streamutil.h"
      26             : 
      27             : /* currently open WAL file */
      28             : static Walfile *walfile = NULL;
      29             : static bool reportFlushPosition = false;
      30             : static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
      31             : 
      32             : static bool still_sending = true;   /* feedback still needs to be sent? */
      33             : 
      34             : static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
      35             :                                   XLogRecPtr *stoppos);
      36             : static int  CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
      37             : static int  CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
      38             :                               char **buffer);
      39             : static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
      40             :                                 int len, XLogRecPtr blockpos, TimestampTz *last_status);
      41             : static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
      42             :                                XLogRecPtr *blockpos);
      43             : static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
      44             :                                        XLogRecPtr blockpos, XLogRecPtr *stoppos);
      45             : static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
      46             : static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
      47             :                                          TimestampTz last_status);
      48             : 
      49             : static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
      50             :                                      uint32 *timeline);
      51             : 
      52             : static bool
      53          30 : mark_file_as_archived(StreamCtl *stream, const char *fname)
      54             : {
      55             :     Walfile    *f;
      56             :     static char tmppath[MAXPGPATH];
      57             : 
      58          30 :     snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
      59             :              fname);
      60             : 
      61          30 :     f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
      62             :                                                NULL, 0);
      63          30 :     if (f == NULL)
      64             :     {
      65           0 :         pg_log_error("could not create archive status file \"%s\": %s",
      66             :                      tmppath, GetLastWalMethodError(stream->walmethod));
      67           0 :         return false;
      68             :     }
      69             : 
      70          30 :     if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
      71             :     {
      72           0 :         pg_log_error("could not close archive status file \"%s\": %s",
      73             :                      tmppath, GetLastWalMethodError(stream->walmethod));
      74           0 :         return false;
      75             :     }
      76             : 
      77          30 :     return true;
      78             : }
      79             : 
      80             : /*
      81             :  * Open a new WAL file in the specified directory.
      82             :  *
      83             :  * Returns true if OK; on failure, returns false after printing an error msg.
      84             :  * On success, 'walfile' is set to the opened WAL file.
      85             :  *
      86             :  * The file will be padded to 16Mb with zeroes.
      87             :  */
      88             : static bool
      89         286 : open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
      90             : {
      91             :     Walfile    *f;
      92             :     char       *fn;
      93             :     ssize_t     size;
      94             :     XLogSegNo   segno;
      95             :     char        walfile_name[MAXPGPATH];
      96             : 
      97         286 :     XLByteToSeg(startpoint, segno, WalSegSz);
      98         286 :     XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
      99             : 
     100             :     /* Note that this considers the compression used if necessary */
     101         286 :     fn = stream->walmethod->ops->get_file_name(stream->walmethod,
     102             :                                                walfile_name,
     103         286 :                                                stream->partial_suffix);
     104             : 
     105             :     /*
     106             :      * When streaming to files, if an existing file exists we verify that it's
     107             :      * either empty (just created), or a complete WalSegSz segment (in which
     108             :      * case it has been created and padded). Anything else indicates a corrupt
     109             :      * file. Compressed files have no need for padding, so just ignore this
     110             :      * case.
     111             :      *
     112             :      * When streaming to tar, no file with this name will exist before, so we
     113             :      * never have to verify a size.
     114             :      */
     115         558 :     if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE &&
     116         272 :         stream->walmethod->ops->existsfile(stream->walmethod, fn))
     117             :     {
     118           0 :         size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
     119           0 :         if (size < 0)
     120             :         {
     121           0 :             pg_log_error("could not get size of write-ahead log file \"%s\": %s",
     122             :                          fn, GetLastWalMethodError(stream->walmethod));
     123           0 :             pg_free(fn);
     124           0 :             return false;
     125             :         }
     126           0 :         if (size == WalSegSz)
     127             :         {
     128             :             /* Already padded file. Open it for use */
     129           0 :             f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
     130           0 :             if (f == NULL)
     131             :             {
     132           0 :                 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
     133             :                              fn, GetLastWalMethodError(stream->walmethod));
     134           0 :                 pg_free(fn);
     135           0 :                 return false;
     136             :             }
     137             : 
     138             :             /* fsync file in case of a previous crash */
     139           0 :             if (stream->walmethod->ops->sync(f) != 0)
     140             :             {
     141           0 :                 pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
     142             :                              fn, GetLastWalMethodError(stream->walmethod));
     143           0 :                 stream->walmethod->ops->close(f, CLOSE_UNLINK);
     144           0 :                 exit(1);
     145             :             }
     146             : 
     147           0 :             walfile = f;
     148           0 :             pg_free(fn);
     149           0 :             return true;
     150             :         }
     151           0 :         if (size != 0)
     152             :         {
     153             :             /* if write didn't set errno, assume problem is no disk space */
     154           0 :             if (errno == 0)
     155           0 :                 errno = ENOSPC;
     156           0 :             pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
     157             :                                   "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
     158             :                                   size),
     159             :                          fn, size, WalSegSz);
     160           0 :             pg_free(fn);
     161           0 :             return false;
     162             :         }
     163             :         /* File existed and was empty, so fall through and open */
     164             :     }
     165             : 
     166             :     /* No file existed, so create one */
     167             : 
     168         286 :     f = stream->walmethod->ops->open_for_write(stream->walmethod,
     169             :                                                walfile_name,
     170         286 :                                                stream->partial_suffix,
     171             :                                                WalSegSz);
     172         286 :     if (f == NULL)
     173             :     {
     174           0 :         pg_log_error("could not open write-ahead log file \"%s\": %s",
     175             :                      fn, GetLastWalMethodError(stream->walmethod));
     176           0 :         pg_free(fn);
     177           0 :         return false;
     178             :     }
     179             : 
     180         286 :     pg_free(fn);
     181         286 :     walfile = f;
     182         286 :     return true;
     183             : }
     184             : 
     185             : /*
     186             :  * Close the current WAL file (if open), and rename it to the correct
     187             :  * filename if it's complete. On failure, prints an error message to stderr
     188             :  * and returns false, otherwise returns true.
     189             :  */
     190             : static bool
     191         312 : close_walfile(StreamCtl *stream, XLogRecPtr pos)
     192             : {
     193             :     char       *fn;
     194             :     pgoff_t     currpos;
     195             :     int         r;
     196             :     char        walfile_name[MAXPGPATH];
     197             : 
     198         312 :     if (walfile == NULL)
     199          26 :         return true;
     200             : 
     201         286 :     strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
     202         286 :     currpos = walfile->currpos;
     203             : 
     204             :     /* Note that this considers the compression used if necessary */
     205         286 :     fn = stream->walmethod->ops->get_file_name(stream->walmethod,
     206             :                                                walfile_name,
     207         286 :                                                stream->partial_suffix);
     208             : 
     209         286 :     if (stream->partial_suffix)
     210             :     {
     211          24 :         if (currpos == WalSegSz)
     212          12 :             r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
     213             :         else
     214             :         {
     215          12 :             pg_log_info("not renaming \"%s\", segment is not complete", fn);
     216          12 :             r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
     217             :         }
     218             :     }
     219             :     else
     220         262 :         r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
     221             : 
     222         286 :     walfile = NULL;
     223             : 
     224         286 :     if (r != 0)
     225             :     {
     226           0 :         pg_log_error("could not close file \"%s\": %s",
     227             :                      fn, GetLastWalMethodError(stream->walmethod));
     228             : 
     229           0 :         pg_free(fn);
     230           0 :         return false;
     231             :     }
     232             : 
     233         286 :     pg_free(fn);
     234             : 
     235             :     /*
     236             :      * Mark file as archived if requested by the caller - pg_basebackup needs
     237             :      * to do so as files can otherwise get archived again after promotion of a
     238             :      * new node. This is in line with walreceiver.c always doing a
     239             :      * XLogArchiveForceDone() after a complete segment.
     240             :      */
     241         286 :     if (currpos == WalSegSz && stream->mark_done)
     242             :     {
     243             :         /* writes error message if failed */
     244          26 :         if (!mark_file_as_archived(stream, walfile_name))
     245           0 :             return false;
     246             :     }
     247             : 
     248         286 :     lastFlushPosition = pos;
     249         286 :     return true;
     250             : }
     251             : 
     252             : 
     253             : /*
     254             :  * Check if a timeline history file exists.
     255             :  */
     256             : static bool
     257         276 : existsTimeLineHistoryFile(StreamCtl *stream)
     258             : {
     259             :     char        histfname[MAXFNAMELEN];
     260             : 
     261             :     /*
     262             :      * Timeline 1 never has a history file. We treat that as if it existed,
     263             :      * since we never need to stream it.
     264             :      */
     265         276 :     if (stream->timeline == 1)
     266         270 :         return true;
     267             : 
     268           6 :     TLHistoryFileName(histfname, stream->timeline);
     269             : 
     270           6 :     return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
     271             : }
     272             : 
     273             : static bool
     274           6 : writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
     275             : {
     276           6 :     int         size = strlen(content);
     277             :     char        histfname[MAXFNAMELEN];
     278             :     Walfile    *f;
     279             : 
     280             :     /*
     281             :      * Check that the server's idea of how timeline history files should be
     282             :      * named matches ours.
     283             :      */
     284           6 :     TLHistoryFileName(histfname, stream->timeline);
     285           6 :     if (strcmp(histfname, filename) != 0)
     286             :     {
     287           0 :         pg_log_error("server reported unexpected history file name for timeline %u: %s",
     288             :                      stream->timeline, filename);
     289           0 :         return false;
     290             :     }
     291             : 
     292           6 :     f = stream->walmethod->ops->open_for_write(stream->walmethod,
     293             :                                                histfname, ".tmp", 0);
     294           6 :     if (f == NULL)
     295             :     {
     296           0 :         pg_log_error("could not create timeline history file \"%s\": %s",
     297             :                      histfname, GetLastWalMethodError(stream->walmethod));
     298           0 :         return false;
     299             :     }
     300             : 
     301           6 :     if ((int) stream->walmethod->ops->write(f, content, size) != size)
     302             :     {
     303           0 :         pg_log_error("could not write timeline history file \"%s\": %s",
     304             :                      histfname, GetLastWalMethodError(stream->walmethod));
     305             : 
     306             :         /*
     307             :          * If we fail to make the file, delete it to release disk space
     308             :          */
     309           0 :         stream->walmethod->ops->close(f, CLOSE_UNLINK);
     310             : 
     311           0 :         return false;
     312             :     }
     313             : 
     314           6 :     if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
     315             :     {
     316           0 :         pg_log_error("could not close file \"%s\": %s",
     317             :                      histfname, GetLastWalMethodError(stream->walmethod));
     318           0 :         return false;
     319             :     }
     320             : 
     321             :     /* Maintain archive_status, check close_walfile() for details. */
     322           6 :     if (stream->mark_done)
     323             :     {
     324             :         /* writes error message if failed */
     325           4 :         if (!mark_file_as_archived(stream, histfname))
     326           0 :             return false;
     327             :     }
     328             : 
     329           6 :     return true;
     330             : }
     331             : 
     332             : /*
     333             :  * Send a Standby Status Update message to server.
     334             :  */
     335             : static bool
     336         274 : sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
     337             : {
     338             :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
     339         274 :     int         len = 0;
     340             : 
     341         274 :     replybuf[len] = 'r';
     342         274 :     len += 1;
     343         274 :     fe_sendint64(blockpos, &replybuf[len]); /* write */
     344         274 :     len += 8;
     345         274 :     if (reportFlushPosition)
     346         266 :         fe_sendint64(lastFlushPosition, &replybuf[len]);    /* flush */
     347             :     else
     348           8 :         fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* flush */
     349         274 :     len += 8;
     350         274 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
     351         274 :     len += 8;
     352         274 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
     353         274 :     len += 8;
     354         274 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
     355         274 :     len += 1;
     356             : 
     357         274 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
     358             :     {
     359           0 :         pg_log_error("could not send feedback packet: %s",
     360             :                      PQerrorMessage(conn));
     361           0 :         return false;
     362             :     }
     363             : 
     364         274 :     return true;
     365             : }
     366             : 
     367             : /*
     368             :  * Check that the server version we're connected to is supported by
     369             :  * ReceiveXlogStream().
     370             :  *
     371             :  * If it's not, an error message is printed to stderr, and false is returned.
     372             :  */
     373             : bool
     374         590 : CheckServerVersionForStreaming(PGconn *conn)
     375             : {
     376             :     int         minServerMajor,
     377             :                 maxServerMajor;
     378             :     int         serverMajor;
     379             : 
     380             :     /*
     381             :      * The message format used in streaming replication changed in 9.3, so we
     382             :      * cannot stream from older servers. And we don't support servers newer
     383             :      * than the client; it might work, but we don't know, so err on the safe
     384             :      * side.
     385             :      */
     386         590 :     minServerMajor = 903;
     387         590 :     maxServerMajor = PG_VERSION_NUM / 100;
     388         590 :     serverMajor = PQserverVersion(conn) / 100;
     389         590 :     if (serverMajor < minServerMajor)
     390             :     {
     391           0 :         const char *serverver = PQparameterStatus(conn, "server_version");
     392             : 
     393           0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
     394             :                      serverver ? serverver : "'unknown'",
     395             :                      "9.3");
     396           0 :         return false;
     397             :     }
     398         590 :     else if (serverMajor > maxServerMajor)
     399             :     {
     400           0 :         const char *serverver = PQparameterStatus(conn, "server_version");
     401             : 
     402           0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
     403             :                      serverver ? serverver : "'unknown'",
     404             :                      PG_VERSION);
     405           0 :         return false;
     406             :     }
     407         590 :     return true;
     408             : }
     409             : 
     410             : /*
     411             :  * Receive a log stream starting at the specified position.
     412             :  *
     413             :  * Individual parameters are passed through the StreamCtl structure.
     414             :  *
     415             :  * If sysidentifier is specified, validate that both the system
     416             :  * identifier and the timeline matches the specified ones
     417             :  * (by sending an extra IDENTIFY_SYSTEM command)
     418             :  *
     419             :  * All received segments will be written to the directory
     420             :  * specified by basedir. This will also fetch any missing timeline history
     421             :  * files.
     422             :  *
     423             :  * The stream_stop callback will be called every time data
     424             :  * is received, and whenever a segment is completed. If it returns
     425             :  * true, the streaming will stop and the function
     426             :  * return. As long as it returns false, streaming will continue
     427             :  * indefinitely.
     428             :  *
     429             :  * If stream_stop() checks for external input, stop_socket should be set to
     430             :  * the FD it checks.  This will allow such input to be detected promptly
     431             :  * rather than after standby_message_timeout (which might be indefinite).
     432             :  * Note that signals will interrupt waits for input as well, but that is
     433             :  * race-y since a signal received while busy won't interrupt the wait.
     434             :  *
     435             :  * standby_message_timeout controls how often we send a message
     436             :  * back to the primary letting it know our progress, in milliseconds.
     437             :  * Zero means no messages are sent.
     438             :  * This message will only contain the write location, and never
     439             :  * flush or replay.
     440             :  *
     441             :  * If 'partial_suffix' is not NULL, files are initially created with the
     442             :  * given suffix, and the suffix is removed once the file is finished. That
     443             :  * allows you to tell the difference between partial and completed files,
     444             :  * so that you can continue later where you left.
     445             :  *
     446             :  * If 'synchronous' is true, the received WAL is flushed as soon as written,
     447             :  * otherwise only when the WAL file is closed.
     448             :  *
     449             :  * Note: The WAL location *must* be at a log segment start!
     450             :  */
     451             : bool
     452         274 : ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
     453             : {
     454             :     char        query[128];
     455             :     char        slotcmd[128];
     456             :     PGresult   *res;
     457             :     XLogRecPtr  stoppos;
     458             : 
     459             :     /*
     460             :      * The caller should've checked the server version already, but doesn't do
     461             :      * any harm to check it here too.
     462             :      */
     463         274 :     if (!CheckServerVersionForStreaming(conn))
     464           0 :         return false;
     465             : 
     466             :     /*
     467             :      * Decide whether we want to report the flush position. If we report the
     468             :      * flush position, the primary will know what WAL we'll possibly
     469             :      * re-request, and it can then remove older WAL safely. We must always do
     470             :      * that when we are using slots.
     471             :      *
     472             :      * Reporting the flush position makes one eligible as a synchronous
     473             :      * replica. People shouldn't include generic names in
     474             :      * synchronous_standby_names, but we've protected them against it so far,
     475             :      * so let's continue to do so unless specifically requested.
     476             :      */
     477         274 :     if (stream->replication_slot != NULL)
     478             :     {
     479         264 :         reportFlushPosition = true;
     480         264 :         sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
     481             :     }
     482             :     else
     483             :     {
     484          10 :         if (stream->synchronous)
     485           2 :             reportFlushPosition = true;
     486             :         else
     487           8 :             reportFlushPosition = false;
     488          10 :         slotcmd[0] = 0;
     489             :     }
     490             : 
     491         274 :     if (stream->sysidentifier != NULL)
     492             :     {
     493         274 :         char       *sysidentifier = NULL;
     494             :         TimeLineID  servertli;
     495             : 
     496             :         /*
     497             :          * Get the server system identifier and timeline, and validate them.
     498             :          */
     499         274 :         if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
     500             :         {
     501           0 :             pg_free(sysidentifier);
     502           0 :             return false;
     503             :         }
     504             : 
     505         274 :         if (strcmp(stream->sysidentifier, sysidentifier) != 0)
     506             :         {
     507           0 :             pg_log_error("system identifier does not match between base backup and streaming connection");
     508           0 :             pg_free(sysidentifier);
     509           0 :             return false;
     510             :         }
     511         274 :         pg_free(sysidentifier);
     512             : 
     513         274 :         if (stream->timeline > servertli)
     514             :         {
     515           0 :             pg_log_error("starting timeline %u is not present in the server",
     516             :                          stream->timeline);
     517           0 :             return false;
     518             :         }
     519             :     }
     520             : 
     521             :     /*
     522             :      * initialize flush position to starting point, it's the caller's
     523             :      * responsibility that that's sane.
     524             :      */
     525         274 :     lastFlushPosition = stream->startpos;
     526             : 
     527             :     while (1)
     528             :     {
     529             :         /*
     530             :          * Fetch the timeline history file for this timeline, if we don't have
     531             :          * it already. When streaming log to tar, this will always return
     532             :          * false, as we are never streaming into an existing file and
     533             :          * therefore there can be no pre-existing timeline history file.
     534             :          */
     535         276 :         if (!existsTimeLineHistoryFile(stream))
     536             :         {
     537           6 :             snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
     538           6 :             res = PQexec(conn, query);
     539           6 :             if (PQresultStatus(res) != PGRES_TUPLES_OK)
     540             :             {
     541             :                 /* FIXME: we might send it ok, but get an error */
     542           0 :                 pg_log_error("could not send replication command \"%s\": %s",
     543             :                              "TIMELINE_HISTORY", PQresultErrorMessage(res));
     544           0 :                 PQclear(res);
     545           0 :                 return false;
     546             :             }
     547             : 
     548             :             /*
     549             :              * The response to TIMELINE_HISTORY is a single row result set
     550             :              * with two fields: filename and content
     551             :              */
     552           6 :             if (PQnfields(res) != 2 || PQntuples(res) != 1)
     553             :             {
     554           0 :                 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
     555             :                                PQntuples(res), PQnfields(res), 1, 2);
     556             :             }
     557             : 
     558             :             /* Write the history file to disk */
     559           6 :             writeTimeLineHistoryFile(stream,
     560             :                                      PQgetvalue(res, 0, 0),
     561             :                                      PQgetvalue(res, 0, 1));
     562             : 
     563           6 :             PQclear(res);
     564             :         }
     565             : 
     566             :         /*
     567             :          * Before we start streaming from the requested location, check if the
     568             :          * callback tells us to stop here.
     569             :          */
     570         276 :         if (stream->stream_stop(stream->startpos, stream->timeline, false))
     571           0 :             return true;
     572             : 
     573             :         /* Initiate the replication stream at specified location */
     574         276 :         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
     575             :                  slotcmd,
     576         276 :                  LSN_FORMAT_ARGS(stream->startpos),
     577             :                  stream->timeline);
     578         276 :         res = PQexec(conn, query);
     579         276 :         if (PQresultStatus(res) != PGRES_COPY_BOTH)
     580             :         {
     581           2 :             pg_log_error("could not send replication command \"%s\": %s",
     582             :                          "START_REPLICATION", PQresultErrorMessage(res));
     583           2 :             PQclear(res);
     584           2 :             return false;
     585             :         }
     586         274 :         PQclear(res);
     587             : 
     588             :         /* Stream the WAL */
     589         274 :         res = HandleCopyStream(conn, stream, &stoppos);
     590         274 :         if (res == NULL)
     591           0 :             goto error;
     592             : 
     593             :         /*
     594             :          * Streaming finished.
     595             :          *
     596             :          * There are two possible reasons for that: a controlled shutdown, or
     597             :          * we reached the end of the current timeline. In case of
     598             :          * end-of-timeline, the server sends a result set after Copy has
     599             :          * finished, containing information about the next timeline. Read
     600             :          * that, and restart streaming from the next timeline. In case of
     601             :          * controlled shutdown, stop here.
     602             :          */
     603         274 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
     604             :         {
     605             :             /*
     606             :              * End-of-timeline. Read the next timeline's ID and starting
     607             :              * position. Usually, the starting position will match the end of
     608             :              * the previous timeline, but there are corner cases like if the
     609             :              * server had sent us half of a WAL record, when it was promoted.
     610             :              * The new timeline will begin at the end of the last complete
     611             :              * record in that case, overlapping the partial WAL record on the
     612             :              * old timeline.
     613             :              */
     614             :             uint32      newtimeline;
     615             :             bool        parsed;
     616             : 
     617           2 :             parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
     618           2 :             PQclear(res);
     619           2 :             if (!parsed)
     620           0 :                 goto error;
     621             : 
     622             :             /* Sanity check the values the server gave us */
     623           2 :             if (newtimeline <= stream->timeline)
     624             :             {
     625           0 :                 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
     626             :                              newtimeline, stream->timeline);
     627           0 :                 goto error;
     628             :             }
     629           2 :             if (stream->startpos > stoppos)
     630             :             {
     631           0 :                 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
     632             :                              stream->timeline, LSN_FORMAT_ARGS(stoppos),
     633             :                              newtimeline, LSN_FORMAT_ARGS(stream->startpos));
     634           0 :                 goto error;
     635             :             }
     636             : 
     637             :             /* Read the final result, which should be CommandComplete. */
     638           2 :             res = PQgetResult(conn);
     639           2 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
     640             :             {
     641           0 :                 pg_log_error("unexpected termination of replication stream: %s",
     642             :                              PQresultErrorMessage(res));
     643           0 :                 PQclear(res);
     644           0 :                 goto error;
     645             :             }
     646           2 :             PQclear(res);
     647             : 
     648             :             /*
     649             :              * Loop back to start streaming from the new timeline. Always
     650             :              * start streaming at the beginning of a segment.
     651             :              */
     652           2 :             stream->timeline = newtimeline;
     653           2 :             stream->startpos = stream->startpos -
     654           2 :                 XLogSegmentOffset(stream->startpos, WalSegSz);
     655           2 :             continue;
     656             :         }
     657         272 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     658             :         {
     659         270 :             PQclear(res);
     660             : 
     661             :             /*
     662             :              * End of replication (ie. controlled shut down of the server).
     663             :              *
     664             :              * Check if the callback thinks it's OK to stop here. If not,
     665             :              * complain.
     666             :              */
     667         270 :             if (stream->stream_stop(stoppos, stream->timeline, false))
     668         270 :                 return true;
     669             :             else
     670             :             {
     671           0 :                 pg_log_error("replication stream was terminated before stop point");
     672           0 :                 goto error;
     673             :             }
     674             :         }
     675             :         else
     676             :         {
     677             :             /* Server returned an error. */
     678           2 :             pg_log_error("unexpected termination of replication stream: %s",
     679             :                          PQresultErrorMessage(res));
     680           2 :             PQclear(res);
     681           2 :             goto error;
     682             :         }
     683             :     }
     684             : 
     685           2 : error:
     686           2 :     if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
     687           0 :         pg_log_error("could not close file \"%s\": %s",
     688             :                      walfile->pathname, GetLastWalMethodError(stream->walmethod));
     689           2 :     walfile = NULL;
     690           2 :     return false;
     691             : }
     692             : 
     693             : /*
     694             :  * Helper function to parse the result set returned by server after streaming
     695             :  * has finished. On failure, prints an error to stderr and returns false.
     696             :  */
     697             : static bool
     698           2 : ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
     699             : {
     700             :     uint32      startpos_xlogid,
     701             :                 startpos_xrecoff;
     702             : 
     703             :     /*----------
     704             :      * The result set consists of one row and two columns, e.g:
     705             :      *
     706             :      *  next_tli | next_tli_startpos
     707             :      * ----------+-------------------
     708             :      *         4 | 0/9949AE0
     709             :      *
     710             :      * next_tli is the timeline ID of the next timeline after the one that
     711             :      * just finished streaming. next_tli_startpos is the WAL location where
     712             :      * the server switched to it.
     713             :      *----------
     714             :      */
     715           2 :     if (PQnfields(res) < 2 || PQntuples(res) != 1)
     716             :     {
     717           0 :         pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
     718             :                      PQntuples(res), PQnfields(res), 1, 2);
     719           0 :         return false;
     720             :     }
     721             : 
     722           2 :     *timeline = atoi(PQgetvalue(res, 0, 0));
     723           2 :     if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
     724             :                &startpos_xrecoff) != 2)
     725             :     {
     726           0 :         pg_log_error("could not parse next timeline's starting point \"%s\"",
     727             :                      PQgetvalue(res, 0, 1));
     728           0 :         return false;
     729             :     }
     730           2 :     *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
     731             : 
     732           2 :     return true;
     733             : }
     734             : 
     735             : /*
     736             :  * The main loop of ReceiveXlogStream. Handles the COPY stream after
     737             :  * initiating streaming with the START_REPLICATION command.
     738             :  *
     739             :  * If the COPY ends (not necessarily successfully) due a message from the
     740             :  * server, returns a PGresult and sets *stoppos to the last byte written.
     741             :  * On any other sort of error, returns NULL.
     742             :  */
     743             : static PGresult *
     744         274 : HandleCopyStream(PGconn *conn, StreamCtl *stream,
     745             :                  XLogRecPtr *stoppos)
     746             : {
     747         274 :     char       *copybuf = NULL;
     748         274 :     TimestampTz last_status = -1;
     749         274 :     XLogRecPtr  blockpos = stream->startpos;
     750             : 
     751         274 :     still_sending = true;
     752             : 
     753             :     while (1)
     754        3308 :     {
     755             :         int         r;
     756             :         TimestampTz now;
     757             :         long        sleeptime;
     758             : 
     759             :         /*
     760             :          * Check if we should continue streaming, or abort at this point.
     761             :          */
     762        3582 :         if (!CheckCopyStreamStop(conn, stream, blockpos))
     763           0 :             goto error;
     764             : 
     765        3582 :         now = feGetCurrentTimestamp();
     766             : 
     767             :         /*
     768             :          * If synchronous option is true, issue sync command as soon as there
     769             :          * are WAL data which has not been flushed yet.
     770             :          */
     771        3582 :         if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
     772             :         {
     773           0 :             if (stream->walmethod->ops->sync(walfile) != 0)
     774           0 :                 pg_fatal("could not fsync file \"%s\": %s",
     775             :                          walfile->pathname, GetLastWalMethodError(stream->walmethod));
     776           0 :             lastFlushPosition = blockpos;
     777             : 
     778             :             /*
     779             :              * Send feedback so that the server sees the latest WAL locations
     780             :              * immediately.
     781             :              */
     782           0 :             if (!sendFeedback(conn, blockpos, now, false))
     783           0 :                 goto error;
     784           0 :             last_status = now;
     785             :         }
     786             : 
     787             :         /*
     788             :          * Potentially send a status message to the primary
     789             :          */
     790        6954 :         if (still_sending && stream->standby_message_timeout > 0 &&
     791        3372 :             feTimestampDifferenceExceeds(last_status, now,
     792             :                                          stream->standby_message_timeout))
     793             :         {
     794             :             /* Time to send feedback! */
     795         274 :             if (!sendFeedback(conn, blockpos, now, false))
     796           0 :                 goto error;
     797         274 :             last_status = now;
     798             :         }
     799             : 
     800             :         /*
     801             :          * Calculate how long send/receive loops should sleep
     802             :          */
     803        3582 :         sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
     804             :                                                  last_status);
     805             : 
     806             :         /* Done with any prior message */
     807        3582 :         PQfreemem(copybuf);
     808        3582 :         copybuf = NULL;
     809             : 
     810        3582 :         r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
     811       10642 :         while (r != 0)
     812             :         {
     813        7334 :             if (r == -1)
     814           0 :                 goto error;
     815        7334 :             if (r == -2)
     816             :             {
     817         274 :                 PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
     818             : 
     819         274 :                 if (res == NULL)
     820           0 :                     goto error;
     821         274 :                 PQfreemem(copybuf);
     822         274 :                 return res;
     823             :             }
     824             : 
     825             :             /* Check the message type. */
     826        7060 :             if (copybuf[0] == 'k')
     827             :             {
     828           0 :                 if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
     829             :                                          &last_status))
     830           0 :                     goto error;
     831             :             }
     832        7060 :             else if (copybuf[0] == 'w')
     833             :             {
     834        7060 :                 if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
     835           0 :                     goto error;
     836             : 
     837             :                 /*
     838             :                  * Check if we should continue streaming, or abort at this
     839             :                  * point.
     840             :                  */
     841        7060 :                 if (!CheckCopyStreamStop(conn, stream, blockpos))
     842           0 :                     goto error;
     843             :             }
     844             :             else
     845             :             {
     846           0 :                 pg_log_error("unrecognized streaming header: \"%c\"",
     847             :                              copybuf[0]);
     848           0 :                 goto error;
     849             :             }
     850             : 
     851             :             /* Done with that message */
     852        7060 :             PQfreemem(copybuf);
     853        7060 :             copybuf = NULL;
     854             : 
     855             :             /*
     856             :              * Process the received data, and any subsequent data we can read
     857             :              * without blocking.
     858             :              */
     859        7060 :             r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
     860             :         }
     861             :     }
     862             : 
     863           0 : error:
     864           0 :     PQfreemem(copybuf);
     865           0 :     return NULL;
     866             : }
     867             : 
     868             : /*
     869             :  * Wait until we can read a CopyData message,
     870             :  * or timeout, or occurrence of a signal or input on the stop_socket.
     871             :  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
     872             :  *
     873             :  * Returns 1 if data has become available for reading, 0 if timed out
     874             :  * or interrupted by signal or stop_socket input, and -1 on an error.
     875             :  */
     876             : static int
     877       10276 : CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
     878             : {
     879             :     int         ret;
     880             :     fd_set      input_mask;
     881             :     int         connsocket;
     882             :     int         maxfd;
     883             :     struct timeval timeout;
     884             :     struct timeval *timeoutptr;
     885             : 
     886       10276 :     connsocket = PQsocket(conn);
     887       10276 :     if (connsocket < 0)
     888             :     {
     889           0 :         pg_log_error("invalid socket: %s", PQerrorMessage(conn));
     890           0 :         return -1;
     891             :     }
     892             : 
     893       10276 :     FD_ZERO(&input_mask);
     894       10276 :     FD_SET(connsocket, &input_mask);
     895       10276 :     maxfd = connsocket;
     896       10276 :     if (stop_socket != PGINVALID_SOCKET)
     897             :     {
     898       10028 :         FD_SET(stop_socket, &input_mask);
     899       10028 :         maxfd = Max(maxfd, stop_socket);
     900             :     }
     901             : 
     902       10276 :     if (timeout_ms < 0)
     903         210 :         timeoutptr = NULL;
     904             :     else
     905             :     {
     906       10066 :         timeout.tv_sec = timeout_ms / 1000L;
     907       10066 :         timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
     908       10066 :         timeoutptr = &timeout;
     909             :     }
     910             : 
     911       10276 :     ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
     912             : 
     913       10276 :     if (ret < 0)
     914             :     {
     915           0 :         if (errno == EINTR)
     916           0 :             return 0;           /* Got a signal, so not an error */
     917           0 :         pg_log_error("%s() failed: %m", "select");
     918           0 :         return -1;
     919             :     }
     920       10276 :     if (ret > 0 && FD_ISSET(connsocket, &input_mask))
     921        7482 :         return 1;               /* Got input on connection socket */
     922             : 
     923        2794 :     return 0;                   /* Got timeout or input on stop_socket */
     924             : }
     925             : 
     926             : /*
     927             :  * Receive CopyData message available from XLOG stream, blocking for
     928             :  * maximum of 'timeout' ms.
     929             :  *
     930             :  * If data was received, returns the length of the data. *buffer is set to
     931             :  * point to a buffer holding the received message. The caller must eventually
     932             :  * free the buffer with PQfreemem().
     933             :  *
     934             :  * Returns 0 if no data was available within timeout, or if wait was
     935             :  * interrupted by signal or stop_socket input.
     936             :  * -1 on error. -2 if the server ended the COPY.
     937             :  */
     938             : static int
     939       10642 : CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
     940             :                   char **buffer)
     941             : {
     942       10642 :     char       *copybuf = NULL;
     943             :     int         rawlen;
     944             : 
     945             :     /* Caller should have cleared any prior buffer */
     946             :     Assert(*buffer == NULL);
     947             : 
     948             :     /* Try to receive a CopyData message */
     949       10642 :     rawlen = PQgetCopyData(conn, &copybuf, 1);
     950       10642 :     if (rawlen == 0)
     951             :     {
     952             :         int         ret;
     953             : 
     954             :         /*
     955             :          * No data available.  Wait for some to appear, but not longer than
     956             :          * the specified timeout, so that we can ping the server.  Also stop
     957             :          * waiting if input appears on stop_socket.
     958             :          */
     959       10276 :         ret = CopyStreamPoll(conn, timeout, stop_socket);
     960       10276 :         if (ret <= 0)
     961        2794 :             return ret;
     962             : 
     963             :         /* Now there is actually data on the socket */
     964        7482 :         if (PQconsumeInput(conn) == 0)
     965             :         {
     966           0 :             pg_log_error("could not receive data from WAL stream: %s",
     967             :                          PQerrorMessage(conn));
     968           0 :             return -1;
     969             :         }
     970             : 
     971             :         /* Now that we've consumed some input, try again */
     972        7482 :         rawlen = PQgetCopyData(conn, &copybuf, 1);
     973        7482 :         if (rawlen == 0)
     974         514 :             return 0;
     975             :     }
     976        7334 :     if (rawlen == -1)           /* end-of-streaming or error */
     977         274 :         return -2;
     978        7060 :     if (rawlen == -2)
     979             :     {
     980           0 :         pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
     981           0 :         return -1;
     982             :     }
     983             : 
     984             :     /* Return received messages to caller */
     985        7060 :     *buffer = copybuf;
     986        7060 :     return rawlen;
     987             : }
     988             : 
     989             : /*
     990             :  * Process the keepalive message.
     991             :  */
     992             : static bool
     993           0 : ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
     994             :                     XLogRecPtr blockpos, TimestampTz *last_status)
     995             : {
     996             :     int         pos;
     997             :     bool        replyRequested;
     998             :     TimestampTz now;
     999             : 
    1000             :     /*
    1001             :      * Parse the keepalive message, enclosed in the CopyData message. We just
    1002             :      * check if the server requested a reply, and ignore the rest.
    1003             :      */
    1004           0 :     pos = 1;                    /* skip msgtype 'k' */
    1005           0 :     pos += 8;                   /* skip walEnd */
    1006           0 :     pos += 8;                   /* skip sendTime */
    1007             : 
    1008           0 :     if (len < pos + 1)
    1009             :     {
    1010           0 :         pg_log_error("streaming header too small: %d", len);
    1011           0 :         return false;
    1012             :     }
    1013           0 :     replyRequested = copybuf[pos];
    1014             : 
    1015             :     /* If the server requested an immediate reply, send one. */
    1016           0 :     if (replyRequested && still_sending)
    1017             :     {
    1018           0 :         if (reportFlushPosition && lastFlushPosition < blockpos &&
    1019           0 :             walfile != NULL)
    1020             :         {
    1021             :             /*
    1022             :              * If a valid flush location needs to be reported, flush the
    1023             :              * current WAL file so that the latest flush location is sent back
    1024             :              * to the server. This is necessary to see whether the last WAL
    1025             :              * data has been successfully replicated or not, at the normal
    1026             :              * shutdown of the server.
    1027             :              */
    1028           0 :             if (stream->walmethod->ops->sync(walfile) != 0)
    1029           0 :                 pg_fatal("could not fsync file \"%s\": %s",
    1030             :                          walfile->pathname, GetLastWalMethodError(stream->walmethod));
    1031           0 :             lastFlushPosition = blockpos;
    1032             :         }
    1033             : 
    1034           0 :         now = feGetCurrentTimestamp();
    1035           0 :         if (!sendFeedback(conn, blockpos, now, false))
    1036           0 :             return false;
    1037           0 :         *last_status = now;
    1038             :     }
    1039             : 
    1040           0 :     return true;
    1041             : }
    1042             : 
    1043             : /*
    1044             :  * Process XLogData message.
    1045             :  */
    1046             : static bool
    1047        7060 : ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
    1048             :                    XLogRecPtr *blockpos)
    1049             : {
    1050             :     int         xlogoff;
    1051             :     int         bytes_left;
    1052             :     int         bytes_written;
    1053             :     int         hdr_len;
    1054             : 
    1055             :     /*
    1056             :      * Once we've decided we don't want to receive any more, just ignore any
    1057             :      * subsequent XLogData messages.
    1058             :      */
    1059        7060 :     if (!(still_sending))
    1060         362 :         return true;
    1061             : 
    1062             :     /*
    1063             :      * Read the header of the XLogData message, enclosed in the CopyData
    1064             :      * message. We only need the WAL location field (dataStart), the rest of
    1065             :      * the header is ignored.
    1066             :      */
    1067        6698 :     hdr_len = 1;                /* msgtype 'w' */
    1068        6698 :     hdr_len += 8;               /* dataStart */
    1069        6698 :     hdr_len += 8;               /* walEnd */
    1070        6698 :     hdr_len += 8;               /* sendTime */
    1071        6698 :     if (len < hdr_len)
    1072             :     {
    1073           0 :         pg_log_error("streaming header too small: %d", len);
    1074           0 :         return false;
    1075             :     }
    1076        6698 :     *blockpos = fe_recvint64(&copybuf[1]);
    1077             : 
    1078             :     /* Extract WAL location for this block */
    1079        6698 :     xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
    1080             : 
    1081             :     /*
    1082             :      * Verify that the initial location in the stream matches where we think
    1083             :      * we are.
    1084             :      */
    1085        6698 :     if (walfile == NULL)
    1086             :     {
    1087             :         /* No file open yet */
    1088         286 :         if (xlogoff != 0)
    1089             :         {
    1090           0 :             pg_log_error("received write-ahead log record for offset %u with no file open",
    1091             :                          xlogoff);
    1092           0 :             return false;
    1093             :         }
    1094             :     }
    1095             :     else
    1096             :     {
    1097             :         /* More data in existing segment */
    1098        6412 :         if (walfile->currpos != xlogoff)
    1099             :         {
    1100           0 :             pg_log_error("got WAL data offset %08x, expected %08x",
    1101             :                          xlogoff, (int) walfile->currpos);
    1102           0 :             return false;
    1103             :         }
    1104             :     }
    1105             : 
    1106        6698 :     bytes_left = len - hdr_len;
    1107        6698 :     bytes_written = 0;
    1108             : 
    1109       13396 :     while (bytes_left)
    1110             :     {
    1111             :         int         bytes_to_write;
    1112             : 
    1113             :         /*
    1114             :          * If crossing a WAL boundary, only write up until we reach wal
    1115             :          * segment size.
    1116             :          */
    1117        6698 :         if (xlogoff + bytes_left > WalSegSz)
    1118           0 :             bytes_to_write = WalSegSz - xlogoff;
    1119             :         else
    1120        6698 :             bytes_to_write = bytes_left;
    1121             : 
    1122        6698 :         if (walfile == NULL)
    1123             :         {
    1124         286 :             if (!open_walfile(stream, *blockpos))
    1125             :             {
    1126             :                 /* Error logged by open_walfile */
    1127           0 :                 return false;
    1128             :             }
    1129             :         }
    1130             : 
    1131       13396 :         if (stream->walmethod->ops->write(walfile,
    1132        6698 :                                           copybuf + hdr_len + bytes_written,
    1133        6698 :                                           bytes_to_write) != bytes_to_write)
    1134             :         {
    1135           0 :             pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
    1136             :                          bytes_to_write, walfile->pathname,
    1137             :                          GetLastWalMethodError(stream->walmethod));
    1138           0 :             return false;
    1139             :         }
    1140             : 
    1141             :         /* Write was successful, advance our position */
    1142        6698 :         bytes_written += bytes_to_write;
    1143        6698 :         bytes_left -= bytes_to_write;
    1144        6698 :         *blockpos += bytes_to_write;
    1145        6698 :         xlogoff += bytes_to_write;
    1146             : 
    1147             :         /* Did we reach the end of a WAL segment? */
    1148        6698 :         if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
    1149             :         {
    1150          38 :             if (!close_walfile(stream, *blockpos))
    1151             :                 /* Error message written in close_walfile() */
    1152           0 :                 return false;
    1153             : 
    1154          38 :             xlogoff = 0;
    1155             : 
    1156          38 :             if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
    1157             :             {
    1158           0 :                 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1159             :                 {
    1160           0 :                     pg_log_error("could not send copy-end packet: %s",
    1161             :                                  PQerrorMessage(conn));
    1162           0 :                     return false;
    1163             :                 }
    1164           0 :                 still_sending = false;
    1165           0 :                 return true;    /* ignore the rest of this XLogData packet */
    1166             :             }
    1167             :         }
    1168             :     }
    1169             :     /* No more data left to write, receive next copy packet */
    1170             : 
    1171        6698 :     return true;
    1172             : }
    1173             : 
    1174             : /*
    1175             :  * Handle end of the copy stream.
    1176             :  */
    1177             : static PGresult *
    1178         274 : HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
    1179             :                       XLogRecPtr blockpos, XLogRecPtr *stoppos)
    1180             : {
    1181         274 :     PGresult   *res = PQgetResult(conn);
    1182             : 
    1183             :     /*
    1184             :      * The server closed its end of the copy stream.  If we haven't closed
    1185             :      * ours already, we need to do so now, unless the server threw an error,
    1186             :      * in which case we don't.
    1187             :      */
    1188         274 :     if (still_sending)
    1189             :     {
    1190           4 :         if (!close_walfile(stream, blockpos))
    1191             :         {
    1192             :             /* Error message written in close_walfile() */
    1193           0 :             PQclear(res);
    1194           0 :             return NULL;
    1195             :         }
    1196           4 :         if (PQresultStatus(res) == PGRES_COPY_IN)
    1197             :         {
    1198           2 :             if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1199             :             {
    1200           0 :                 pg_log_error("could not send copy-end packet: %s",
    1201             :                              PQerrorMessage(conn));
    1202           0 :                 PQclear(res);
    1203           0 :                 return NULL;
    1204             :             }
    1205           2 :             res = PQgetResult(conn);
    1206             :         }
    1207           4 :         still_sending = false;
    1208             :     }
    1209         274 :     *stoppos = blockpos;
    1210         274 :     return res;
    1211             : }
    1212             : 
    1213             : /*
    1214             :  * Check if we should continue streaming, or abort at this point.
    1215             :  */
    1216             : static bool
    1217       10642 : CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
    1218             : {
    1219       10642 :     if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
    1220             :     {
    1221         270 :         if (!close_walfile(stream, blockpos))
    1222             :         {
    1223             :             /* Potential error message is written by close_walfile */
    1224           0 :             return false;
    1225             :         }
    1226         270 :         if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1227             :         {
    1228           0 :             pg_log_error("could not send copy-end packet: %s",
    1229             :                          PQerrorMessage(conn));
    1230           0 :             return false;
    1231             :         }
    1232         270 :         still_sending = false;
    1233             :     }
    1234             : 
    1235       10642 :     return true;
    1236             : }
    1237             : 
    1238             : /*
    1239             :  * Calculate how long send/receive loops should sleep
    1240             :  */
    1241             : static long
    1242        3582 : CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
    1243             :                              TimestampTz last_status)
    1244             : {
    1245        3582 :     TimestampTz status_targettime = 0;
    1246             :     long        sleeptime;
    1247             : 
    1248        3582 :     if (standby_message_timeout && still_sending)
    1249        3372 :         status_targettime = last_status +
    1250        3372 :             (standby_message_timeout - 1) * ((int64) 1000);
    1251             : 
    1252        3582 :     if (status_targettime > 0)
    1253             :     {
    1254             :         long        secs;
    1255             :         int         usecs;
    1256             : 
    1257        3372 :         feTimestampDifference(now,
    1258             :                               status_targettime,
    1259             :                               &secs,
    1260             :                               &usecs);
    1261             :         /* Always sleep at least 1 sec */
    1262        3372 :         if (secs <= 0)
    1263             :         {
    1264           0 :             secs = 1;
    1265           0 :             usecs = 0;
    1266             :         }
    1267             : 
    1268        3372 :         sleeptime = secs * 1000 + usecs / 1000;
    1269             :     }
    1270             :     else
    1271         210 :         sleeptime = -1;
    1272             : 
    1273        3582 :     return sleeptime;
    1274             : }

Generated by: LCOV version 1.14