LCOV - code coverage report
Current view: top level - src/fe_utils - astreamer_gzip.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 75.0 % 92 69
Test Date: 2026-03-01 16:14:42 Functions: 81.8 % 11 9
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * astreamer_gzip.c
       4              :  *
       5              :  * Archive streamers that deal with data compressed using gzip.
       6              :  * astreamer_gzip_writer applies gzip compression to the input data
       7              :  * and writes the result to a file. astreamer_gzip_decompressor assumes
       8              :  * that the input stream is compressed using gzip and decompresses it.
       9              :  *
      10              :  * Note that the code in this file is asymmetric with what we do for
      11              :  * other compression types: for lz4 and zstd, there is a compressor and
      12              :  * a decompressor, rather than a writer and a decompressor. The approach
      13              :  * taken here is less flexible, because a writer can only write to a file,
      14              :  * while a compressor can write to a subsequent astreamer which is free
      15              :  * to do whatever it likes. The reason it's like this is because this
      16              :  * code was adapted from old, less-modular pg_basebackup code that used
      17              :  * the same APIs that astreamer_gzip_writer now uses, and it didn't seem
      18              :  * necessary to change anything at the time.
      19              :  *
      20              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      21              :  *
      22              :  * IDENTIFICATION
      23              :  *        src/fe_utils/astreamer_gzip.c
      24              :  *-------------------------------------------------------------------------
      25              :  */
      26              : 
      27              : #include "postgres_fe.h"
      28              : 
      29              : #include <unistd.h>
      30              : 
      31              : #ifdef HAVE_LIBZ
      32              : #include <zlib.h>
      33              : #endif
      34              : 
      35              : #include "common/logging.h"
      36              : #include "fe_utils/astreamer.h"
      37              : 
      38              : #ifdef HAVE_LIBZ
      39              : typedef struct astreamer_gzip_writer
      40              : {
      41              :     astreamer   base;
      42              :     char       *pathname;
      43              :     gzFile      gzfile;
      44              : } astreamer_gzip_writer;
      45              : 
      46              : typedef struct astreamer_gzip_decompressor
      47              : {
      48              :     astreamer   base;
      49              :     z_stream    zstream;
      50              :     size_t      bytes_written;
      51              : } astreamer_gzip_decompressor;
      52              : 
      53              : static void astreamer_gzip_writer_content(astreamer *streamer,
      54              :                                           astreamer_member *member,
      55              :                                           const char *data, int len,
      56              :                                           astreamer_archive_context context);
      57              : static void astreamer_gzip_writer_finalize(astreamer *streamer);
      58              : static void astreamer_gzip_writer_free(astreamer *streamer);
      59              : static const char *get_gz_error(gzFile gzf);
      60              : 
      61              : static const astreamer_ops astreamer_gzip_writer_ops = {
      62              :     .content = astreamer_gzip_writer_content,
      63              :     .finalize = astreamer_gzip_writer_finalize,
      64              :     .free = astreamer_gzip_writer_free
      65              : };
      66              : 
      67              : static void astreamer_gzip_decompressor_content(astreamer *streamer,
      68              :                                                 astreamer_member *member,
      69              :                                                 const char *data, int len,
      70              :                                                 astreamer_archive_context context);
      71              : static void astreamer_gzip_decompressor_finalize(astreamer *streamer);
      72              : static void astreamer_gzip_decompressor_free(astreamer *streamer);
      73              : static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
      74              : static void gzip_pfree(void *opaque, void *address);
      75              : 
      76              : static const astreamer_ops astreamer_gzip_decompressor_ops = {
      77              :     .content = astreamer_gzip_decompressor_content,
      78              :     .finalize = astreamer_gzip_decompressor_finalize,
      79              :     .free = astreamer_gzip_decompressor_free
      80              : };
      81              : #endif
      82              : 
      83              : /*
      84              :  * Create a astreamer that just compresses data using gzip, and then writes
      85              :  * it to a file.
      86              :  *
      87              :  * The caller must specify a pathname and may specify a file. The pathname is
      88              :  * used for error-reporting purposes either way. If file is NULL, the pathname
      89              :  * also identifies the file to which the data should be written: it is opened
      90              :  * for writing and closed when done. If file is not NULL, the data is written
      91              :  * there.
      92              :  *
      93              :  * Note that zlib does not use the FILE interface, but operates directly on
      94              :  * a duplicate of the underlying fd. Hence, callers must take care if they
      95              :  * plan to write any other data to the same FILE, either before or after using
      96              :  * this.
      97              :  */
      98              : astreamer *
      99            4 : astreamer_gzip_writer_new(char *pathname, FILE *file,
     100              :                           pg_compress_specification *compress)
     101              : {
     102              : #ifdef HAVE_LIBZ
     103              :     astreamer_gzip_writer *streamer;
     104              : 
     105            4 :     streamer = palloc0_object(astreamer_gzip_writer);
     106            4 :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
     107              :         &astreamer_gzip_writer_ops;
     108              : 
     109            4 :     streamer->pathname = pstrdup(pathname);
     110              : 
     111            4 :     if (file == NULL)
     112              :     {
     113            4 :         streamer->gzfile = gzopen(pathname, "wb");
     114            4 :         if (streamer->gzfile == NULL)
     115            0 :             pg_fatal("could not create compressed file \"%s\": %m",
     116              :                      pathname);
     117              :     }
     118              :     else
     119              :     {
     120              :         /*
     121              :          * We must dup the file handle so that gzclose doesn't break the
     122              :          * caller's FILE.  See comment for astreamer_gzip_writer_finalize.
     123              :          */
     124            0 :         int         fd = dup(fileno(file));
     125              : 
     126            0 :         if (fd < 0)
     127            0 :             pg_fatal("could not duplicate stdout: %m");
     128              : 
     129            0 :         streamer->gzfile = gzdopen(fd, "wb");
     130            0 :         if (streamer->gzfile == NULL)
     131            0 :             pg_fatal("could not open output file: %m");
     132              :     }
     133              : 
     134            4 :     if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
     135            0 :         pg_fatal("could not set compression level %d: %s",
     136              :                  compress->level, get_gz_error(streamer->gzfile));
     137              : 
     138            4 :     return &streamer->base;
     139              : #else
     140              :     pg_fatal("this build does not support compression with %s", "gzip");
     141              :     return NULL;                /* keep compiler quiet */
     142              : #endif
     143              : }
     144              : 
     145              : #ifdef HAVE_LIBZ
     146              : /*
     147              :  * Write archive content to gzip file.
     148              :  */
     149              : static void
     150         9311 : astreamer_gzip_writer_content(astreamer *streamer,
     151              :                               astreamer_member *member, const char *data,
     152              :                               int len, astreamer_archive_context context)
     153              : {
     154              :     astreamer_gzip_writer *mystreamer;
     155              : 
     156         9311 :     mystreamer = (astreamer_gzip_writer *) streamer;
     157              : 
     158         9311 :     if (len == 0)
     159            0 :         return;
     160              : 
     161         9311 :     errno = 0;
     162         9311 :     if (gzwrite(mystreamer->gzfile, data, len) != len)
     163              :     {
     164              :         /* if write didn't set errno, assume problem is no disk space */
     165            0 :         if (errno == 0)
     166            0 :             errno = ENOSPC;
     167            0 :         pg_fatal("could not write to compressed file \"%s\": %s",
     168              :                  mystreamer->pathname, get_gz_error(mystreamer->gzfile));
     169              :     }
     170              : }
     171              : 
     172              : /*
     173              :  * End-of-archive processing when writing to a gzip file consists of just
     174              :  * calling gzclose.
     175              :  *
     176              :  * It makes no difference whether we opened the file or the caller did it,
     177              :  * because libz provides no way of avoiding a close on the underlying file
     178              :  * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to
     179              :  * work around this issue, so that the behavior from the caller's viewpoint
     180              :  * is the same as for astreamer_plain_writer.
     181              :  */
     182              : static void
     183            4 : astreamer_gzip_writer_finalize(astreamer *streamer)
     184              : {
     185              :     astreamer_gzip_writer *mystreamer;
     186              : 
     187            4 :     mystreamer = (astreamer_gzip_writer *) streamer;
     188              : 
     189            4 :     errno = 0;                  /* in case gzclose() doesn't set it */
     190            4 :     if (gzclose(mystreamer->gzfile) != 0)
     191            0 :         pg_fatal("could not close compressed file \"%s\": %m",
     192              :                  mystreamer->pathname);
     193              : 
     194            4 :     mystreamer->gzfile = NULL;
     195            4 : }
     196              : 
     197              : /*
     198              :  * Free memory associated with this astreamer.
     199              :  */
     200              : static void
     201            4 : astreamer_gzip_writer_free(astreamer *streamer)
     202              : {
     203              :     astreamer_gzip_writer *mystreamer;
     204              : 
     205            4 :     mystreamer = (astreamer_gzip_writer *) streamer;
     206              : 
     207              :     Assert(mystreamer->base.bbs_next == NULL);
     208              :     Assert(mystreamer->gzfile == NULL);
     209              : 
     210            4 :     pfree(mystreamer->pathname);
     211            4 :     pfree(mystreamer);
     212            4 : }
     213              : 
     214              : /*
     215              :  * Helper function for libz error reporting.
     216              :  */
     217              : static const char *
     218            0 : get_gz_error(gzFile gzf)
     219              : {
     220              :     int         errnum;
     221              :     const char *errmsg;
     222              : 
     223            0 :     errmsg = gzerror(gzf, &errnum);
     224            0 :     if (errnum == Z_ERRNO)
     225            0 :         return strerror(errno);
     226              :     else
     227            0 :         return errmsg;
     228              : }
     229              : #endif
     230              : 
     231              : /*
     232              :  * Create a new base backup streamer that performs decompression of gzip
     233              :  * compressed blocks.
     234              :  */
     235              : astreamer *
     236            4 : astreamer_gzip_decompressor_new(astreamer *next)
     237              : {
     238              : #ifdef HAVE_LIBZ
     239              :     astreamer_gzip_decompressor *streamer;
     240              :     z_stream   *zs;
     241              : 
     242              :     Assert(next != NULL);
     243              : 
     244            4 :     streamer = palloc0_object(astreamer_gzip_decompressor);
     245            4 :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
     246              :         &astreamer_gzip_decompressor_ops;
     247              : 
     248            4 :     streamer->base.bbs_next = next;
     249            4 :     initStringInfo(&streamer->base.bbs_buffer);
     250              : 
     251              :     /* Initialize internal stream state for decompression */
     252            4 :     zs = &streamer->zstream;
     253            4 :     zs->zalloc = gzip_palloc;
     254            4 :     zs->zfree = gzip_pfree;
     255            4 :     zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
     256            4 :     zs->avail_out = streamer->base.bbs_buffer.maxlen;
     257              : 
     258              :     /*
     259              :      * Data compression was initialized using deflateInit2 to request a gzip
     260              :      * header. Similarly, we are using inflateInit2 to initialize data
     261              :      * decompression.
     262              :      *
     263              :      * Per the documentation for inflateInit2, the second argument is
     264              :      * "windowBits" and its value must be greater than or equal to the value
     265              :      * provided while compressing the data, so we are using the maximum
     266              :      * possible value for safety.
     267              :      */
     268            4 :     if (inflateInit2(zs, 15 + 16) != Z_OK)
     269            0 :         pg_fatal("could not initialize compression library");
     270              : 
     271            4 :     return &streamer->base;
     272              : #else
     273              :     pg_fatal("this build does not support compression with %s", "gzip");
     274              :     return NULL;                /* keep compiler quiet */
     275              : #endif
     276              : }
     277              : 
     278              : #ifdef HAVE_LIBZ
     279              : /*
     280              :  * Decompress the input data to output buffer until we run out of input
     281              :  * data. Each time the output buffer is full, pass on the decompressed data
     282              :  * to the next streamer.
     283              :  */
     284              : static void
     285          157 : astreamer_gzip_decompressor_content(astreamer *streamer,
     286              :                                     astreamer_member *member,
     287              :                                     const char *data, int len,
     288              :                                     astreamer_archive_context context)
     289              : {
     290              :     astreamer_gzip_decompressor *mystreamer;
     291              :     z_stream   *zs;
     292              : 
     293          157 :     mystreamer = (astreamer_gzip_decompressor *) streamer;
     294              : 
     295          157 :     zs = &mystreamer->zstream;
     296          157 :     zs->next_in = (const uint8 *) data;
     297          157 :     zs->avail_in = len;
     298              : 
     299              :     /* Process the current chunk */
     300       122190 :     while (zs->avail_in > 0)
     301              :     {
     302              :         int         res;
     303              : 
     304              :         Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
     305              : 
     306       121876 :         zs->next_out = (uint8 *)
     307       121876 :             mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
     308       121876 :         zs->avail_out =
     309       121876 :             mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     310              : 
     311              :         /*
     312              :          * This call decompresses data starting at zs->next_in and updates
     313              :          * zs->next_in * and zs->avail_in. It generates output data starting
     314              :          * at zs->next_out and updates zs->next_out and zs->avail_out
     315              :          * accordingly.
     316              :          */
     317       121876 :         res = inflate(zs, Z_NO_FLUSH);
     318              : 
     319       121876 :         if (res == Z_STREAM_ERROR)
     320            0 :             pg_log_error("could not decompress data: %s", zs->msg);
     321              : 
     322       121876 :         mystreamer->bytes_written =
     323       121876 :             mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
     324              : 
     325              :         /* If output buffer is full then pass data to next streamer */
     326       121876 :         if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
     327              :         {
     328       121724 :             astreamer_content(mystreamer->base.bbs_next, member,
     329       121724 :                               mystreamer->base.bbs_buffer.data,
     330              :                               mystreamer->base.bbs_buffer.maxlen, context);
     331       121724 :             mystreamer->bytes_written = 0;
     332              :         }
     333              :     }
     334          157 : }
     335              : 
     336              : /*
     337              :  * End-of-stream processing.
     338              :  */
     339              : static void
     340            4 : astreamer_gzip_decompressor_finalize(astreamer *streamer)
     341              : {
     342              :     astreamer_gzip_decompressor *mystreamer;
     343              : 
     344            4 :     mystreamer = (astreamer_gzip_decompressor *) streamer;
     345              : 
     346              :     /*
     347              :      * End of the stream, if there is some pending data in output buffers then
     348              :      * we must forward it to next streamer.
     349              :      */
     350            4 :     astreamer_content(mystreamer->base.bbs_next, NULL,
     351            4 :                       mystreamer->base.bbs_buffer.data,
     352              :                       mystreamer->base.bbs_buffer.maxlen,
     353              :                       ASTREAMER_UNKNOWN);
     354              : 
     355            4 :     astreamer_finalize(mystreamer->base.bbs_next);
     356            4 : }
     357              : 
     358              : /*
     359              :  * Free memory.
     360              :  */
     361              : static void
     362            4 : astreamer_gzip_decompressor_free(astreamer *streamer)
     363              : {
     364            4 :     astreamer_free(streamer->bbs_next);
     365            4 :     pfree(streamer->bbs_buffer.data);
     366            4 :     pfree(streamer);
     367            4 : }
     368              : 
     369              : /*
     370              :  * Wrapper function to adjust the signature of palloc to match what libz
     371              :  * expects.
     372              :  */
     373              : static void *
     374            8 : gzip_palloc(void *opaque, unsigned items, unsigned size)
     375              : {
     376            8 :     return palloc(items * size);
     377              : }
     378              : 
     379              : /*
     380              :  * Wrapper function to adjust the signature of pfree to match what libz
     381              :  * expects.
     382              :  */
     383              : static void
     384            0 : gzip_pfree(void *opaque, void *address)
     385              : {
     386            0 :     pfree(address);
     387            0 : }
     388              : #endif
        

Generated by: LCOV version 2.0-1