LCOV - code coverage report
Current view: top level - src/fe_utils - astreamer_lz4.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 105 119 88.2 %
Date: 2024-11-21 08:14:44 Functions: 8 8 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * astreamer_lz4.c
       4             :  *
       5             :  * Archive streamers that deal with data compressed using lz4.
       6             :  * astreamer_lz4_compressor applies lz4 compression to the input stream,
       7             :  * and astreamer_lz4_decompressor does the reverse.
       8             :  *
       9             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      10             :  *
      11             :  * IDENTIFICATION
      12             :  *        src/bin/pg_basebackup/astreamer_lz4.c
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres_fe.h"
      17             : 
      18             : #include <unistd.h>
      19             : 
      20             : #ifdef USE_LZ4
      21             : #include <lz4frame.h>
      22             : #endif
      23             : 
      24             : #include "common/logging.h"
      25             : #include "fe_utils/astreamer.h"
      26             : 
      27             : #ifdef USE_LZ4
      28             : typedef struct astreamer_lz4_frame
      29             : {
      30             :     astreamer   base;
      31             : 
      32             :     LZ4F_compressionContext_t cctx;
      33             :     LZ4F_decompressionContext_t dctx;
      34             :     LZ4F_preferences_t prefs;
      35             : 
      36             :     size_t      bytes_written;
      37             :     bool        header_written;
      38             : } astreamer_lz4_frame;
      39             : 
      40             : static void astreamer_lz4_compressor_content(astreamer *streamer,
      41             :                                              astreamer_member *member,
      42             :                                              const char *data, int len,
      43             :                                              astreamer_archive_context context);
      44             : static void astreamer_lz4_compressor_finalize(astreamer *streamer);
      45             : static void astreamer_lz4_compressor_free(astreamer *streamer);
      46             : 
      47             : static const astreamer_ops astreamer_lz4_compressor_ops = {
      48             :     .content = astreamer_lz4_compressor_content,
      49             :     .finalize = astreamer_lz4_compressor_finalize,
      50             :     .free = astreamer_lz4_compressor_free
      51             : };
      52             : 
      53             : static void astreamer_lz4_decompressor_content(astreamer *streamer,
      54             :                                                astreamer_member *member,
      55             :                                                const char *data, int len,
      56             :                                                astreamer_archive_context context);
      57             : static void astreamer_lz4_decompressor_finalize(astreamer *streamer);
      58             : static void astreamer_lz4_decompressor_free(astreamer *streamer);
      59             : 
      60             : static const astreamer_ops astreamer_lz4_decompressor_ops = {
      61             :     .content = astreamer_lz4_decompressor_content,
      62             :     .finalize = astreamer_lz4_decompressor_finalize,
      63             :     .free = astreamer_lz4_decompressor_free
      64             : };
      65             : #endif
      66             : 
      67             : /*
      68             :  * Create a new base backup streamer that performs lz4 compression of tar
      69             :  * blocks.
      70             :  */
      71             : astreamer *
      72           2 : astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress)
      73             : {
      74             : #ifdef USE_LZ4
      75             :     astreamer_lz4_frame *streamer;
      76             :     LZ4F_errorCode_t ctxError;
      77             :     LZ4F_preferences_t *prefs;
      78             : 
      79             :     Assert(next != NULL);
      80             : 
      81           2 :     streamer = palloc0(sizeof(astreamer_lz4_frame));
      82           2 :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
      83             :         &astreamer_lz4_compressor_ops;
      84             : 
      85           2 :     streamer->base.bbs_next = next;
      86           2 :     initStringInfo(&streamer->base.bbs_buffer);
      87           2 :     streamer->header_written = false;
      88             : 
      89             :     /* Initialize stream compression preferences */
      90           2 :     prefs = &streamer->prefs;
      91           2 :     memset(prefs, 0, sizeof(LZ4F_preferences_t));
      92           2 :     prefs->frameInfo.blockSizeID = LZ4F_max256KB;
      93           2 :     prefs->compressionLevel = compress->level;
      94             : 
      95           2 :     ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
      96           2 :     if (LZ4F_isError(ctxError))
      97           0 :         pg_log_error("could not create lz4 compression context: %s",
      98             :                      LZ4F_getErrorName(ctxError));
      99             : 
     100           2 :     return &streamer->base;
     101             : #else
     102             :     pg_fatal("this build does not support compression with %s", "LZ4");
     103             :     return NULL;                /* keep compiler quiet */
     104             : #endif
     105             : }
     106             : 
     107             : #ifdef USE_LZ4
     108             : /*
     109             :  * Compress the input data to output buffer.
     110             :  *
     111             :  * Find out the compression bound based on input data length for each
     112             :  * invocation to make sure that output buffer has enough capacity to
     113             :  * accommodate the compressed data. In case if the output buffer
     114             :  * capacity falls short of compression bound then forward the content
     115             :  * of output buffer to next streamer and empty the buffer.
     116             :  */
     117             : static void
     118        5408 : astreamer_lz4_compressor_content(astreamer *streamer,
     119             :                                  astreamer_member *member,
     120             :                                  const char *data, int len,
     121             :                                  astreamer_archive_context context)
     122             : {
     123             :     astreamer_lz4_frame *mystreamer;
     124             :     uint8      *next_in,
     125             :                *next_out;
     126             :     size_t      out_bound,
     127             :                 compressed_size,
     128             :                 avail_out;
     129             : 
     130        5408 :     mystreamer = (astreamer_lz4_frame *) streamer;
     131        5408 :     next_in = (uint8 *) data;
     132             : 
     133             :     /* Write header before processing the first input chunk. */
     134        5408 :     if (!mystreamer->header_written)
     135             :     {
     136           2 :         compressed_size = LZ4F_compressBegin(mystreamer->cctx,
     137           2 :                                              (uint8 *) mystreamer->base.bbs_buffer.data,
     138           2 :                                              mystreamer->base.bbs_buffer.maxlen,
     139           2 :                                              &mystreamer->prefs);
     140             : 
     141           2 :         if (LZ4F_isError(compressed_size))
     142           0 :             pg_log_error("could not write lz4 header: %s",
     143             :                          LZ4F_getErrorName(compressed_size));
     144             : 
     145           2 :         mystreamer->bytes_written += compressed_size;
     146           2 :         mystreamer->header_written = true;
     147             :     }
     148             : 
     149             :     /*
     150             :      * Update the offset and capacity of output buffer based on number of
     151             :      * bytes written to output buffer.
     152             :      */
     153        5408 :     next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
     154        5408 :     avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     155             : 
     156             :     /*
     157             :      * Find out the compression bound and make sure that output buffer has the
     158             :      * required capacity for the success of LZ4F_compressUpdate. If needed
     159             :      * forward the content to next streamer and empty the buffer.
     160             :      */
     161        5408 :     out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
     162        5408 :     if (avail_out < out_bound)
     163             :     {
     164          28 :         astreamer_content(mystreamer->base.bbs_next, member,
     165          28 :                           mystreamer->base.bbs_buffer.data,
     166          28 :                           mystreamer->bytes_written,
     167             :                           context);
     168             : 
     169             :         /* Enlarge buffer if it falls short of out bound. */
     170          28 :         if (mystreamer->base.bbs_buffer.maxlen < out_bound)
     171           2 :             enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
     172             : 
     173          28 :         avail_out = mystreamer->base.bbs_buffer.maxlen;
     174          28 :         mystreamer->bytes_written = 0;
     175          28 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     176             :     }
     177             : 
     178             :     /*
     179             :      * This call compresses the data starting at next_in and generates the
     180             :      * output starting at next_out. It expects the caller to provide the size
     181             :      * of input buffer and capacity of output buffer by providing parameters
     182             :      * len and avail_out.
     183             :      *
     184             :      * It returns the number of bytes compressed to output buffer.
     185             :      */
     186        5408 :     compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
     187             :                                           next_out, avail_out,
     188             :                                           next_in, len, NULL);
     189             : 
     190        5408 :     if (LZ4F_isError(compressed_size))
     191           0 :         pg_log_error("could not compress data: %s",
     192             :                      LZ4F_getErrorName(compressed_size));
     193             : 
     194        5408 :     mystreamer->bytes_written += compressed_size;
     195        5408 : }
     196             : 
     197             : /*
     198             :  * End-of-stream processing.
     199             :  */
     200             : static void
     201           2 : astreamer_lz4_compressor_finalize(astreamer *streamer)
     202             : {
     203             :     astreamer_lz4_frame *mystreamer;
     204             :     uint8      *next_out;
     205             :     size_t      footer_bound,
     206             :                 compressed_size,
     207             :                 avail_out;
     208             : 
     209           2 :     mystreamer = (astreamer_lz4_frame *) streamer;
     210             : 
     211             :     /* Find out the footer bound and update the output buffer. */
     212           2 :     footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
     213           2 :     if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
     214             :         footer_bound)
     215             :     {
     216           0 :         astreamer_content(mystreamer->base.bbs_next, NULL,
     217           0 :                           mystreamer->base.bbs_buffer.data,
     218           0 :                           mystreamer->bytes_written,
     219             :                           ASTREAMER_UNKNOWN);
     220             : 
     221             :         /* Enlarge buffer if it falls short of footer bound. */
     222           0 :         if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
     223           0 :             enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
     224             : 
     225           0 :         avail_out = mystreamer->base.bbs_buffer.maxlen;
     226           0 :         mystreamer->bytes_written = 0;
     227           0 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     228             :     }
     229             :     else
     230             :     {
     231           2 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
     232           2 :         avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     233             :     }
     234             : 
     235             :     /*
     236             :      * Finalize the frame and flush whatever data remaining in compression
     237             :      * context.
     238             :      */
     239           2 :     compressed_size = LZ4F_compressEnd(mystreamer->cctx,
     240             :                                        next_out, avail_out, NULL);
     241             : 
     242           2 :     if (LZ4F_isError(compressed_size))
     243           0 :         pg_log_error("could not end lz4 compression: %s",
     244             :                      LZ4F_getErrorName(compressed_size));
     245             : 
     246           2 :     mystreamer->bytes_written += compressed_size;
     247             : 
     248           2 :     astreamer_content(mystreamer->base.bbs_next, NULL,
     249           2 :                       mystreamer->base.bbs_buffer.data,
     250           2 :                       mystreamer->bytes_written,
     251             :                       ASTREAMER_UNKNOWN);
     252             : 
     253           2 :     astreamer_finalize(mystreamer->base.bbs_next);
     254           2 : }
     255             : 
     256             : /*
     257             :  * Free memory.
     258             :  */
     259             : static void
     260           2 : astreamer_lz4_compressor_free(astreamer *streamer)
     261             : {
     262             :     astreamer_lz4_frame *mystreamer;
     263             : 
     264           2 :     mystreamer = (astreamer_lz4_frame *) streamer;
     265           2 :     astreamer_free(streamer->bbs_next);
     266           2 :     LZ4F_freeCompressionContext(mystreamer->cctx);
     267           2 :     pfree(streamer->bbs_buffer.data);
     268           2 :     pfree(streamer);
     269           2 : }
     270             : #endif
     271             : 
     272             : /*
     273             :  * Create a new base backup streamer that performs decompression of lz4
     274             :  * compressed blocks.
     275             :  */
     276             : astreamer *
     277           8 : astreamer_lz4_decompressor_new(astreamer *next)
     278             : {
     279             : #ifdef USE_LZ4
     280             :     astreamer_lz4_frame *streamer;
     281             :     LZ4F_errorCode_t ctxError;
     282             : 
     283             :     Assert(next != NULL);
     284             : 
     285           8 :     streamer = palloc0(sizeof(astreamer_lz4_frame));
     286           8 :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
     287             :         &astreamer_lz4_decompressor_ops;
     288             : 
     289           8 :     streamer->base.bbs_next = next;
     290           8 :     initStringInfo(&streamer->base.bbs_buffer);
     291             : 
     292             :     /* Initialize internal stream state for decompression */
     293           8 :     ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
     294           8 :     if (LZ4F_isError(ctxError))
     295           0 :         pg_fatal("could not initialize compression library: %s",
     296             :                  LZ4F_getErrorName(ctxError));
     297             : 
     298           8 :     return &streamer->base;
     299             : #else
     300             :     pg_fatal("this build does not support compression with %s", "LZ4");
     301             :     return NULL;                /* keep compiler quiet */
     302             : #endif
     303             : }
     304             : 
     305             : #ifdef USE_LZ4
     306             : /*
     307             :  * Decompress the input data to output buffer until we run out of input
     308             :  * data. Each time the output buffer is full, pass on the decompressed data
     309             :  * to the next streamer.
     310             :  */
     311             : static void
     312         328 : astreamer_lz4_decompressor_content(astreamer *streamer,
     313             :                                    astreamer_member *member,
     314             :                                    const char *data, int len,
     315             :                                    astreamer_archive_context context)
     316             : {
     317             :     astreamer_lz4_frame *mystreamer;
     318             :     uint8      *next_in,
     319             :                *next_out;
     320             :     size_t      avail_in,
     321             :                 avail_out;
     322             : 
     323         328 :     mystreamer = (astreamer_lz4_frame *) streamer;
     324         328 :     next_in = (uint8 *) data;
     325         328 :     next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     326         328 :     avail_in = len;
     327         328 :     avail_out = mystreamer->base.bbs_buffer.maxlen;
     328             : 
     329      239168 :     while (avail_in > 0)
     330             :     {
     331             :         size_t      ret,
     332             :                     read_size,
     333             :                     out_size;
     334             : 
     335      238840 :         read_size = avail_in;
     336      238840 :         out_size = avail_out;
     337             : 
     338             :         /*
     339             :          * This call decompresses the data starting at next_in and generates
     340             :          * the output data starting at next_out. It expects the caller to
     341             :          * provide size of the input buffer and total capacity of the output
     342             :          * buffer by providing the read_size and out_size parameters
     343             :          * respectively.
     344             :          *
     345             :          * Per the documentation of LZ4, parameters read_size and out_size
     346             :          * behaves as dual parameters. On return, the number of bytes consumed
     347             :          * from the input buffer will be written back to read_size and the
     348             :          * number of bytes decompressed to output buffer will be written back
     349             :          * to out_size respectively.
     350             :          */
     351      238840 :         ret = LZ4F_decompress(mystreamer->dctx,
     352             :                               next_out, &out_size,
     353             :                               next_in, &read_size, NULL);
     354             : 
     355      238840 :         if (LZ4F_isError(ret))
     356           0 :             pg_log_error("could not decompress data: %s",
     357             :                          LZ4F_getErrorName(ret));
     358             : 
     359             :         /* Update input buffer based on number of bytes consumed */
     360      238840 :         avail_in -= read_size;
     361      238840 :         next_in += read_size;
     362             : 
     363      238840 :         mystreamer->bytes_written += out_size;
     364             : 
     365             :         /*
     366             :          * If output buffer is full then forward the content to next streamer
     367             :          * and update the output buffer.
     368             :          */
     369      238840 :         if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
     370             :         {
     371      238712 :             astreamer_content(mystreamer->base.bbs_next, member,
     372      238712 :                               mystreamer->base.bbs_buffer.data,
     373             :                               mystreamer->base.bbs_buffer.maxlen,
     374             :                               context);
     375             : 
     376      238712 :             avail_out = mystreamer->base.bbs_buffer.maxlen;
     377      238712 :             mystreamer->bytes_written = 0;
     378      238712 :             next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     379             :         }
     380             :         else
     381             :         {
     382         128 :             avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     383         128 :             next_out += mystreamer->bytes_written;
     384             :         }
     385             :     }
     386         328 : }
     387             : 
     388             : /*
     389             :  * End-of-stream processing.
     390             :  */
     391             : static void
     392           8 : astreamer_lz4_decompressor_finalize(astreamer *streamer)
     393             : {
     394             :     astreamer_lz4_frame *mystreamer;
     395             : 
     396           8 :     mystreamer = (astreamer_lz4_frame *) streamer;
     397             : 
     398             :     /*
     399             :      * End of the stream, if there is some pending data in output buffers then
     400             :      * we must forward it to next streamer.
     401             :      */
     402           8 :     astreamer_content(mystreamer->base.bbs_next, NULL,
     403           8 :                       mystreamer->base.bbs_buffer.data,
     404             :                       mystreamer->base.bbs_buffer.maxlen,
     405             :                       ASTREAMER_UNKNOWN);
     406             : 
     407           8 :     astreamer_finalize(mystreamer->base.bbs_next);
     408           8 : }
     409             : 
     410             : /*
     411             :  * Free memory.
     412             :  */
     413             : static void
     414           8 : astreamer_lz4_decompressor_free(astreamer *streamer)
     415             : {
     416             :     astreamer_lz4_frame *mystreamer;
     417             : 
     418           8 :     mystreamer = (astreamer_lz4_frame *) streamer;
     419           8 :     astreamer_free(streamer->bbs_next);
     420           8 :     LZ4F_freeDecompressionContext(mystreamer->dctx);
     421           8 :     pfree(streamer->bbs_buffer.data);
     422           8 :     pfree(streamer);
     423           8 : }
     424             : #endif

Generated by: LCOV version 1.14