LCOV - code coverage report
Current view: top level - src/fe_utils - astreamer_zstd.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 0.0 % 4 0
Test Date: 2026-03-01 18:15:11 Functions: 0.0 % 2 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * astreamer_zstd.c
       4              :  *
       5              :  * Archive streamers that deal with data compressed using zstd.
       6              :  * astreamer_zstd_compressor applies zstd compression to the input stream,
       7              :  * and astreamer_zstd_decompressor does the reverse.
       8              :  *
       9              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      10              :  *
      11              :  * IDENTIFICATION
      12              :  *        src/fe_utils/astreamer_zstd.c
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : 
      16              : #include "postgres_fe.h"
      17              : 
      18              : #include <unistd.h>
      19              : 
      20              : #ifdef USE_ZSTD
      21              : #include <zstd.h>
      22              : #endif
      23              : 
      24              : #include "common/logging.h"
      25              : #include "fe_utils/astreamer.h"
      26              : 
      27              : #ifdef USE_ZSTD
      28              : 
      29              : typedef struct astreamer_zstd_frame
      30              : {
      31              :     astreamer   base;
      32              : 
      33              :     ZSTD_CCtx  *cctx;
      34              :     ZSTD_DCtx  *dctx;
      35              :     ZSTD_outBuffer zstd_outBuf;
      36              : } astreamer_zstd_frame;
      37              : 
      38              : static void astreamer_zstd_compressor_content(astreamer *streamer,
      39              :                                               astreamer_member *member,
      40              :                                               const char *data, int len,
      41              :                                               astreamer_archive_context context);
      42              : static void astreamer_zstd_compressor_finalize(astreamer *streamer);
      43              : static void astreamer_zstd_compressor_free(astreamer *streamer);
      44              : 
      45              : static const astreamer_ops astreamer_zstd_compressor_ops = {
      46              :     .content = astreamer_zstd_compressor_content,
      47              :     .finalize = astreamer_zstd_compressor_finalize,
      48              :     .free = astreamer_zstd_compressor_free
      49              : };
      50              : 
      51              : static void astreamer_zstd_decompressor_content(astreamer *streamer,
      52              :                                                 astreamer_member *member,
      53              :                                                 const char *data, int len,
      54              :                                                 astreamer_archive_context context);
      55              : static void astreamer_zstd_decompressor_finalize(astreamer *streamer);
      56              : static void astreamer_zstd_decompressor_free(astreamer *streamer);
      57              : 
      58              : static const astreamer_ops astreamer_zstd_decompressor_ops = {
      59              :     .content = astreamer_zstd_decompressor_content,
      60              :     .finalize = astreamer_zstd_decompressor_finalize,
      61              :     .free = astreamer_zstd_decompressor_free
      62              : };
      63              : #endif
      64              : 
      65              : /*
      66              :  * Create a new base backup streamer that performs zstd compression of tar
      67              :  * blocks.
      68              :  */
      69              : astreamer *
      70            0 : astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
      71              : {
      72              : #ifdef USE_ZSTD
      73              :     astreamer_zstd_frame *streamer;
      74              :     size_t      ret;
      75              : 
      76              :     Assert(next != NULL);
      77              : 
      78              :     streamer = palloc0_object(astreamer_zstd_frame);
      79              : 
      80              :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
      81              :         &astreamer_zstd_compressor_ops;
      82              : 
      83              :     streamer->base.bbs_next = next;
      84              :     initStringInfo(&streamer->base.bbs_buffer);
      85              :     enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
      86              : 
      87              :     streamer->cctx = ZSTD_createCCtx();
      88              :     if (!streamer->cctx)
      89              :         pg_fatal("could not create zstd compression context");
      90              : 
      91              :     /* Set compression level */
      92              :     ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
      93              :                                  compress->level);
      94              :     if (ZSTD_isError(ret))
      95              :         pg_fatal("could not set zstd compression level to %d: %s",
      96              :                  compress->level, ZSTD_getErrorName(ret));
      97              : 
      98              :     /* Set # of workers, if specified */
      99              :     if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
     100              :     {
     101              :         /*
     102              :          * On older versions of libzstd, this option does not exist, and
     103              :          * trying to set it will fail. Similarly for newer versions if they
     104              :          * are compiled without threading support.
     105              :          */
     106              :         ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
     107              :                                      compress->workers);
     108              :         if (ZSTD_isError(ret))
     109              :             pg_fatal("could not set compression worker count to %d: %s",
     110              :                      compress->workers, ZSTD_getErrorName(ret));
     111              :     }
     112              : 
     113              :     if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
     114              :     {
     115              :         ret = ZSTD_CCtx_setParameter(streamer->cctx,
     116              :                                      ZSTD_c_enableLongDistanceMatching,
     117              :                                      compress->long_distance);
     118              :         if (ZSTD_isError(ret))
     119              :         {
     120              :             pg_log_error("could not enable long-distance mode: %s",
     121              :                          ZSTD_getErrorName(ret));
     122              :             exit(1);
     123              :         }
     124              :     }
     125              : 
     126              :     /* Initialize the ZSTD output buffer. */
     127              :     streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
     128              :     streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
     129              :     streamer->zstd_outBuf.pos = 0;
     130              : 
     131              :     return &streamer->base;
     132              : #else
     133            0 :     pg_fatal("this build does not support compression with %s", "ZSTD");
     134              :     return NULL;                /* keep compiler quiet */
     135              : #endif
     136              : }
     137              : 
     138              : #ifdef USE_ZSTD
     139              : /*
     140              :  * Compress the input data to output buffer.
     141              :  *
     142              :  * Find out the compression bound based on input data length for each
     143              :  * invocation to make sure that output buffer has enough capacity to
     144              :  * accommodate the compressed data. In case if the output buffer
     145              :  * capacity falls short of compression bound then forward the content
     146              :  * of output buffer to next streamer and empty the buffer.
     147              :  */
     148              : static void
     149              : astreamer_zstd_compressor_content(astreamer *streamer,
     150              :                                   astreamer_member *member,
     151              :                                   const char *data, int len,
     152              :                                   astreamer_archive_context context)
     153              : {
     154              :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     155              :     ZSTD_inBuffer inBuf = {data, len, 0};
     156              : 
     157              :     while (inBuf.pos < inBuf.size)
     158              :     {
     159              :         size_t      yet_to_flush;
     160              :         size_t      max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
     161              : 
     162              :         /*
     163              :          * If the output buffer is not left with enough space, send the
     164              :          * compressed bytes to the next streamer, and empty the buffer.
     165              :          */
     166              :         if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
     167              :             max_needed)
     168              :         {
     169              :             astreamer_content(mystreamer->base.bbs_next, member,
     170              :                               mystreamer->zstd_outBuf.dst,
     171              :                               mystreamer->zstd_outBuf.pos,
     172              :                               context);
     173              : 
     174              :             /* Reset the ZSTD output buffer. */
     175              :             mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
     176              :             mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
     177              :             mystreamer->zstd_outBuf.pos = 0;
     178              :         }
     179              : 
     180              :         yet_to_flush =
     181              :             ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
     182              :                                  &inBuf, ZSTD_e_continue);
     183              : 
     184              :         if (ZSTD_isError(yet_to_flush))
     185              :             pg_log_error("could not compress data: %s",
     186              :                          ZSTD_getErrorName(yet_to_flush));
     187              :     }
     188              : }
     189              : 
     190              : /*
     191              :  * End-of-stream processing.
     192              :  */
     193              : static void
     194              : astreamer_zstd_compressor_finalize(astreamer *streamer)
     195              : {
     196              :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     197              :     size_t      yet_to_flush;
     198              : 
     199              :     do
     200              :     {
     201              :         ZSTD_inBuffer in = {NULL, 0, 0};
     202              :         size_t      max_needed = ZSTD_compressBound(0);
     203              : 
     204              :         /*
     205              :          * If the output buffer is not left with enough space, send the
     206              :          * compressed bytes to the next streamer, and empty the buffer.
     207              :          */
     208              :         if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
     209              :             max_needed)
     210              :         {
     211              :             astreamer_content(mystreamer->base.bbs_next, NULL,
     212              :                               mystreamer->zstd_outBuf.dst,
     213              :                               mystreamer->zstd_outBuf.pos,
     214              :                               ASTREAMER_UNKNOWN);
     215              : 
     216              :             /* Reset the ZSTD output buffer. */
     217              :             mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
     218              :             mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
     219              :             mystreamer->zstd_outBuf.pos = 0;
     220              :         }
     221              : 
     222              :         yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
     223              :                                             &mystreamer->zstd_outBuf,
     224              :                                             &in, ZSTD_e_end);
     225              : 
     226              :         if (ZSTD_isError(yet_to_flush))
     227              :             pg_log_error("could not compress data: %s",
     228              :                          ZSTD_getErrorName(yet_to_flush));
     229              : 
     230              :     } while (yet_to_flush > 0);
     231              : 
     232              :     /* Make sure to pass any remaining bytes to the next streamer. */
     233              :     if (mystreamer->zstd_outBuf.pos > 0)
     234              :         astreamer_content(mystreamer->base.bbs_next, NULL,
     235              :                           mystreamer->zstd_outBuf.dst,
     236              :                           mystreamer->zstd_outBuf.pos,
     237              :                           ASTREAMER_UNKNOWN);
     238              : 
     239              :     astreamer_finalize(mystreamer->base.bbs_next);
     240              : }
     241              : 
     242              : /*
     243              :  * Free memory.
     244              :  */
     245              : static void
     246              : astreamer_zstd_compressor_free(astreamer *streamer)
     247              : {
     248              :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     249              : 
     250              :     astreamer_free(streamer->bbs_next);
     251              :     ZSTD_freeCCtx(mystreamer->cctx);
     252              :     pfree(streamer->bbs_buffer.data);
     253              :     pfree(streamer);
     254              : }
     255              : #endif
     256              : 
     257              : /*
     258              :  * Create a new base backup streamer that performs decompression of zstd
     259              :  * compressed blocks.
     260              :  */
     261              : astreamer *
     262            0 : astreamer_zstd_decompressor_new(astreamer *next)
     263              : {
     264              : #ifdef USE_ZSTD
     265              :     astreamer_zstd_frame *streamer;
     266              : 
     267              :     Assert(next != NULL);
     268              : 
     269              :     streamer = palloc0_object(astreamer_zstd_frame);
     270              :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
     271              :         &astreamer_zstd_decompressor_ops;
     272              : 
     273              :     streamer->base.bbs_next = next;
     274              :     initStringInfo(&streamer->base.bbs_buffer);
     275              :     enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
     276              : 
     277              :     streamer->dctx = ZSTD_createDCtx();
     278              :     if (!streamer->dctx)
     279              :         pg_fatal("could not create zstd decompression context");
     280              : 
     281              :     /* Initialize the ZSTD output buffer. */
     282              :     streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
     283              :     streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
     284              :     streamer->zstd_outBuf.pos = 0;
     285              : 
     286              :     return &streamer->base;
     287              : #else
     288            0 :     pg_fatal("this build does not support compression with %s", "ZSTD");
     289              :     return NULL;                /* keep compiler quiet */
     290              : #endif
     291              : }
     292              : 
     293              : #ifdef USE_ZSTD
     294              : /*
     295              :  * Decompress the input data to output buffer until we run out of input
     296              :  * data. Each time the output buffer is full, pass on the decompressed data
     297              :  * to the next streamer.
     298              :  */
     299              : static void
     300              : astreamer_zstd_decompressor_content(astreamer *streamer,
     301              :                                     astreamer_member *member,
     302              :                                     const char *data, int len,
     303              :                                     astreamer_archive_context context)
     304              : {
     305              :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     306              :     ZSTD_inBuffer inBuf = {data, len, 0};
     307              : 
     308              :     while (inBuf.pos < inBuf.size)
     309              :     {
     310              :         size_t      ret;
     311              : 
     312              :         /*
     313              :          * If output buffer is full then forward the content to next streamer
     314              :          * and update the output buffer.
     315              :          */
     316              :         if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
     317              :         {
     318              :             astreamer_content(mystreamer->base.bbs_next, member,
     319              :                               mystreamer->zstd_outBuf.dst,
     320              :                               mystreamer->zstd_outBuf.pos,
     321              :                               context);
     322              : 
     323              :             /* Reset the ZSTD output buffer. */
     324              :             mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
     325              :             mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
     326              :             mystreamer->zstd_outBuf.pos = 0;
     327              :         }
     328              : 
     329              :         ret = ZSTD_decompressStream(mystreamer->dctx,
     330              :                                     &mystreamer->zstd_outBuf, &inBuf);
     331              : 
     332              :         if (ZSTD_isError(ret))
     333              :             pg_log_error("could not decompress data: %s",
     334              :                          ZSTD_getErrorName(ret));
     335              :     }
     336              : }
     337              : 
     338              : /*
     339              :  * End-of-stream processing.
     340              :  */
     341              : static void
     342              : astreamer_zstd_decompressor_finalize(astreamer *streamer)
     343              : {
     344              :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     345              : 
     346              :     /*
     347              :      * End of the stream, if there is some pending data in output buffers then
     348              :      * we must forward it to next streamer.
     349              :      */
     350              :     if (mystreamer->zstd_outBuf.pos > 0)
     351              :         astreamer_content(mystreamer->base.bbs_next, NULL,
     352              :                           mystreamer->base.bbs_buffer.data,
     353              :                           mystreamer->base.bbs_buffer.maxlen,
     354              :                           ASTREAMER_UNKNOWN);
     355              : 
     356              :     astreamer_finalize(mystreamer->base.bbs_next);
     357              : }
     358              : 
     359              : /*
     360              :  * Free memory.
     361              :  */
     362              : static void
     363              : astreamer_zstd_decompressor_free(astreamer *streamer)
     364              : {
     365              :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     366              : 
     367              :     astreamer_free(streamer->bbs_next);
     368              :     ZSTD_freeDCtx(mystreamer->dctx);
     369              :     pfree(streamer->bbs_buffer.data);
     370              :     pfree(streamer);
     371              : }
     372              : #endif
        

Generated by: LCOV version 2.0-1