LCOV - code coverage report
Current view: top level - src/bin/pg_dump - compress_zstd.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 0.0 % 4 0
Test Date: 2026-03-03 18:14:56 Functions: 0.0 % 2 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * compress_zstd.c
       4              :  *   Routines for archivers to write a Zstd compressed data stream.
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *     src/bin/pg_dump/compress_zstd.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres_fe.h"
      16              : #include <unistd.h>
      17              : 
      18              : #include "compress_zstd.h"
      19              : #include "pg_backup_utils.h"
      20              : 
      21              : #ifndef USE_ZSTD
      22              : 
      23              : void
      24            0 : InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
      25              : {
      26            0 :     pg_fatal("this build does not support compression with %s", "ZSTD");
      27              : }
      28              : 
      29              : void
      30            0 : InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
      31              : {
      32            0 :     pg_fatal("this build does not support compression with %s", "ZSTD");
      33              : }
      34              : 
      35              : #else
      36              : 
      37              : #include <zstd.h>
      38              : 
      39              : typedef struct ZstdCompressorState
      40              : {
      41              :     /* This is a normal file to which we read/write compressed data */
      42              :     FILE       *fp;
      43              : 
      44              :     ZSTD_CStream *cstream;
      45              :     ZSTD_DStream *dstream;
      46              :     ZSTD_outBuffer output;
      47              :     ZSTD_inBuffer input;
      48              : 
      49              :     /* pointer to a static string like from strerror(), for Zstd_write() */
      50              :     const char *zstderror;
      51              : } ZstdCompressorState;
      52              : 
      53              : static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress);
      54              : static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
      55              : static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
      56              :                                    const void *data, size_t dLen);
      57              : static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
      58              : 
      59              : static void
      60              : _Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
      61              :                            ZSTD_cParameter param, int value, char *paramname)
      62              : {
      63              :     size_t      res;
      64              : 
      65              :     res = ZSTD_CCtx_setParameter(cstream, param, value);
      66              :     if (ZSTD_isError(res))
      67              :         pg_fatal("could not set compression parameter \"%s\": %s",
      68              :                  paramname, ZSTD_getErrorName(res));
      69              : }
      70              : 
      71              : /* Return a compression stream with parameters set per argument */
      72              : static ZSTD_CStream *
      73              : _ZstdCStreamParams(pg_compress_specification compress)
      74              : {
      75              :     ZSTD_CStream *cstream;
      76              : 
      77              :     cstream = ZSTD_createCStream();
      78              :     if (cstream == NULL)
      79              :         pg_fatal("could not initialize compression library");
      80              : 
      81              :     _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
      82              :                                compress.level, "level");
      83              : 
      84              :     if (compress.options & PG_COMPRESSION_OPTION_LONG_DISTANCE)
      85              :         _Zstd_CCtx_setParam_or_die(cstream,
      86              :                                    ZSTD_c_enableLongDistanceMatching,
      87              :                                    compress.long_distance, "long");
      88              : 
      89              :     return cstream;
      90              : }
      91              : 
      92              : /* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
      93              : static void
      94              : _ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
      95              : {
      96              :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
      97              :     ZSTD_inBuffer *input = &zstdcs->input;
      98              :     ZSTD_outBuffer *output = &zstdcs->output;
      99              : 
     100              :     /* Loop while there's any input or until flushed */
     101              :     while (input->pos < input->size || flush)
     102              :     {
     103              :         size_t      res;
     104              : 
     105              :         res = ZSTD_compressStream2(zstdcs->cstream, output,
     106              :                                    input, flush ? ZSTD_e_end : ZSTD_e_continue);
     107              : 
     108              :         if (ZSTD_isError(res))
     109              :             pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
     110              : 
     111              :         /* Dump output buffer if full, or if we're told to flush */
     112              :         if (output->pos >= output->size || flush)
     113              :         {
     114              :             cs->writeF(AH, output->dst, output->pos);
     115              :             output->pos = 0;
     116              :         }
     117              : 
     118              :         if (res == 0)
     119              :             break;              /* End of frame or all input consumed */
     120              :     }
     121              : }
     122              : 
     123              : static void
     124              : EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
     125              : {
     126              :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
     127              : 
     128              :     if (cs->readF != NULL)
     129              :     {
     130              :         Assert(zstdcs->cstream == NULL);
     131              :         ZSTD_freeDStream(zstdcs->dstream);
     132              :         pg_free(unconstify(void *, zstdcs->input.src));
     133              :     }
     134              :     else if (cs->writeF != NULL)
     135              :     {
     136              :         Assert(zstdcs->dstream == NULL);
     137              :         _ZstdWriteCommon(AH, cs, true);
     138              :         ZSTD_freeCStream(zstdcs->cstream);
     139              :     }
     140              : 
     141              :     /* output buffer may be allocated in either mode */
     142              :     pg_free(zstdcs->output.dst);
     143              :     pg_free(zstdcs);
     144              :     cs->private_data = NULL;
     145              : }
     146              : 
     147              : static void
     148              : WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
     149              :                        const void *data, size_t dLen)
     150              : {
     151              :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
     152              : 
     153              :     zstdcs->input.src = data;
     154              :     zstdcs->input.size = dLen;
     155              :     zstdcs->input.pos = 0;
     156              : 
     157              :     _ZstdWriteCommon(AH, cs, false);
     158              : }
     159              : 
     160              : static void
     161              : ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
     162              : {
     163              :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
     164              :     ZSTD_outBuffer *output = &zstdcs->output;
     165              :     ZSTD_inBuffer *input = &zstdcs->input;
     166              :     size_t      input_allocated_size = ZSTD_DStreamInSize();
     167              :     size_t      res;
     168              : 
     169              :     for (;;)
     170              :     {
     171              :         size_t      cnt;
     172              : 
     173              :         /*
     174              :          * Read compressed data.  Note that readF can resize the buffer; the
     175              :          * new size is tracked and used for future loops.
     176              :          */
     177              :         input->size = input_allocated_size;
     178              :         cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
     179              : 
     180              :         /* ensure that readF didn't *shrink* the buffer */
     181              :         Assert(input->size >= input_allocated_size);
     182              :         input_allocated_size = input->size;
     183              :         input->size = cnt;
     184              :         input->pos = 0;
     185              : 
     186              :         if (cnt == 0)
     187              :             break;
     188              : 
     189              :         /* Now decompress */
     190              :         while (input->pos < input->size)
     191              :         {
     192              :             output->pos = 0;
     193              :             res = ZSTD_decompressStream(zstdcs->dstream, output, input);
     194              :             if (ZSTD_isError(res))
     195              :                 pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
     196              : 
     197              :             /*
     198              :              * then write the decompressed data to the output handle
     199              :              */
     200              :             ((char *) output->dst)[output->pos] = '\0';
     201              :             ahwrite(output->dst, 1, output->pos, AH);
     202              : 
     203              :             if (res == 0)
     204              :                 break;          /* End of frame */
     205              :         }
     206              :     }
     207              : }
     208              : 
     209              : /* Public routine that supports Zstd compressed data I/O */
     210              : void
     211              : InitCompressorZstd(CompressorState *cs,
     212              :                    const pg_compress_specification compression_spec)
     213              : {
     214              :     ZstdCompressorState *zstdcs;
     215              : 
     216              :     cs->readData = ReadDataFromArchiveZstd;
     217              :     cs->writeData = WriteDataToArchiveZstd;
     218              :     cs->end = EndCompressorZstd;
     219              : 
     220              :     cs->compression_spec = compression_spec;
     221              : 
     222              :     zstdcs = pg_malloc0_object(ZstdCompressorState);
     223              :     cs->private_data = zstdcs;
     224              : 
     225              :     /* We expect that exactly one of readF/writeF is specified */
     226              :     Assert((cs->readF == NULL) != (cs->writeF == NULL));
     227              : 
     228              :     if (cs->readF != NULL)
     229              :     {
     230              :         zstdcs->dstream = ZSTD_createDStream();
     231              :         if (zstdcs->dstream == NULL)
     232              :             pg_fatal("could not initialize compression library");
     233              : 
     234              :         zstdcs->input.size = ZSTD_DStreamInSize();
     235              :         zstdcs->input.src = pg_malloc(zstdcs->input.size);
     236              : 
     237              :         /*
     238              :          * output.size is the buffer size we tell zstd it can output to.
     239              :          * Allocate an additional byte such that ReadDataFromArchiveZstd() can
     240              :          * call ahwrite() with a null-terminated string, which is an optimized
     241              :          * case in ExecuteSqlCommandBuf().
     242              :          */
     243              :         zstdcs->output.size = ZSTD_DStreamOutSize();
     244              :         zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
     245              :     }
     246              :     else if (cs->writeF != NULL)
     247              :     {
     248              :         zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec);
     249              : 
     250              :         zstdcs->output.size = ZSTD_CStreamOutSize();
     251              :         zstdcs->output.dst = pg_malloc(zstdcs->output.size);
     252              :         zstdcs->output.pos = 0;
     253              :     }
     254              : }
     255              : 
     256              : /*
     257              :  * Compressed stream API
     258              :  */
     259              : 
     260              : static size_t
     261              : Zstd_read_internal(void *ptr, size_t size, CompressFileHandle *CFH, bool exit_on_error)
     262              : {
     263              :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     264              :     ZSTD_inBuffer *input = &zstdcs->input;
     265              :     ZSTD_outBuffer *output = &zstdcs->output;
     266              :     size_t      input_allocated_size = ZSTD_DStreamInSize();
     267              :     size_t      res,
     268              :                 cnt;
     269              : 
     270              :     /*
     271              :      * If this is the first call to the reading function, initialize the
     272              :      * required datastructures.
     273              :      */
     274              :     if (zstdcs->dstream == NULL)
     275              :     {
     276              :         zstdcs->input.src = pg_malloc0(input_allocated_size);
     277              :         zstdcs->dstream = ZSTD_createDStream();
     278              :         if (zstdcs->dstream == NULL)
     279              :         {
     280              :             if (exit_on_error)
     281              :                 pg_fatal("could not initialize compression library");
     282              :             return -1;
     283              :         }
     284              :     }
     285              : 
     286              :     output->size = size;
     287              :     output->dst = ptr;
     288              :     output->pos = 0;
     289              : 
     290              :     while (output->pos < output->size)
     291              :     {
     292              :         Assert(input->pos <= input->size);
     293              :         Assert(input->size <= input_allocated_size);
     294              : 
     295              :         /*
     296              :          * If the input is completely consumed, start back at the beginning
     297              :          */
     298              :         if (input->pos == input->size)
     299              :         {
     300              :             /* input->size is size produced by "fread" */
     301              :             input->size = 0;
     302              :             /* input->pos is position consumed by decompress */
     303              :             input->pos = 0;
     304              :         }
     305              : 
     306              :         /* read compressed data if we must produce more input */
     307              :         if (input->pos == input->size)
     308              :         {
     309              :             cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
     310              :             if (ferror(zstdcs->fp))
     311              :             {
     312              :                 if (exit_on_error)
     313              :                     pg_fatal("could not read from input file: %m");
     314              :                 return -1;
     315              :             }
     316              : 
     317              :             input->size = cnt;
     318              : 
     319              :             Assert(cnt <= input_allocated_size);
     320              : 
     321              :             /* If we have no more input to consume, we're done */
     322              :             if (cnt == 0)
     323              :                 break;
     324              :         }
     325              : 
     326              :         while (input->pos < input->size)
     327              :         {
     328              :             /* now decompress */
     329              :             res = ZSTD_decompressStream(zstdcs->dstream, output, input);
     330              : 
     331              :             if (ZSTD_isError(res))
     332              :             {
     333              :                 if (exit_on_error)
     334              :                     pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
     335              :                 return -1;
     336              :             }
     337              : 
     338              :             if (output->pos == output->size)
     339              :                 break;          /* No more room for output */
     340              : 
     341              :             if (res == 0)
     342              :                 break;          /* End of frame */
     343              :         }
     344              :     }
     345              : 
     346              :     return output->pos;
     347              : }
     348              : 
     349              : static void
     350              : Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
     351              : {
     352              :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     353              :     ZSTD_inBuffer *input = &zstdcs->input;
     354              :     ZSTD_outBuffer *output = &zstdcs->output;
     355              :     size_t      res,
     356              :                 cnt;
     357              : 
     358              :     input->src = ptr;
     359              :     input->size = size;
     360              :     input->pos = 0;
     361              : 
     362              :     if (zstdcs->cstream == NULL)
     363              :     {
     364              :         zstdcs->output.size = ZSTD_CStreamOutSize();
     365              :         zstdcs->output.dst = pg_malloc(zstdcs->output.size);
     366              :         zstdcs->output.pos = 0;
     367              :         zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
     368              :         if (zstdcs->cstream == NULL)
     369              :             pg_fatal("could not initialize compression library");
     370              :     }
     371              : 
     372              :     /* Consume all input, to be flushed later */
     373              :     while (input->pos < input->size)
     374              :     {
     375              :         res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
     376              :         if (ZSTD_isError(res))
     377              :             pg_fatal("could not write to file: %s", ZSTD_getErrorName(res));
     378              : 
     379              :         /* Dump output buffer if full */
     380              :         if (output->pos >= output->size)
     381              :         {
     382              :             errno = 0;
     383              :             cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
     384              :             if (cnt != output->pos)
     385              :             {
     386              :                 errno = (errno) ? errno : ENOSPC;
     387              :                 pg_fatal("could not write to file: %m");
     388              :             }
     389              :             output->pos = 0;
     390              :         }
     391              :     }
     392              : }
     393              : 
     394              : static int
     395              : Zstd_getc(CompressFileHandle *CFH)
     396              : {
     397              :     unsigned char ret;
     398              : 
     399              :     if (CFH->read_func(&ret, 1, CFH) != 1)
     400              :         pg_fatal("could not read from input file: end of file");
     401              :     return ret;
     402              : }
     403              : 
     404              : static char *
     405              : Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
     406              : {
     407              :     int         i;
     408              : 
     409              :     Assert(len > 0);
     410              : 
     411              :     /*
     412              :      * Read one byte at a time until newline or EOF. This is only used to read
     413              :      * the list of LOs, and the I/O is buffered anyway.
     414              :      */
     415              :     for (i = 0; i < len - 1; ++i)
     416              :     {
     417              :         if (Zstd_read_internal(&buf[i], 1, CFH, false) != 1)
     418              :             break;
     419              :         if (buf[i] == '\n')
     420              :         {
     421              :             ++i;
     422              :             break;
     423              :         }
     424              :     }
     425              :     buf[i] = '\0';
     426              :     return i > 0 ? buf : NULL;
     427              : }
     428              : 
     429              : static size_t
     430              : Zstd_read(void *ptr, size_t size, CompressFileHandle *CFH)
     431              : {
     432              :     return Zstd_read_internal(ptr, size, CFH, true);
     433              : }
     434              : 
     435              : static bool
     436              : Zstd_close(CompressFileHandle *CFH)
     437              : {
     438              :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     439              :     bool        success = true;
     440              : 
     441              :     if (zstdcs->cstream)
     442              :     {
     443              :         size_t      res,
     444              :                     cnt;
     445              :         ZSTD_inBuffer *input = &zstdcs->input;
     446              :         ZSTD_outBuffer *output = &zstdcs->output;
     447              : 
     448              :         /* Loop until the compression buffers are fully consumed */
     449              :         for (;;)
     450              :         {
     451              :             res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
     452              :             if (ZSTD_isError(res))
     453              :             {
     454              :                 zstdcs->zstderror = ZSTD_getErrorName(res);
     455              :                 success = false;
     456              :                 break;
     457              :             }
     458              : 
     459              :             errno = 0;
     460              :             cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
     461              :             if (cnt != output->pos)
     462              :             {
     463              :                 errno = (errno) ? errno : ENOSPC;
     464              :                 zstdcs->zstderror = strerror(errno);
     465              :                 success = false;
     466              :                 break;
     467              :             }
     468              :             output->pos = 0;
     469              : 
     470              :             if (res == 0)
     471              :                 break;          /* End of frame */
     472              :         }
     473              : 
     474              :         ZSTD_freeCStream(zstdcs->cstream);
     475              :         pg_free(zstdcs->output.dst);
     476              :     }
     477              : 
     478              :     if (zstdcs->dstream)
     479              :     {
     480              :         ZSTD_freeDStream(zstdcs->dstream);
     481              :         pg_free(unconstify(void *, zstdcs->input.src));
     482              :     }
     483              : 
     484              :     errno = 0;
     485              :     if (fclose(zstdcs->fp) != 0)
     486              :     {
     487              :         zstdcs->zstderror = strerror(errno);
     488              :         success = false;
     489              :     }
     490              : 
     491              :     pg_free(zstdcs);
     492              :     CFH->private_data = NULL;
     493              :     return success;
     494              : }
     495              : 
     496              : static bool
     497              : Zstd_eof(CompressFileHandle *CFH)
     498              : {
     499              :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     500              : 
     501              :     return feof(zstdcs->fp);
     502              : }
     503              : 
     504              : static bool
     505              : Zstd_open(const char *path, int fd, const char *mode,
     506              :           CompressFileHandle *CFH)
     507              : {
     508              :     FILE       *fp;
     509              :     ZstdCompressorState *zstdcs;
     510              : 
     511              :     /*
     512              :      * Clear state storage to avoid having the fd point to non-NULL memory on
     513              :      * error return.
     514              :      */
     515              :     CFH->private_data = NULL;
     516              : 
     517              :     zstdcs = (ZstdCompressorState *) pg_malloc_extended(sizeof(*zstdcs),
     518              :                                                         MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
     519              :     if (!zstdcs)
     520              :     {
     521              :         errno = ENOMEM;
     522              :         return false;
     523              :     }
     524              : 
     525              :     if (fd >= 0)
     526              :         fp = fdopen(dup(fd), mode);
     527              :     else
     528              :         fp = fopen(path, mode);
     529              : 
     530              :     if (fp == NULL)
     531              :     {
     532              :         pg_free(zstdcs);
     533              :         return false;
     534              :     }
     535              : 
     536              :     zstdcs->fp = fp;
     537              :     CFH->private_data = zstdcs;
     538              : 
     539              :     return true;
     540              : }
     541              : 
     542              : static bool
     543              : Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
     544              : {
     545              :     char        fname[MAXPGPATH];
     546              : 
     547              :     sprintf(fname, "%s.zst", path);
     548              :     return CFH->open_func(fname, -1, mode, CFH);
     549              : }
     550              : 
     551              : static const char *
     552              : Zstd_get_error(CompressFileHandle *CFH)
     553              : {
     554              :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     555              : 
     556              :     return zstdcs->zstderror;
     557              : }
     558              : 
     559              : void
     560              : InitCompressFileHandleZstd(CompressFileHandle *CFH,
     561              :                            const pg_compress_specification compression_spec)
     562              : {
     563              :     CFH->open_func = Zstd_open;
     564              :     CFH->open_write_func = Zstd_open_write;
     565              :     CFH->read_func = Zstd_read;
     566              :     CFH->write_func = Zstd_write;
     567              :     CFH->gets_func = Zstd_gets;
     568              :     CFH->getc_func = Zstd_getc;
     569              :     CFH->close_func = Zstd_close;
     570              :     CFH->eof_func = Zstd_eof;
     571              :     CFH->get_error_func = Zstd_get_error;
     572              : 
     573              :     CFH->compression_spec = compression_spec;
     574              : 
     575              :     CFH->private_data = NULL;
     576              : }
     577              : 
     578              : #endif                          /* USE_ZSTD */
        

Generated by: LCOV version 2.0-1