LCOV - code coverage report
Current view: top level - src/backend/backup - basebackup_copy.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 99.2 % 119 118
Test Date: 2026-03-03 02:14:47 Functions: 100.0 % 14 14
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * basebackup_copy.c
       4              :  *    send basebackup archives using COPY OUT
       5              :  *
       6              :  * We send a result set with information about the tablespaces to be included
       7              :  * in the backup before starting COPY OUT. Then, we start a single COPY OUT
       8              :  * operation and transmits all the archives and the manifest if present during
       9              :  * the course of that single COPY OUT. Each CopyData message begins with a
      10              :  * type byte, allowing us to signal the start of a new archive, or the
      11              :  * manifest, by some means other than ending the COPY stream. This also allows
      12              :  * for future protocol extensions, since we can include arbitrary information
      13              :  * in the message stream as long as we're certain that the client will know
      14              :  * what to do with it.
      15              :  *
      16              :  * An older method that sent each archive using a separate COPY OUT
      17              :  * operation is no longer supported.
      18              :  *
      19              :  * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
      20              :  *
      21              :  * IDENTIFICATION
      22              :  *    src/backend/backup/basebackup_copy.c
      23              :  *
      24              :  *-------------------------------------------------------------------------
      25              :  */
      26              : #include "postgres.h"
      27              : 
      28              : #include "access/tupdesc.h"
      29              : #include "backup/basebackup.h"
      30              : #include "backup/basebackup_sink.h"
      31              : #include "catalog/pg_type_d.h"
      32              : #include "executor/executor.h"
      33              : #include "libpq/libpq.h"
      34              : #include "libpq/pqformat.h"
      35              : #include "tcop/dest.h"
      36              : #include "utils/builtins.h"
      37              : #include "utils/timestamp.h"
      38              : 
      39              : typedef struct bbsink_copystream
      40              : {
      41              :     /* Common information for all types of sink. */
      42              :     bbsink      base;
      43              : 
      44              :     /* Are we sending the archives to the client, or somewhere else? */
      45              :     bool        send_to_client;
      46              : 
      47              :     /*
      48              :      * Protocol message buffer. We assemble CopyData protocol messages by
      49              :      * setting the first character of this buffer to 'd' (archive or manifest
      50              :      * data) and then making base.bbs_buffer point to the second character so
      51              :      * that the rest of the data gets copied into the message just where we
      52              :      * want it.
      53              :      */
      54              :     char       *msgbuffer;
      55              : 
      56              :     /*
      57              :      * When did we last report progress to the client, and how much progress
      58              :      * did we report?
      59              :      */
      60              :     TimestampTz last_progress_report_time;
      61              :     uint64      bytes_done_at_last_time_check;
      62              : } bbsink_copystream;
      63              : 
      64              : /*
      65              :  * We don't want to send progress messages to the client excessively
      66              :  * frequently. Ideally, we'd like to send a message when the time since the
      67              :  * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
      68              :  * the system time every time we send a tiny bit of data seems too expensive.
      69              :  * So we only check it after the number of bytes since the last check reaches
      70              :  * PROGRESS_REPORT_BYTE_INTERVAL.
      71              :  */
      72              : #define PROGRESS_REPORT_BYTE_INTERVAL               65536
      73              : #define PROGRESS_REPORT_MILLISECOND_THRESHOLD       1000
      74              : 
      75              : static void bbsink_copystream_begin_backup(bbsink *sink);
      76              : static void bbsink_copystream_begin_archive(bbsink *sink,
      77              :                                             const char *archive_name);
      78              : static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
      79              : static void bbsink_copystream_end_archive(bbsink *sink);
      80              : static void bbsink_copystream_begin_manifest(bbsink *sink);
      81              : static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
      82              : static void bbsink_copystream_end_manifest(bbsink *sink);
      83              : static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
      84              :                                          TimeLineID endtli);
      85              : static void bbsink_copystream_cleanup(bbsink *sink);
      86              : 
      87              : static void SendCopyOutResponse(void);
      88              : static void SendCopyDone(void);
      89              : static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
      90              : static void SendTablespaceList(List *tablespaces);
      91              : 
      92              : static const bbsink_ops bbsink_copystream_ops = {
      93              :     .begin_backup = bbsink_copystream_begin_backup,
      94              :     .begin_archive = bbsink_copystream_begin_archive,
      95              :     .archive_contents = bbsink_copystream_archive_contents,
      96              :     .end_archive = bbsink_copystream_end_archive,
      97              :     .begin_manifest = bbsink_copystream_begin_manifest,
      98              :     .manifest_contents = bbsink_copystream_manifest_contents,
      99              :     .end_manifest = bbsink_copystream_end_manifest,
     100              :     .end_backup = bbsink_copystream_end_backup,
     101              :     .cleanup = bbsink_copystream_cleanup
     102              : };
     103              : 
     104              : /*
     105              :  * Create a new 'copystream' bbsink.
     106              :  */
     107              : bbsink *
     108          172 : bbsink_copystream_new(bool send_to_client)
     109              : {
     110          172 :     bbsink_copystream *sink = palloc0_object(bbsink_copystream);
     111              : 
     112          172 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
     113          172 :     sink->send_to_client = send_to_client;
     114              : 
     115              :     /* Set up for periodic progress reporting. */
     116          172 :     sink->last_progress_report_time = GetCurrentTimestamp();
     117          172 :     sink->bytes_done_at_last_time_check = UINT64CONST(0);
     118              : 
     119          172 :     return &sink->base;
     120              : }
     121              : 
     122              : /*
     123              :  * Send start-of-backup wire protocol messages.
     124              :  */
     125              : static void
     126          169 : bbsink_copystream_begin_backup(bbsink *sink)
     127              : {
     128          169 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     129          169 :     bbsink_state *state = sink->bbs_state;
     130              :     char       *buf;
     131              : 
     132              :     /*
     133              :      * Initialize buffer. We ultimately want to send the archive and manifest
     134              :      * data by means of CopyData messages where the payload portion of each
     135              :      * message begins with a type byte. However, basebackup.c expects the
     136              :      * buffer to be aligned, so we can't just allocate one extra byte for the
     137              :      * type byte. Instead, allocate enough extra bytes that the portion of the
     138              :      * buffer we reveal to our callers can be aligned, while leaving room to
     139              :      * slip the type byte in just beforehand.  That will allow us to ship the
     140              :      * data with a single call to pq_putmessage and without needing any extra
     141              :      * copying.
     142              :      */
     143          169 :     buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
     144          169 :     mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
     145          169 :     mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
     146          169 :     mysink->msgbuffer[0] = PqMsg_CopyData;   /* archive or manifest data */
     147              : 
     148              :     /* Tell client the backup start location. */
     149          169 :     SendXlogRecPtrResult(state->startptr, state->starttli);
     150              : 
     151              :     /* Send client a list of tablespaces. */
     152          169 :     SendTablespaceList(state->tablespaces);
     153              : 
     154              :     /* Send a CommandComplete message */
     155          169 :     pq_puttextmessage(PqMsg_CommandComplete, "SELECT");
     156              : 
     157              :     /* Begin COPY stream. This will be used for all archives + manifest. */
     158          169 :     SendCopyOutResponse();
     159          169 : }
     160              : 
     161              : /*
     162              :  * Send a CopyData message announcing the beginning of a new archive.
     163              :  */
     164              : static void
     165          206 : bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
     166              : {
     167          206 :     bbsink_state *state = sink->bbs_state;
     168              :     tablespaceinfo *ti;
     169              :     StringInfoData buf;
     170              : 
     171          206 :     ti = list_nth(state->tablespaces, state->tablespace_num);
     172          206 :     pq_beginmessage(&buf, PqMsg_CopyData);
     173          206 :     pq_sendbyte(&buf, PqBackupMsg_NewArchive);
     174          206 :     pq_sendstring(&buf, archive_name);
     175          206 :     pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
     176          206 :     pq_endmessage(&buf);
     177          206 : }
     178              : 
     179              : /*
     180              :  * Send a CopyData message containing a chunk of archive content.
     181              :  */
     182              : static void
     183       370416 : bbsink_copystream_archive_contents(bbsink *sink, size_t len)
     184              : {
     185       370416 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     186       370416 :     bbsink_state *state = mysink->base.bbs_state;
     187              :     StringInfoData buf;
     188              :     uint64      targetbytes;
     189              : 
     190              :     /* Send the archive content to the client, if appropriate. */
     191       370416 :     if (mysink->send_to_client)
     192              :     {
     193              :         /* Add one because we're also sending a leading type byte. */
     194       355369 :         pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
     195              :     }
     196              : 
     197              :     /* Consider whether to send a progress report to the client. */
     198       370416 :     targetbytes = mysink->bytes_done_at_last_time_check
     199              :         + PROGRESS_REPORT_BYTE_INTERVAL;
     200       370416 :     if (targetbytes <= state->bytes_done)
     201              :     {
     202        55600 :         TimestampTz now = GetCurrentTimestamp();
     203              :         long        ms;
     204              : 
     205              :         /*
     206              :          * OK, we've sent a decent number of bytes, so check the system time
     207              :          * to see whether we're due to send a progress report.
     208              :          */
     209        55600 :         mysink->bytes_done_at_last_time_check = state->bytes_done;
     210        55600 :         ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time,
     211              :                                              now);
     212              : 
     213              :         /*
     214              :          * Send a progress report if enough time has passed. Also send one if
     215              :          * the system clock was set backward, so that such occurrences don't
     216              :          * have the effect of suppressing further progress messages.
     217              :          */
     218        55600 :         if (ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD ||
     219        55589 :             now < mysink->last_progress_report_time)
     220              :         {
     221           11 :             mysink->last_progress_report_time = now;
     222              : 
     223           11 :             pq_beginmessage(&buf, PqMsg_CopyData);
     224           11 :             pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
     225           11 :             pq_sendint64(&buf, state->bytes_done);
     226           11 :             pq_endmessage(&buf);
     227           11 :             pq_flush_if_writable();
     228              :         }
     229              :     }
     230       370416 : }
     231              : 
     232              : /*
     233              :  * We don't need to explicitly signal the end of the archive; the client
     234              :  * will figure out that we've reached the end when we begin the next one,
     235              :  * or begin the manifest, or end the COPY stream. However, this seems like
     236              :  * a good time to force out a progress report. One reason for that is that
     237              :  * if this is the last archive, and we don't force a progress report now,
     238              :  * the client will never be told that we sent all the bytes.
     239              :  */
     240              : static void
     241          201 : bbsink_copystream_end_archive(bbsink *sink)
     242              : {
     243          201 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     244          201 :     bbsink_state *state = mysink->base.bbs_state;
     245              :     StringInfoData buf;
     246              : 
     247          201 :     mysink->bytes_done_at_last_time_check = state->bytes_done;
     248          201 :     mysink->last_progress_report_time = GetCurrentTimestamp();
     249          201 :     pq_beginmessage(&buf, PqMsg_CopyData);
     250          201 :     pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
     251          201 :     pq_sendint64(&buf, state->bytes_done);
     252          201 :     pq_endmessage(&buf);
     253          201 :     pq_flush_if_writable();
     254          201 : }
     255              : 
     256              : /*
     257              :  * Send a CopyData message announcing the beginning of the backup manifest.
     258              :  */
     259              : static void
     260          163 : bbsink_copystream_begin_manifest(bbsink *sink)
     261              : {
     262              :     StringInfoData buf;
     263              : 
     264          163 :     pq_beginmessage(&buf, PqMsg_CopyData);
     265          163 :     pq_sendbyte(&buf, PqBackupMsg_Manifest);
     266          163 :     pq_endmessage(&buf);
     267          163 : }
     268              : 
     269              : /*
     270              :  * Each chunk of manifest data is sent using a CopyData message.
     271              :  */
     272              : static void
     273          843 : bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
     274              : {
     275          843 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     276              : 
     277          843 :     if (mysink->send_to_client)
     278              :     {
     279              :         /* Add one because we're also sending a leading type byte. */
     280          798 :         pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
     281              :     }
     282          843 : }
     283              : 
     284              : /*
     285              :  * We don't need an explicit terminator for the backup manifest.
     286              :  */
     287              : static void
     288          163 : bbsink_copystream_end_manifest(bbsink *sink)
     289              : {
     290              :     /* Do nothing. */
     291          163 : }
     292              : 
     293              : /*
     294              :  * Send end-of-backup wire protocol messages.
     295              :  */
     296              : static void
     297          164 : bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
     298              :                              TimeLineID endtli)
     299              : {
     300          164 :     SendCopyDone();
     301          164 :     SendXlogRecPtrResult(endptr, endtli);
     302          164 : }
     303              : 
     304              : /*
     305              :  * Cleanup.
     306              :  */
     307              : static void
     308          162 : bbsink_copystream_cleanup(bbsink *sink)
     309              : {
     310              :     /* Nothing to do. */
     311          162 : }
     312              : 
     313              : /*
     314              :  * Send a CopyOutResponse message.
     315              :  */
     316              : static void
     317          169 : SendCopyOutResponse(void)
     318              : {
     319              :     StringInfoData buf;
     320              : 
     321          169 :     pq_beginmessage(&buf, PqMsg_CopyOutResponse);
     322          169 :     pq_sendbyte(&buf, 0);       /* overall format */
     323          169 :     pq_sendint16(&buf, 0);      /* natts */
     324          169 :     pq_endmessage(&buf);
     325          169 : }
     326              : 
     327              : /*
     328              :  * Send a CopyDone message.
     329              :  */
     330              : static void
     331          164 : SendCopyDone(void)
     332              : {
     333          164 :     pq_putemptymessage(PqMsg_CopyDone);
     334          164 : }
     335              : 
     336              : /*
     337              :  * Send a single resultset containing just a single
     338              :  * XLogRecPtr record (in text format)
     339              :  */
     340              : static void
     341          333 : SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
     342              : {
     343              :     DestReceiver *dest;
     344              :     TupOutputState *tstate;
     345              :     TupleDesc   tupdesc;
     346              :     Datum       values[2];
     347          333 :     bool        nulls[2] = {0};
     348              : 
     349          333 :     dest = CreateDestReceiver(DestRemoteSimple);
     350              : 
     351          333 :     tupdesc = CreateTemplateTupleDesc(2);
     352          333 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
     353              : 
     354              :     /*
     355              :      * int8 may seem like a surprising data type for this, but in theory int4
     356              :      * would not be wide enough for this, as TimeLineID is unsigned.
     357              :      */
     358          333 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
     359              : 
     360              :     /* send RowDescription */
     361          333 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
     362              : 
     363              :     /* Data row */
     364          333 :     values[0] = CStringGetTextDatum(psprintf("%X/%08X", LSN_FORMAT_ARGS(ptr)));
     365          333 :     values[1] = Int64GetDatum(tli);
     366          333 :     do_tup_output(tstate, values, nulls);
     367              : 
     368          333 :     end_tup_output(tstate);
     369              : 
     370              :     /* Send a CommandComplete message */
     371          333 :     pq_puttextmessage(PqMsg_CommandComplete, "SELECT");
     372          333 : }
     373              : 
     374              : /*
     375              :  * Send a result set via libpq describing the tablespace list.
     376              :  */
     377              : static void
     378          169 : SendTablespaceList(List *tablespaces)
     379              : {
     380              :     DestReceiver *dest;
     381              :     TupOutputState *tstate;
     382              :     TupleDesc   tupdesc;
     383              :     ListCell   *lc;
     384              : 
     385          169 :     dest = CreateDestReceiver(DestRemoteSimple);
     386              : 
     387          169 :     tupdesc = CreateTemplateTupleDesc(3);
     388          169 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
     389          169 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
     390          169 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
     391              : 
     392              :     /* send RowDescription */
     393          169 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
     394              : 
     395              :     /* Construct and send the directory information */
     396          375 :     foreach(lc, tablespaces)
     397              :     {
     398          206 :         tablespaceinfo *ti = lfirst(lc);
     399              :         Datum       values[3];
     400          206 :         bool        nulls[3] = {0};
     401              : 
     402              :         /* Send one datarow message */
     403          206 :         if (ti->path == NULL)
     404              :         {
     405          169 :             nulls[0] = true;
     406          169 :             nulls[1] = true;
     407              :         }
     408              :         else
     409              :         {
     410           37 :             values[0] = ObjectIdGetDatum(ti->oid);
     411           37 :             values[1] = CStringGetTextDatum(ti->path);
     412              :         }
     413          206 :         if (ti->size >= 0)
     414          206 :             values[2] = Int64GetDatum(ti->size / 1024);
     415              :         else
     416            0 :             nulls[2] = true;
     417              : 
     418          206 :         do_tup_output(tstate, values, nulls);
     419              :     }
     420              : 
     421          169 :     end_tup_output(tstate);
     422          169 : }
        

Generated by: LCOV version 2.0-1