LCOV - code coverage report
Current view: top level - src/backend/backup - basebackup_copy.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 119 119 100.0 %
Date: 2024-02-28 23:11:08 Functions: 14 14 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * basebackup_copy.c
       4             :  *    send basebackup archives using COPY OUT
       5             :  *
       6             :  * We send a result set with information about the tablespaces to be included
       7             :  * in the backup before starting COPY OUT. Then, we start a single COPY OUT
       8             :  * operation and transmits all the archives and the manifest if present during
       9             :  * the course of that single COPY OUT. Each CopyData message begins with a
      10             :  * type byte, allowing us to signal the start of a new archive, or the
      11             :  * manifest, by some means other than ending the COPY stream. This also allows
      12             :  * for future protocol extensions, since we can include arbitrary information
      13             :  * in the message stream as long as we're certain that the client will know
      14             :  * what to do with it.
      15             :  *
      16             :  * An older method that sent each archive using a separate COPY OUT
      17             :  * operation is no longer supported.
      18             :  *
      19             :  * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
      20             :  *
      21             :  * IDENTIFICATION
      22             :  *    src/backend/backup/basebackup_copy.c
      23             :  *
      24             :  *-------------------------------------------------------------------------
      25             :  */
      26             : #include "postgres.h"
      27             : 
      28             : #include "access/tupdesc.h"
      29             : #include "backup/basebackup.h"
      30             : #include "backup/basebackup_sink.h"
      31             : #include "catalog/pg_type_d.h"
      32             : #include "executor/executor.h"
      33             : #include "libpq/libpq.h"
      34             : #include "libpq/pqformat.h"
      35             : #include "tcop/dest.h"
      36             : #include "utils/builtins.h"
      37             : #include "utils/timestamp.h"
      38             : 
      39             : typedef struct bbsink_copystream
      40             : {
      41             :     /* Common information for all types of sink. */
      42             :     bbsink      base;
      43             : 
      44             :     /* Are we sending the archives to the client, or somewhere else? */
      45             :     bool        send_to_client;
      46             : 
      47             :     /*
      48             :      * Protocol message buffer. We assemble CopyData protocol messages by
      49             :      * setting the first character of this buffer to 'd' (archive or manifest
      50             :      * data) and then making base.bbs_buffer point to the second character so
      51             :      * that the rest of the data gets copied into the message just where we
      52             :      * want it.
      53             :      */
      54             :     char       *msgbuffer;
      55             : 
      56             :     /*
      57             :      * When did we last report progress to the client, and how much progress
      58             :      * did we report?
      59             :      */
      60             :     TimestampTz last_progress_report_time;
      61             :     uint64      bytes_done_at_last_time_check;
      62             : } bbsink_copystream;
      63             : 
      64             : /*
      65             :  * We don't want to send progress messages to the client excessively
      66             :  * frequently. Ideally, we'd like to send a message when the time since the
      67             :  * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
      68             :  * the system time every time we send a tiny bit of data seems too expensive.
      69             :  * So we only check it after the number of bytes sine the last check reaches
      70             :  * PROGRESS_REPORT_BYTE_INTERVAL.
      71             :  */
      72             : #define PROGRESS_REPORT_BYTE_INTERVAL               65536
      73             : #define PROGRESS_REPORT_MILLISECOND_THRESHOLD       1000
      74             : 
      75             : static void bbsink_copystream_begin_backup(bbsink *sink);
      76             : static void bbsink_copystream_begin_archive(bbsink *sink,
      77             :                                             const char *archive_name);
      78             : static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
      79             : static void bbsink_copystream_end_archive(bbsink *sink);
      80             : static void bbsink_copystream_begin_manifest(bbsink *sink);
      81             : static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
      82             : static void bbsink_copystream_end_manifest(bbsink *sink);
      83             : static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
      84             :                                          TimeLineID endtli);
      85             : static void bbsink_copystream_cleanup(bbsink *sink);
      86             : 
      87             : static void SendCopyOutResponse(void);
      88             : static void SendCopyDone(void);
      89             : static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
      90             : static void SendTablespaceList(List *tablespaces);
      91             : 
      92             : static const bbsink_ops bbsink_copystream_ops = {
      93             :     .begin_backup = bbsink_copystream_begin_backup,
      94             :     .begin_archive = bbsink_copystream_begin_archive,
      95             :     .archive_contents = bbsink_copystream_archive_contents,
      96             :     .end_archive = bbsink_copystream_end_archive,
      97             :     .begin_manifest = bbsink_copystream_begin_manifest,
      98             :     .manifest_contents = bbsink_copystream_manifest_contents,
      99             :     .end_manifest = bbsink_copystream_end_manifest,
     100             :     .end_backup = bbsink_copystream_end_backup,
     101             :     .cleanup = bbsink_copystream_cleanup
     102             : };
     103             : 
     104             : /*
     105             :  * Create a new 'copystream' bbsink.
     106             :  */
     107             : bbsink *
     108         272 : bbsink_copystream_new(bool send_to_client)
     109             : {
     110         272 :     bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream));
     111             : 
     112         272 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
     113         272 :     sink->send_to_client = send_to_client;
     114             : 
     115             :     /* Set up for periodic progress reporting. */
     116         272 :     sink->last_progress_report_time = GetCurrentTimestamp();
     117         272 :     sink->bytes_done_at_last_time_check = UINT64CONST(0);
     118             : 
     119         272 :     return &sink->base;
     120             : }
     121             : 
     122             : /*
     123             :  * Send start-of-backup wire protocol messages.
     124             :  */
     125             : static void
     126         266 : bbsink_copystream_begin_backup(bbsink *sink)
     127             : {
     128         266 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     129         266 :     bbsink_state *state = sink->bbs_state;
     130             :     char       *buf;
     131             : 
     132             :     /*
     133             :      * Initialize buffer. We ultimately want to send the archive and manifest
     134             :      * data by means of CopyData messages where the payload portion of each
     135             :      * message begins with a type byte. However, basebackup.c expects the
     136             :      * buffer to be aligned, so we can't just allocate one extra byte for the
     137             :      * type byte. Instead, allocate enough extra bytes that the portion of the
     138             :      * buffer we reveal to our callers can be aligned, while leaving room to
     139             :      * slip the type byte in just beforehand.  That will allow us to ship the
     140             :      * data with a single call to pq_putmessage and without needing any extra
     141             :      * copying.
     142             :      */
     143         266 :     buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
     144         266 :     mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
     145         266 :     mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
     146         266 :     mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
     147             : 
     148             :     /* Tell client the backup start location. */
     149         266 :     SendXlogRecPtrResult(state->startptr, state->starttli);
     150             : 
     151             :     /* Send client a list of tablespaces. */
     152         266 :     SendTablespaceList(state->tablespaces);
     153             : 
     154             :     /* Send a CommandComplete message */
     155         266 :     pq_puttextmessage(PqMsg_CommandComplete, "SELECT");
     156             : 
     157             :     /* Begin COPY stream. This will be used for all archives + manifest. */
     158         266 :     SendCopyOutResponse();
     159         266 : }
     160             : 
     161             : /*
     162             :  * Send a CopyData message announcing the beginning of a new archive.
     163             :  */
     164             : static void
     165         328 : bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
     166             : {
     167         328 :     bbsink_state *state = sink->bbs_state;
     168             :     tablespaceinfo *ti;
     169             :     StringInfoData buf;
     170             : 
     171         328 :     ti = list_nth(state->tablespaces, state->tablespace_num);
     172         328 :     pq_beginmessage(&buf, PqMsg_CopyData);
     173         328 :     pq_sendbyte(&buf, 'n');     /* New archive */
     174         328 :     pq_sendstring(&buf, archive_name);
     175         328 :     pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
     176         328 :     pq_endmessage(&buf);
     177         328 : }
     178             : 
     179             : /*
     180             :  * Send a CopyData message containing a chunk of archive content.
     181             :  */
     182             : static void
     183      562914 : bbsink_copystream_archive_contents(bbsink *sink, size_t len)
     184             : {
     185      562914 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     186      562914 :     bbsink_state *state = mysink->base.bbs_state;
     187             :     StringInfoData buf;
     188             :     uint64      targetbytes;
     189             : 
     190             :     /* Send the archive content to the client, if appropriate. */
     191      562914 :     if (mysink->send_to_client)
     192             :     {
     193             :         /* Add one because we're also sending a leading type byte. */
     194      533548 :         pq_putmessage('d', mysink->msgbuffer, len + 1);
     195             :     }
     196             : 
     197             :     /* Consider whether to send a progress report to the client. */
     198      562914 :     targetbytes = mysink->bytes_done_at_last_time_check
     199             :         + PROGRESS_REPORT_BYTE_INTERVAL;
     200      562914 :     if (targetbytes <= state->bytes_done)
     201             :     {
     202       84782 :         TimestampTz now = GetCurrentTimestamp();
     203             :         long        ms;
     204             : 
     205             :         /*
     206             :          * OK, we've sent a decent number of bytes, so check the system time
     207             :          * to see whether we're due to send a progress report.
     208             :          */
     209       84782 :         mysink->bytes_done_at_last_time_check = state->bytes_done;
     210       84782 :         ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time,
     211             :                                              now);
     212             : 
     213             :         /*
     214             :          * Send a progress report if enough time has passed. Also send one if
     215             :          * the system clock was set backward, so that such occurrences don't
     216             :          * have the effect of suppressing further progress messages.
     217             :          */
     218       84782 :         if (ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD ||
     219       84770 :             now < mysink->last_progress_report_time)
     220             :         {
     221          12 :             mysink->last_progress_report_time = now;
     222             : 
     223          12 :             pq_beginmessage(&buf, PqMsg_CopyData);
     224          12 :             pq_sendbyte(&buf, 'p'); /* Progress report */
     225          12 :             pq_sendint64(&buf, state->bytes_done);
     226          12 :             pq_endmessage(&buf);
     227          12 :             pq_flush_if_writable();
     228             :         }
     229             :     }
     230      562914 : }
     231             : 
     232             : /*
     233             :  * We don't need to explicitly signal the end of the archive; the client
     234             :  * will figure out that we've reached the end when we begin the next one,
     235             :  * or begin the manifest, or end the COPY stream. However, this seems like
     236             :  * a good time to force out a progress report. One reason for that is that
     237             :  * if this is the last archive, and we don't force a progress report now,
     238             :  * the client will never be told that we sent all the bytes.
     239             :  */
     240             : static void
     241         314 : bbsink_copystream_end_archive(bbsink *sink)
     242             : {
     243         314 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     244         314 :     bbsink_state *state = mysink->base.bbs_state;
     245             :     StringInfoData buf;
     246             : 
     247         314 :     mysink->bytes_done_at_last_time_check = state->bytes_done;
     248         314 :     mysink->last_progress_report_time = GetCurrentTimestamp();
     249         314 :     pq_beginmessage(&buf, PqMsg_CopyData);
     250         314 :     pq_sendbyte(&buf, 'p');     /* Progress report */
     251         314 :     pq_sendint64(&buf, state->bytes_done);
     252         314 :     pq_endmessage(&buf);
     253         314 :     pq_flush_if_writable();
     254         314 : }
     255             : 
     256             : /*
     257             :  * Send a CopyData message announcing the beginning of the backup manifest.
     258             :  */
     259             : static void
     260         250 : bbsink_copystream_begin_manifest(bbsink *sink)
     261             : {
     262             :     StringInfoData buf;
     263             : 
     264         250 :     pq_beginmessage(&buf, PqMsg_CopyData);
     265         250 :     pq_sendbyte(&buf, 'm');     /* Manifest */
     266         250 :     pq_endmessage(&buf);
     267         250 : }
     268             : 
     269             : /*
     270             :  * Each chunk of manifest data is sent using a CopyData message.
     271             :  */
     272             : static void
     273        1280 : bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
     274             : {
     275        1280 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     276             : 
     277        1280 :     if (mysink->send_to_client)
     278             :     {
     279             :         /* Add one because we're also sending a leading type byte. */
     280        1210 :         pq_putmessage('d', mysink->msgbuffer, len + 1);
     281             :     }
     282        1280 : }
     283             : 
     284             : /*
     285             :  * We don't need an explicit terminator for the backup manifest.
     286             :  */
     287             : static void
     288         250 : bbsink_copystream_end_manifest(bbsink *sink)
     289             : {
     290             :     /* Do nothing. */
     291         250 : }
     292             : 
     293             : /*
     294             :  * Send end-of-backup wire protocol messages.
     295             :  */
     296             : static void
     297         252 : bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
     298             :                              TimeLineID endtli)
     299             : {
     300         252 :     SendCopyDone();
     301         252 :     SendXlogRecPtrResult(endptr, endtli);
     302         252 : }
     303             : 
     304             : /*
     305             :  * Cleanup.
     306             :  */
     307             : static void
     308         254 : bbsink_copystream_cleanup(bbsink *sink)
     309             : {
     310             :     /* Nothing to do. */
     311         254 : }
     312             : 
     313             : /*
     314             :  * Send a CopyOutResponse message.
     315             :  */
     316             : static void
     317         266 : SendCopyOutResponse(void)
     318             : {
     319             :     StringInfoData buf;
     320             : 
     321         266 :     pq_beginmessage(&buf, PqMsg_CopyOutResponse);
     322         266 :     pq_sendbyte(&buf, 0);       /* overall format */
     323         266 :     pq_sendint16(&buf, 0);      /* natts */
     324         266 :     pq_endmessage(&buf);
     325         266 : }
     326             : 
     327             : /*
     328             :  * Send a CopyDone message.
     329             :  */
     330             : static void
     331         252 : SendCopyDone(void)
     332             : {
     333         252 :     pq_putemptymessage(PqMsg_CopyDone);
     334         252 : }
     335             : 
     336             : /*
     337             :  * Send a single resultset containing just a single
     338             :  * XLogRecPtr record (in text format)
     339             :  */
     340             : static void
     341         518 : SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
     342             : {
     343             :     DestReceiver *dest;
     344             :     TupOutputState *tstate;
     345             :     TupleDesc   tupdesc;
     346             :     Datum       values[2];
     347         518 :     bool        nulls[2] = {0};
     348             : 
     349         518 :     dest = CreateDestReceiver(DestRemoteSimple);
     350             : 
     351         518 :     tupdesc = CreateTemplateTupleDesc(2);
     352         518 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
     353             : 
     354             :     /*
     355             :      * int8 may seem like a surprising data type for this, but in theory int4
     356             :      * would not be wide enough for this, as TimeLineID is unsigned.
     357             :      */
     358         518 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
     359             : 
     360             :     /* send RowDescription */
     361         518 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
     362             : 
     363             :     /* Data row */
     364         518 :     values[0] = CStringGetTextDatum(psprintf("%X/%X", LSN_FORMAT_ARGS(ptr)));
     365         518 :     values[1] = Int64GetDatum(tli);
     366         518 :     do_tup_output(tstate, values, nulls);
     367             : 
     368         518 :     end_tup_output(tstate);
     369             : 
     370             :     /* Send a CommandComplete message */
     371         518 :     pq_puttextmessage(PqMsg_CommandComplete, "SELECT");
     372         518 : }
     373             : 
     374             : /*
     375             :  * Send a result set via libpq describing the tablespace list.
     376             :  */
     377             : static void
     378         266 : SendTablespaceList(List *tablespaces)
     379             : {
     380             :     DestReceiver *dest;
     381             :     TupOutputState *tstate;
     382             :     TupleDesc   tupdesc;
     383             :     ListCell   *lc;
     384             : 
     385         266 :     dest = CreateDestReceiver(DestRemoteSimple);
     386             : 
     387         266 :     tupdesc = CreateTemplateTupleDesc(3);
     388         266 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
     389         266 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
     390         266 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
     391             : 
     392             :     /* send RowDescription */
     393         266 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
     394             : 
     395             :     /* Construct and send the directory information */
     396         594 :     foreach(lc, tablespaces)
     397             :     {
     398         328 :         tablespaceinfo *ti = lfirst(lc);
     399             :         Datum       values[3];
     400         328 :         bool        nulls[3] = {0};
     401             : 
     402             :         /* Send one datarow message */
     403         328 :         if (ti->path == NULL)
     404             :         {
     405         266 :             nulls[0] = true;
     406         266 :             nulls[1] = true;
     407             :         }
     408             :         else
     409             :         {
     410          62 :             values[0] = ObjectIdGetDatum(ti->oid);
     411          62 :             values[1] = CStringGetTextDatum(ti->path);
     412             :         }
     413         328 :         if (ti->size >= 0)
     414         326 :             values[2] = Int64GetDatum(ti->size / 1024);
     415             :         else
     416           2 :             nulls[2] = true;
     417             : 
     418         328 :         do_tup_output(tstate, values, nulls);
     419             :     }
     420             : 
     421         266 :     end_tup_output(tstate);
     422         266 : }

Generated by: LCOV version 1.14