LCOV - code coverage report
Current view: top level - src/fe_utils - astreamer_zstd.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 0.0 % 4 0
Test Date: 2026-04-03 11:15:52 Functions: 0.0 % 2 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * astreamer_zstd.c
       4              :  *
       5              :  * Archive streamers that deal with data compressed using zstd.
       6              :  * astreamer_zstd_compressor applies zstd compression to the input stream,
       7              :  * and astreamer_zstd_decompressor does the reverse.
       8              :  *
       9              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      10              :  *
      11              :  * IDENTIFICATION
      12              :  *        src/fe_utils/astreamer_zstd.c
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : 
      16              : #include "postgres_fe.h"
      17              : 
      18              : #include <unistd.h>
      19              : 
      20              : #ifdef USE_ZSTD
      21              : #include <zstd.h>
      22              : #endif
      23              : 
      24              : #include "common/logging.h"
      25              : #include "fe_utils/astreamer.h"
      26              : 
      27              : #ifdef USE_ZSTD
      28              : 
      29              : typedef struct astreamer_zstd_frame
      30              : {
      31              :     astreamer   base;
      32              : 
      33              :     ZSTD_CCtx  *cctx;
      34              :     ZSTD_DCtx  *dctx;
      35              :     ZSTD_outBuffer zstd_outBuf;
      36              : } astreamer_zstd_frame;
      37              : 
      38              : static void astreamer_zstd_compressor_content(astreamer *streamer,
      39              :                                               astreamer_member *member,
      40              :                                               const char *data, int len,
      41              :                                               astreamer_archive_context context);
      42              : static void astreamer_zstd_compressor_finalize(astreamer *streamer);
      43              : static void astreamer_zstd_compressor_free(astreamer *streamer);
      44              : 
      45              : static const astreamer_ops astreamer_zstd_compressor_ops = {
      46              :     .content = astreamer_zstd_compressor_content,
      47              :     .finalize = astreamer_zstd_compressor_finalize,
      48              :     .free = astreamer_zstd_compressor_free
      49              : };
      50              : 
      51              : static void astreamer_zstd_decompressor_content(astreamer *streamer,
      52              :                                                 astreamer_member *member,
      53              :                                                 const char *data, int len,
      54              :                                                 astreamer_archive_context context);
      55              : static void astreamer_zstd_decompressor_finalize(astreamer *streamer);
      56              : static void astreamer_zstd_decompressor_free(astreamer *streamer);
      57              : 
      58              : static const astreamer_ops astreamer_zstd_decompressor_ops = {
      59              :     .content = astreamer_zstd_decompressor_content,
      60              :     .finalize = astreamer_zstd_decompressor_finalize,
      61              :     .free = astreamer_zstd_decompressor_free
      62              : };
      63              : #endif
      64              : 
      65              : /*
      66              :  * Create a new base backup streamer that performs zstd compression of tar
      67              :  * blocks.
      68              :  */
      69              : astreamer *
      70            0 : astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
      71              : {
      72              : #ifdef USE_ZSTD
      73              :     astreamer_zstd_frame *streamer;
      74              :     size_t      ret;
      75              : 
      76              :     Assert(next != NULL);
      77              : 
      78              :     streamer = palloc0_object(astreamer_zstd_frame);
      79              : 
      80              :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
      81              :         &astreamer_zstd_compressor_ops;
      82              : 
      83              :     streamer->base.bbs_next = next;
      84              :     initStringInfo(&streamer->base.bbs_buffer);
      85              :     enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_CStreamOutSize());
      86              : 
      87              :     streamer->cctx = ZSTD_createCCtx();
      88              :     if (!streamer->cctx)
      89              :         pg_fatal("could not create zstd compression context");
      90              : 
      91              :     /* Set compression level */
      92              :     ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
      93              :                                  compress->level);
      94              :     if (ZSTD_isError(ret))
      95              :         pg_fatal("could not set zstd compression level to %d: %s",
      96              :                  compress->level, ZSTD_getErrorName(ret));
      97              : 
      98              :     /* Set # of workers, if specified */
      99              :     if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
     100              :     {
     101              :         /*
     102              :          * On older versions of libzstd, this option does not exist, and
     103              :          * trying to set it will fail. Similarly for newer versions if they
     104              :          * are compiled without threading support.
     105              :          */
     106              :         ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
     107              :                                      compress->workers);
     108              :         if (ZSTD_isError(ret))
     109              :             pg_fatal("could not set compression worker count to %d: %s",
     110              :                      compress->workers, ZSTD_getErrorName(ret));
     111              :     }
     112              : 
     113              :     if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
     114              :     {
     115              :         ret = ZSTD_CCtx_setParameter(streamer->cctx,
     116              :                                      ZSTD_c_enableLongDistanceMatching,
     117              :                                      compress->long_distance);
     118              :         if (ZSTD_isError(ret))
     119              :             pg_fatal("could not enable long-distance mode: %s",
     120              :                      ZSTD_getErrorName(ret));
     121              :     }
     122              : 
     123              :     /* Initialize the ZSTD output buffer. */
     124              :     streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
     125              :     streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
     126              :     streamer->zstd_outBuf.pos = 0;
     127              : 
     128              :     return &streamer->base;
     129              : #else
     130            0 :     pg_fatal("this build does not support compression with %s", "ZSTD");
     131              :     return NULL;                /* keep compiler quiet */
     132              : #endif
     133              : }
     134              : 
     135              : #ifdef USE_ZSTD
     136              : /*
     137              :  * Compress the input data to output buffer.
     138              :  *
     139              :  * Find out the compression bound based on input data length for each
     140              :  * invocation to make sure that output buffer has enough capacity to
     141              :  * accommodate the compressed data. In case if the output buffer
     142              :  * capacity falls short of compression bound then forward the content
     143              :  * of output buffer to next streamer and empty the buffer.
     144              :  */
     145              : static void
     146              : astreamer_zstd_compressor_content(astreamer *streamer,
     147              :                                   astreamer_member *member,
     148              :                                   const char *data, int len,
     149              :                                   astreamer_archive_context context)
     150              : {
     151              :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     152              :     ZSTD_inBuffer inBuf = {data, len, 0};
     153              : 
     154              :     while (inBuf.pos < inBuf.size)
     155              :     {
     156              :         size_t      yet_to_flush;
     157              :         size_t      max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
     158              : 
     159              :         /*
     160              :          * If the output buffer is not left with enough space, send the
     161              :          * compressed bytes to the next streamer, and empty the buffer.
     162              :          */
     163              :         if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
     164              :             max_needed)
     165              :         {
     166              :             astreamer_content(mystreamer->base.bbs_next, member,
     167              :                               mystreamer->zstd_outBuf.dst,
     168              :                               mystreamer->zstd_outBuf.pos,
     169              :                               context);
     170              : 
     171              :             /* Reset the ZSTD output buffer. */
     172              :             mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
     173              :             mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
     174              :             mystreamer->zstd_outBuf.pos = 0;
     175              :         }
     176              : 
     177              :         yet_to_flush =
     178              :             ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
     179              :                                  &inBuf, ZSTD_e_continue);
     180              : 
     181              :         if (ZSTD_isError(yet_to_flush))
     182              :             pg_fatal("could not compress data: %s",
     183              :                      ZSTD_getErrorName(yet_to_flush));
     184              :     }
     185              : }
     186              : 
     187              : /*
     188              :  * End-of-stream processing.
     189              :  */
     190              : static void
     191              : astreamer_zstd_compressor_finalize(astreamer *streamer)
     192              : {
     193              :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     194              :     size_t      yet_to_flush;
     195              : 
     196              :     do
     197              :     {
     198              :         ZSTD_inBuffer in = {NULL, 0, 0};
     199              :         size_t      max_needed = ZSTD_compressBound(0);
     200              : 
     201              :         /*
     202              :          * If the output buffer is not left with enough space, send the
     203              :          * compressed bytes to the next streamer, and empty the buffer.
     204              :          */
     205              :         if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
     206              :             max_needed)
     207              :         {
     208              :             astreamer_content(mystreamer->base.bbs_next, NULL,
     209              :                               mystreamer->zstd_outBuf.dst,
     210              :                               mystreamer->zstd_outBuf.pos,
     211              :                               ASTREAMER_UNKNOWN);
     212              : 
     213              :             /* Reset the ZSTD output buffer. */
     214              :             mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
     215              :             mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
     216              :             mystreamer->zstd_outBuf.pos = 0;
     217              :         }
     218              : 
     219              :         yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
     220              :                                             &mystreamer->zstd_outBuf,
     221              :                                             &in, ZSTD_e_end);
     222              : 
     223              :         if (ZSTD_isError(yet_to_flush))
     224              :             pg_fatal("could not compress data: %s",
     225              :                      ZSTD_getErrorName(yet_to_flush));
     226              : 
     227              :     } while (yet_to_flush > 0);
     228              : 
     229              :     /* Make sure to pass any remaining bytes to the next streamer. */
     230              :     if (mystreamer->zstd_outBuf.pos > 0)
     231              :         astreamer_content(mystreamer->base.bbs_next, NULL,
     232              :                           mystreamer->zstd_outBuf.dst,
     233              :                           mystreamer->zstd_outBuf.pos,
     234              :                           ASTREAMER_UNKNOWN);
     235              : 
     236              :     astreamer_finalize(mystreamer->base.bbs_next);
     237              : }
     238              : 
     239              : /*
     240              :  * Free memory.
     241              :  */
     242              : static void
     243              : astreamer_zstd_compressor_free(astreamer *streamer)
     244              : {
     245              :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     246              : 
     247              :     astreamer_free(streamer->bbs_next);
     248              :     ZSTD_freeCCtx(mystreamer->cctx);
     249              :     pfree(streamer->bbs_buffer.data);
     250              :     pfree(streamer);
     251              : }
     252              : #endif
     253              : 
     254              : /*
     255              :  * Create a new base backup streamer that performs decompression of zstd
     256              :  * compressed blocks.
     257              :  */
     258              : astreamer *
     259            0 : astreamer_zstd_decompressor_new(astreamer *next)
     260              : {
     261              : #ifdef USE_ZSTD
     262              :     astreamer_zstd_frame *streamer;
     263              : 
     264              :     Assert(next != NULL);
     265              : 
     266              :     streamer = palloc0_object(astreamer_zstd_frame);
     267              :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
     268              :         &astreamer_zstd_decompressor_ops;
     269              : 
     270              :     streamer->base.bbs_next = next;
     271              :     initStringInfo(&streamer->base.bbs_buffer);
     272              :     enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
     273              : 
     274              :     streamer->dctx = ZSTD_createDCtx();
     275              :     if (!streamer->dctx)
     276              :         pg_fatal("could not create zstd decompression context");
     277              : 
     278              :     /* Initialize the ZSTD output buffer. */
     279              :     streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
     280              :     streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
     281              :     streamer->zstd_outBuf.pos = 0;
     282              : 
     283              :     return &streamer->base;
     284              : #else
     285            0 :     pg_fatal("this build does not support compression with %s", "ZSTD");
     286              :     return NULL;                /* keep compiler quiet */
     287              : #endif
     288              : }
     289              : 
     290              : #ifdef USE_ZSTD
     291              : /*
     292              :  * Decompress the input data to output buffer until we run out of input
     293              :  * data. Each time the output buffer is full, pass on the decompressed data
     294              :  * to the next streamer.
     295              :  */
     296              : static void
     297              : astreamer_zstd_decompressor_content(astreamer *streamer,
     298              :                                     astreamer_member *member,
     299              :                                     const char *data, int len,
     300              :                                     astreamer_archive_context context)
     301              : {
     302              :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     303              :     ZSTD_inBuffer inBuf = {data, len, 0};
     304              : 
     305              :     while (inBuf.pos < inBuf.size)
     306              :     {
     307              :         size_t      ret;
     308              : 
     309              :         /*
     310              :          * If output buffer is full then forward the content to next streamer
     311              :          * and update the output buffer.
     312              :          */
     313              :         if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
     314              :         {
     315              :             astreamer_content(mystreamer->base.bbs_next, member,
     316              :                               mystreamer->zstd_outBuf.dst,
     317              :                               mystreamer->zstd_outBuf.pos,
     318              :                               context);
     319              : 
     320              :             /* Reset the ZSTD output buffer. */
     321              :             mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
     322              :             mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
     323              :             mystreamer->zstd_outBuf.pos = 0;
     324              :         }
     325              : 
     326              :         ret = ZSTD_decompressStream(mystreamer->dctx,
     327              :                                     &mystreamer->zstd_outBuf, &inBuf);
     328              : 
     329              :         if (ZSTD_isError(ret))
     330              :             pg_fatal("could not decompress data: %s",
     331              :                      ZSTD_getErrorName(ret));
     332              :     }
     333              : }
     334              : 
     335              : /*
     336              :  * End-of-stream processing.
     337              :  */
     338              : static void
     339              : astreamer_zstd_decompressor_finalize(astreamer *streamer)
     340              : {
     341              :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     342              : 
     343              :     /*
     344              :      * End of the stream, if there is some pending data in output buffers then
     345              :      * we must forward it to next streamer.
     346              :      */
     347              :     if (mystreamer->zstd_outBuf.pos > 0)
     348              :         astreamer_content(mystreamer->base.bbs_next, NULL,
     349              :                           mystreamer->base.bbs_buffer.data,
     350              :                           mystreamer->zstd_outBuf.pos,
     351              :                           ASTREAMER_UNKNOWN);
     352              : 
     353              :     astreamer_finalize(mystreamer->base.bbs_next);
     354              : }
     355              : 
     356              : /*
     357              :  * Free memory.
     358              :  */
     359              : static void
     360              : astreamer_zstd_decompressor_free(astreamer *streamer)
     361              : {
     362              :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     363              : 
     364              :     astreamer_free(streamer->bbs_next);
     365              :     ZSTD_freeDCtx(mystreamer->dctx);
     366              :     pfree(streamer->bbs_buffer.data);
     367              :     pfree(streamer);
     368              : }
     369              : #endif
        

Generated by: LCOV version 2.0-1