LCOV - code coverage report
Current view: top level - src/fe_utils - astreamer_tar.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 84 143 58.7 %
Date: 2024-11-21 08:14:44 Functions: 5 13 38.5 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * astreamer_tar.c
       4             :  *
       5             :  * This module implements three types of tar processing. A tar parser
       6             :  * expects unlabelled chunks of data (e.g. ASTREAMER_UNKNOWN) and splits
       7             :  * it into labelled chunks (any other value of astreamer_archive_context).
       8             :  * A tar archiver does the reverse: it takes a bunch of labelled chunks
       9             :  * and produces a tarfile, optionally replacing member headers and trailers
      10             :  * so that upstream astreamer objects can perform surgery on the tarfile
      11             :  * contents without knowing the details of the tar format. A tar terminator
      12             :  * just adds two blocks of NUL bytes to the end of the file, since older
      13             :  * server versions produce files with this terminator omitted.
      14             :  *
      15             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      16             :  *
      17             :  * IDENTIFICATION
      18             :  *        src/bin/pg_basebackup/astreamer_tar.c
      19             :  *-------------------------------------------------------------------------
      20             :  */
      21             : 
      22             : #include "postgres_fe.h"
      23             : 
      24             : #include <time.h>
      25             : 
      26             : #include "common/logging.h"
      27             : #include "fe_utils/astreamer.h"
      28             : #include "pgtar.h"
      29             : 
      30             : typedef struct astreamer_tar_parser
      31             : {
      32             :     astreamer   base;
      33             :     astreamer_archive_context next_context;
      34             :     astreamer_member member;
      35             :     size_t      file_bytes_sent;
      36             :     size_t      pad_bytes_expected;
      37             : } astreamer_tar_parser;
      38             : 
      39             : typedef struct astreamer_tar_archiver
      40             : {
      41             :     astreamer   base;
      42             :     bool        rearchive_member;
      43             : } astreamer_tar_archiver;
      44             : 
      45             : static void astreamer_tar_parser_content(astreamer *streamer,
      46             :                                          astreamer_member *member,
      47             :                                          const char *data, int len,
      48             :                                          astreamer_archive_context context);
      49             : static void astreamer_tar_parser_finalize(astreamer *streamer);
      50             : static void astreamer_tar_parser_free(astreamer *streamer);
      51             : static bool astreamer_tar_header(astreamer_tar_parser *mystreamer);
      52             : 
      53             : static const astreamer_ops astreamer_tar_parser_ops = {
      54             :     .content = astreamer_tar_parser_content,
      55             :     .finalize = astreamer_tar_parser_finalize,
      56             :     .free = astreamer_tar_parser_free
      57             : };
      58             : 
      59             : static void astreamer_tar_archiver_content(astreamer *streamer,
      60             :                                            astreamer_member *member,
      61             :                                            const char *data, int len,
      62             :                                            astreamer_archive_context context);
      63             : static void astreamer_tar_archiver_finalize(astreamer *streamer);
      64             : static void astreamer_tar_archiver_free(astreamer *streamer);
      65             : 
      66             : static const astreamer_ops astreamer_tar_archiver_ops = {
      67             :     .content = astreamer_tar_archiver_content,
      68             :     .finalize = astreamer_tar_archiver_finalize,
      69             :     .free = astreamer_tar_archiver_free
      70             : };
      71             : 
      72             : static void astreamer_tar_terminator_content(astreamer *streamer,
      73             :                                              astreamer_member *member,
      74             :                                              const char *data, int len,
      75             :                                              astreamer_archive_context context);
      76             : static void astreamer_tar_terminator_finalize(astreamer *streamer);
      77             : static void astreamer_tar_terminator_free(astreamer *streamer);
      78             : 
      79             : static const astreamer_ops astreamer_tar_terminator_ops = {
      80             :     .content = astreamer_tar_terminator_content,
      81             :     .finalize = astreamer_tar_terminator_finalize,
      82             :     .free = astreamer_tar_terminator_free
      83             : };
      84             : 
      85             : /*
      86             :  * Create a astreamer that can parse a stream of content as tar data.
      87             :  *
      88             :  * The input should be a series of ASTREAMER_UNKNOWN chunks; the astreamer
      89             :  * specified by 'next' will receive a series of typed chunks, as per the
      90             :  * conventions described in astreamer.h.
      91             :  */
      92             : astreamer *
      93         376 : astreamer_tar_parser_new(astreamer *next)
      94             : {
      95             :     astreamer_tar_parser *streamer;
      96             : 
      97         376 :     streamer = palloc0(sizeof(astreamer_tar_parser));
      98         376 :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
      99             :         &astreamer_tar_parser_ops;
     100         376 :     streamer->base.bbs_next = next;
     101         376 :     initStringInfo(&streamer->base.bbs_buffer);
     102         376 :     streamer->next_context = ASTREAMER_MEMBER_HEADER;
     103             : 
     104         376 :     return &streamer->base;
     105             : }
     106             : 
     107             : /*
     108             :  * Parse unknown content as tar data.
     109             :  */
     110             : static void
     111     1058966 : astreamer_tar_parser_content(astreamer *streamer, astreamer_member *member,
     112             :                              const char *data, int len,
     113             :                              astreamer_archive_context context)
     114             : {
     115     1058966 :     astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer;
     116             :     size_t      nbytes;
     117             : 
     118             :     /* Expect unparsed input. */
     119             :     Assert(member == NULL);
     120             :     Assert(context == ASTREAMER_UNKNOWN);
     121             : 
     122     2183258 :     while (len > 0)
     123             :     {
     124     1124670 :         switch (mystreamer->next_context)
     125             :         {
     126      300428 :             case ASTREAMER_MEMBER_HEADER:
     127             : 
     128             :                 /*
     129             :                  * If we're expecting an archive member header, accumulate a
     130             :                  * full block of data before doing anything further.
     131             :                  */
     132      300428 :                 if (!astreamer_buffer_until(streamer, &data, &len,
     133             :                                             TAR_BLOCK_SIZE))
     134           0 :                     return;
     135             : 
     136             :                 /*
     137             :                  * Now we can process the header and get ready to process the
     138             :                  * file contents; however, we might find out that what we
     139             :                  * thought was the next file header is actually the start of
     140             :                  * the archive trailer. Switch modes accordingly.
     141             :                  */
     142      300428 :                 if (astreamer_tar_header(mystreamer))
     143             :                 {
     144      300060 :                     if (mystreamer->member.size == 0)
     145             :                     {
     146             :                         /* No content; trailer is zero-length. */
     147       59176 :                         astreamer_content(mystreamer->base.bbs_next,
     148             :                                           &mystreamer->member,
     149             :                                           NULL, 0,
     150             :                                           ASTREAMER_MEMBER_TRAILER);
     151             : 
     152             :                         /* Expect next header. */
     153       59176 :                         mystreamer->next_context = ASTREAMER_MEMBER_HEADER;
     154             :                     }
     155             :                     else
     156             :                     {
     157             :                         /* Expect contents. */
     158      240884 :                         mystreamer->next_context = ASTREAMER_MEMBER_CONTENTS;
     159             :                     }
     160      300060 :                     mystreamer->base.bbs_buffer.len = 0;
     161      300060 :                     mystreamer->file_bytes_sent = 0;
     162             :                 }
     163             :                 else
     164         368 :                     mystreamer->next_context = ASTREAMER_ARCHIVE_TRAILER;
     165      300428 :                 break;
     166             : 
     167      807644 :             case ASTREAMER_MEMBER_CONTENTS:
     168             : 
     169             :                 /*
     170             :                  * Send as much content as we have, but not more than the
     171             :                  * remaining file length.
     172             :                  */
     173             :                 Assert(mystreamer->file_bytes_sent < mystreamer->member.size);
     174      807644 :                 nbytes = mystreamer->member.size - mystreamer->file_bytes_sent;
     175      807644 :                 nbytes = Min(nbytes, len);
     176             :                 Assert(nbytes > 0);
     177      807644 :                 astreamer_content(mystreamer->base.bbs_next,
     178             :                                   &mystreamer->member,
     179             :                                   data, nbytes,
     180             :                                   ASTREAMER_MEMBER_CONTENTS);
     181      807644 :                 mystreamer->file_bytes_sent += nbytes;
     182      807644 :                 data += nbytes;
     183      807644 :                 len -= nbytes;
     184             : 
     185             :                 /*
     186             :                  * If we've not yet sent the whole file, then there's more
     187             :                  * content to come; otherwise, it's time to expect the file
     188             :                  * trailer.
     189             :                  */
     190             :                 Assert(mystreamer->file_bytes_sent <= mystreamer->member.size);
     191      807644 :                 if (mystreamer->file_bytes_sent == mystreamer->member.size)
     192             :                 {
     193      240878 :                     if (mystreamer->pad_bytes_expected == 0)
     194             :                     {
     195             :                         /* Trailer is zero-length. */
     196      224656 :                         astreamer_content(mystreamer->base.bbs_next,
     197             :                                           &mystreamer->member,
     198             :                                           NULL, 0,
     199             :                                           ASTREAMER_MEMBER_TRAILER);
     200             : 
     201             :                         /* Expect next header. */
     202      224654 :                         mystreamer->next_context = ASTREAMER_MEMBER_HEADER;
     203             :                     }
     204             :                     else
     205             :                     {
     206             :                         /* Trailer is not zero-length. */
     207       16222 :                         mystreamer->next_context = ASTREAMER_MEMBER_TRAILER;
     208             :                     }
     209      240876 :                     mystreamer->base.bbs_buffer.len = 0;
     210             :                 }
     211      807642 :                 break;
     212             : 
     213       16222 :             case ASTREAMER_MEMBER_TRAILER:
     214             : 
     215             :                 /*
     216             :                  * If we're expecting an archive member trailer, accumulate
     217             :                  * the expected number of padding bytes before sending
     218             :                  * anything onward.
     219             :                  */
     220       16222 :                 if (!astreamer_buffer_until(streamer, &data, &len,
     221       16222 :                                             mystreamer->pad_bytes_expected))
     222           0 :                     return;
     223             : 
     224             :                 /* OK, now we can send it. */
     225       16222 :                 astreamer_content(mystreamer->base.bbs_next,
     226             :                                   &mystreamer->member,
     227       16222 :                                   data, mystreamer->pad_bytes_expected,
     228             :                                   ASTREAMER_MEMBER_TRAILER);
     229             : 
     230             :                 /* Expect next file header. */
     231       16222 :                 mystreamer->next_context = ASTREAMER_MEMBER_HEADER;
     232       16222 :                 mystreamer->base.bbs_buffer.len = 0;
     233       16222 :                 break;
     234             : 
     235         376 :             case ASTREAMER_ARCHIVE_TRAILER:
     236             : 
     237             :                 /*
     238             :                  * We've seen an end-of-archive indicator, so anything more is
     239             :                  * buffered and sent as part of the archive trailer. But we
     240             :                  * don't expect more than 2 blocks.
     241             :                  */
     242         376 :                 astreamer_buffer_bytes(streamer, &data, &len, len);
     243         376 :                 if (len > 2 * TAR_BLOCK_SIZE)
     244           0 :                     pg_fatal("tar file trailer exceeds 2 blocks");
     245         376 :                 return;
     246             : 
     247           0 :             default:
     248             :                 /* Shouldn't happen. */
     249           0 :                 pg_fatal("unexpected state while parsing tar archive");
     250             :         }
     251             :     }
     252             : }
     253             : 
     254             : /*
     255             :  * Parse a file header within a tar stream.
     256             :  *
     257             :  * The return value is true if we found a file header and passed it on to the
     258             :  * next astreamer; it is false if we have reached the archive trailer.
     259             :  */
     260             : static bool
     261      300428 : astreamer_tar_header(astreamer_tar_parser *mystreamer)
     262             : {
     263      300428 :     bool        has_nonzero_byte = false;
     264             :     int         i;
     265      300428 :     astreamer_member *member = &mystreamer->member;
     266      300428 :     char       *buffer = mystreamer->base.bbs_buffer.data;
     267             : 
     268             :     Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE);
     269             : 
     270             :     /* Check whether we've got a block of all zero bytes. */
     271      488844 :     for (i = 0; i < TAR_BLOCK_SIZE; ++i)
     272             :     {
     273      488476 :         if (buffer[i] != '\0')
     274             :         {
     275      300060 :             has_nonzero_byte = true;
     276      300060 :             break;
     277             :         }
     278             :     }
     279             : 
     280             :     /*
     281             :      * If the entire block was zeros, this is the end of the archive, not the
     282             :      * start of the next file.
     283             :      */
     284      300428 :     if (!has_nonzero_byte)
     285         368 :         return false;
     286             : 
     287             :     /*
     288             :      * Parse key fields out of the header.
     289             :      */
     290      300060 :     strlcpy(member->pathname, &buffer[TAR_OFFSET_NAME], MAXPGPATH);
     291      300060 :     if (member->pathname[0] == '\0')
     292           0 :         pg_fatal("tar member has empty name");
     293      300060 :     member->size = read_tar_number(&buffer[TAR_OFFSET_SIZE], 12);
     294      300060 :     member->mode = read_tar_number(&buffer[TAR_OFFSET_MODE], 8);
     295      300060 :     member->uid = read_tar_number(&buffer[TAR_OFFSET_UID], 8);
     296      300060 :     member->gid = read_tar_number(&buffer[TAR_OFFSET_GID], 8);
     297      300060 :     member->is_directory =
     298      300060 :         (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_DIRECTORY);
     299      300060 :     member->is_link =
     300      300060 :         (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_SYMLINK);
     301      300060 :     if (member->is_link)
     302          32 :         strlcpy(member->linktarget, &buffer[TAR_OFFSET_LINKNAME], 100);
     303             : 
     304             :     /* Compute number of padding bytes. */
     305      300060 :     mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size);
     306             : 
     307             :     /* Forward the entire header to the next astreamer. */
     308      300060 :     astreamer_content(mystreamer->base.bbs_next, member,
     309             :                       buffer, TAR_BLOCK_SIZE,
     310             :                       ASTREAMER_MEMBER_HEADER);
     311             : 
     312      300060 :     return true;
     313             : }
     314             : 
     315             : /*
     316             :  * End-of-stream processing for a tar parser.
     317             :  */
     318             : static void
     319         368 : astreamer_tar_parser_finalize(astreamer *streamer)
     320             : {
     321         368 :     astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer;
     322             : 
     323         368 :     if (mystreamer->next_context != ASTREAMER_ARCHIVE_TRAILER &&
     324           0 :         (mystreamer->next_context != ASTREAMER_MEMBER_HEADER ||
     325           0 :          mystreamer->base.bbs_buffer.len > 0))
     326           0 :         pg_fatal("COPY stream ended before last file was finished");
     327             : 
     328             :     /* Send the archive trailer, even if empty. */
     329         368 :     astreamer_content(streamer->bbs_next, NULL,
     330         368 :                       streamer->bbs_buffer.data, streamer->bbs_buffer.len,
     331             :                       ASTREAMER_ARCHIVE_TRAILER);
     332             : 
     333             :     /* Now finalize successor. */
     334         368 :     astreamer_finalize(streamer->bbs_next);
     335         368 : }
     336             : 
     337             : /*
     338             :  * Free memory associated with a tar parser.
     339             :  */
     340             : static void
     341         368 : astreamer_tar_parser_free(astreamer *streamer)
     342             : {
     343         368 :     pfree(streamer->bbs_buffer.data);
     344         368 :     astreamer_free(streamer->bbs_next);
     345         368 : }
     346             : 
     347             : /*
     348             :  * Create a astreamer that can generate a tar archive.
     349             :  *
     350             :  * This is intended to be usable either for generating a brand-new tar archive
     351             :  * or for modifying one on the fly. The input should be a series of typed
     352             :  * chunks (i.e. not ASTREAMER_UNKNOWN). See also the comments for
     353             :  * astreamer_tar_parser_content.
     354             :  */
     355             : astreamer *
     356           0 : astreamer_tar_archiver_new(astreamer *next)
     357             : {
     358             :     astreamer_tar_archiver *streamer;
     359             : 
     360           0 :     streamer = palloc0(sizeof(astreamer_tar_archiver));
     361           0 :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
     362             :         &astreamer_tar_archiver_ops;
     363           0 :     streamer->base.bbs_next = next;
     364             : 
     365           0 :     return &streamer->base;
     366             : }
     367             : 
     368             : /*
     369             :  * Fix up the stream of input chunks to create a valid tar file.
     370             :  *
     371             :  * If a ASTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a
     372             :  * newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is
     373             :  * passed through without change. Any other size is a fatal error (and
     374             :  * indicates a bug).
     375             :  *
     376             :  * Whenever a new ASTREAMER_MEMBER_HEADER chunk is constructed, the
     377             :  * corresponding ASTREAMER_MEMBER_TRAILER chunk is also constructed from
     378             :  * scratch. Specifically, we construct a block of zero bytes sufficient to
     379             :  * pad out to a block boundary, as required by the tar format. Other
     380             :  * ASTREAMER_MEMBER_TRAILER chunks are passed through without change.
     381             :  *
     382             :  * Any ASTREAMER_MEMBER_CONTENTS chunks are passed through without change.
     383             :  *
     384             :  * The ASTREAMER_ARCHIVE_TRAILER chunk is replaced with two
     385             :  * blocks of zero bytes. Not all tar programs require this, but apparently
     386             :  * some do. The server does not supply this trailer. If no archive trailer is
     387             :  * present, one will be added by astreamer_tar_parser_finalize.
     388             :  */
     389             : static void
     390           0 : astreamer_tar_archiver_content(astreamer *streamer,
     391             :                                astreamer_member *member,
     392             :                                const char *data, int len,
     393             :                                astreamer_archive_context context)
     394             : {
     395           0 :     astreamer_tar_archiver *mystreamer = (astreamer_tar_archiver *) streamer;
     396             :     char        buffer[2 * TAR_BLOCK_SIZE];
     397             : 
     398             :     Assert(context != ASTREAMER_UNKNOWN);
     399             : 
     400           0 :     if (context == ASTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE)
     401             :     {
     402             :         Assert(len == 0);
     403             : 
     404             :         /* Replace zero-length tar header with a newly constructed one. */
     405           0 :         tarCreateHeader(buffer, member->pathname, NULL,
     406             :                         member->size, member->mode, member->uid, member->gid,
     407             :                         time(NULL));
     408           0 :         data = buffer;
     409           0 :         len = TAR_BLOCK_SIZE;
     410             : 
     411             :         /* Also make a note to replace padding, in case size changed. */
     412           0 :         mystreamer->rearchive_member = true;
     413             :     }
     414           0 :     else if (context == ASTREAMER_MEMBER_TRAILER &&
     415           0 :              mystreamer->rearchive_member)
     416           0 :     {
     417           0 :         int         pad_bytes = tarPaddingBytesRequired(member->size);
     418             : 
     419             :         /* Also replace padding, if we regenerated the header. */
     420           0 :         memset(buffer, 0, pad_bytes);
     421           0 :         data = buffer;
     422           0 :         len = pad_bytes;
     423             : 
     424             :         /* Don't do this again unless we replace another header. */
     425           0 :         mystreamer->rearchive_member = false;
     426             :     }
     427           0 :     else if (context == ASTREAMER_ARCHIVE_TRAILER)
     428             :     {
     429             :         /* Trailer should always be two blocks of zero bytes. */
     430           0 :         memset(buffer, 0, 2 * TAR_BLOCK_SIZE);
     431           0 :         data = buffer;
     432           0 :         len = 2 * TAR_BLOCK_SIZE;
     433             :     }
     434             : 
     435           0 :     astreamer_content(streamer->bbs_next, member, data, len, context);
     436           0 : }
     437             : 
     438             : /*
     439             :  * End-of-stream processing for a tar archiver.
     440             :  */
     441             : static void
     442           0 : astreamer_tar_archiver_finalize(astreamer *streamer)
     443             : {
     444           0 :     astreamer_finalize(streamer->bbs_next);
     445           0 : }
     446             : 
     447             : /*
     448             :  * Free memory associated with a tar archiver.
     449             :  */
     450             : static void
     451           0 : astreamer_tar_archiver_free(astreamer *streamer)
     452             : {
     453           0 :     astreamer_free(streamer->bbs_next);
     454           0 :     pfree(streamer);
     455           0 : }
     456             : 
     457             : /*
     458             :  * Create a astreamer that blindly adds two blocks of NUL bytes to the
     459             :  * end of an incomplete tarfile that the server might send us.
     460             :  */
     461             : astreamer *
     462           0 : astreamer_tar_terminator_new(astreamer *next)
     463             : {
     464             :     astreamer  *streamer;
     465             : 
     466           0 :     streamer = palloc0(sizeof(astreamer));
     467           0 :     *((const astreamer_ops **) &streamer->bbs_ops) =
     468             :         &astreamer_tar_terminator_ops;
     469           0 :     streamer->bbs_next = next;
     470             : 
     471           0 :     return streamer;
     472             : }
     473             : 
     474             : /*
     475             :  * Pass all the content through without change.
     476             :  */
     477             : static void
     478           0 : astreamer_tar_terminator_content(astreamer *streamer,
     479             :                                  astreamer_member *member,
     480             :                                  const char *data, int len,
     481             :                                  astreamer_archive_context context)
     482             : {
     483             :     /* Expect unparsed input. */
     484             :     Assert(member == NULL);
     485             :     Assert(context == ASTREAMER_UNKNOWN);
     486             : 
     487             :     /* Just forward it. */
     488           0 :     astreamer_content(streamer->bbs_next, member, data, len, context);
     489           0 : }
     490             : 
     491             : /*
     492             :  * At the end, blindly add the two blocks of NUL bytes which the server fails
     493             :  * to supply.
     494             :  */
     495             : static void
     496           0 : astreamer_tar_terminator_finalize(astreamer *streamer)
     497             : {
     498             :     char        buffer[2 * TAR_BLOCK_SIZE];
     499             : 
     500           0 :     memset(buffer, 0, 2 * TAR_BLOCK_SIZE);
     501           0 :     astreamer_content(streamer->bbs_next, NULL, buffer,
     502             :                       2 * TAR_BLOCK_SIZE, ASTREAMER_UNKNOWN);
     503           0 :     astreamer_finalize(streamer->bbs_next);
     504           0 : }
     505             : 
     506             : /*
     507             :  * Free memory associated with a tar terminator.
     508             :  */
     509             : static void
     510           0 : astreamer_tar_terminator_free(astreamer *streamer)
     511             : {
     512           0 :     astreamer_free(streamer->bbs_next);
     513           0 :     pfree(streamer);
     514           0 : }

Generated by: LCOV version 1.14