LCOV - code coverage report
Current view: top level - src/backend/backup - basebackup_lz4.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 89.6 % 77 69
Test Date: 2026-03-14 04:15:00 Functions: 100.0 % 7 7
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * basebackup_lz4.c
       4              :  *    Basebackup sink implementing lz4 compression.
       5              :  *
       6              :  * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *    src/backend/backup/basebackup_lz4.c
      10              :  *
      11              :  *-------------------------------------------------------------------------
      12              :  */
      13              : #include "postgres.h"
      14              : 
      15              : #ifdef USE_LZ4
      16              : #include <lz4frame.h>
      17              : #endif
      18              : 
      19              : #include "backup/basebackup_sink.h"
      20              : 
      21              : #ifdef USE_LZ4
      22              : 
      23              : typedef struct bbsink_lz4
      24              : {
      25              :     /* Common information for all types of sink. */
      26              :     bbsink      base;
      27              : 
      28              :     /* Compression level. */
      29              :     int         compresslevel;
      30              : 
      31              :     LZ4F_compressionContext_t ctx;
      32              :     LZ4F_preferences_t prefs;
      33              : 
      34              :     /* Number of bytes staged in output buffer. */
      35              :     size_t      bytes_written;
      36              : } bbsink_lz4;
      37              : 
      38              : static void bbsink_lz4_begin_backup(bbsink *sink);
      39              : static void bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name);
      40              : static void bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in);
      41              : static void bbsink_lz4_manifest_contents(bbsink *sink, size_t len);
      42              : static void bbsink_lz4_end_archive(bbsink *sink);
      43              : static void bbsink_lz4_cleanup(bbsink *sink);
      44              : 
      45              : static const bbsink_ops bbsink_lz4_ops = {
      46              :     .begin_backup = bbsink_lz4_begin_backup,
      47              :     .begin_archive = bbsink_lz4_begin_archive,
      48              :     .archive_contents = bbsink_lz4_archive_contents,
      49              :     .end_archive = bbsink_lz4_end_archive,
      50              :     .begin_manifest = bbsink_forward_begin_manifest,
      51              :     .manifest_contents = bbsink_lz4_manifest_contents,
      52              :     .end_manifest = bbsink_forward_end_manifest,
      53              :     .end_backup = bbsink_forward_end_backup,
      54              :     .cleanup = bbsink_lz4_cleanup
      55              : };
      56              : #endif
      57              : 
      58              : /*
      59              :  * Create a new basebackup sink that performs lz4 compression.
      60              :  */
      61              : bbsink *
      62            3 : bbsink_lz4_new(bbsink *next, pg_compress_specification *compress)
      63              : {
      64              : #ifndef USE_LZ4
      65              :     ereport(ERROR,
      66              :             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
      67              :              errmsg("lz4 compression is not supported by this build")));
      68              :     return NULL;                /* keep compiler quiet */
      69              : #else
      70              :     bbsink_lz4 *sink;
      71              :     int         compresslevel;
      72              : 
      73              :     Assert(next != NULL);
      74              : 
      75            3 :     compresslevel = compress->level;
      76              :     Assert(compresslevel >= 0 && compresslevel <= 12);
      77              : 
      78            3 :     sink = palloc0_object(bbsink_lz4);
      79            3 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_lz4_ops;
      80            3 :     sink->base.bbs_next = next;
      81            3 :     sink->compresslevel = compresslevel;
      82              : 
      83            3 :     return &sink->base;
      84              : #endif
      85              : }
      86              : 
      87              : #ifdef USE_LZ4
      88              : 
      89              : /*
      90              :  * Begin backup.
      91              :  */
      92              : static void
      93            3 : bbsink_lz4_begin_backup(bbsink *sink)
      94              : {
      95            3 :     bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
      96              :     size_t      output_buffer_bound;
      97            3 :     LZ4F_preferences_t *prefs = &mysink->prefs;
      98              : 
      99              :     /* Initialize compressor object. */
     100            3 :     memset(prefs, 0, sizeof(LZ4F_preferences_t));
     101            3 :     prefs->frameInfo.blockSizeID = LZ4F_max256KB;
     102            3 :     prefs->compressionLevel = mysink->compresslevel;
     103              : 
     104              :     /*
     105              :      * We need our own buffer, because we're going to pass different data to
     106              :      * the next sink than what gets passed to us.
     107              :      */
     108            3 :     mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
     109              : 
     110              :     /*
     111              :      * Since LZ4F_compressUpdate() requires the output buffer of size equal or
     112              :      * greater than that of LZ4F_compressBound(), make sure we have the next
     113              :      * sink's bbs_buffer of length that can accommodate the compressed input
     114              :      * buffer.
     115              :      */
     116            3 :     output_buffer_bound = LZ4F_compressBound(mysink->base.bbs_buffer_length,
     117            3 :                                              &mysink->prefs);
     118              : 
     119              :     /*
     120              :      * The buffer length is expected to be a multiple of BLCKSZ, so round up.
     121              :      */
     122            3 :     output_buffer_bound = output_buffer_bound + BLCKSZ -
     123              :         (output_buffer_bound % BLCKSZ);
     124              : 
     125            3 :     bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
     126            3 : }
     127              : 
     128              : /*
     129              :  * Prepare to compress the next archive.
     130              :  */
     131              : static void
     132            5 : bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name)
     133              : {
     134            5 :     bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
     135              :     char       *lz4_archive_name;
     136              :     LZ4F_errorCode_t ctxError;
     137              :     size_t      headerSize;
     138              : 
     139            5 :     ctxError = LZ4F_createCompressionContext(&mysink->ctx, LZ4F_VERSION);
     140            5 :     if (LZ4F_isError(ctxError))
     141            0 :         elog(ERROR, "could not create lz4 compression context: %s",
     142              :              LZ4F_getErrorName(ctxError));
     143              : 
     144              :     /* First of all write the frame header to destination buffer. */
     145            5 :     headerSize = LZ4F_compressBegin(mysink->ctx,
     146            5 :                                     mysink->base.bbs_next->bbs_buffer,
     147            5 :                                     mysink->base.bbs_next->bbs_buffer_length,
     148            5 :                                     &mysink->prefs);
     149              : 
     150            5 :     if (LZ4F_isError(headerSize))
     151            0 :         elog(ERROR, "could not write lz4 header: %s",
     152              :              LZ4F_getErrorName(headerSize));
     153              : 
     154              :     /*
     155              :      * We need to write the compressed data after the header in the output
     156              :      * buffer. So, make sure to update the notion of bytes written to output
     157              :      * buffer.
     158              :      */
     159            5 :     mysink->bytes_written += headerSize;
     160              : 
     161              :     /* Add ".lz4" to the archive name. */
     162            5 :     lz4_archive_name = psprintf("%s.lz4", archive_name);
     163              :     Assert(sink->bbs_next != NULL);
     164            5 :     bbsink_begin_archive(sink->bbs_next, lz4_archive_name);
     165            5 :     pfree(lz4_archive_name);
     166            5 : }
     167              : 
     168              : /*
     169              :  * Compress the input data to the output buffer until we run out of input
     170              :  * data. Each time the output buffer falls below the compression bound for
     171              :  * the input buffer, invoke the archive_contents() method for then next sink.
     172              :  *
     173              :  * Note that since we're compressing the input, it may very commonly happen
     174              :  * that we consume all the input data without filling the output buffer. In
     175              :  * that case, the compressed representation of the current input data won't
     176              :  * actually be sent to the next bbsink until a later call to this function,
     177              :  * or perhaps even not until bbsink_lz4_end_archive() is invoked.
     178              :  */
     179              : static void
     180         8161 : bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in)
     181              : {
     182         8161 :     bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
     183              :     size_t      compressedSize;
     184              :     size_t      avail_in_bound;
     185              : 
     186         8161 :     avail_in_bound = LZ4F_compressBound(avail_in, &mysink->prefs);
     187              : 
     188              :     /*
     189              :      * If the number of available bytes has fallen below the value computed by
     190              :      * LZ4F_compressBound(), ask the next sink to process the data so that we
     191              :      * can empty the buffer.
     192              :      */
     193         8161 :     if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
     194              :         avail_in_bound)
     195              :     {
     196          300 :         bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
     197          300 :         mysink->bytes_written = 0;
     198              :     }
     199              : 
     200              :     /*
     201              :      * Compress the input buffer and write it into the output buffer.
     202              :      */
     203         8161 :     compressedSize = LZ4F_compressUpdate(mysink->ctx,
     204         8161 :                                          mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
     205         8161 :                                          mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
     206         8161 :                                          (uint8 *) mysink->base.bbs_buffer,
     207              :                                          avail_in,
     208              :                                          NULL);
     209              : 
     210         8161 :     if (LZ4F_isError(compressedSize))
     211            0 :         elog(ERROR, "could not compress data: %s",
     212              :              LZ4F_getErrorName(compressedSize));
     213              : 
     214              :     /*
     215              :      * Update our notion of how many bytes we've written into output buffer.
     216              :      */
     217         8161 :     mysink->bytes_written += compressedSize;
     218         8161 : }
     219              : 
     220              : /*
     221              :  * There might be some data inside lz4's internal buffers; we need to get
     222              :  * that flushed out and also finalize the lz4 frame and then get that forwarded
     223              :  * to the successor sink as archive content.
     224              :  *
     225              :  * Then we can end processing for this archive.
     226              :  */
     227              : static void
     228            5 : bbsink_lz4_end_archive(bbsink *sink)
     229              : {
     230            5 :     bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
     231              :     size_t      compressedSize;
     232              :     size_t      lz4_footer_bound;
     233              : 
     234            5 :     lz4_footer_bound = LZ4F_compressBound(0, &mysink->prefs);
     235              : 
     236              :     Assert(mysink->base.bbs_next->bbs_buffer_length >= lz4_footer_bound);
     237              : 
     238            5 :     if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
     239              :         lz4_footer_bound)
     240              :     {
     241            0 :         bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
     242            0 :         mysink->bytes_written = 0;
     243              :     }
     244              : 
     245            5 :     compressedSize = LZ4F_compressEnd(mysink->ctx,
     246            5 :                                       mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
     247            5 :                                       mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
     248              :                                       NULL);
     249              : 
     250            5 :     if (LZ4F_isError(compressedSize))
     251            0 :         elog(ERROR, "could not end lz4 compression: %s",
     252              :              LZ4F_getErrorName(compressedSize));
     253              : 
     254              :     /* Update our notion of how many bytes we've written. */
     255            5 :     mysink->bytes_written += compressedSize;
     256              : 
     257              :     /* Send whatever accumulated output bytes we have. */
     258            5 :     bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
     259            5 :     mysink->bytes_written = 0;
     260              : 
     261              :     /* Release the resources. */
     262            5 :     LZ4F_freeCompressionContext(mysink->ctx);
     263            5 :     mysink->ctx = NULL;
     264              : 
     265              :     /* Pass on the information that this archive has ended. */
     266            5 :     bbsink_forward_end_archive(sink);
     267            5 : }
     268              : 
     269              : /*
     270              :  * Manifest contents are not compressed, but we do need to copy them into
     271              :  * the successor sink's buffer, because we have our own.
     272              :  */
     273              : static void
     274           15 : bbsink_lz4_manifest_contents(bbsink *sink, size_t len)
     275              : {
     276           15 :     memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
     277           15 :     bbsink_manifest_contents(sink->bbs_next, len);
     278           15 : }
     279              : 
     280              : /*
     281              :  * In case the backup fails, make sure we free the compression context by
     282              :  * calling LZ4F_freeCompressionContext() if needed to avoid memory leak.
     283              :  */
     284              : static void
     285            3 : bbsink_lz4_cleanup(bbsink *sink)
     286              : {
     287            3 :     bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
     288              : 
     289            3 :     if (mysink->ctx)
     290              :     {
     291            0 :         LZ4F_freeCompressionContext(mysink->ctx);
     292            0 :         mysink->ctx = NULL;
     293              :     }
     294            3 : }
     295              : 
     296              : #endif
        

Generated by: LCOV version 2.0-1