LCOV - code coverage report
Current view: top level - src/backend/backup - basebackup_copy.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 99.2 % 121 120
Test Date: 2026-03-24 02:15:55 Functions: 100.0 % 14 14
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * basebackup_copy.c
       4              :  *    send basebackup archives using COPY OUT
       5              :  *
       6              :  * We send a result set with information about the tablespaces to be included
       7              :  * in the backup before starting COPY OUT. Then, we start a single COPY OUT
       8              :  * operation and transmits all the archives and the manifest if present during
       9              :  * the course of that single COPY OUT. Each CopyData message begins with a
      10              :  * type byte, allowing us to signal the start of a new archive, or the
      11              :  * manifest, by some means other than ending the COPY stream. This also allows
      12              :  * for future protocol extensions, since we can include arbitrary information
      13              :  * in the message stream as long as we're certain that the client will know
      14              :  * what to do with it.
      15              :  *
      16              :  * An older method that sent each archive using a separate COPY OUT
      17              :  * operation is no longer supported.
      18              :  *
      19              :  * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
      20              :  *
      21              :  * IDENTIFICATION
      22              :  *    src/backend/backup/basebackup_copy.c
      23              :  *
      24              :  *-------------------------------------------------------------------------
      25              :  */
      26              : #include "postgres.h"
      27              : 
      28              : #include "access/tupdesc.h"
      29              : #include "backup/basebackup.h"
      30              : #include "backup/basebackup_sink.h"
      31              : #include "catalog/pg_type_d.h"
      32              : #include "executor/executor.h"
      33              : #include "libpq/libpq.h"
      34              : #include "libpq/pqformat.h"
      35              : #include "tcop/dest.h"
      36              : #include "utils/builtins.h"
      37              : #include "utils/timestamp.h"
      38              : 
      39              : typedef struct bbsink_copystream
      40              : {
      41              :     /* Common information for all types of sink. */
      42              :     bbsink      base;
      43              : 
      44              :     /* Are we sending the archives to the client, or somewhere else? */
      45              :     bool        send_to_client;
      46              : 
      47              :     /*
      48              :      * Protocol message buffer. We assemble CopyData protocol messages by
      49              :      * setting the first character of this buffer to 'd' (archive or manifest
      50              :      * data) and then making base.bbs_buffer point to the second character so
      51              :      * that the rest of the data gets copied into the message just where we
      52              :      * want it.
      53              :      */
      54              :     char       *msgbuffer;
      55              : 
      56              :     /*
      57              :      * When did we last report progress to the client, and how much progress
      58              :      * did we report?
      59              :      */
      60              :     TimestampTz last_progress_report_time;
      61              :     uint64      bytes_done_at_last_time_check;
      62              : } bbsink_copystream;
      63              : 
      64              : /*
      65              :  * We don't want to send progress messages to the client excessively
      66              :  * frequently. Ideally, we'd like to send a message when the time since the
      67              :  * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
      68              :  * the system time every time we send a tiny bit of data seems too expensive.
      69              :  * So we only check it after the number of bytes since the last check reaches
      70              :  * PROGRESS_REPORT_BYTE_INTERVAL.
      71              :  */
      72              : #define PROGRESS_REPORT_BYTE_INTERVAL               65536
      73              : #define PROGRESS_REPORT_MILLISECOND_THRESHOLD       1000
      74              : 
      75              : static void bbsink_copystream_begin_backup(bbsink *sink);
      76              : static void bbsink_copystream_begin_archive(bbsink *sink,
      77              :                                             const char *archive_name);
      78              : static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
      79              : static void bbsink_copystream_end_archive(bbsink *sink);
      80              : static void bbsink_copystream_begin_manifest(bbsink *sink);
      81              : static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
      82              : static void bbsink_copystream_end_manifest(bbsink *sink);
      83              : static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
      84              :                                          TimeLineID endtli);
      85              : static void bbsink_copystream_cleanup(bbsink *sink);
      86              : 
      87              : static void SendCopyOutResponse(void);
      88              : static void SendCopyDone(void);
      89              : static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
      90              : static void SendTablespaceList(List *tablespaces);
      91              : 
      92              : static const bbsink_ops bbsink_copystream_ops = {
      93              :     .begin_backup = bbsink_copystream_begin_backup,
      94              :     .begin_archive = bbsink_copystream_begin_archive,
      95              :     .archive_contents = bbsink_copystream_archive_contents,
      96              :     .end_archive = bbsink_copystream_end_archive,
      97              :     .begin_manifest = bbsink_copystream_begin_manifest,
      98              :     .manifest_contents = bbsink_copystream_manifest_contents,
      99              :     .end_manifest = bbsink_copystream_end_manifest,
     100              :     .end_backup = bbsink_copystream_end_backup,
     101              :     .cleanup = bbsink_copystream_cleanup
     102              : };
     103              : 
     104              : /*
     105              :  * Create a new 'copystream' bbsink.
     106              :  */
     107              : bbsink *
     108          174 : bbsink_copystream_new(bool send_to_client)
     109              : {
     110          174 :     bbsink_copystream *sink = palloc0_object(bbsink_copystream);
     111              : 
     112          174 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
     113          174 :     sink->send_to_client = send_to_client;
     114              : 
     115              :     /* Set up for periodic progress reporting. */
     116          174 :     sink->last_progress_report_time = GetCurrentTimestamp();
     117          174 :     sink->bytes_done_at_last_time_check = UINT64CONST(0);
     118              : 
     119          174 :     return &sink->base;
     120              : }
     121              : 
     122              : /*
     123              :  * Send start-of-backup wire protocol messages.
     124              :  */
     125              : static void
     126          171 : bbsink_copystream_begin_backup(bbsink *sink)
     127              : {
     128          171 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     129          171 :     bbsink_state *state = sink->bbs_state;
     130              :     char       *buf;
     131              : 
     132              :     /*
     133              :      * Initialize buffer. We ultimately want to send the archive and manifest
     134              :      * data by means of CopyData messages where the payload portion of each
     135              :      * message begins with a type byte. However, basebackup.c expects the
     136              :      * buffer to be aligned, so we can't just allocate one extra byte for the
     137              :      * type byte. Instead, allocate enough extra bytes that the portion of the
     138              :      * buffer we reveal to our callers can be aligned, while leaving room to
     139              :      * slip the type byte in just beforehand.  That will allow us to ship the
     140              :      * data with a single call to pq_putmessage and without needing any extra
     141              :      * copying.
     142              :      */
     143          171 :     buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
     144          171 :     mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
     145          171 :     mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
     146          171 :     mysink->msgbuffer[0] = PqMsg_CopyData;   /* archive or manifest data */
     147              : 
     148              :     /* Tell client the backup start location. */
     149          171 :     SendXlogRecPtrResult(state->startptr, state->starttli);
     150              : 
     151              :     /* Send client a list of tablespaces. */
     152          171 :     SendTablespaceList(state->tablespaces);
     153              : 
     154              :     /* Send a CommandComplete message */
     155          171 :     pq_puttextmessage(PqMsg_CommandComplete, "SELECT");
     156              : 
     157              :     /* Begin COPY stream. This will be used for all archives + manifest. */
     158          171 :     SendCopyOutResponse();
     159          171 : }
     160              : 
     161              : /*
     162              :  * Send a CopyData message announcing the beginning of a new archive.
     163              :  */
     164              : static void
     165          209 : bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
     166              : {
     167          209 :     bbsink_state *state = sink->bbs_state;
     168              :     tablespaceinfo *ti;
     169              :     StringInfoData buf;
     170              : 
     171          209 :     ti = list_nth(state->tablespaces, state->tablespace_num);
     172          209 :     pq_beginmessage(&buf, PqMsg_CopyData);
     173          209 :     pq_sendbyte(&buf, PqBackupMsg_NewArchive);
     174          209 :     pq_sendstring(&buf, archive_name);
     175          209 :     pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
     176          209 :     pq_endmessage(&buf);
     177          209 : }
     178              : 
     179              : /*
     180              :  * Send a CopyData message containing a chunk of archive content.
     181              :  */
     182              : static void
     183       393721 : bbsink_copystream_archive_contents(bbsink *sink, size_t len)
     184              : {
     185       393721 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     186       393721 :     bbsink_state *state = mysink->base.bbs_state;
     187              :     StringInfoData buf;
     188              :     uint64      targetbytes;
     189              : 
     190              :     /* Send the archive content to the client, if appropriate. */
     191       393721 :     if (mysink->send_to_client)
     192              :     {
     193              :         /* Add one because we're also sending a leading type byte. */
     194       378005 :         pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
     195              :     }
     196              : 
     197              :     /* Consider whether to send a progress report to the client. */
     198       393721 :     targetbytes = mysink->bytes_done_at_last_time_check
     199              :         + PROGRESS_REPORT_BYTE_INTERVAL;
     200       393721 :     if (targetbytes <= state->bytes_done)
     201              :     {
     202        58467 :         TimestampTz now = GetCurrentTimestamp();
     203              :         long        ms;
     204              : 
     205              :         /*
     206              :          * OK, we've sent a decent number of bytes, so check the system time
     207              :          * to see whether we're due to send a progress report.
     208              :          */
     209        58467 :         mysink->bytes_done_at_last_time_check = state->bytes_done;
     210        58467 :         ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time,
     211              :                                              now);
     212              : 
     213              :         /*
     214              :          * Send a progress report if enough time has passed. Also send one if
     215              :          * the system clock was set backward, so that such occurrences don't
     216              :          * have the effect of suppressing further progress messages.
     217              :          */
     218        58467 :         if (ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD ||
     219        58459 :             now < mysink->last_progress_report_time)
     220              :         {
     221            8 :             mysink->last_progress_report_time = now;
     222              : 
     223            8 :             pq_beginmessage(&buf, PqMsg_CopyData);
     224            8 :             pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
     225            8 :             pq_sendint64(&buf, state->bytes_done);
     226            8 :             pq_endmessage(&buf);
     227            8 :             pq_flush_if_writable();
     228              :         }
     229              :     }
     230       393721 : }
     231              : 
     232              : /*
     233              :  * We don't need to explicitly signal the end of the archive; the client
     234              :  * will figure out that we've reached the end when we begin the next one,
     235              :  * or begin the manifest, or end the COPY stream. However, this seems like
     236              :  * a good time to force out a progress report. One reason for that is that
     237              :  * if this is the last archive, and we don't force a progress report now,
     238              :  * the client will never be told that we sent all the bytes.
     239              :  */
     240              : static void
     241          204 : bbsink_copystream_end_archive(bbsink *sink)
     242              : {
     243          204 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     244          204 :     bbsink_state *state = mysink->base.bbs_state;
     245              :     StringInfoData buf;
     246              : 
     247          204 :     mysink->bytes_done_at_last_time_check = state->bytes_done;
     248          204 :     mysink->last_progress_report_time = GetCurrentTimestamp();
     249          204 :     pq_beginmessage(&buf, PqMsg_CopyData);
     250          204 :     pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
     251          204 :     pq_sendint64(&buf, state->bytes_done);
     252          204 :     pq_endmessage(&buf);
     253          204 :     pq_flush_if_writable();
     254          204 : }
     255              : 
     256              : /*
     257              :  * Send a CopyData message announcing the beginning of the backup manifest.
     258              :  */
     259              : static void
     260          165 : bbsink_copystream_begin_manifest(bbsink *sink)
     261              : {
     262              :     StringInfoData buf;
     263              : 
     264          165 :     pq_beginmessage(&buf, PqMsg_CopyData);
     265          165 :     pq_sendbyte(&buf, PqBackupMsg_Manifest);
     266          165 :     pq_endmessage(&buf);
     267          165 : }
     268              : 
     269              : /*
     270              :  * Each chunk of manifest data is sent using a CopyData message.
     271              :  */
     272              : static void
     273          868 : bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
     274              : {
     275          868 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     276              : 
     277          868 :     if (mysink->send_to_client)
     278              :     {
     279              :         /* Add one because we're also sending a leading type byte. */
     280          823 :         pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
     281              :     }
     282          868 : }
     283              : 
     284              : /*
     285              :  * We don't need an explicit terminator for the backup manifest.
     286              :  */
     287              : static void
     288          165 : bbsink_copystream_end_manifest(bbsink *sink)
     289              : {
     290              :     /* Do nothing. */
     291          165 : }
     292              : 
     293              : /*
     294              :  * Send end-of-backup wire protocol messages.
     295              :  */
     296              : static void
     297          166 : bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
     298              :                              TimeLineID endtli)
     299              : {
     300          166 :     SendCopyDone();
     301          166 :     SendXlogRecPtrResult(endptr, endtli);
     302          166 : }
     303              : 
     304              : /*
     305              :  * Cleanup.
     306              :  */
     307              : static void
     308          162 : bbsink_copystream_cleanup(bbsink *sink)
     309              : {
     310              :     /* Nothing to do. */
     311          162 : }
     312              : 
     313              : /*
     314              :  * Send a CopyOutResponse message.
     315              :  */
     316              : static void
     317          171 : SendCopyOutResponse(void)
     318              : {
     319              :     StringInfoData buf;
     320              : 
     321          171 :     pq_beginmessage(&buf, PqMsg_CopyOutResponse);
     322          171 :     pq_sendbyte(&buf, 0);       /* overall format */
     323          171 :     pq_sendint16(&buf, 0);      /* natts */
     324          171 :     pq_endmessage(&buf);
     325          171 : }
     326              : 
     327              : /*
     328              :  * Send a CopyDone message.
     329              :  */
     330              : static void
     331          166 : SendCopyDone(void)
     332              : {
     333          166 :     pq_putemptymessage(PqMsg_CopyDone);
     334          166 : }
     335              : 
     336              : /*
     337              :  * Send a single resultset containing just a single
     338              :  * XLogRecPtr record (in text format)
     339              :  */
     340              : static void
     341          337 : SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
     342              : {
     343              :     DestReceiver *dest;
     344              :     TupOutputState *tstate;
     345              :     TupleDesc   tupdesc;
     346              :     Datum       values[2];
     347          337 :     bool        nulls[2] = {0};
     348              : 
     349          337 :     dest = CreateDestReceiver(DestRemoteSimple);
     350              : 
     351          337 :     tupdesc = CreateTemplateTupleDesc(2);
     352          337 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
     353              : 
     354              :     /*
     355              :      * int8 may seem like a surprising data type for this, but in theory int4
     356              :      * would not be wide enough for this, as TimeLineID is unsigned.
     357              :      */
     358          337 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
     359              : 
     360          337 :     TupleDescFinalize(tupdesc);
     361              : 
     362              :     /* send RowDescription */
     363          337 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
     364              : 
     365              :     /* Data row */
     366          337 :     values[0] = CStringGetTextDatum(psprintf("%X/%08X", LSN_FORMAT_ARGS(ptr)));
     367          337 :     values[1] = Int64GetDatum(tli);
     368          337 :     do_tup_output(tstate, values, nulls);
     369              : 
     370          337 :     end_tup_output(tstate);
     371              : 
     372              :     /* Send a CommandComplete message */
     373          337 :     pq_puttextmessage(PqMsg_CommandComplete, "SELECT");
     374          337 : }
     375              : 
     376              : /*
     377              :  * Send a result set via libpq describing the tablespace list.
     378              :  */
     379              : static void
     380          171 : SendTablespaceList(List *tablespaces)
     381              : {
     382              :     DestReceiver *dest;
     383              :     TupOutputState *tstate;
     384              :     TupleDesc   tupdesc;
     385              :     ListCell   *lc;
     386              : 
     387          171 :     dest = CreateDestReceiver(DestRemoteSimple);
     388              : 
     389          171 :     tupdesc = CreateTemplateTupleDesc(3);
     390          171 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
     391          171 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
     392          171 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
     393          171 :     TupleDescFinalize(tupdesc);
     394              : 
     395              :     /* send RowDescription */
     396          171 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
     397              : 
     398              :     /* Construct and send the directory information */
     399          380 :     foreach(lc, tablespaces)
     400              :     {
     401          209 :         tablespaceinfo *ti = lfirst(lc);
     402              :         Datum       values[3];
     403          209 :         bool        nulls[3] = {0};
     404              : 
     405              :         /* Send one datarow message */
     406          209 :         if (ti->path == NULL)
     407              :         {
     408          171 :             nulls[0] = true;
     409          171 :             nulls[1] = true;
     410              :         }
     411              :         else
     412              :         {
     413           38 :             values[0] = ObjectIdGetDatum(ti->oid);
     414           38 :             values[1] = CStringGetTextDatum(ti->path);
     415              :         }
     416          209 :         if (ti->size >= 0)
     417          209 :             values[2] = Int64GetDatum(ti->size / 1024);
     418              :         else
     419            0 :             nulls[2] = true;
     420              : 
     421          209 :         do_tup_output(tstate, values, nulls);
     422              :     }
     423              : 
     424          171 :     end_tup_output(tstate);
     425          171 : }
        

Generated by: LCOV version 2.0-1