LCOV - code coverage report
Current view: top level - src/backend/backup - basebackup_zstd.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 0 2 0.0 %
Date: 2025-01-18 05:15:39 Functions: 0 1 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * basebackup_zstd.c
       4             :  *    Basebackup sink implementing zstd compression.
       5             :  *
       6             :  * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/backup/basebackup_zstd.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #ifdef USE_ZSTD
      16             : #include <zstd.h>
      17             : #endif
      18             : 
      19             : #include "backup/basebackup_sink.h"
      20             : 
      21             : #ifdef USE_ZSTD
      22             : 
      23             : typedef struct bbsink_zstd
      24             : {
      25             :     /* Common information for all types of sink. */
      26             :     bbsink      base;
      27             : 
      28             :     /* Compression options */
      29             :     pg_compress_specification *compress;
      30             : 
      31             :     ZSTD_CCtx  *cctx;
      32             :     ZSTD_outBuffer zstd_outBuf;
      33             : } bbsink_zstd;
      34             : 
      35             : static void bbsink_zstd_begin_backup(bbsink *sink);
      36             : static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
      37             : static void bbsink_zstd_archive_contents(bbsink *sink, size_t len);
      38             : static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
      39             : static void bbsink_zstd_end_archive(bbsink *sink);
      40             : static void bbsink_zstd_cleanup(bbsink *sink);
      41             : static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
      42             :                                    TimeLineID endtli);
      43             : 
      44             : static const bbsink_ops bbsink_zstd_ops = {
      45             :     .begin_backup = bbsink_zstd_begin_backup,
      46             :     .begin_archive = bbsink_zstd_begin_archive,
      47             :     .archive_contents = bbsink_zstd_archive_contents,
      48             :     .end_archive = bbsink_zstd_end_archive,
      49             :     .begin_manifest = bbsink_forward_begin_manifest,
      50             :     .manifest_contents = bbsink_zstd_manifest_contents,
      51             :     .end_manifest = bbsink_forward_end_manifest,
      52             :     .end_backup = bbsink_zstd_end_backup,
      53             :     .cleanup = bbsink_zstd_cleanup
      54             : };
      55             : #endif
      56             : 
      57             : /*
      58             :  * Create a new basebackup sink that performs zstd compression.
      59             :  */
      60             : bbsink *
      61           0 : bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
      62             : {
      63             : #ifndef USE_ZSTD
      64           0 :     ereport(ERROR,
      65             :             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
      66             :              errmsg("zstd compression is not supported by this build")));
      67             :     return NULL;                /* keep compiler quiet */
      68             : #else
      69             :     bbsink_zstd *sink;
      70             : 
      71             :     Assert(next != NULL);
      72             : 
      73             :     sink = palloc0(sizeof(bbsink_zstd));
      74             :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
      75             :     sink->base.bbs_next = next;
      76             :     sink->compress = compress;
      77             : 
      78             :     return &sink->base;
      79             : #endif
      80             : }
      81             : 
      82             : #ifdef USE_ZSTD
      83             : 
      84             : /*
      85             :  * Begin backup.
      86             :  */
      87             : static void
      88             : bbsink_zstd_begin_backup(bbsink *sink)
      89             : {
      90             :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
      91             :     size_t      output_buffer_bound;
      92             :     size_t      ret;
      93             :     pg_compress_specification *compress = mysink->compress;
      94             : 
      95             :     mysink->cctx = ZSTD_createCCtx();
      96             :     if (!mysink->cctx)
      97             :         elog(ERROR, "could not create zstd compression context");
      98             : 
      99             :     ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
     100             :                                  compress->level);
     101             :     if (ZSTD_isError(ret))
     102             :         elog(ERROR, "could not set zstd compression level to %d: %s",
     103             :              compress->level, ZSTD_getErrorName(ret));
     104             : 
     105             :     if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
     106             :     {
     107             :         /*
     108             :          * On older versions of libzstd, this option does not exist, and
     109             :          * trying to set it will fail. Similarly for newer versions if they
     110             :          * are compiled without threading support.
     111             :          */
     112             :         ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
     113             :                                      compress->workers);
     114             :         if (ZSTD_isError(ret))
     115             :             ereport(ERROR,
     116             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     117             :                     errmsg("could not set compression worker count to %d: %s",
     118             :                            compress->workers, ZSTD_getErrorName(ret)));
     119             :     }
     120             : 
     121             :     if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
     122             :     {
     123             :         ret = ZSTD_CCtx_setParameter(mysink->cctx,
     124             :                                      ZSTD_c_enableLongDistanceMatching,
     125             :                                      compress->long_distance);
     126             :         if (ZSTD_isError(ret))
     127             :             ereport(ERROR,
     128             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     129             :                     errmsg("could not enable long-distance mode: %s",
     130             :                            ZSTD_getErrorName(ret)));
     131             :     }
     132             : 
     133             :     /*
     134             :      * We need our own buffer, because we're going to pass different data to
     135             :      * the next sink than what gets passed to us.
     136             :      */
     137             :     mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
     138             : 
     139             :     /*
     140             :      * Make sure that the next sink's bbs_buffer is big enough to accommodate
     141             :      * the compressed input buffer.
     142             :      */
     143             :     output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
     144             : 
     145             :     /*
     146             :      * The buffer length is expected to be a multiple of BLCKSZ, so round up.
     147             :      */
     148             :     output_buffer_bound = output_buffer_bound + BLCKSZ -
     149             :         (output_buffer_bound % BLCKSZ);
     150             : 
     151             :     bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
     152             : }
     153             : 
     154             : /*
     155             :  * Prepare to compress the next archive.
     156             :  */
     157             : static void
     158             : bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
     159             : {
     160             :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     161             :     char       *zstd_archive_name;
     162             : 
     163             :     /*
     164             :      * At the start of each archive we reset the state to start a new
     165             :      * compression operation. The parameters are sticky and they will stick
     166             :      * around as we are resetting with option ZSTD_reset_session_only.
     167             :      */
     168             :     ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
     169             : 
     170             :     mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
     171             :     mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
     172             :     mysink->zstd_outBuf.pos = 0;
     173             : 
     174             :     /* Add ".zst" to the archive name. */
     175             :     zstd_archive_name = psprintf("%s.zst", archive_name);
     176             :     Assert(sink->bbs_next != NULL);
     177             :     bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
     178             :     pfree(zstd_archive_name);
     179             : }
     180             : 
     181             : /*
     182             :  * Compress the input data to the output buffer until we run out of input
     183             :  * data. Each time the output buffer falls below the compression bound for
     184             :  * the input buffer, invoke the archive_contents() method for the next sink.
     185             :  *
     186             :  * Note that since we're compressing the input, it may very commonly happen
     187             :  * that we consume all the input data without filling the output buffer. In
     188             :  * that case, the compressed representation of the current input data won't
     189             :  * actually be sent to the next bbsink until a later call to this function,
     190             :  * or perhaps even not until bbsink_zstd_end_archive() is invoked.
     191             :  */
     192             : static void
     193             : bbsink_zstd_archive_contents(bbsink *sink, size_t len)
     194             : {
     195             :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     196             :     ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
     197             : 
     198             :     while (inBuf.pos < inBuf.size)
     199             :     {
     200             :         size_t      yet_to_flush;
     201             :         size_t      max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
     202             : 
     203             :         /*
     204             :          * If the out buffer is not left with enough space, send the output
     205             :          * buffer to the next sink, and reset it.
     206             :          */
     207             :         if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
     208             :         {
     209             :             bbsink_archive_contents(mysink->base.bbs_next,
     210             :                                     mysink->zstd_outBuf.pos);
     211             :             mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
     212             :             mysink->zstd_outBuf.size =
     213             :                 mysink->base.bbs_next->bbs_buffer_length;
     214             :             mysink->zstd_outBuf.pos = 0;
     215             :         }
     216             : 
     217             :         yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
     218             :                                             &inBuf, ZSTD_e_continue);
     219             : 
     220             :         if (ZSTD_isError(yet_to_flush))
     221             :             elog(ERROR,
     222             :                  "could not compress data: %s",
     223             :                  ZSTD_getErrorName(yet_to_flush));
     224             :     }
     225             : }
     226             : 
     227             : /*
     228             :  * There might be some data inside zstd's internal buffers; we need to get that
     229             :  * flushed out, also end the zstd frame and then get that forwarded to the
     230             :  * successor sink as archive content.
     231             :  *
     232             :  * Then we can end processing for this archive.
     233             :  */
     234             : static void
     235             : bbsink_zstd_end_archive(bbsink *sink)
     236             : {
     237             :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     238             :     size_t      yet_to_flush;
     239             : 
     240             :     do
     241             :     {
     242             :         ZSTD_inBuffer in = {NULL, 0, 0};
     243             :         size_t      max_needed = ZSTD_compressBound(0);
     244             : 
     245             :         /*
     246             :          * If the out buffer is not left with enough space, send the output
     247             :          * buffer to the next sink, and reset it.
     248             :          */
     249             :         if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
     250             :         {
     251             :             bbsink_archive_contents(mysink->base.bbs_next,
     252             :                                     mysink->zstd_outBuf.pos);
     253             :             mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
     254             :             mysink->zstd_outBuf.size =
     255             :                 mysink->base.bbs_next->bbs_buffer_length;
     256             :             mysink->zstd_outBuf.pos = 0;
     257             :         }
     258             : 
     259             :         yet_to_flush = ZSTD_compressStream2(mysink->cctx,
     260             :                                             &mysink->zstd_outBuf,
     261             :                                             &in, ZSTD_e_end);
     262             : 
     263             :         if (ZSTD_isError(yet_to_flush))
     264             :             elog(ERROR, "could not compress data: %s",
     265             :                  ZSTD_getErrorName(yet_to_flush));
     266             : 
     267             :     } while (yet_to_flush > 0);
     268             : 
     269             :     /* Make sure to pass any remaining bytes to the next sink. */
     270             :     if (mysink->zstd_outBuf.pos > 0)
     271             :         bbsink_archive_contents(mysink->base.bbs_next,
     272             :                                 mysink->zstd_outBuf.pos);
     273             : 
     274             :     /* Pass on the information that this archive has ended. */
     275             :     bbsink_forward_end_archive(sink);
     276             : }
     277             : 
     278             : /*
     279             :  * Free the resources and context.
     280             :  */
     281             : static void
     282             : bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
     283             :                        TimeLineID endtli)
     284             : {
     285             :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     286             : 
     287             :     /* Release the context. */
     288             :     if (mysink->cctx)
     289             :     {
     290             :         ZSTD_freeCCtx(mysink->cctx);
     291             :         mysink->cctx = NULL;
     292             :     }
     293             : 
     294             :     bbsink_forward_end_backup(sink, endptr, endtli);
     295             : }
     296             : 
     297             : /*
     298             :  * Manifest contents are not compressed, but we do need to copy them into
     299             :  * the successor sink's buffer, because we have our own.
     300             :  */
     301             : static void
     302             : bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
     303             : {
     304             :     memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
     305             :     bbsink_manifest_contents(sink->bbs_next, len);
     306             : }
     307             : 
     308             : /*
     309             :  * In case the backup fails, make sure we free any compression context that
     310             :  * got allocated, so that we don't leak memory.
     311             :  */
     312             : static void
     313             : bbsink_zstd_cleanup(bbsink *sink)
     314             : {
     315             :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     316             : 
     317             :     /* Release the context if not already released. */
     318             :     if (mysink->cctx)
     319             :     {
     320             :         ZSTD_freeCCtx(mysink->cctx);
     321             :         mysink->cctx = NULL;
     322             :     }
     323             : }
     324             : 
     325             : #endif

Generated by: LCOV version 1.14