LCOV - code coverage report
Current view: top level - src/backend/backup - basebackup_zstd.c (source / functions) Hit Total Coverage
Test: PostgreSQL 16devel Lines: 0 2 0.0 %
Date: 2023-03-26 14:11:19 Functions: 0 1 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * basebackup_zstd.c
       4             :  *    Basebackup sink implementing zstd compression.
       5             :  *
       6             :  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/backup/basebackup_zstd.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #ifdef USE_ZSTD
      16             : #include <zstd.h>
      17             : #endif
      18             : 
      19             : #include "backup/basebackup_sink.h"
      20             : 
      21             : #ifdef USE_ZSTD
      22             : 
      23             : typedef struct bbsink_zstd
      24             : {
      25             :     /* Common information for all types of sink. */
      26             :     bbsink      base;
      27             : 
      28             :     /* Compression options */
      29             :     pg_compress_specification *compress;
      30             : 
      31             :     ZSTD_CCtx  *cctx;
      32             :     ZSTD_outBuffer zstd_outBuf;
      33             : } bbsink_zstd;
      34             : 
      35             : static void bbsink_zstd_begin_backup(bbsink *sink);
      36             : static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
      37             : static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in);
      38             : static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
      39             : static void bbsink_zstd_end_archive(bbsink *sink);
      40             : static void bbsink_zstd_cleanup(bbsink *sink);
      41             : static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
      42             :                                    TimeLineID endtli);
      43             : 
      44             : static const bbsink_ops bbsink_zstd_ops = {
      45             :     .begin_backup = bbsink_zstd_begin_backup,
      46             :     .begin_archive = bbsink_zstd_begin_archive,
      47             :     .archive_contents = bbsink_zstd_archive_contents,
      48             :     .end_archive = bbsink_zstd_end_archive,
      49             :     .begin_manifest = bbsink_forward_begin_manifest,
      50             :     .manifest_contents = bbsink_zstd_manifest_contents,
      51             :     .end_manifest = bbsink_forward_end_manifest,
      52             :     .end_backup = bbsink_zstd_end_backup,
      53             :     .cleanup = bbsink_zstd_cleanup
      54             : };
      55             : #endif
      56             : 
      57             : /*
      58             :  * Create a new basebackup sink that performs zstd compression.
      59             :  */
      60             : bbsink *
      61           0 : bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
      62             : {
      63             : #ifndef USE_ZSTD
      64           0 :     ereport(ERROR,
      65             :             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
      66             :              errmsg("zstd compression is not supported by this build")));
      67             :     return NULL;                /* keep compiler quiet */
      68             : #else
      69             :     bbsink_zstd *sink;
      70             : 
      71             :     Assert(next != NULL);
      72             : 
      73             :     sink = palloc0(sizeof(bbsink_zstd));
      74             :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
      75             :     sink->base.bbs_next = next;
      76             :     sink->compress = compress;
      77             : 
      78             :     return &sink->base;
      79             : #endif
      80             : }
      81             : 
      82             : #ifdef USE_ZSTD
      83             : 
      84             : /*
      85             :  * Begin backup.
      86             :  */
      87             : static void
      88             : bbsink_zstd_begin_backup(bbsink *sink)
      89             : {
      90             :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
      91             :     size_t      output_buffer_bound;
      92             :     size_t      ret;
      93             :     pg_compress_specification *compress = mysink->compress;
      94             : 
      95             :     mysink->cctx = ZSTD_createCCtx();
      96             :     if (!mysink->cctx)
      97             :         elog(ERROR, "could not create zstd compression context");
      98             : 
      99             :     ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
     100             :                                  compress->level);
     101             :     if (ZSTD_isError(ret))
     102             :         elog(ERROR, "could not set zstd compression level to %d: %s",
     103             :              compress->level, ZSTD_getErrorName(ret));
     104             : 
     105             :     if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
     106             :     {
     107             :         /*
     108             :          * On older versions of libzstd, this option does not exist, and
     109             :          * trying to set it will fail. Similarly for newer versions if they
     110             :          * are compiled without threading support.
     111             :          */
     112             :         ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
     113             :                                      compress->workers);
     114             :         if (ZSTD_isError(ret))
     115             :             ereport(ERROR,
     116             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     117             :                     errmsg("could not set compression worker count to %d: %s",
     118             :                            compress->workers, ZSTD_getErrorName(ret)));
     119             :     }
     120             : 
     121             :     /*
     122             :      * We need our own buffer, because we're going to pass different data to
     123             :      * the next sink than what gets passed to us.
     124             :      */
     125             :     mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
     126             : 
     127             :     /*
     128             :      * Make sure that the next sink's bbs_buffer is big enough to accommodate
     129             :      * the compressed input buffer.
     130             :      */
     131             :     output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
     132             : 
     133             :     /*
     134             :      * The buffer length is expected to be a multiple of BLCKSZ, so round up.
     135             :      */
     136             :     output_buffer_bound = output_buffer_bound + BLCKSZ -
     137             :         (output_buffer_bound % BLCKSZ);
     138             : 
     139             :     bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
     140             : }
     141             : 
     142             : /*
     143             :  * Prepare to compress the next archive.
     144             :  */
     145             : static void
     146             : bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
     147             : {
     148             :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     149             :     char       *zstd_archive_name;
     150             : 
     151             :     /*
     152             :      * At the start of each archive we reset the state to start a new
     153             :      * compression operation. The parameters are sticky and they will stick
     154             :      * around as we are resetting with option ZSTD_reset_session_only.
     155             :      */
     156             :     ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
     157             : 
     158             :     mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
     159             :     mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
     160             :     mysink->zstd_outBuf.pos = 0;
     161             : 
     162             :     /* Add ".zst" to the archive name. */
     163             :     zstd_archive_name = psprintf("%s.zst", archive_name);
     164             :     Assert(sink->bbs_next != NULL);
     165             :     bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
     166             :     pfree(zstd_archive_name);
     167             : }
     168             : 
     169             : /*
     170             :  * Compress the input data to the output buffer until we run out of input
     171             :  * data. Each time the output buffer falls below the compression bound for
     172             :  * the input buffer, invoke the archive_contents() method for the next sink.
     173             :  *
     174             :  * Note that since we're compressing the input, it may very commonly happen
     175             :  * that we consume all the input data without filling the output buffer. In
     176             :  * that case, the compressed representation of the current input data won't
     177             :  * actually be sent to the next bbsink until a later call to this function,
     178             :  * or perhaps even not until bbsink_zstd_end_archive() is invoked.
     179             :  */
     180             : static void
     181             : bbsink_zstd_archive_contents(bbsink *sink, size_t len)
     182             : {
     183             :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     184             :     ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
     185             : 
     186             :     while (inBuf.pos < inBuf.size)
     187             :     {
     188             :         size_t      yet_to_flush;
     189             :         size_t      max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
     190             : 
     191             :         /*
     192             :          * If the out buffer is not left with enough space, send the output
     193             :          * buffer to the next sink, and reset it.
     194             :          */
     195             :         if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
     196             :         {
     197             :             bbsink_archive_contents(mysink->base.bbs_next,
     198             :                                     mysink->zstd_outBuf.pos);
     199             :             mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
     200             :             mysink->zstd_outBuf.size =
     201             :                 mysink->base.bbs_next->bbs_buffer_length;
     202             :             mysink->zstd_outBuf.pos = 0;
     203             :         }
     204             : 
     205             :         yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
     206             :                                             &inBuf, ZSTD_e_continue);
     207             : 
     208             :         if (ZSTD_isError(yet_to_flush))
     209             :             elog(ERROR,
     210             :                  "could not compress data: %s",
     211             :                  ZSTD_getErrorName(yet_to_flush));
     212             :     }
     213             : }
     214             : 
     215             : /*
     216             :  * There might be some data inside zstd's internal buffers; we need to get that
     217             :  * flushed out, also end the zstd frame and then get that forwarded to the
     218             :  * successor sink as archive content.
     219             :  *
     220             :  * Then we can end processing for this archive.
     221             :  */
     222             : static void
     223             : bbsink_zstd_end_archive(bbsink *sink)
     224             : {
     225             :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     226             :     size_t      yet_to_flush;
     227             : 
     228             :     do
     229             :     {
     230             :         ZSTD_inBuffer in = {NULL, 0, 0};
     231             :         size_t      max_needed = ZSTD_compressBound(0);
     232             : 
     233             :         /*
     234             :          * If the out buffer is not left with enough space, send the output
     235             :          * buffer to the next sink, and reset it.
     236             :          */
     237             :         if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
     238             :         {
     239             :             bbsink_archive_contents(mysink->base.bbs_next,
     240             :                                     mysink->zstd_outBuf.pos);
     241             :             mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
     242             :             mysink->zstd_outBuf.size =
     243             :                 mysink->base.bbs_next->bbs_buffer_length;
     244             :             mysink->zstd_outBuf.pos = 0;
     245             :         }
     246             : 
     247             :         yet_to_flush = ZSTD_compressStream2(mysink->cctx,
     248             :                                             &mysink->zstd_outBuf,
     249             :                                             &in, ZSTD_e_end);
     250             : 
     251             :         if (ZSTD_isError(yet_to_flush))
     252             :             elog(ERROR, "could not compress data: %s",
     253             :                  ZSTD_getErrorName(yet_to_flush));
     254             : 
     255             :     } while (yet_to_flush > 0);
     256             : 
     257             :     /* Make sure to pass any remaining bytes to the next sink. */
     258             :     if (mysink->zstd_outBuf.pos > 0)
     259             :         bbsink_archive_contents(mysink->base.bbs_next,
     260             :                                 mysink->zstd_outBuf.pos);
     261             : 
     262             :     /* Pass on the information that this archive has ended. */
     263             :     bbsink_forward_end_archive(sink);
     264             : }
     265             : 
     266             : /*
     267             :  * Free the resources and context.
     268             :  */
     269             : static void
     270             : bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
     271             :                        TimeLineID endtli)
     272             : {
     273             :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     274             : 
     275             :     /* Release the context. */
     276             :     if (mysink->cctx)
     277             :     {
     278             :         ZSTD_freeCCtx(mysink->cctx);
     279             :         mysink->cctx = NULL;
     280             :     }
     281             : 
     282             :     bbsink_forward_end_backup(sink, endptr, endtli);
     283             : }
     284             : 
     285             : /*
     286             :  * Manifest contents are not compressed, but we do need to copy them into
     287             :  * the successor sink's buffer, because we have our own.
     288             :  */
     289             : static void
     290             : bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
     291             : {
     292             :     memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
     293             :     bbsink_manifest_contents(sink->bbs_next, len);
     294             : }
     295             : 
     296             : /*
     297             :  * In case the backup fails, make sure we free any compression context that
     298             :  * got allocated, so that we don't leak memory.
     299             :  */
     300             : static void
     301             : bbsink_zstd_cleanup(bbsink *sink)
     302             : {
     303             :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     304             : 
     305             :     /* Release the context if not already released. */
     306             :     if (mysink->cctx)
     307             :     {
     308             :         ZSTD_freeCCtx(mysink->cctx);
     309             :         mysink->cctx = NULL;
     310             :     }
     311             : }
     312             : 
     313             : #endif

Generated by: LCOV version 1.14