LCOV - code coverage report
Current view: top level - src/fe_utils - astreamer_zstd.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 0 4 0.0 %
Date: 2025-01-18 03:14:54 Functions: 0 2 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * astreamer_zstd.c
       4             :  *
       5             :  * Archive streamers that deal with data compressed using zstd.
       6             :  * astreamer_zstd_compressor applies lz4 compression to the input stream,
       7             :  * and astreamer_zstd_decompressor does the reverse.
       8             :  *
       9             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      10             :  *
      11             :  * IDENTIFICATION
      12             :  *        src/fe_utils/astreamer_zstd.c
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres_fe.h"
      17             : 
      18             : #include <unistd.h>
      19             : 
      20             : #ifdef USE_ZSTD
      21             : #include <zstd.h>
      22             : #endif
      23             : 
      24             : #include "common/logging.h"
      25             : #include "fe_utils/astreamer.h"
      26             : 
      27             : #ifdef USE_ZSTD
      28             : 
      29             : typedef struct astreamer_zstd_frame
      30             : {
      31             :     astreamer   base;
      32             : 
      33             :     ZSTD_CCtx  *cctx;
      34             :     ZSTD_DCtx  *dctx;
      35             :     ZSTD_outBuffer zstd_outBuf;
      36             : } astreamer_zstd_frame;
      37             : 
      38             : static void astreamer_zstd_compressor_content(astreamer *streamer,
      39             :                                               astreamer_member *member,
      40             :                                               const char *data, int len,
      41             :                                               astreamer_archive_context context);
      42             : static void astreamer_zstd_compressor_finalize(astreamer *streamer);
      43             : static void astreamer_zstd_compressor_free(astreamer *streamer);
      44             : 
      45             : static const astreamer_ops astreamer_zstd_compressor_ops = {
      46             :     .content = astreamer_zstd_compressor_content,
      47             :     .finalize = astreamer_zstd_compressor_finalize,
      48             :     .free = astreamer_zstd_compressor_free
      49             : };
      50             : 
      51             : static void astreamer_zstd_decompressor_content(astreamer *streamer,
      52             :                                                 astreamer_member *member,
      53             :                                                 const char *data, int len,
      54             :                                                 astreamer_archive_context context);
      55             : static void astreamer_zstd_decompressor_finalize(astreamer *streamer);
      56             : static void astreamer_zstd_decompressor_free(astreamer *streamer);
      57             : 
      58             : static const astreamer_ops astreamer_zstd_decompressor_ops = {
      59             :     .content = astreamer_zstd_decompressor_content,
      60             :     .finalize = astreamer_zstd_decompressor_finalize,
      61             :     .free = astreamer_zstd_decompressor_free
      62             : };
      63             : #endif
      64             : 
      65             : /*
      66             :  * Create a new base backup streamer that performs zstd compression of tar
      67             :  * blocks.
      68             :  */
      69             : astreamer *
      70           0 : astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
      71             : {
      72             : #ifdef USE_ZSTD
      73             :     astreamer_zstd_frame *streamer;
      74             :     size_t      ret;
      75             : 
      76             :     Assert(next != NULL);
      77             : 
      78             :     streamer = palloc0(sizeof(astreamer_zstd_frame));
      79             : 
      80             :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
      81             :         &astreamer_zstd_compressor_ops;
      82             : 
      83             :     streamer->base.bbs_next = next;
      84             :     initStringInfo(&streamer->base.bbs_buffer);
      85             :     enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
      86             : 
      87             :     streamer->cctx = ZSTD_createCCtx();
      88             :     if (!streamer->cctx)
      89             :         pg_fatal("could not create zstd compression context");
      90             : 
      91             :     /* Set compression level */
      92             :     ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
      93             :                                  compress->level);
      94             :     if (ZSTD_isError(ret))
      95             :         pg_fatal("could not set zstd compression level to %d: %s",
      96             :                  compress->level, ZSTD_getErrorName(ret));
      97             : 
      98             :     /* Set # of workers, if specified */
      99             :     if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
     100             :     {
     101             :         /*
     102             :          * On older versions of libzstd, this option does not exist, and
     103             :          * trying to set it will fail. Similarly for newer versions if they
     104             :          * are compiled without threading support.
     105             :          */
     106             :         ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
     107             :                                      compress->workers);
     108             :         if (ZSTD_isError(ret))
     109             :             pg_fatal("could not set compression worker count to %d: %s",
     110             :                      compress->workers, ZSTD_getErrorName(ret));
     111             :     }
     112             : 
     113             :     if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
     114             :     {
     115             :         ret = ZSTD_CCtx_setParameter(streamer->cctx,
     116             :                                      ZSTD_c_enableLongDistanceMatching,
     117             :                                      compress->long_distance);
     118             :         if (ZSTD_isError(ret))
     119             :         {
     120             :             pg_log_error("could not enable long-distance mode: %s",
     121             :                          ZSTD_getErrorName(ret));
     122             :             exit(1);
     123             :         }
     124             :     }
     125             : 
     126             :     /* Initialize the ZSTD output buffer. */
     127             :     streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
     128             :     streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
     129             :     streamer->zstd_outBuf.pos = 0;
     130             : 
     131             :     return &streamer->base;
     132             : #else
     133           0 :     pg_fatal("this build does not support compression with %s", "ZSTD");
     134             :     return NULL;                /* keep compiler quiet */
     135             : #endif
     136             : }
     137             : 
     138             : #ifdef USE_ZSTD
     139             : /*
     140             :  * Compress the input data to output buffer.
     141             :  *
     142             :  * Find out the compression bound based on input data length for each
     143             :  * invocation to make sure that output buffer has enough capacity to
     144             :  * accommodate the compressed data. In case if the output buffer
     145             :  * capacity falls short of compression bound then forward the content
     146             :  * of output buffer to next streamer and empty the buffer.
     147             :  */
     148             : static void
     149             : astreamer_zstd_compressor_content(astreamer *streamer,
     150             :                                   astreamer_member *member,
     151             :                                   const char *data, int len,
     152             :                                   astreamer_archive_context context)
     153             : {
     154             :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     155             :     ZSTD_inBuffer inBuf = {data, len, 0};
     156             : 
     157             :     while (inBuf.pos < inBuf.size)
     158             :     {
     159             :         size_t      yet_to_flush;
     160             :         size_t      max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
     161             : 
     162             :         /*
     163             :          * If the output buffer is not left with enough space, send the
     164             :          * compressed bytes to the next streamer, and empty the buffer.
     165             :          */
     166             :         if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
     167             :             max_needed)
     168             :         {
     169             :             astreamer_content(mystreamer->base.bbs_next, member,
     170             :                               mystreamer->zstd_outBuf.dst,
     171             :                               mystreamer->zstd_outBuf.pos,
     172             :                               context);
     173             : 
     174             :             /* Reset the ZSTD output buffer. */
     175             :             mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
     176             :             mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
     177             :             mystreamer->zstd_outBuf.pos = 0;
     178             :         }
     179             : 
     180             :         yet_to_flush =
     181             :             ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
     182             :                                  &inBuf, ZSTD_e_continue);
     183             : 
     184             :         if (ZSTD_isError(yet_to_flush))
     185             :             pg_log_error("could not compress data: %s",
     186             :                          ZSTD_getErrorName(yet_to_flush));
     187             :     }
     188             : }
     189             : 
     190             : /*
     191             :  * End-of-stream processing.
     192             :  */
     193             : static void
     194             : astreamer_zstd_compressor_finalize(astreamer *streamer)
     195             : {
     196             :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     197             :     size_t      yet_to_flush;
     198             : 
     199             :     do
     200             :     {
     201             :         ZSTD_inBuffer in = {NULL, 0, 0};
     202             :         size_t      max_needed = ZSTD_compressBound(0);
     203             : 
     204             :         /*
     205             :          * If the output buffer is not left with enough space, send the
     206             :          * compressed bytes to the next streamer, and empty the buffer.
     207             :          */
     208             :         if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
     209             :             max_needed)
     210             :         {
     211             :             astreamer_content(mystreamer->base.bbs_next, NULL,
     212             :                               mystreamer->zstd_outBuf.dst,
     213             :                               mystreamer->zstd_outBuf.pos,
     214             :                               ASTREAMER_UNKNOWN);
     215             : 
     216             :             /* Reset the ZSTD output buffer. */
     217             :             mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
     218             :             mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
     219             :             mystreamer->zstd_outBuf.pos = 0;
     220             :         }
     221             : 
     222             :         yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
     223             :                                             &mystreamer->zstd_outBuf,
     224             :                                             &in, ZSTD_e_end);
     225             : 
     226             :         if (ZSTD_isError(yet_to_flush))
     227             :             pg_log_error("could not compress data: %s",
     228             :                          ZSTD_getErrorName(yet_to_flush));
     229             : 
     230             :     } while (yet_to_flush > 0);
     231             : 
     232             :     /* Make sure to pass any remaining bytes to the next streamer. */
     233             :     if (mystreamer->zstd_outBuf.pos > 0)
     234             :         astreamer_content(mystreamer->base.bbs_next, NULL,
     235             :                           mystreamer->zstd_outBuf.dst,
     236             :                           mystreamer->zstd_outBuf.pos,
     237             :                           ASTREAMER_UNKNOWN);
     238             : 
     239             :     astreamer_finalize(mystreamer->base.bbs_next);
     240             : }
     241             : 
     242             : /*
     243             :  * Free memory.
     244             :  */
     245             : static void
     246             : astreamer_zstd_compressor_free(astreamer *streamer)
     247             : {
     248             :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     249             : 
     250             :     astreamer_free(streamer->bbs_next);
     251             :     ZSTD_freeCCtx(mystreamer->cctx);
     252             :     pfree(streamer->bbs_buffer.data);
     253             :     pfree(streamer);
     254             : }
     255             : #endif
     256             : 
     257             : /*
     258             :  * Create a new base backup streamer that performs decompression of zstd
     259             :  * compressed blocks.
     260             :  */
     261             : astreamer *
     262           0 : astreamer_zstd_decompressor_new(astreamer *next)
     263             : {
     264             : #ifdef USE_ZSTD
     265             :     astreamer_zstd_frame *streamer;
     266             : 
     267             :     Assert(next != NULL);
     268             : 
     269             :     streamer = palloc0(sizeof(astreamer_zstd_frame));
     270             :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
     271             :         &astreamer_zstd_decompressor_ops;
     272             : 
     273             :     streamer->base.bbs_next = next;
     274             :     initStringInfo(&streamer->base.bbs_buffer);
     275             :     enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
     276             : 
     277             :     streamer->dctx = ZSTD_createDCtx();
     278             :     if (!streamer->dctx)
     279             :         pg_fatal("could not create zstd decompression context");
     280             : 
     281             :     /* Initialize the ZSTD output buffer. */
     282             :     streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
     283             :     streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
     284             :     streamer->zstd_outBuf.pos = 0;
     285             : 
     286             :     return &streamer->base;
     287             : #else
     288           0 :     pg_fatal("this build does not support compression with %s", "ZSTD");
     289             :     return NULL;                /* keep compiler quiet */
     290             : #endif
     291             : }
     292             : 
     293             : #ifdef USE_ZSTD
     294             : /*
     295             :  * Decompress the input data to output buffer until we run out of input
     296             :  * data. Each time the output buffer is full, pass on the decompressed data
     297             :  * to the next streamer.
     298             :  */
     299             : static void
     300             : astreamer_zstd_decompressor_content(astreamer *streamer,
     301             :                                     astreamer_member *member,
     302             :                                     const char *data, int len,
     303             :                                     astreamer_archive_context context)
     304             : {
     305             :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     306             :     ZSTD_inBuffer inBuf = {data, len, 0};
     307             : 
     308             :     while (inBuf.pos < inBuf.size)
     309             :     {
     310             :         size_t      ret;
     311             : 
     312             :         /*
     313             :          * If output buffer is full then forward the content to next streamer
     314             :          * and update the output buffer.
     315             :          */
     316             :         if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
     317             :         {
     318             :             astreamer_content(mystreamer->base.bbs_next, member,
     319             :                               mystreamer->zstd_outBuf.dst,
     320             :                               mystreamer->zstd_outBuf.pos,
     321             :                               context);
     322             : 
     323             :             /* Reset the ZSTD output buffer. */
     324             :             mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
     325             :             mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
     326             :             mystreamer->zstd_outBuf.pos = 0;
     327             :         }
     328             : 
     329             :         ret = ZSTD_decompressStream(mystreamer->dctx,
     330             :                                     &mystreamer->zstd_outBuf, &inBuf);
     331             : 
     332             :         if (ZSTD_isError(ret))
     333             :             pg_log_error("could not decompress data: %s",
     334             :                          ZSTD_getErrorName(ret));
     335             :     }
     336             : }
     337             : 
     338             : /*
     339             :  * End-of-stream processing.
     340             :  */
     341             : static void
     342             : astreamer_zstd_decompressor_finalize(astreamer *streamer)
     343             : {
     344             :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     345             : 
     346             :     /*
     347             :      * End of the stream, if there is some pending data in output buffers then
     348             :      * we must forward it to next streamer.
     349             :      */
     350             :     if (mystreamer->zstd_outBuf.pos > 0)
     351             :         astreamer_content(mystreamer->base.bbs_next, NULL,
     352             :                           mystreamer->base.bbs_buffer.data,
     353             :                           mystreamer->base.bbs_buffer.maxlen,
     354             :                           ASTREAMER_UNKNOWN);
     355             : 
     356             :     astreamer_finalize(mystreamer->base.bbs_next);
     357             : }
     358             : 
     359             : /*
     360             :  * Free memory.
     361             :  */
     362             : static void
     363             : astreamer_zstd_decompressor_free(astreamer *streamer)
     364             : {
     365             :     astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     366             : 
     367             :     astreamer_free(streamer->bbs_next);
     368             :     ZSTD_freeDCtx(mystreamer->dctx);
     369             :     pfree(streamer->bbs_buffer.data);
     370             :     pfree(streamer);
     371             : }
     372             : #endif

Generated by: LCOV version 1.14