LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - bbstreamer_zstd.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17beta1 Lines: 0 4 0.0 %
Date: 2024-06-14 17:13:31 Functions: 0 2 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * bbstreamer_zstd.c
       4             :  *
       5             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *        src/bin/pg_basebackup/bbstreamer_zstd.c
       9             :  *-------------------------------------------------------------------------
      10             :  */
      11             : 
      12             : #include "postgres_fe.h"
      13             : 
      14             : #include <unistd.h>
      15             : 
      16             : #ifdef USE_ZSTD
      17             : #include <zstd.h>
      18             : #endif
      19             : 
      20             : #include "bbstreamer.h"
      21             : #include "common/logging.h"
      22             : 
      23             : #ifdef USE_ZSTD
      24             : 
      25             : typedef struct bbstreamer_zstd_frame
      26             : {
      27             :     bbstreamer  base;
      28             : 
      29             :     ZSTD_CCtx  *cctx;
      30             :     ZSTD_DCtx  *dctx;
      31             :     ZSTD_outBuffer zstd_outBuf;
      32             : } bbstreamer_zstd_frame;
      33             : 
      34             : static void bbstreamer_zstd_compressor_content(bbstreamer *streamer,
      35             :                                                bbstreamer_member *member,
      36             :                                                const char *data, int len,
      37             :                                                bbstreamer_archive_context context);
      38             : static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer);
      39             : static void bbstreamer_zstd_compressor_free(bbstreamer *streamer);
      40             : 
      41             : const bbstreamer_ops bbstreamer_zstd_compressor_ops = {
      42             :     .content = bbstreamer_zstd_compressor_content,
      43             :     .finalize = bbstreamer_zstd_compressor_finalize,
      44             :     .free = bbstreamer_zstd_compressor_free
      45             : };
      46             : 
      47             : static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
      48             :                                                  bbstreamer_member *member,
      49             :                                                  const char *data, int len,
      50             :                                                  bbstreamer_archive_context context);
      51             : static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer);
      52             : static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer);
      53             : 
      54             : const bbstreamer_ops bbstreamer_zstd_decompressor_ops = {
      55             :     .content = bbstreamer_zstd_decompressor_content,
      56             :     .finalize = bbstreamer_zstd_decompressor_finalize,
      57             :     .free = bbstreamer_zstd_decompressor_free
      58             : };
      59             : #endif
      60             : 
      61             : /*
      62             :  * Create a new base backup streamer that performs zstd compression of tar
      63             :  * blocks.
      64             :  */
      65             : bbstreamer *
      66           0 : bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *compress)
      67             : {
      68             : #ifdef USE_ZSTD
      69             :     bbstreamer_zstd_frame *streamer;
      70             :     size_t      ret;
      71             : 
      72             :     Assert(next != NULL);
      73             : 
      74             :     streamer = palloc0(sizeof(bbstreamer_zstd_frame));
      75             : 
      76             :     *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
      77             :         &bbstreamer_zstd_compressor_ops;
      78             : 
      79             :     streamer->base.bbs_next = next;
      80             :     initStringInfo(&streamer->base.bbs_buffer);
      81             :     enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
      82             : 
      83             :     streamer->cctx = ZSTD_createCCtx();
      84             :     if (!streamer->cctx)
      85             :         pg_fatal("could not create zstd compression context");
      86             : 
      87             :     /* Set compression level */
      88             :     ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
      89             :                                  compress->level);
      90             :     if (ZSTD_isError(ret))
      91             :         pg_fatal("could not set zstd compression level to %d: %s",
      92             :                  compress->level, ZSTD_getErrorName(ret));
      93             : 
      94             :     /* Set # of workers, if specified */
      95             :     if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
      96             :     {
      97             :         /*
      98             :          * On older versions of libzstd, this option does not exist, and
      99             :          * trying to set it will fail. Similarly for newer versions if they
     100             :          * are compiled without threading support.
     101             :          */
     102             :         ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
     103             :                                      compress->workers);
     104             :         if (ZSTD_isError(ret))
     105             :             pg_fatal("could not set compression worker count to %d: %s",
     106             :                      compress->workers, ZSTD_getErrorName(ret));
     107             :     }
     108             : 
     109             :     if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
     110             :     {
     111             :         ret = ZSTD_CCtx_setParameter(streamer->cctx,
     112             :                                      ZSTD_c_enableLongDistanceMatching,
     113             :                                      compress->long_distance);
     114             :         if (ZSTD_isError(ret))
     115             :         {
     116             :             pg_log_error("could not enable long-distance mode: %s",
     117             :                          ZSTD_getErrorName(ret));
     118             :             exit(1);
     119             :         }
     120             :     }
     121             : 
     122             :     /* Initialize the ZSTD output buffer. */
     123             :     streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
     124             :     streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
     125             :     streamer->zstd_outBuf.pos = 0;
     126             : 
     127             :     return &streamer->base;
     128             : #else
     129           0 :     pg_fatal("this build does not support compression with %s", "ZSTD");
     130             :     return NULL;                /* keep compiler quiet */
     131             : #endif
     132             : }
     133             : 
     134             : #ifdef USE_ZSTD
     135             : /*
     136             :  * Compress the input data to output buffer.
     137             :  *
     138             :  * Find out the compression bound based on input data length for each
     139             :  * invocation to make sure that output buffer has enough capacity to
     140             :  * accommodate the compressed data. In case if the output buffer
     141             :  * capacity falls short of compression bound then forward the content
     142             :  * of output buffer to next streamer and empty the buffer.
     143             :  */
     144             : static void
     145             : bbstreamer_zstd_compressor_content(bbstreamer *streamer,
     146             :                                    bbstreamer_member *member,
     147             :                                    const char *data, int len,
     148             :                                    bbstreamer_archive_context context)
     149             : {
     150             :     bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
     151             :     ZSTD_inBuffer inBuf = {data, len, 0};
     152             : 
     153             :     while (inBuf.pos < inBuf.size)
     154             :     {
     155             :         size_t      yet_to_flush;
     156             :         size_t      max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
     157             : 
     158             :         /*
     159             :          * If the output buffer is not left with enough space, send the
     160             :          * compressed bytes to the next streamer, and empty the buffer.
     161             :          */
     162             :         if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
     163             :             max_needed)
     164             :         {
     165             :             bbstreamer_content(mystreamer->base.bbs_next, member,
     166             :                                mystreamer->zstd_outBuf.dst,
     167             :                                mystreamer->zstd_outBuf.pos,
     168             :                                context);
     169             : 
     170             :             /* Reset the ZSTD output buffer. */
     171             :             mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
     172             :             mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
     173             :             mystreamer->zstd_outBuf.pos = 0;
     174             :         }
     175             : 
     176             :         yet_to_flush =
     177             :             ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
     178             :                                  &inBuf, ZSTD_e_continue);
     179             : 
     180             :         if (ZSTD_isError(yet_to_flush))
     181             :             pg_log_error("could not compress data: %s",
     182             :                          ZSTD_getErrorName(yet_to_flush));
     183             :     }
     184             : }
     185             : 
     186             : /*
     187             :  * End-of-stream processing.
     188             :  */
     189             : static void
     190             : bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
     191             : {
     192             :     bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
     193             :     size_t      yet_to_flush;
     194             : 
     195             :     do
     196             :     {
     197             :         ZSTD_inBuffer in = {NULL, 0, 0};
     198             :         size_t      max_needed = ZSTD_compressBound(0);
     199             : 
     200             :         /*
     201             :          * If the output buffer is not left with enough space, send the
     202             :          * compressed bytes to the next streamer, and empty the buffer.
     203             :          */
     204             :         if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
     205             :             max_needed)
     206             :         {
     207             :             bbstreamer_content(mystreamer->base.bbs_next, NULL,
     208             :                                mystreamer->zstd_outBuf.dst,
     209             :                                mystreamer->zstd_outBuf.pos,
     210             :                                BBSTREAMER_UNKNOWN);
     211             : 
     212             :             /* Reset the ZSTD output buffer. */
     213             :             mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
     214             :             mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
     215             :             mystreamer->zstd_outBuf.pos = 0;
     216             :         }
     217             : 
     218             :         yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
     219             :                                             &mystreamer->zstd_outBuf,
     220             :                                             &in, ZSTD_e_end);
     221             : 
     222             :         if (ZSTD_isError(yet_to_flush))
     223             :             pg_log_error("could not compress data: %s",
     224             :                          ZSTD_getErrorName(yet_to_flush));
     225             : 
     226             :     } while (yet_to_flush > 0);
     227             : 
     228             :     /* Make sure to pass any remaining bytes to the next streamer. */
     229             :     if (mystreamer->zstd_outBuf.pos > 0)
     230             :         bbstreamer_content(mystreamer->base.bbs_next, NULL,
     231             :                            mystreamer->zstd_outBuf.dst,
     232             :                            mystreamer->zstd_outBuf.pos,
     233             :                            BBSTREAMER_UNKNOWN);
     234             : 
     235             :     bbstreamer_finalize(mystreamer->base.bbs_next);
     236             : }
     237             : 
     238             : /*
     239             :  * Free memory.
     240             :  */
     241             : static void
     242             : bbstreamer_zstd_compressor_free(bbstreamer *streamer)
     243             : {
     244             :     bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
     245             : 
     246             :     bbstreamer_free(streamer->bbs_next);
     247             :     ZSTD_freeCCtx(mystreamer->cctx);
     248             :     pfree(streamer->bbs_buffer.data);
     249             :     pfree(streamer);
     250             : }
     251             : #endif
     252             : 
     253             : /*
     254             :  * Create a new base backup streamer that performs decompression of zstd
     255             :  * compressed blocks.
     256             :  */
     257             : bbstreamer *
     258           0 : bbstreamer_zstd_decompressor_new(bbstreamer *next)
     259             : {
     260             : #ifdef USE_ZSTD
     261             :     bbstreamer_zstd_frame *streamer;
     262             : 
     263             :     Assert(next != NULL);
     264             : 
     265             :     streamer = palloc0(sizeof(bbstreamer_zstd_frame));
     266             :     *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
     267             :         &bbstreamer_zstd_decompressor_ops;
     268             : 
     269             :     streamer->base.bbs_next = next;
     270             :     initStringInfo(&streamer->base.bbs_buffer);
     271             :     enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
     272             : 
     273             :     streamer->dctx = ZSTD_createDCtx();
     274             :     if (!streamer->dctx)
     275             :         pg_fatal("could not create zstd decompression context");
     276             : 
     277             :     /* Initialize the ZSTD output buffer. */
     278             :     streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
     279             :     streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
     280             :     streamer->zstd_outBuf.pos = 0;
     281             : 
     282             :     return &streamer->base;
     283             : #else
     284           0 :     pg_fatal("this build does not support compression with %s", "ZSTD");
     285             :     return NULL;                /* keep compiler quiet */
     286             : #endif
     287             : }
     288             : 
     289             : #ifdef USE_ZSTD
     290             : /*
     291             :  * Decompress the input data to output buffer until we run out of input
     292             :  * data. Each time the output buffer is full, pass on the decompressed data
     293             :  * to the next streamer.
     294             :  */
     295             : static void
     296             : bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
     297             :                                      bbstreamer_member *member,
     298             :                                      const char *data, int len,
     299             :                                      bbstreamer_archive_context context)
     300             : {
     301             :     bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
     302             :     ZSTD_inBuffer inBuf = {data, len, 0};
     303             : 
     304             :     while (inBuf.pos < inBuf.size)
     305             :     {
     306             :         size_t      ret;
     307             : 
     308             :         /*
     309             :          * If output buffer is full then forward the content to next streamer
     310             :          * and update the output buffer.
     311             :          */
     312             :         if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
     313             :         {
     314             :             bbstreamer_content(mystreamer->base.bbs_next, member,
     315             :                                mystreamer->zstd_outBuf.dst,
     316             :                                mystreamer->zstd_outBuf.pos,
     317             :                                context);
     318             : 
     319             :             /* Reset the ZSTD output buffer. */
     320             :             mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
     321             :             mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
     322             :             mystreamer->zstd_outBuf.pos = 0;
     323             :         }
     324             : 
     325             :         ret = ZSTD_decompressStream(mystreamer->dctx,
     326             :                                     &mystreamer->zstd_outBuf, &inBuf);
     327             : 
     328             :         if (ZSTD_isError(ret))
     329             :             pg_log_error("could not decompress data: %s",
     330             :                          ZSTD_getErrorName(ret));
     331             :     }
     332             : }
     333             : 
     334             : /*
     335             :  * End-of-stream processing.
     336             :  */
     337             : static void
     338             : bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer)
     339             : {
     340             :     bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
     341             : 
     342             :     /*
     343             :      * End of the stream, if there is some pending data in output buffers then
     344             :      * we must forward it to next streamer.
     345             :      */
     346             :     if (mystreamer->zstd_outBuf.pos > 0)
     347             :         bbstreamer_content(mystreamer->base.bbs_next, NULL,
     348             :                            mystreamer->base.bbs_buffer.data,
     349             :                            mystreamer->base.bbs_buffer.maxlen,
     350             :                            BBSTREAMER_UNKNOWN);
     351             : 
     352             :     bbstreamer_finalize(mystreamer->base.bbs_next);
     353             : }
     354             : 
     355             : /*
     356             :  * Free memory.
     357             :  */
     358             : static void
     359             : bbstreamer_zstd_decompressor_free(bbstreamer *streamer)
     360             : {
     361             :     bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
     362             : 
     363             :     bbstreamer_free(streamer->bbs_next);
     364             :     ZSTD_freeDCtx(mystreamer->dctx);
     365             :     pfree(streamer->bbs_buffer.data);
     366             :     pfree(streamer);
     367             : }
     368             : #endif

Generated by: LCOV version 1.14