LCOV - code coverage report
Current view: top level - src/fe_utils - astreamer_gzip.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 69 92 75.0 %
Date: 2024-11-21 08:14:44 Functions: 9 11 81.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * astreamer_gzip.c
       4             :  *
       5             :  * Archive streamers that deal with data compressed using gzip.
       6             :  * astreamer_gzip_writer applies gzip compression to the input data
       7             :  * and writes the result to a file. astreamer_gzip_decompressor assumes
       8             :  * that the input stream is compressed using gzip and decompresses it.
       9             :  *
      10             :  * Note that the code in this file is asymmetric with what we do for
      11             :  * other compression types: for lz4 and zstd, there is a compressor and
      12             :  * a decompressor, rather than a writer and a decompressor. The approach
      13             :  * taken here is less flexible, because a writer can only write to a file,
      14             :  * while a compressor can write to a subsequent astreamer which is free
      15             :  * to do whatever it likes. The reason it's like this is because this
      16             :  * code was adapted from old, less-modular pg_basebackup code that used
      17             :  * the same APIs that astreamer_gzip_writer now uses, and it didn't seem
      18             :  * necessary to change anything at the time.
      19             :  *
      20             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      21             :  *
      22             :  * IDENTIFICATION
      23             :  *        src/bin/pg_basebackup/astreamer_gzip.c
      24             :  *-------------------------------------------------------------------------
      25             :  */
      26             : 
      27             : #include "postgres_fe.h"
      28             : 
      29             : #include <unistd.h>
      30             : 
      31             : #ifdef HAVE_LIBZ
      32             : #include <zlib.h>
      33             : #endif
      34             : 
      35             : #include "common/logging.h"
      36             : #include "fe_utils/astreamer.h"
      37             : 
      38             : #ifdef HAVE_LIBZ
      39             : typedef struct astreamer_gzip_writer
      40             : {
      41             :     astreamer   base;
      42             :     char       *pathname;
      43             :     gzFile      gzfile;
      44             : } astreamer_gzip_writer;
      45             : 
      46             : typedef struct astreamer_gzip_decompressor
      47             : {
      48             :     astreamer   base;
      49             :     z_stream    zstream;
      50             :     size_t      bytes_written;
      51             : } astreamer_gzip_decompressor;
      52             : 
      53             : static void astreamer_gzip_writer_content(astreamer *streamer,
      54             :                                           astreamer_member *member,
      55             :                                           const char *data, int len,
      56             :                                           astreamer_archive_context context);
      57             : static void astreamer_gzip_writer_finalize(astreamer *streamer);
      58             : static void astreamer_gzip_writer_free(astreamer *streamer);
      59             : static const char *get_gz_error(gzFile gzf);
      60             : 
      61             : static const astreamer_ops astreamer_gzip_writer_ops = {
      62             :     .content = astreamer_gzip_writer_content,
      63             :     .finalize = astreamer_gzip_writer_finalize,
      64             :     .free = astreamer_gzip_writer_free
      65             : };
      66             : 
      67             : static void astreamer_gzip_decompressor_content(astreamer *streamer,
      68             :                                                 astreamer_member *member,
      69             :                                                 const char *data, int len,
      70             :                                                 astreamer_archive_context context);
      71             : static void astreamer_gzip_decompressor_finalize(astreamer *streamer);
      72             : static void astreamer_gzip_decompressor_free(astreamer *streamer);
      73             : static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
      74             : static void gzip_pfree(void *opaque, void *address);
      75             : 
      76             : static const astreamer_ops astreamer_gzip_decompressor_ops = {
      77             :     .content = astreamer_gzip_decompressor_content,
      78             :     .finalize = astreamer_gzip_decompressor_finalize,
      79             :     .free = astreamer_gzip_decompressor_free
      80             : };
      81             : #endif
      82             : 
      83             : /*
      84             :  * Create a astreamer that just compresses data using gzip, and then writes
      85             :  * it to a file.
      86             :  *
      87             :  * The caller must specify a pathname and may specify a file. The pathname is
      88             :  * used for error-reporting purposes either way. If file is NULL, the pathname
      89             :  * also identifies the file to which the data should be written: it is opened
      90             :  * for writing and closed when done. If file is not NULL, the data is written
      91             :  * there.
      92             :  *
      93             :  * Note that zlib does not use the FILE interface, but operates directly on
      94             :  * a duplicate of the underlying fd. Hence, callers must take care if they
      95             :  * plan to write any other data to the same FILE, either before or after using
      96             :  * this.
      97             :  */
      98             : astreamer *
      99           8 : astreamer_gzip_writer_new(char *pathname, FILE *file,
     100             :                           pg_compress_specification *compress)
     101             : {
     102             : #ifdef HAVE_LIBZ
     103             :     astreamer_gzip_writer *streamer;
     104             : 
     105           8 :     streamer = palloc0(sizeof(astreamer_gzip_writer));
     106           8 :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
     107             :         &astreamer_gzip_writer_ops;
     108             : 
     109           8 :     streamer->pathname = pstrdup(pathname);
     110             : 
     111           8 :     if (file == NULL)
     112             :     {
     113           8 :         streamer->gzfile = gzopen(pathname, "wb");
     114           8 :         if (streamer->gzfile == NULL)
     115           0 :             pg_fatal("could not create compressed file \"%s\": %m",
     116             :                      pathname);
     117             :     }
     118             :     else
     119             :     {
     120             :         /*
     121             :          * We must dup the file handle so that gzclose doesn't break the
     122             :          * caller's FILE.  See comment for astreamer_gzip_writer_finalize.
     123             :          */
     124           0 :         int         fd = dup(fileno(file));
     125             : 
     126           0 :         if (fd < 0)
     127           0 :             pg_fatal("could not duplicate stdout: %m");
     128             : 
     129           0 :         streamer->gzfile = gzdopen(fd, "wb");
     130           0 :         if (streamer->gzfile == NULL)
     131           0 :             pg_fatal("could not open output file: %m");
     132             :     }
     133             : 
     134           8 :     if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
     135           0 :         pg_fatal("could not set compression level %d: %s",
     136             :                  compress->level, get_gz_error(streamer->gzfile));
     137             : 
     138           8 :     return &streamer->base;
     139             : #else
     140             :     pg_fatal("this build does not support compression with %s", "gzip");
     141             :     return NULL;                /* keep compiler quiet */
     142             : #endif
     143             : }
     144             : 
     145             : #ifdef HAVE_LIBZ
     146             : /*
     147             :  * Write archive content to gzip file.
     148             :  */
     149             : static void
     150       18572 : astreamer_gzip_writer_content(astreamer *streamer,
     151             :                               astreamer_member *member, const char *data,
     152             :                               int len, astreamer_archive_context context)
     153             : {
     154             :     astreamer_gzip_writer *mystreamer;
     155             : 
     156       18572 :     mystreamer = (astreamer_gzip_writer *) streamer;
     157             : 
     158       18572 :     if (len == 0)
     159           0 :         return;
     160             : 
     161       18572 :     errno = 0;
     162       18572 :     if (gzwrite(mystreamer->gzfile, data, len) != len)
     163             :     {
     164             :         /* if write didn't set errno, assume problem is no disk space */
     165           0 :         if (errno == 0)
     166           0 :             errno = ENOSPC;
     167           0 :         pg_fatal("could not write to compressed file \"%s\": %s",
     168             :                  mystreamer->pathname, get_gz_error(mystreamer->gzfile));
     169             :     }
     170             : }
     171             : 
     172             : /*
     173             :  * End-of-archive processing when writing to a gzip file consists of just
     174             :  * calling gzclose.
     175             :  *
     176             :  * It makes no difference whether we opened the file or the caller did it,
     177             :  * because libz provides no way of avoiding a close on the underlying file
     178             :  * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to
     179             :  * work around this issue, so that the behavior from the caller's viewpoint
     180             :  * is the same as for astreamer_plain_writer.
     181             :  */
     182             : static void
     183           8 : astreamer_gzip_writer_finalize(astreamer *streamer)
     184             : {
     185             :     astreamer_gzip_writer *mystreamer;
     186             : 
     187           8 :     mystreamer = (astreamer_gzip_writer *) streamer;
     188             : 
     189           8 :     errno = 0;                  /* in case gzclose() doesn't set it */
     190           8 :     if (gzclose(mystreamer->gzfile) != 0)
     191           0 :         pg_fatal("could not close compressed file \"%s\": %m",
     192             :                  mystreamer->pathname);
     193             : 
     194           8 :     mystreamer->gzfile = NULL;
     195           8 : }
     196             : 
     197             : /*
     198             :  * Free memory associated with this astreamer.
     199             :  */
     200             : static void
     201           8 : astreamer_gzip_writer_free(astreamer *streamer)
     202             : {
     203             :     astreamer_gzip_writer *mystreamer;
     204             : 
     205           8 :     mystreamer = (astreamer_gzip_writer *) streamer;
     206             : 
     207             :     Assert(mystreamer->base.bbs_next == NULL);
     208             :     Assert(mystreamer->gzfile == NULL);
     209             : 
     210           8 :     pfree(mystreamer->pathname);
     211           8 :     pfree(mystreamer);
     212           8 : }
     213             : 
     214             : /*
     215             :  * Helper function for libz error reporting.
     216             :  */
     217             : static const char *
     218           0 : get_gz_error(gzFile gzf)
     219             : {
     220             :     int         errnum;
     221             :     const char *errmsg;
     222             : 
     223           0 :     errmsg = gzerror(gzf, &errnum);
     224           0 :     if (errnum == Z_ERRNO)
     225           0 :         return strerror(errno);
     226             :     else
     227           0 :         return errmsg;
     228             : }
     229             : #endif
     230             : 
     231             : /*
     232             :  * Create a new base backup streamer that performs decompression of gzip
     233             :  * compressed blocks.
     234             :  */
     235             : astreamer *
     236           8 : astreamer_gzip_decompressor_new(astreamer *next)
     237             : {
     238             : #ifdef HAVE_LIBZ
     239             :     astreamer_gzip_decompressor *streamer;
     240             :     z_stream   *zs;
     241             : 
     242             :     Assert(next != NULL);
     243             : 
     244           8 :     streamer = palloc0(sizeof(astreamer_gzip_decompressor));
     245           8 :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
     246             :         &astreamer_gzip_decompressor_ops;
     247             : 
     248           8 :     streamer->base.bbs_next = next;
     249           8 :     initStringInfo(&streamer->base.bbs_buffer);
     250             : 
     251             :     /* Initialize internal stream state for decompression */
     252           8 :     zs = &streamer->zstream;
     253           8 :     zs->zalloc = gzip_palloc;
     254           8 :     zs->zfree = gzip_pfree;
     255           8 :     zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
     256           8 :     zs->avail_out = streamer->base.bbs_buffer.maxlen;
     257             : 
     258             :     /*
     259             :      * Data compression was initialized using deflateInit2 to request a gzip
     260             :      * header. Similarly, we are using inflateInit2 to initialize data
     261             :      * decompression.
     262             :      *
     263             :      * Per the documentation for inflateInit2, the second argument is
     264             :      * "windowBits" and its value must be greater than or equal to the value
     265             :      * provided while compressing the data, so we are using the maximum
     266             :      * possible value for safety.
     267             :      */
     268           8 :     if (inflateInit2(zs, 15 + 16) != Z_OK)
     269           0 :         pg_fatal("could not initialize compression library");
     270             : 
     271           8 :     return &streamer->base;
     272             : #else
     273             :     pg_fatal("this build does not support compression with %s", "gzip");
     274             :     return NULL;                /* keep compiler quiet */
     275             : #endif
     276             : }
     277             : 
     278             : #ifdef HAVE_LIBZ
     279             : /*
     280             :  * Decompress the input data to output buffer until we run out of input
     281             :  * data. Each time the output buffer is full, pass on the decompressed data
     282             :  * to the next streamer.
     283             :  */
     284             : static void
     285         294 : astreamer_gzip_decompressor_content(astreamer *streamer,
     286             :                                     astreamer_member *member,
     287             :                                     const char *data, int len,
     288             :                                     astreamer_archive_context context)
     289             : {
     290             :     astreamer_gzip_decompressor *mystreamer;
     291             :     z_stream   *zs;
     292             : 
     293         294 :     mystreamer = (astreamer_gzip_decompressor *) streamer;
     294             : 
     295         294 :     zs = &mystreamer->zstream;
     296         294 :     zs->next_in = (const uint8 *) data;
     297         294 :     zs->avail_in = len;
     298             : 
     299             :     /* Process the current chunk */
     300      239288 :     while (zs->avail_in > 0)
     301             :     {
     302             :         int         res;
     303             : 
     304             :         Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
     305             : 
     306      238994 :         zs->next_out = (uint8 *)
     307      238994 :             mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
     308      238994 :         zs->avail_out =
     309      238994 :             mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     310             : 
     311             :         /*
     312             :          * This call decompresses data starting at zs->next_in and updates
     313             :          * zs->next_in * and zs->avail_in. It generates output data starting
     314             :          * at zs->next_out and updates zs->next_out and zs->avail_out
     315             :          * accordingly.
     316             :          */
     317      238994 :         res = inflate(zs, Z_NO_FLUSH);
     318             : 
     319      238994 :         if (res == Z_STREAM_ERROR)
     320           0 :             pg_log_error("could not decompress data: %s", zs->msg);
     321             : 
     322      238994 :         mystreamer->bytes_written =
     323      238994 :             mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
     324             : 
     325             :         /* If output buffer is full then pass data to next streamer */
     326      238994 :         if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
     327             :         {
     328      238712 :             astreamer_content(mystreamer->base.bbs_next, member,
     329      238712 :                               mystreamer->base.bbs_buffer.data,
     330             :                               mystreamer->base.bbs_buffer.maxlen, context);
     331      238712 :             mystreamer->bytes_written = 0;
     332             :         }
     333             :     }
     334         294 : }
     335             : 
     336             : /*
     337             :  * End-of-stream processing.
     338             :  */
     339             : static void
     340           8 : astreamer_gzip_decompressor_finalize(astreamer *streamer)
     341             : {
     342             :     astreamer_gzip_decompressor *mystreamer;
     343             : 
     344           8 :     mystreamer = (astreamer_gzip_decompressor *) streamer;
     345             : 
     346             :     /*
     347             :      * End of the stream, if there is some pending data in output buffers then
     348             :      * we must forward it to next streamer.
     349             :      */
     350           8 :     astreamer_content(mystreamer->base.bbs_next, NULL,
     351           8 :                       mystreamer->base.bbs_buffer.data,
     352             :                       mystreamer->base.bbs_buffer.maxlen,
     353             :                       ASTREAMER_UNKNOWN);
     354             : 
     355           8 :     astreamer_finalize(mystreamer->base.bbs_next);
     356           8 : }
     357             : 
     358             : /*
     359             :  * Free memory.
     360             :  */
     361             : static void
     362           8 : astreamer_gzip_decompressor_free(astreamer *streamer)
     363             : {
     364           8 :     astreamer_free(streamer->bbs_next);
     365           8 :     pfree(streamer->bbs_buffer.data);
     366           8 :     pfree(streamer);
     367           8 : }
     368             : 
     369             : /*
     370             :  * Wrapper function to adjust the signature of palloc to match what libz
     371             :  * expects.
     372             :  */
     373             : static void *
     374          16 : gzip_palloc(void *opaque, unsigned items, unsigned size)
     375             : {
     376          16 :     return palloc(items * size);
     377             : }
     378             : 
     379             : /*
     380             :  * Wrapper function to adjust the signature of pfree to match what libz
     381             :  * expects.
     382             :  */
     383             : static void
     384           0 : gzip_pfree(void *opaque, void *address)
     385             : {
     386           0 :     pfree(address);
     387           0 : }
     388             : #endif

Generated by: LCOV version 1.14