LCOV - code coverage report
Current view: top level - src/bin/pg_dump - compress_zstd.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 0 4 0.0 %
Date: 2025-08-31 01:17:28 Functions: 0 2 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * compress_zstd.c
       4             :  *   Routines for archivers to write a Zstd compressed data stream.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *     src/bin/pg_dump/compress_zstd.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres_fe.h"
      16             : #include <unistd.h>
      17             : 
      18             : #include "compress_zstd.h"
      19             : #include "pg_backup_utils.h"
      20             : 
      21             : #ifndef USE_ZSTD
      22             : 
      23             : void
      24           0 : InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
      25             : {
      26           0 :     pg_fatal("this build does not support compression with %s", "ZSTD");
      27             : }
      28             : 
      29             : void
      30           0 : InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
      31             : {
      32           0 :     pg_fatal("this build does not support compression with %s", "ZSTD");
      33             : }
      34             : 
      35             : #else
      36             : 
      37             : #include <zstd.h>
      38             : 
      39             : typedef struct ZstdCompressorState
      40             : {
      41             :     /* This is a normal file to which we read/write compressed data */
      42             :     FILE       *fp;
      43             : 
      44             :     ZSTD_CStream *cstream;
      45             :     ZSTD_DStream *dstream;
      46             :     ZSTD_outBuffer output;
      47             :     ZSTD_inBuffer input;
      48             : 
      49             :     /* pointer to a static string like from strerror(), for Zstd_write() */
      50             :     const char *zstderror;
      51             : } ZstdCompressorState;
      52             : 
      53             : static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress);
      54             : static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
      55             : static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
      56             :                                    const void *data, size_t dLen);
      57             : static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
      58             : 
      59             : static void
      60             : _Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
      61             :                            ZSTD_cParameter param, int value, char *paramname)
      62             : {
      63             :     size_t      res;
      64             : 
      65             :     res = ZSTD_CCtx_setParameter(cstream, param, value);
      66             :     if (ZSTD_isError(res))
      67             :         pg_fatal("could not set compression parameter \"%s\": %s",
      68             :                  paramname, ZSTD_getErrorName(res));
      69             : }
      70             : 
      71             : /* Return a compression stream with parameters set per argument */
      72             : static ZSTD_CStream *
      73             : _ZstdCStreamParams(pg_compress_specification compress)
      74             : {
      75             :     ZSTD_CStream *cstream;
      76             : 
      77             :     cstream = ZSTD_createCStream();
      78             :     if (cstream == NULL)
      79             :         pg_fatal("could not initialize compression library");
      80             : 
      81             :     _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
      82             :                                compress.level, "level");
      83             : 
      84             :     if (compress.options & PG_COMPRESSION_OPTION_LONG_DISTANCE)
      85             :         _Zstd_CCtx_setParam_or_die(cstream,
      86             :                                    ZSTD_c_enableLongDistanceMatching,
      87             :                                    compress.long_distance, "long");
      88             : 
      89             :     return cstream;
      90             : }
      91             : 
      92             : /* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
      93             : static void
      94             : _ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
      95             : {
      96             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
      97             :     ZSTD_inBuffer *input = &zstdcs->input;
      98             :     ZSTD_outBuffer *output = &zstdcs->output;
      99             : 
     100             :     /* Loop while there's any input or until flushed */
     101             :     while (input->pos != input->size || flush)
     102             :     {
     103             :         size_t      res;
     104             : 
     105             :         output->pos = 0;
     106             :         res = ZSTD_compressStream2(zstdcs->cstream, output,
     107             :                                    input, flush ? ZSTD_e_end : ZSTD_e_continue);
     108             : 
     109             :         if (ZSTD_isError(res))
     110             :             pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
     111             : 
     112             :         /*
     113             :          * Extra paranoia: avoid zero-length chunks, since a zero length chunk
     114             :          * is the EOF marker in the custom format. This should never happen
     115             :          * but...
     116             :          */
     117             :         if (output->pos > 0)
     118             :             cs->writeF(AH, output->dst, output->pos);
     119             : 
     120             :         if (res == 0)
     121             :             break;              /* End of frame or all input consumed */
     122             :     }
     123             : }
     124             : 
     125             : static void
     126             : EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
     127             : {
     128             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
     129             : 
     130             :     if (cs->readF != NULL)
     131             :     {
     132             :         Assert(zstdcs->cstream == NULL);
     133             :         ZSTD_freeDStream(zstdcs->dstream);
     134             :         pg_free(unconstify(void *, zstdcs->input.src));
     135             :     }
     136             :     else if (cs->writeF != NULL)
     137             :     {
     138             :         Assert(zstdcs->dstream == NULL);
     139             :         _ZstdWriteCommon(AH, cs, true);
     140             :         ZSTD_freeCStream(zstdcs->cstream);
     141             :     }
     142             : 
     143             :     /* output buffer may be allocated in either mode */
     144             :     pg_free(zstdcs->output.dst);
     145             :     pg_free(zstdcs);
     146             :     cs->private_data = NULL;
     147             : }
     148             : 
     149             : static void
     150             : WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
     151             :                        const void *data, size_t dLen)
     152             : {
     153             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
     154             : 
     155             :     zstdcs->input.src = data;
     156             :     zstdcs->input.size = dLen;
     157             :     zstdcs->input.pos = 0;
     158             : 
     159             :     _ZstdWriteCommon(AH, cs, false);
     160             : }
     161             : 
     162             : static void
     163             : ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
     164             : {
     165             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
     166             :     ZSTD_outBuffer *output = &zstdcs->output;
     167             :     ZSTD_inBuffer *input = &zstdcs->input;
     168             :     size_t      input_allocated_size = ZSTD_DStreamInSize();
     169             :     size_t      res;
     170             : 
     171             :     for (;;)
     172             :     {
     173             :         size_t      cnt;
     174             : 
     175             :         /*
     176             :          * Read compressed data.  Note that readF can resize the buffer; the
     177             :          * new size is tracked and used for future loops.
     178             :          */
     179             :         input->size = input_allocated_size;
     180             :         cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
     181             : 
     182             :         /* ensure that readF didn't *shrink* the buffer */
     183             :         Assert(input->size >= input_allocated_size);
     184             :         input_allocated_size = input->size;
     185             :         input->size = cnt;
     186             :         input->pos = 0;
     187             : 
     188             :         if (cnt == 0)
     189             :             break;
     190             : 
     191             :         /* Now decompress */
     192             :         while (input->pos < input->size)
     193             :         {
     194             :             output->pos = 0;
     195             :             res = ZSTD_decompressStream(zstdcs->dstream, output, input);
     196             :             if (ZSTD_isError(res))
     197             :                 pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
     198             : 
     199             :             /*
     200             :              * then write the decompressed data to the output handle
     201             :              */
     202             :             ((char *) output->dst)[output->pos] = '\0';
     203             :             ahwrite(output->dst, 1, output->pos, AH);
     204             : 
     205             :             if (res == 0)
     206             :                 break;          /* End of frame */
     207             :         }
     208             :     }
     209             : }
     210             : 
     211             : /* Public routine that supports Zstd compressed data I/O */
     212             : void
     213             : InitCompressorZstd(CompressorState *cs,
     214             :                    const pg_compress_specification compression_spec)
     215             : {
     216             :     ZstdCompressorState *zstdcs;
     217             : 
     218             :     cs->readData = ReadDataFromArchiveZstd;
     219             :     cs->writeData = WriteDataToArchiveZstd;
     220             :     cs->end = EndCompressorZstd;
     221             : 
     222             :     cs->compression_spec = compression_spec;
     223             : 
     224             :     zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
     225             :     cs->private_data = zstdcs;
     226             : 
     227             :     /* We expect that exactly one of readF/writeF is specified */
     228             :     Assert((cs->readF == NULL) != (cs->writeF == NULL));
     229             : 
     230             :     if (cs->readF != NULL)
     231             :     {
     232             :         zstdcs->dstream = ZSTD_createDStream();
     233             :         if (zstdcs->dstream == NULL)
     234             :             pg_fatal("could not initialize compression library");
     235             : 
     236             :         zstdcs->input.size = ZSTD_DStreamInSize();
     237             :         zstdcs->input.src = pg_malloc(zstdcs->input.size);
     238             : 
     239             :         /*
     240             :          * output.size is the buffer size we tell zstd it can output to.
     241             :          * Allocate an additional byte such that ReadDataFromArchiveZstd() can
     242             :          * call ahwrite() with a null-terminated string, which is an optimized
     243             :          * case in ExecuteSqlCommandBuf().
     244             :          */
     245             :         zstdcs->output.size = ZSTD_DStreamOutSize();
     246             :         zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
     247             :     }
     248             :     else if (cs->writeF != NULL)
     249             :     {
     250             :         zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec);
     251             : 
     252             :         zstdcs->output.size = ZSTD_CStreamOutSize();
     253             :         zstdcs->output.dst = pg_malloc(zstdcs->output.size);
     254             :         zstdcs->output.pos = 0;
     255             :     }
     256             : }
     257             : 
     258             : /*
     259             :  * Compressed stream API
     260             :  */
     261             : 
     262             : static size_t
     263             : Zstd_read_internal(void *ptr, size_t size, CompressFileHandle *CFH, bool exit_on_error)
     264             : {
     265             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     266             :     ZSTD_inBuffer *input = &zstdcs->input;
     267             :     ZSTD_outBuffer *output = &zstdcs->output;
     268             :     size_t      input_allocated_size = ZSTD_DStreamInSize();
     269             :     size_t      res,
     270             :                 cnt;
     271             : 
     272             :     /*
     273             :      * If this is the first call to the reading function, initialize the
     274             :      * required datastructures.
     275             :      */
     276             :     if (zstdcs->dstream == NULL)
     277             :     {
     278             :         zstdcs->input.src = pg_malloc0(input_allocated_size);
     279             :         zstdcs->dstream = ZSTD_createDStream();
     280             :         if (zstdcs->dstream == NULL)
     281             :         {
     282             :             if (exit_on_error)
     283             :                 pg_fatal("could not initialize compression library");
     284             :             return -1;
     285             :         }
     286             :     }
     287             : 
     288             :     output->size = size;
     289             :     output->dst = ptr;
     290             :     output->pos = 0;
     291             : 
     292             :     for (;;)
     293             :     {
     294             :         Assert(input->pos <= input->size);
     295             :         Assert(input->size <= input_allocated_size);
     296             : 
     297             :         /*
     298             :          * If the input is completely consumed, start back at the beginning
     299             :          */
     300             :         if (input->pos == input->size)
     301             :         {
     302             :             /* input->size is size produced by "fread" */
     303             :             input->size = 0;
     304             :             /* input->pos is position consumed by decompress */
     305             :             input->pos = 0;
     306             :         }
     307             : 
     308             :         /* read compressed data if we must produce more input */
     309             :         if (input->pos == input->size)
     310             :         {
     311             :             cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
     312             :             if (ferror(zstdcs->fp))
     313             :             {
     314             :                 if (exit_on_error)
     315             :                     pg_fatal("could not read from input file: %m");
     316             :                 return -1;
     317             :             }
     318             : 
     319             :             input->size = cnt;
     320             : 
     321             :             Assert(cnt <= input_allocated_size);
     322             : 
     323             :             /* If we have no more input to consume, we're done */
     324             :             if (cnt == 0)
     325             :                 break;
     326             :         }
     327             : 
     328             :         while (input->pos < input->size)
     329             :         {
     330             :             /* now decompress */
     331             :             res = ZSTD_decompressStream(zstdcs->dstream, output, input);
     332             : 
     333             :             if (ZSTD_isError(res))
     334             :             {
     335             :                 if (exit_on_error)
     336             :                     pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
     337             :                 return -1;
     338             :             }
     339             : 
     340             :             if (output->pos == output->size)
     341             :                 break;          /* No more room for output */
     342             : 
     343             :             if (res == 0)
     344             :                 break;          /* End of frame */
     345             :         }
     346             : 
     347             :         if (output->pos == output->size)
     348             :             break;              /* We read all the data that fits */
     349             :     }
     350             : 
     351             :     return output->pos;
     352             : }
     353             : 
     354             : static void
     355             : Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
     356             : {
     357             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     358             :     ZSTD_inBuffer *input = &zstdcs->input;
     359             :     ZSTD_outBuffer *output = &zstdcs->output;
     360             :     size_t      res,
     361             :                 cnt;
     362             : 
     363             :     input->src = ptr;
     364             :     input->size = size;
     365             :     input->pos = 0;
     366             : 
     367             :     if (zstdcs->cstream == NULL)
     368             :     {
     369             :         zstdcs->output.size = ZSTD_CStreamOutSize();
     370             :         zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
     371             :         zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
     372             :         if (zstdcs->cstream == NULL)
     373             :             pg_fatal("could not initialize compression library");
     374             :     }
     375             : 
     376             :     /* Consume all input, to be flushed later */
     377             :     while (input->pos != input->size)
     378             :     {
     379             :         output->pos = 0;
     380             :         res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
     381             :         if (ZSTD_isError(res))
     382             :             pg_fatal("could not write to file: %s", ZSTD_getErrorName(res));
     383             : 
     384             :         errno = 0;
     385             :         cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
     386             :         if (cnt != output->pos)
     387             :         {
     388             :             errno = (errno) ? errno : ENOSPC;
     389             :             pg_fatal("could not write to file: %m");
     390             :         }
     391             :     }
     392             : }
     393             : 
     394             : static int
     395             : Zstd_getc(CompressFileHandle *CFH)
     396             : {
     397             :     unsigned char ret;
     398             : 
     399             :     if (CFH->read_func(&ret, 1, CFH) != 1)
     400             :         pg_fatal("could not read from input file: end of file");
     401             :     return ret;
     402             : }
     403             : 
     404             : static char *
     405             : Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
     406             : {
     407             :     int         i;
     408             : 
     409             :     Assert(len > 0);
     410             : 
     411             :     /*
     412             :      * Read one byte at a time until newline or EOF. This is only used to read
     413             :      * the list of LOs, and the I/O is buffered anyway.
     414             :      */
     415             :     for (i = 0; i < len - 1; ++i)
     416             :     {
     417             :         if (Zstd_read_internal(&buf[i], 1, CFH, false) != 1)
     418             :             break;
     419             :         if (buf[i] == '\n')
     420             :         {
     421             :             ++i;
     422             :             break;
     423             :         }
     424             :     }
     425             :     buf[i] = '\0';
     426             :     return i > 0 ? buf : NULL;
     427             : }
     428             : 
     429             : static size_t
     430             : Zstd_read(void *ptr, size_t size, CompressFileHandle *CFH)
     431             : {
     432             :     return Zstd_read_internal(ptr, size, CFH, true);
     433             : }
     434             : 
     435             : static bool
     436             : Zstd_close(CompressFileHandle *CFH)
     437             : {
     438             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     439             :     bool        success = true;
     440             : 
     441             :     if (zstdcs->cstream)
     442             :     {
     443             :         size_t      res,
     444             :                     cnt;
     445             :         ZSTD_inBuffer *input = &zstdcs->input;
     446             :         ZSTD_outBuffer *output = &zstdcs->output;
     447             : 
     448             :         /* Loop until the compression buffers are fully consumed */
     449             :         for (;;)
     450             :         {
     451             :             output->pos = 0;
     452             :             res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
     453             :             if (ZSTD_isError(res))
     454             :             {
     455             :                 zstdcs->zstderror = ZSTD_getErrorName(res);
     456             :                 success = false;
     457             :                 break;
     458             :             }
     459             : 
     460             :             errno = 0;
     461             :             cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
     462             :             if (cnt != output->pos)
     463             :             {
     464             :                 errno = (errno) ? errno : ENOSPC;
     465             :                 zstdcs->zstderror = strerror(errno);
     466             :                 success = false;
     467             :                 break;
     468             :             }
     469             : 
     470             :             if (res == 0)
     471             :                 break;          /* End of frame */
     472             :         }
     473             : 
     474             :         ZSTD_freeCStream(zstdcs->cstream);
     475             :         pg_free(zstdcs->output.dst);
     476             :     }
     477             : 
     478             :     if (zstdcs->dstream)
     479             :     {
     480             :         ZSTD_freeDStream(zstdcs->dstream);
     481             :         pg_free(unconstify(void *, zstdcs->input.src));
     482             :     }
     483             : 
     484             :     errno = 0;
     485             :     if (fclose(zstdcs->fp) != 0)
     486             :     {
     487             :         zstdcs->zstderror = strerror(errno);
     488             :         success = false;
     489             :     }
     490             : 
     491             :     pg_free(zstdcs);
     492             :     CFH->private_data = NULL;
     493             :     return success;
     494             : }
     495             : 
     496             : static bool
     497             : Zstd_eof(CompressFileHandle *CFH)
     498             : {
     499             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     500             : 
     501             :     return feof(zstdcs->fp);
     502             : }
     503             : 
     504             : static bool
     505             : Zstd_open(const char *path, int fd, const char *mode,
     506             :           CompressFileHandle *CFH)
     507             : {
     508             :     FILE       *fp;
     509             :     ZstdCompressorState *zstdcs;
     510             : 
     511             :     /*
     512             :      * Clear state storage to avoid having the fd point to non-NULL memory on
     513             :      * error return.
     514             :      */
     515             :     CFH->private_data = NULL;
     516             : 
     517             :     zstdcs = (ZstdCompressorState *) pg_malloc_extended(sizeof(*zstdcs),
     518             :                                                         MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
     519             :     if (!zstdcs)
     520             :     {
     521             :         errno = ENOMEM;
     522             :         return false;
     523             :     }
     524             : 
     525             :     if (fd >= 0)
     526             :         fp = fdopen(dup(fd), mode);
     527             :     else
     528             :         fp = fopen(path, mode);
     529             : 
     530             :     if (fp == NULL)
     531             :     {
     532             :         pg_free(zstdcs);
     533             :         return false;
     534             :     }
     535             : 
     536             :     zstdcs->fp = fp;
     537             :     CFH->private_data = zstdcs;
     538             : 
     539             :     return true;
     540             : }
     541             : 
     542             : static bool
     543             : Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
     544             : {
     545             :     char        fname[MAXPGPATH];
     546             : 
     547             :     sprintf(fname, "%s.zst", path);
     548             :     return CFH->open_func(fname, -1, mode, CFH);
     549             : }
     550             : 
     551             : static const char *
     552             : Zstd_get_error(CompressFileHandle *CFH)
     553             : {
     554             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     555             : 
     556             :     return zstdcs->zstderror;
     557             : }
     558             : 
     559             : void
     560             : InitCompressFileHandleZstd(CompressFileHandle *CFH,
     561             :                            const pg_compress_specification compression_spec)
     562             : {
     563             :     CFH->open_func = Zstd_open;
     564             :     CFH->open_write_func = Zstd_open_write;
     565             :     CFH->read_func = Zstd_read;
     566             :     CFH->write_func = Zstd_write;
     567             :     CFH->gets_func = Zstd_gets;
     568             :     CFH->getc_func = Zstd_getc;
     569             :     CFH->close_func = Zstd_close;
     570             :     CFH->eof_func = Zstd_eof;
     571             :     CFH->get_error_func = Zstd_get_error;
     572             : 
     573             :     CFH->compression_spec = compression_spec;
     574             : 
     575             :     CFH->private_data = NULL;
     576             : }
     577             : 
     578             : #endif                          /* USE_ZSTD */

Generated by: LCOV version 1.16