LCOV - code coverage report
Current view: top level - src/backend/backup - basebackup_zstd.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 0.0 % 2 0
Test Date: 2026-03-03 02:14:47 Functions: 0.0 % 1 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * basebackup_zstd.c
       4              :  *    Basebackup sink implementing zstd compression.
       5              :  *
       6              :  * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *    src/backend/backup/basebackup_zstd.c
      10              :  *
      11              :  *-------------------------------------------------------------------------
      12              :  */
      13              : #include "postgres.h"
      14              : 
      15              : #ifdef USE_ZSTD
      16              : #include <zstd.h>
      17              : #endif
      18              : 
      19              : #include "backup/basebackup_sink.h"
      20              : 
      21              : #ifdef USE_ZSTD
      22              : 
      23              : typedef struct bbsink_zstd
      24              : {
      25              :     /* Common information for all types of sink. */
      26              :     bbsink      base;
      27              : 
      28              :     /* Compression options */
      29              :     pg_compress_specification *compress;
      30              : 
      31              :     ZSTD_CCtx  *cctx;
      32              :     ZSTD_outBuffer zstd_outBuf;
      33              : } bbsink_zstd;
      34              : 
      35              : static void bbsink_zstd_begin_backup(bbsink *sink);
      36              : static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
      37              : static void bbsink_zstd_archive_contents(bbsink *sink, size_t len);
      38              : static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
      39              : static void bbsink_zstd_end_archive(bbsink *sink);
      40              : static void bbsink_zstd_cleanup(bbsink *sink);
      41              : static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
      42              :                                    TimeLineID endtli);
      43              : 
      44              : static const bbsink_ops bbsink_zstd_ops = {
      45              :     .begin_backup = bbsink_zstd_begin_backup,
      46              :     .begin_archive = bbsink_zstd_begin_archive,
      47              :     .archive_contents = bbsink_zstd_archive_contents,
      48              :     .end_archive = bbsink_zstd_end_archive,
      49              :     .begin_manifest = bbsink_forward_begin_manifest,
      50              :     .manifest_contents = bbsink_zstd_manifest_contents,
      51              :     .end_manifest = bbsink_forward_end_manifest,
      52              :     .end_backup = bbsink_zstd_end_backup,
      53              :     .cleanup = bbsink_zstd_cleanup
      54              : };
      55              : #endif
      56              : 
      57              : /*
      58              :  * Create a new basebackup sink that performs zstd compression.
      59              :  */
      60              : bbsink *
      61            0 : bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
      62              : {
      63              : #ifndef USE_ZSTD
      64            0 :     ereport(ERROR,
      65              :             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
      66              :              errmsg("zstd compression is not supported by this build")));
      67              :     return NULL;                /* keep compiler quiet */
      68              : #else
      69              :     bbsink_zstd *sink;
      70              : 
      71              :     Assert(next != NULL);
      72              : 
      73              :     sink = palloc0_object(bbsink_zstd);
      74              :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
      75              :     sink->base.bbs_next = next;
      76              :     sink->compress = compress;
      77              : 
      78              :     return &sink->base;
      79              : #endif
      80              : }
      81              : 
      82              : #ifdef USE_ZSTD
      83              : 
      84              : /*
      85              :  * Begin backup.
      86              :  */
      87              : static void
      88              : bbsink_zstd_begin_backup(bbsink *sink)
      89              : {
      90              :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
      91              :     size_t      output_buffer_bound;
      92              :     size_t      ret;
      93              :     pg_compress_specification *compress = mysink->compress;
      94              : 
      95              :     mysink->cctx = ZSTD_createCCtx();
      96              :     if (!mysink->cctx)
      97              :         elog(ERROR, "could not create zstd compression context");
      98              : 
      99              :     ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
     100              :                                  compress->level);
     101              :     if (ZSTD_isError(ret))
     102              :         elog(ERROR, "could not set zstd compression level to %d: %s",
     103              :              compress->level, ZSTD_getErrorName(ret));
     104              : 
     105              :     if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
     106              :     {
     107              :         /*
     108              :          * On older versions of libzstd, this option does not exist, and
     109              :          * trying to set it will fail. Similarly for newer versions if they
     110              :          * are compiled without threading support.
     111              :          */
     112              :         ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
     113              :                                      compress->workers);
     114              :         if (ZSTD_isError(ret))
     115              :             ereport(ERROR,
     116              :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     117              :                     errmsg("could not set compression worker count to %d: %s",
     118              :                            compress->workers, ZSTD_getErrorName(ret)));
     119              :     }
     120              : 
     121              :     if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
     122              :     {
     123              :         ret = ZSTD_CCtx_setParameter(mysink->cctx,
     124              :                                      ZSTD_c_enableLongDistanceMatching,
     125              :                                      compress->long_distance);
     126              :         if (ZSTD_isError(ret))
     127              :             ereport(ERROR,
     128              :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     129              :                     errmsg("could not enable long-distance mode: %s",
     130              :                            ZSTD_getErrorName(ret)));
     131              :     }
     132              : 
     133              :     /*
     134              :      * We need our own buffer, because we're going to pass different data to
     135              :      * the next sink than what gets passed to us.
     136              :      */
     137              :     mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
     138              : 
     139              :     /*
     140              :      * Make sure that the next sink's bbs_buffer is big enough to accommodate
     141              :      * the compressed input buffer.
     142              :      */
     143              :     output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
     144              : 
     145              :     /*
     146              :      * The buffer length is expected to be a multiple of BLCKSZ, so round up.
     147              :      */
     148              :     output_buffer_bound = output_buffer_bound + BLCKSZ -
     149              :         (output_buffer_bound % BLCKSZ);
     150              : 
     151              :     bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
     152              : }
     153              : 
     154              : /*
     155              :  * Prepare to compress the next archive.
     156              :  */
     157              : static void
     158              : bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
     159              : {
     160              :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     161              :     char       *zstd_archive_name;
     162              : 
     163              :     /*
     164              :      * At the start of each archive we reset the state to start a new
     165              :      * compression operation. The parameters are sticky and they will stick
     166              :      * around as we are resetting with option ZSTD_reset_session_only.
     167              :      */
     168              :     ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
     169              : 
     170              :     mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
     171              :     mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
     172              :     mysink->zstd_outBuf.pos = 0;
     173              : 
     174              :     /* Add ".zst" to the archive name. */
     175              :     zstd_archive_name = psprintf("%s.zst", archive_name);
     176              :     Assert(sink->bbs_next != NULL);
     177              :     bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
     178              :     pfree(zstd_archive_name);
     179              : }
     180              : 
     181              : /*
     182              :  * Compress the input data to the output buffer until we run out of input
     183              :  * data. Each time the output buffer falls below the compression bound for
     184              :  * the input buffer, invoke the archive_contents() method for the next sink.
     185              :  *
     186              :  * Note that since we're compressing the input, it may very commonly happen
     187              :  * that we consume all the input data without filling the output buffer. In
     188              :  * that case, the compressed representation of the current input data won't
     189              :  * actually be sent to the next bbsink until a later call to this function,
     190              :  * or perhaps even not until bbsink_zstd_end_archive() is invoked.
     191              :  */
     192              : static void
     193              : bbsink_zstd_archive_contents(bbsink *sink, size_t len)
     194              : {
     195              :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     196              :     ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
     197              : 
     198              :     while (inBuf.pos < inBuf.size)
     199              :     {
     200              :         size_t      yet_to_flush;
     201              :         size_t      max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
     202              : 
     203              :         /*
     204              :          * If the out buffer is not left with enough space, send the output
     205              :          * buffer to the next sink, and reset it.
     206              :          */
     207              :         if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
     208              :         {
     209              :             bbsink_archive_contents(mysink->base.bbs_next,
     210              :                                     mysink->zstd_outBuf.pos);
     211              :             mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
     212              :             mysink->zstd_outBuf.size =
     213              :                 mysink->base.bbs_next->bbs_buffer_length;
     214              :             mysink->zstd_outBuf.pos = 0;
     215              :         }
     216              : 
     217              :         yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
     218              :                                             &inBuf, ZSTD_e_continue);
     219              : 
     220              :         if (ZSTD_isError(yet_to_flush))
     221              :             elog(ERROR,
     222              :                  "could not compress data: %s",
     223              :                  ZSTD_getErrorName(yet_to_flush));
     224              :     }
     225              : }
     226              : 
     227              : /*
     228              :  * There might be some data inside zstd's internal buffers; we need to get that
     229              :  * flushed out, also end the zstd frame and then get that forwarded to the
     230              :  * successor sink as archive content.
     231              :  *
     232              :  * Then we can end processing for this archive.
     233              :  */
     234              : static void
     235              : bbsink_zstd_end_archive(bbsink *sink)
     236              : {
     237              :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     238              :     size_t      yet_to_flush;
     239              : 
     240              :     do
     241              :     {
     242              :         ZSTD_inBuffer in = {NULL, 0, 0};
     243              :         size_t      max_needed = ZSTD_compressBound(0);
     244              : 
     245              :         /*
     246              :          * If the out buffer is not left with enough space, send the output
     247              :          * buffer to the next sink, and reset it.
     248              :          */
     249              :         if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
     250              :         {
     251              :             bbsink_archive_contents(mysink->base.bbs_next,
     252              :                                     mysink->zstd_outBuf.pos);
     253              :             mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
     254              :             mysink->zstd_outBuf.size =
     255              :                 mysink->base.bbs_next->bbs_buffer_length;
     256              :             mysink->zstd_outBuf.pos = 0;
     257              :         }
     258              : 
     259              :         yet_to_flush = ZSTD_compressStream2(mysink->cctx,
     260              :                                             &mysink->zstd_outBuf,
     261              :                                             &in, ZSTD_e_end);
     262              : 
     263              :         if (ZSTD_isError(yet_to_flush))
     264              :             elog(ERROR, "could not compress data: %s",
     265              :                  ZSTD_getErrorName(yet_to_flush));
     266              : 
     267              :     } while (yet_to_flush > 0);
     268              : 
     269              :     /* Make sure to pass any remaining bytes to the next sink. */
     270              :     if (mysink->zstd_outBuf.pos > 0)
     271              :         bbsink_archive_contents(mysink->base.bbs_next,
     272              :                                 mysink->zstd_outBuf.pos);
     273              : 
     274              :     /* Pass on the information that this archive has ended. */
     275              :     bbsink_forward_end_archive(sink);
     276              : }
     277              : 
     278              : /*
     279              :  * Free the resources and context.
     280              :  */
     281              : static void
     282              : bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
     283              :                        TimeLineID endtli)
     284              : {
     285              :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     286              : 
     287              :     /* Release the context. */
     288              :     if (mysink->cctx)
     289              :     {
     290              :         ZSTD_freeCCtx(mysink->cctx);
     291              :         mysink->cctx = NULL;
     292              :     }
     293              : 
     294              :     bbsink_forward_end_backup(sink, endptr, endtli);
     295              : }
     296              : 
     297              : /*
     298              :  * Manifest contents are not compressed, but we do need to copy them into
     299              :  * the successor sink's buffer, because we have our own.
     300              :  */
     301              : static void
     302              : bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
     303              : {
     304              :     memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
     305              :     bbsink_manifest_contents(sink->bbs_next, len);
     306              : }
     307              : 
     308              : /*
     309              :  * In case the backup fails, make sure we free any compression context that
     310              :  * got allocated, so that we don't leak memory.
     311              :  */
     312              : static void
     313              : bbsink_zstd_cleanup(bbsink *sink)
     314              : {
     315              :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     316              : 
     317              :     /* Release the context if not already released. */
     318              :     if (mysink->cctx)
     319              :     {
     320              :         ZSTD_freeCCtx(mysink->cctx);
     321              :         mysink->cctx = NULL;
     322              :     }
     323              : }
     324              : 
     325              : #endif
        

Generated by: LCOV version 2.0-1