LCOV - code coverage report
Current view: top level - src/bin/pg_dump - compress_zstd.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 0 4 0.0 %
Date: 2025-04-24 13:15:39 Functions: 0 2 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * compress_zstd.c
       4             :  *   Routines for archivers to write a Zstd compressed data stream.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *     src/bin/pg_dump/compress_zstd.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres_fe.h"
      16             : 
      17             : #include "compress_zstd.h"
      18             : #include "pg_backup_utils.h"
      19             : 
      20             : #ifndef USE_ZSTD
      21             : 
      22             : void
      23           0 : InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
      24             : {
      25           0 :     pg_fatal("this build does not support compression with %s", "ZSTD");
      26             : }
      27             : 
      28             : void
      29           0 : InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
      30             : {
      31           0 :     pg_fatal("this build does not support compression with %s", "ZSTD");
      32             : }
      33             : 
      34             : #else
      35             : 
      36             : #include <zstd.h>
      37             : 
      38             : typedef struct ZstdCompressorState
      39             : {
      40             :     /* This is a normal file to which we read/write compressed data */
      41             :     FILE       *fp;
      42             : 
      43             :     ZSTD_CStream *cstream;
      44             :     ZSTD_DStream *dstream;
      45             :     ZSTD_outBuffer output;
      46             :     ZSTD_inBuffer input;
      47             : 
      48             :     /* pointer to a static string like from strerror(), for Zstd_write() */
      49             :     const char *zstderror;
      50             : } ZstdCompressorState;
      51             : 
      52             : static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress);
      53             : static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
      54             : static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
      55             :                                    const void *data, size_t dLen);
      56             : static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
      57             : 
      58             : static void
      59             : _Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
      60             :                            ZSTD_cParameter param, int value, char *paramname)
      61             : {
      62             :     size_t      res;
      63             : 
      64             :     res = ZSTD_CCtx_setParameter(cstream, param, value);
      65             :     if (ZSTD_isError(res))
      66             :         pg_fatal("could not set compression parameter \"%s\": %s",
      67             :                  paramname, ZSTD_getErrorName(res));
      68             : }
      69             : 
      70             : /* Return a compression stream with parameters set per argument */
      71             : static ZSTD_CStream *
      72             : _ZstdCStreamParams(pg_compress_specification compress)
      73             : {
      74             :     ZSTD_CStream *cstream;
      75             : 
      76             :     cstream = ZSTD_createCStream();
      77             :     if (cstream == NULL)
      78             :         pg_fatal("could not initialize compression library");
      79             : 
      80             :     _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
      81             :                                compress.level, "level");
      82             : 
      83             :     if (compress.options & PG_COMPRESSION_OPTION_LONG_DISTANCE)
      84             :         _Zstd_CCtx_setParam_or_die(cstream,
      85             :                                    ZSTD_c_enableLongDistanceMatching,
      86             :                                    compress.long_distance, "long");
      87             : 
      88             :     return cstream;
      89             : }
      90             : 
      91             : /* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
      92             : static void
      93             : _ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
      94             : {
      95             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
      96             :     ZSTD_inBuffer *input = &zstdcs->input;
      97             :     ZSTD_outBuffer *output = &zstdcs->output;
      98             : 
      99             :     /* Loop while there's any input or until flushed */
     100             :     while (input->pos != input->size || flush)
     101             :     {
     102             :         size_t      res;
     103             : 
     104             :         output->pos = 0;
     105             :         res = ZSTD_compressStream2(zstdcs->cstream, output,
     106             :                                    input, flush ? ZSTD_e_end : ZSTD_e_continue);
     107             : 
     108             :         if (ZSTD_isError(res))
     109             :             pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
     110             : 
     111             :         /*
     112             :          * Extra paranoia: avoid zero-length chunks, since a zero length chunk
     113             :          * is the EOF marker in the custom format. This should never happen
     114             :          * but...
     115             :          */
     116             :         if (output->pos > 0)
     117             :             cs->writeF(AH, output->dst, output->pos);
     118             : 
     119             :         if (res == 0)
     120             :             break;              /* End of frame or all input consumed */
     121             :     }
     122             : }
     123             : 
     124             : static void
     125             : EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
     126             : {
     127             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
     128             : 
     129             :     if (cs->readF != NULL)
     130             :     {
     131             :         Assert(zstdcs->cstream == NULL);
     132             :         ZSTD_freeDStream(zstdcs->dstream);
     133             :         pg_free(unconstify(void *, zstdcs->input.src));
     134             :     }
     135             :     else if (cs->writeF != NULL)
     136             :     {
     137             :         Assert(zstdcs->dstream == NULL);
     138             :         _ZstdWriteCommon(AH, cs, true);
     139             :         ZSTD_freeCStream(zstdcs->cstream);
     140             :     }
     141             : 
     142             :     /* output buffer may be allocated in either mode */
     143             :     pg_free(zstdcs->output.dst);
     144             :     pg_free(zstdcs);
     145             :     cs->private_data = NULL;
     146             : }
     147             : 
     148             : static void
     149             : WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
     150             :                        const void *data, size_t dLen)
     151             : {
     152             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
     153             : 
     154             :     zstdcs->input.src = data;
     155             :     zstdcs->input.size = dLen;
     156             :     zstdcs->input.pos = 0;
     157             : 
     158             :     _ZstdWriteCommon(AH, cs, false);
     159             : }
     160             : 
     161             : static void
     162             : ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
     163             : {
     164             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
     165             :     ZSTD_outBuffer *output = &zstdcs->output;
     166             :     ZSTD_inBuffer *input = &zstdcs->input;
     167             :     size_t      input_allocated_size = ZSTD_DStreamInSize();
     168             :     size_t      res;
     169             : 
     170             :     for (;;)
     171             :     {
     172             :         size_t      cnt;
     173             : 
     174             :         /*
     175             :          * Read compressed data.  Note that readF can resize the buffer; the
     176             :          * new size is tracked and used for future loops.
     177             :          */
     178             :         input->size = input_allocated_size;
     179             :         cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
     180             : 
     181             :         /* ensure that readF didn't *shrink* the buffer */
     182             :         Assert(input->size >= input_allocated_size);
     183             :         input_allocated_size = input->size;
     184             :         input->size = cnt;
     185             :         input->pos = 0;
     186             : 
     187             :         if (cnt == 0)
     188             :             break;
     189             : 
     190             :         /* Now decompress */
     191             :         while (input->pos < input->size)
     192             :         {
     193             :             output->pos = 0;
     194             :             res = ZSTD_decompressStream(zstdcs->dstream, output, input);
     195             :             if (ZSTD_isError(res))
     196             :                 pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
     197             : 
     198             :             /*
     199             :              * then write the decompressed data to the output handle
     200             :              */
     201             :             ((char *) output->dst)[output->pos] = '\0';
     202             :             ahwrite(output->dst, 1, output->pos, AH);
     203             : 
     204             :             if (res == 0)
     205             :                 break;          /* End of frame */
     206             :         }
     207             :     }
     208             : }
     209             : 
     210             : /* Public routine that supports Zstd compressed data I/O */
     211             : void
     212             : InitCompressorZstd(CompressorState *cs,
     213             :                    const pg_compress_specification compression_spec)
     214             : {
     215             :     ZstdCompressorState *zstdcs;
     216             : 
     217             :     cs->readData = ReadDataFromArchiveZstd;
     218             :     cs->writeData = WriteDataToArchiveZstd;
     219             :     cs->end = EndCompressorZstd;
     220             : 
     221             :     cs->compression_spec = compression_spec;
     222             : 
     223             :     zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
     224             :     cs->private_data = zstdcs;
     225             : 
     226             :     /* We expect that exactly one of readF/writeF is specified */
     227             :     Assert((cs->readF == NULL) != (cs->writeF == NULL));
     228             : 
     229             :     if (cs->readF != NULL)
     230             :     {
     231             :         zstdcs->dstream = ZSTD_createDStream();
     232             :         if (zstdcs->dstream == NULL)
     233             :             pg_fatal("could not initialize compression library");
     234             : 
     235             :         zstdcs->input.size = ZSTD_DStreamInSize();
     236             :         zstdcs->input.src = pg_malloc(zstdcs->input.size);
     237             : 
     238             :         /*
     239             :          * output.size is the buffer size we tell zstd it can output to.
     240             :          * Allocate an additional byte such that ReadDataFromArchiveZstd() can
     241             :          * call ahwrite() with a null-terminated string, which is an optimized
     242             :          * case in ExecuteSqlCommandBuf().
     243             :          */
     244             :         zstdcs->output.size = ZSTD_DStreamOutSize();
     245             :         zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
     246             :     }
     247             :     else if (cs->writeF != NULL)
     248             :     {
     249             :         zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec);
     250             : 
     251             :         zstdcs->output.size = ZSTD_CStreamOutSize();
     252             :         zstdcs->output.dst = pg_malloc(zstdcs->output.size);
     253             :         zstdcs->output.pos = 0;
     254             :     }
     255             : }
     256             : 
     257             : /*
     258             :  * Compressed stream API
     259             :  */
     260             : 
     261             : static bool
     262             : Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
     263             : {
     264             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     265             :     ZSTD_inBuffer *input = &zstdcs->input;
     266             :     ZSTD_outBuffer *output = &zstdcs->output;
     267             :     size_t      input_allocated_size = ZSTD_DStreamInSize();
     268             :     size_t      res,
     269             :                 cnt;
     270             : 
     271             :     output->size = size;
     272             :     output->dst = ptr;
     273             :     output->pos = 0;
     274             : 
     275             :     for (;;)
     276             :     {
     277             :         Assert(input->pos <= input->size);
     278             :         Assert(input->size <= input_allocated_size);
     279             : 
     280             :         /*
     281             :          * If the input is completely consumed, start back at the beginning
     282             :          */
     283             :         if (input->pos == input->size)
     284             :         {
     285             :             /* input->size is size produced by "fread" */
     286             :             input->size = 0;
     287             :             /* input->pos is position consumed by decompress */
     288             :             input->pos = 0;
     289             :         }
     290             : 
     291             :         /* read compressed data if we must produce more input */
     292             :         if (input->pos == input->size)
     293             :         {
     294             :             cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
     295             :             input->size = cnt;
     296             : 
     297             :             Assert(cnt <= input_allocated_size);
     298             : 
     299             :             /* If we have no more input to consume, we're done */
     300             :             if (cnt == 0)
     301             :                 break;
     302             :         }
     303             : 
     304             :         while (input->pos < input->size)
     305             :         {
     306             :             /* now decompress */
     307             :             res = ZSTD_decompressStream(zstdcs->dstream, output, input);
     308             : 
     309             :             if (ZSTD_isError(res))
     310             :                 pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
     311             : 
     312             :             if (output->pos == output->size)
     313             :                 break;          /* No more room for output */
     314             : 
     315             :             if (res == 0)
     316             :                 break;          /* End of frame */
     317             :         }
     318             : 
     319             :         if (output->pos == output->size)
     320             :             break;              /* We read all the data that fits */
     321             :     }
     322             : 
     323             :     if (rdsize != NULL)
     324             :         *rdsize = output->pos;
     325             : 
     326             :     return true;
     327             : }
     328             : 
     329             : static bool
     330             : Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
     331             : {
     332             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     333             :     ZSTD_inBuffer *input = &zstdcs->input;
     334             :     ZSTD_outBuffer *output = &zstdcs->output;
     335             :     size_t      res,
     336             :                 cnt;
     337             : 
     338             :     input->src = ptr;
     339             :     input->size = size;
     340             :     input->pos = 0;
     341             : 
     342             :     /* Consume all input, to be flushed later */
     343             :     while (input->pos != input->size)
     344             :     {
     345             :         output->pos = 0;
     346             :         res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
     347             :         if (ZSTD_isError(res))
     348             :         {
     349             :             zstdcs->zstderror = ZSTD_getErrorName(res);
     350             :             return false;
     351             :         }
     352             : 
     353             :         cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
     354             :         if (cnt != output->pos)
     355             :         {
     356             :             zstdcs->zstderror = strerror(errno);
     357             :             return false;
     358             :         }
     359             :     }
     360             : 
     361             :     return size;
     362             : }
     363             : 
     364             : static int
     365             : Zstd_getc(CompressFileHandle *CFH)
     366             : {
     367             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     368             :     int         ret;
     369             : 
     370             :     if (CFH->read_func(&ret, 1, NULL, CFH) != 1)
     371             :     {
     372             :         if (feof(zstdcs->fp))
     373             :             pg_fatal("could not read from input file: end of file");
     374             :         else
     375             :             pg_fatal("could not read from input file: %m");
     376             :     }
     377             :     return ret;
     378             : }
     379             : 
     380             : static char *
     381             : Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
     382             : {
     383             :     int         i;
     384             : 
     385             :     Assert(len > 0);
     386             : 
     387             :     /*
     388             :      * Read one byte at a time until newline or EOF. This is only used to read
     389             :      * the list of LOs, and the I/O is buffered anyway.
     390             :      */
     391             :     for (i = 0; i < len - 1; ++i)
     392             :     {
     393             :         size_t      readsz;
     394             : 
     395             :         if (!CFH->read_func(&buf[i], 1, &readsz, CFH))
     396             :             break;
     397             :         if (readsz != 1)
     398             :             break;
     399             :         if (buf[i] == '\n')
     400             :         {
     401             :             ++i;
     402             :             break;
     403             :         }
     404             :     }
     405             :     buf[i] = '\0';
     406             :     return i > 0 ? buf : NULL;
     407             : }
     408             : 
     409             : static bool
     410             : Zstd_close(CompressFileHandle *CFH)
     411             : {
     412             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     413             : 
     414             :     if (zstdcs->cstream)
     415             :     {
     416             :         size_t      res,
     417             :                     cnt;
     418             :         ZSTD_inBuffer *input = &zstdcs->input;
     419             :         ZSTD_outBuffer *output = &zstdcs->output;
     420             : 
     421             :         /* Loop until the compression buffers are fully consumed */
     422             :         for (;;)
     423             :         {
     424             :             output->pos = 0;
     425             :             res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
     426             :             if (ZSTD_isError(res))
     427             :             {
     428             :                 zstdcs->zstderror = ZSTD_getErrorName(res);
     429             :                 return false;
     430             :             }
     431             : 
     432             :             cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
     433             :             if (cnt != output->pos)
     434             :             {
     435             :                 zstdcs->zstderror = strerror(errno);
     436             :                 return false;
     437             :             }
     438             : 
     439             :             if (res == 0)
     440             :                 break;          /* End of frame */
     441             :         }
     442             : 
     443             :         ZSTD_freeCStream(zstdcs->cstream);
     444             :         pg_free(zstdcs->output.dst);
     445             :     }
     446             : 
     447             :     if (zstdcs->dstream)
     448             :     {
     449             :         ZSTD_freeDStream(zstdcs->dstream);
     450             :         pg_free(unconstify(void *, zstdcs->input.src));
     451             :     }
     452             : 
     453             :     if (fclose(zstdcs->fp) != 0)
     454             :         return false;
     455             : 
     456             :     pg_free(zstdcs);
     457             :     return true;
     458             : }
     459             : 
     460             : static bool
     461             : Zstd_eof(CompressFileHandle *CFH)
     462             : {
     463             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     464             : 
     465             :     return feof(zstdcs->fp);
     466             : }
     467             : 
     468             : static bool
     469             : Zstd_open(const char *path, int fd, const char *mode,
     470             :           CompressFileHandle *CFH)
     471             : {
     472             :     FILE       *fp;
     473             :     ZstdCompressorState *zstdcs;
     474             : 
     475             :     if (fd >= 0)
     476             :         fp = fdopen(fd, mode);
     477             :     else
     478             :         fp = fopen(path, mode);
     479             : 
     480             :     if (fp == NULL)
     481             :         return false;
     482             : 
     483             :     zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
     484             :     CFH->private_data = zstdcs;
     485             :     zstdcs->fp = fp;
     486             : 
     487             :     if (mode[0] == 'r')
     488             :     {
     489             :         zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
     490             :         zstdcs->dstream = ZSTD_createDStream();
     491             :         if (zstdcs->dstream == NULL)
     492             :             pg_fatal("could not initialize compression library");
     493             :     }
     494             :     else if (mode[0] == 'w' || mode[0] == 'a')
     495             :     {
     496             :         zstdcs->output.size = ZSTD_CStreamOutSize();
     497             :         zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
     498             :         zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
     499             :         if (zstdcs->cstream == NULL)
     500             :             pg_fatal("could not initialize compression library");
     501             :     }
     502             :     else
     503             :         pg_fatal("unhandled mode \"%s\"", mode);
     504             : 
     505             :     return true;
     506             : }
     507             : 
     508             : static bool
     509             : Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
     510             : {
     511             :     char        fname[MAXPGPATH];
     512             : 
     513             :     sprintf(fname, "%s.zst", path);
     514             :     return CFH->open_func(fname, -1, mode, CFH);
     515             : }
     516             : 
     517             : static const char *
     518             : Zstd_get_error(CompressFileHandle *CFH)
     519             : {
     520             :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     521             : 
     522             :     return zstdcs->zstderror;
     523             : }
     524             : 
     525             : void
     526             : InitCompressFileHandleZstd(CompressFileHandle *CFH,
     527             :                            const pg_compress_specification compression_spec)
     528             : {
     529             :     CFH->open_func = Zstd_open;
     530             :     CFH->open_write_func = Zstd_open_write;
     531             :     CFH->read_func = Zstd_read;
     532             :     CFH->write_func = Zstd_write;
     533             :     CFH->gets_func = Zstd_gets;
     534             :     CFH->getc_func = Zstd_getc;
     535             :     CFH->close_func = Zstd_close;
     536             :     CFH->eof_func = Zstd_eof;
     537             :     CFH->get_error_func = Zstd_get_error;
     538             : 
     539             :     CFH->compression_spec = compression_spec;
     540             : 
     541             :     CFH->private_data = NULL;
     542             : }
     543             : 
     544             : #endif                          /* USE_ZSTD */

Generated by: LCOV version 1.14