LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - bbstreamer_lz4.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 42 119 35.3 %
Date: 2024-04-24 02:11:02 Functions: 4 8 50.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * bbstreamer_lz4.c
       4             :  *
       5             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *        src/bin/pg_basebackup/bbstreamer_lz4.c
       9             :  *-------------------------------------------------------------------------
      10             :  */
      11             : 
      12             : #include "postgres_fe.h"
      13             : 
      14             : #include <unistd.h>
      15             : 
      16             : #ifdef USE_LZ4
      17             : #include <lz4frame.h>
      18             : #endif
      19             : 
      20             : #include "bbstreamer.h"
      21             : #include "common/file_perm.h"
      22             : #include "common/logging.h"
      23             : #include "common/string.h"
      24             : 
      25             : #ifdef USE_LZ4
      26             : typedef struct bbstreamer_lz4_frame
      27             : {
      28             :     bbstreamer  base;
      29             : 
      30             :     LZ4F_compressionContext_t cctx;
      31             :     LZ4F_decompressionContext_t dctx;
      32             :     LZ4F_preferences_t prefs;
      33             : 
      34             :     size_t      bytes_written;
      35             :     bool        header_written;
      36             : } bbstreamer_lz4_frame;
      37             : 
      38             : static void bbstreamer_lz4_compressor_content(bbstreamer *streamer,
      39             :                                               bbstreamer_member *member,
      40             :                                               const char *data, int len,
      41             :                                               bbstreamer_archive_context context);
      42             : static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer);
      43             : static void bbstreamer_lz4_compressor_free(bbstreamer *streamer);
      44             : 
      45             : const bbstreamer_ops bbstreamer_lz4_compressor_ops = {
      46             :     .content = bbstreamer_lz4_compressor_content,
      47             :     .finalize = bbstreamer_lz4_compressor_finalize,
      48             :     .free = bbstreamer_lz4_compressor_free
      49             : };
      50             : 
      51             : static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
      52             :                                                 bbstreamer_member *member,
      53             :                                                 const char *data, int len,
      54             :                                                 bbstreamer_archive_context context);
      55             : static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer);
      56             : static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer);
      57             : 
      58             : const bbstreamer_ops bbstreamer_lz4_decompressor_ops = {
      59             :     .content = bbstreamer_lz4_decompressor_content,
      60             :     .finalize = bbstreamer_lz4_decompressor_finalize,
      61             :     .free = bbstreamer_lz4_decompressor_free
      62             : };
      63             : #endif
      64             : 
      65             : /*
      66             :  * Create a new base backup streamer that performs lz4 compression of tar
      67             :  * blocks.
      68             :  */
      69             : bbstreamer *
      70           0 : bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compress)
      71             : {
      72             : #ifdef USE_LZ4
      73             :     bbstreamer_lz4_frame *streamer;
      74             :     LZ4F_errorCode_t ctxError;
      75             :     LZ4F_preferences_t *prefs;
      76             : 
      77             :     Assert(next != NULL);
      78             : 
      79           0 :     streamer = palloc0(sizeof(bbstreamer_lz4_frame));
      80           0 :     *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
      81             :         &bbstreamer_lz4_compressor_ops;
      82             : 
      83           0 :     streamer->base.bbs_next = next;
      84           0 :     initStringInfo(&streamer->base.bbs_buffer);
      85           0 :     streamer->header_written = false;
      86             : 
      87             :     /* Initialize stream compression preferences */
      88           0 :     prefs = &streamer->prefs;
      89           0 :     memset(prefs, 0, sizeof(LZ4F_preferences_t));
      90           0 :     prefs->frameInfo.blockSizeID = LZ4F_max256KB;
      91           0 :     prefs->compressionLevel = compress->level;
      92             : 
      93           0 :     ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
      94           0 :     if (LZ4F_isError(ctxError))
      95           0 :         pg_log_error("could not create lz4 compression context: %s",
      96             :                      LZ4F_getErrorName(ctxError));
      97             : 
      98           0 :     return &streamer->base;
      99             : #else
     100             :     pg_fatal("this build does not support compression with %s", "LZ4");
     101             :     return NULL;                /* keep compiler quiet */
     102             : #endif
     103             : }
     104             : 
     105             : #ifdef USE_LZ4
     106             : /*
     107             :  * Compress the input data to output buffer.
     108             :  *
     109             :  * Find out the compression bound based on input data length for each
     110             :  * invocation to make sure that output buffer has enough capacity to
     111             :  * accommodate the compressed data. In case if the output buffer
     112             :  * capacity falls short of compression bound then forward the content
     113             :  * of output buffer to next streamer and empty the buffer.
     114             :  */
     115             : static void
     116           0 : bbstreamer_lz4_compressor_content(bbstreamer *streamer,
     117             :                                   bbstreamer_member *member,
     118             :                                   const char *data, int len,
     119             :                                   bbstreamer_archive_context context)
     120             : {
     121             :     bbstreamer_lz4_frame *mystreamer;
     122             :     uint8      *next_in,
     123             :                *next_out;
     124             :     size_t      out_bound,
     125             :                 compressed_size,
     126             :                 avail_out;
     127             : 
     128           0 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
     129           0 :     next_in = (uint8 *) data;
     130             : 
     131             :     /* Write header before processing the first input chunk. */
     132           0 :     if (!mystreamer->header_written)
     133             :     {
     134           0 :         compressed_size = LZ4F_compressBegin(mystreamer->cctx,
     135           0 :                                              (uint8 *) mystreamer->base.bbs_buffer.data,
     136           0 :                                              mystreamer->base.bbs_buffer.maxlen,
     137           0 :                                              &mystreamer->prefs);
     138             : 
     139           0 :         if (LZ4F_isError(compressed_size))
     140           0 :             pg_log_error("could not write lz4 header: %s",
     141             :                          LZ4F_getErrorName(compressed_size));
     142             : 
     143           0 :         mystreamer->bytes_written += compressed_size;
     144           0 :         mystreamer->header_written = true;
     145             :     }
     146             : 
     147             :     /*
     148             :      * Update the offset and capacity of output buffer based on number of
     149             :      * bytes written to output buffer.
     150             :      */
     151           0 :     next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
     152           0 :     avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     153             : 
     154             :     /*
     155             :      * Find out the compression bound and make sure that output buffer has the
     156             :      * required capacity for the success of LZ4F_compressUpdate. If needed
     157             :      * forward the content to next streamer and empty the buffer.
     158             :      */
     159           0 :     out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
     160           0 :     if (avail_out < out_bound)
     161             :     {
     162           0 :         bbstreamer_content(mystreamer->base.bbs_next, member,
     163           0 :                            mystreamer->base.bbs_buffer.data,
     164           0 :                            mystreamer->bytes_written,
     165             :                            context);
     166             : 
     167             :         /* Enlarge buffer if it falls short of out bound. */
     168           0 :         if (mystreamer->base.bbs_buffer.maxlen < out_bound)
     169           0 :             enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
     170             : 
     171           0 :         avail_out = mystreamer->base.bbs_buffer.maxlen;
     172           0 :         mystreamer->bytes_written = 0;
     173           0 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     174             :     }
     175             : 
     176             :     /*
     177             :      * This call compresses the data starting at next_in and generates the
     178             :      * output starting at next_out. It expects the caller to provide the size
     179             :      * of input buffer and capacity of output buffer by providing parameters
     180             :      * len and avail_out.
     181             :      *
     182             :      * It returns the number of bytes compressed to output buffer.
     183             :      */
     184           0 :     compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
     185             :                                           next_out, avail_out,
     186             :                                           next_in, len, NULL);
     187             : 
     188           0 :     if (LZ4F_isError(compressed_size))
     189           0 :         pg_log_error("could not compress data: %s",
     190             :                      LZ4F_getErrorName(compressed_size));
     191             : 
     192           0 :     mystreamer->bytes_written += compressed_size;
     193           0 : }
     194             : 
     195             : /*
     196             :  * End-of-stream processing.
     197             :  */
     198             : static void
     199           0 : bbstreamer_lz4_compressor_finalize(bbstreamer *streamer)
     200             : {
     201             :     bbstreamer_lz4_frame *mystreamer;
     202             :     uint8      *next_out;
     203             :     size_t      footer_bound,
     204             :                 compressed_size,
     205             :                 avail_out;
     206             : 
     207           0 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
     208             : 
     209             :     /* Find out the footer bound and update the output buffer. */
     210           0 :     footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
     211           0 :     if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
     212             :         footer_bound)
     213             :     {
     214           0 :         bbstreamer_content(mystreamer->base.bbs_next, NULL,
     215           0 :                            mystreamer->base.bbs_buffer.data,
     216           0 :                            mystreamer->bytes_written,
     217             :                            BBSTREAMER_UNKNOWN);
     218             : 
     219             :         /* Enlarge buffer if it falls short of footer bound. */
     220           0 :         if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
     221           0 :             enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
     222             : 
     223           0 :         avail_out = mystreamer->base.bbs_buffer.maxlen;
     224           0 :         mystreamer->bytes_written = 0;
     225           0 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     226             :     }
     227             :     else
     228             :     {
     229           0 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
     230           0 :         avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     231             :     }
     232             : 
     233             :     /*
     234             :      * Finalize the frame and flush whatever data remaining in compression
     235             :      * context.
     236             :      */
     237           0 :     compressed_size = LZ4F_compressEnd(mystreamer->cctx,
     238             :                                        next_out, avail_out, NULL);
     239             : 
     240           0 :     if (LZ4F_isError(compressed_size))
     241           0 :         pg_log_error("could not end lz4 compression: %s",
     242             :                      LZ4F_getErrorName(compressed_size));
     243             : 
     244           0 :     mystreamer->bytes_written += compressed_size;
     245             : 
     246           0 :     bbstreamer_content(mystreamer->base.bbs_next, NULL,
     247           0 :                        mystreamer->base.bbs_buffer.data,
     248           0 :                        mystreamer->bytes_written,
     249             :                        BBSTREAMER_UNKNOWN);
     250             : 
     251           0 :     bbstreamer_finalize(mystreamer->base.bbs_next);
     252           0 : }
     253             : 
     254             : /*
     255             :  * Free memory.
     256             :  */
     257             : static void
     258           0 : bbstreamer_lz4_compressor_free(bbstreamer *streamer)
     259             : {
     260             :     bbstreamer_lz4_frame *mystreamer;
     261             : 
     262           0 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
     263           0 :     bbstreamer_free(streamer->bbs_next);
     264           0 :     LZ4F_freeCompressionContext(mystreamer->cctx);
     265           0 :     pfree(streamer->bbs_buffer.data);
     266           0 :     pfree(streamer);
     267           0 : }
     268             : #endif
     269             : 
     270             : /*
     271             :  * Create a new base backup streamer that performs decompression of lz4
     272             :  * compressed blocks.
     273             :  */
     274             : bbstreamer *
     275           2 : bbstreamer_lz4_decompressor_new(bbstreamer *next)
     276             : {
     277             : #ifdef USE_LZ4
     278             :     bbstreamer_lz4_frame *streamer;
     279             :     LZ4F_errorCode_t ctxError;
     280             : 
     281             :     Assert(next != NULL);
     282             : 
     283           2 :     streamer = palloc0(sizeof(bbstreamer_lz4_frame));
     284           2 :     *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
     285             :         &bbstreamer_lz4_decompressor_ops;
     286             : 
     287           2 :     streamer->base.bbs_next = next;
     288           2 :     initStringInfo(&streamer->base.bbs_buffer);
     289             : 
     290             :     /* Initialize internal stream state for decompression */
     291           2 :     ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
     292           2 :     if (LZ4F_isError(ctxError))
     293           0 :         pg_fatal("could not initialize compression library: %s",
     294             :                  LZ4F_getErrorName(ctxError));
     295             : 
     296           2 :     return &streamer->base;
     297             : #else
     298             :     pg_fatal("this build does not support compression with %s", "LZ4");
     299             :     return NULL;                /* keep compiler quiet */
     300             : #endif
     301             : }
     302             : 
     303             : #ifdef USE_LZ4
     304             : /*
     305             :  * Decompress the input data to output buffer until we run out of input
     306             :  * data. Each time the output buffer is full, pass on the decompressed data
     307             :  * to the next streamer.
     308             :  */
     309             : static void
     310         196 : bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
     311             :                                     bbstreamer_member *member,
     312             :                                     const char *data, int len,
     313             :                                     bbstreamer_archive_context context)
     314             : {
     315             :     bbstreamer_lz4_frame *mystreamer;
     316             :     uint8      *next_in,
     317             :                *next_out;
     318             :     size_t      avail_in,
     319             :                 avail_out;
     320             : 
     321         196 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
     322         196 :     next_in = (uint8 *) data;
     323         196 :     next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     324         196 :     avail_in = len;
     325         196 :     avail_out = mystreamer->base.bbs_buffer.maxlen;
     326             : 
     327       79046 :     while (avail_in > 0)
     328             :     {
     329             :         size_t      ret,
     330             :                     read_size,
     331             :                     out_size;
     332             : 
     333       78850 :         read_size = avail_in;
     334       78850 :         out_size = avail_out;
     335             : 
     336             :         /*
     337             :          * This call decompresses the data starting at next_in and generates
     338             :          * the output data starting at next_out. It expects the caller to
     339             :          * provide size of the input buffer and total capacity of the output
     340             :          * buffer by providing the read_size and out_size parameters
     341             :          * respectively.
     342             :          *
     343             :          * Per the documentation of LZ4, parameters read_size and out_size
     344             :          * behaves as dual parameters. On return, the number of bytes consumed
     345             :          * from the input buffer will be written back to read_size and the
     346             :          * number of bytes decompressed to output buffer will be written back
     347             :          * to out_size respectively.
     348             :          */
     349       78850 :         ret = LZ4F_decompress(mystreamer->dctx,
     350             :                               next_out, &out_size,
     351             :                               next_in, &read_size, NULL);
     352             : 
     353       78850 :         if (LZ4F_isError(ret))
     354           0 :             pg_log_error("could not decompress data: %s",
     355             :                          LZ4F_getErrorName(ret));
     356             : 
     357             :         /* Update input buffer based on number of bytes consumed */
     358       78850 :         avail_in -= read_size;
     359       78850 :         next_in += read_size;
     360             : 
     361       78850 :         mystreamer->bytes_written += out_size;
     362             : 
     363             :         /*
     364             :          * If output buffer is full then forward the content to next streamer
     365             :          * and update the output buffer.
     366             :          */
     367       78850 :         if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
     368             :         {
     369       78850 :             bbstreamer_content(mystreamer->base.bbs_next, member,
     370       78850 :                                mystreamer->base.bbs_buffer.data,
     371             :                                mystreamer->base.bbs_buffer.maxlen,
     372             :                                context);
     373             : 
     374       78850 :             avail_out = mystreamer->base.bbs_buffer.maxlen;
     375       78850 :             mystreamer->bytes_written = 0;
     376       78850 :             next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     377             :         }
     378             :         else
     379             :         {
     380           0 :             avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     381           0 :             next_out += mystreamer->bytes_written;
     382             :         }
     383             :     }
     384         196 : }
     385             : 
     386             : /*
     387             :  * End-of-stream processing.
     388             :  */
     389             : static void
     390           2 : bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer)
     391             : {
     392             :     bbstreamer_lz4_frame *mystreamer;
     393             : 
     394           2 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
     395             : 
     396             :     /*
     397             :      * End of the stream, if there is some pending data in output buffers then
     398             :      * we must forward it to next streamer.
     399             :      */
     400           2 :     bbstreamer_content(mystreamer->base.bbs_next, NULL,
     401           2 :                        mystreamer->base.bbs_buffer.data,
     402             :                        mystreamer->base.bbs_buffer.maxlen,
     403             :                        BBSTREAMER_UNKNOWN);
     404             : 
     405           2 :     bbstreamer_finalize(mystreamer->base.bbs_next);
     406           2 : }
     407             : 
     408             : /*
     409             :  * Free memory.
     410             :  */
     411             : static void
     412           2 : bbstreamer_lz4_decompressor_free(bbstreamer *streamer)
     413             : {
     414             :     bbstreamer_lz4_frame *mystreamer;
     415             : 
     416           2 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
     417           2 :     bbstreamer_free(streamer->bbs_next);
     418           2 :     LZ4F_freeDecompressionContext(mystreamer->dctx);
     419           2 :     pfree(streamer->bbs_buffer.data);
     420           2 :     pfree(streamer);
     421           2 : }
     422             : #endif

Generated by: LCOV version 1.14