LCOV - code coverage report
Current view: top level - src/fe_utils - astreamer_lz4.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 88.2 % 119 105
Test Date: 2026-03-01 19:14:57 Functions: 100.0 % 8 8
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * astreamer_lz4.c
       4              :  *
       5              :  * Archive streamers that deal with data compressed using lz4.
       6              :  * astreamer_lz4_compressor applies lz4 compression to the input stream,
       7              :  * and astreamer_lz4_decompressor does the reverse.
       8              :  *
       9              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      10              :  *
      11              :  * IDENTIFICATION
      12              :  *        src/fe_utils/astreamer_lz4.c
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : 
      16              : #include "postgres_fe.h"
      17              : 
      18              : #include <unistd.h>
      19              : 
      20              : #ifdef USE_LZ4
      21              : #include <lz4frame.h>
      22              : #endif
      23              : 
      24              : #include "common/logging.h"
      25              : #include "fe_utils/astreamer.h"
      26              : 
      27              : #ifdef USE_LZ4
      28              : typedef struct astreamer_lz4_frame
      29              : {
      30              :     astreamer   base;
      31              : 
      32              :     LZ4F_compressionContext_t cctx;
      33              :     LZ4F_decompressionContext_t dctx;
      34              :     LZ4F_preferences_t prefs;
      35              : 
      36              :     size_t      bytes_written;
      37              :     bool        header_written;
      38              : } astreamer_lz4_frame;
      39              : 
      40              : static void astreamer_lz4_compressor_content(astreamer *streamer,
      41              :                                              astreamer_member *member,
      42              :                                              const char *data, int len,
      43              :                                              astreamer_archive_context context);
      44              : static void astreamer_lz4_compressor_finalize(astreamer *streamer);
      45              : static void astreamer_lz4_compressor_free(astreamer *streamer);
      46              : 
      47              : static const astreamer_ops astreamer_lz4_compressor_ops = {
      48              :     .content = astreamer_lz4_compressor_content,
      49              :     .finalize = astreamer_lz4_compressor_finalize,
      50              :     .free = astreamer_lz4_compressor_free
      51              : };
      52              : 
      53              : static void astreamer_lz4_decompressor_content(astreamer *streamer,
      54              :                                                astreamer_member *member,
      55              :                                                const char *data, int len,
      56              :                                                astreamer_archive_context context);
      57              : static void astreamer_lz4_decompressor_finalize(astreamer *streamer);
      58              : static void astreamer_lz4_decompressor_free(astreamer *streamer);
      59              : 
      60              : static const astreamer_ops astreamer_lz4_decompressor_ops = {
      61              :     .content = astreamer_lz4_decompressor_content,
      62              :     .finalize = astreamer_lz4_decompressor_finalize,
      63              :     .free = astreamer_lz4_decompressor_free
      64              : };
      65              : #endif
      66              : 
      67              : /*
      68              :  * Create a new base backup streamer that performs lz4 compression of tar
      69              :  * blocks.
      70              :  */
      71              : astreamer *
      72            2 : astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress)
      73              : {
      74              : #ifdef USE_LZ4
      75              :     astreamer_lz4_frame *streamer;
      76              :     LZ4F_errorCode_t ctxError;
      77              :     LZ4F_preferences_t *prefs;
      78              : 
      79              :     Assert(next != NULL);
      80              : 
      81            2 :     streamer = palloc0_object(astreamer_lz4_frame);
      82            2 :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
      83              :         &astreamer_lz4_compressor_ops;
      84              : 
      85            2 :     streamer->base.bbs_next = next;
      86            2 :     initStringInfo(&streamer->base.bbs_buffer);
      87            2 :     streamer->header_written = false;
      88              : 
      89              :     /* Initialize stream compression preferences */
      90            2 :     prefs = &streamer->prefs;
      91            2 :     memset(prefs, 0, sizeof(LZ4F_preferences_t));
      92            2 :     prefs->frameInfo.blockSizeID = LZ4F_max256KB;
      93            2 :     prefs->compressionLevel = compress->level;
      94              : 
      95            2 :     ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
      96            2 :     if (LZ4F_isError(ctxError))
      97            0 :         pg_log_error("could not create lz4 compression context: %s",
      98              :                      LZ4F_getErrorName(ctxError));
      99              : 
     100            2 :     return &streamer->base;
     101              : #else
     102              :     pg_fatal("this build does not support compression with %s", "LZ4");
     103              :     return NULL;                /* keep compiler quiet */
     104              : #endif
     105              : }
     106              : 
     107              : #ifdef USE_LZ4
     108              : /*
     109              :  * Compress the input data to output buffer.
     110              :  *
     111              :  * Find out the compression bound based on input data length for each
     112              :  * invocation to make sure that output buffer has enough capacity to
     113              :  * accommodate the compressed data. In case if the output buffer
     114              :  * capacity falls short of compression bound then forward the content
     115              :  * of output buffer to next streamer and empty the buffer.
     116              :  */
     117              : static void
     118         5452 : astreamer_lz4_compressor_content(astreamer *streamer,
     119              :                                  astreamer_member *member,
     120              :                                  const char *data, int len,
     121              :                                  astreamer_archive_context context)
     122              : {
     123              :     astreamer_lz4_frame *mystreamer;
     124              :     const uint8 *next_in;
     125              :     uint8      *next_out;
     126              :     size_t      out_bound,
     127              :                 compressed_size,
     128              :                 avail_out;
     129              : 
     130         5452 :     mystreamer = (astreamer_lz4_frame *) streamer;
     131         5452 :     next_in = (const uint8 *) data;
     132              : 
     133              :     /* Write header before processing the first input chunk. */
     134         5452 :     if (!mystreamer->header_written)
     135              :     {
     136            2 :         compressed_size = LZ4F_compressBegin(mystreamer->cctx,
     137            2 :                                              (uint8 *) mystreamer->base.bbs_buffer.data,
     138            2 :                                              mystreamer->base.bbs_buffer.maxlen,
     139            2 :                                              &mystreamer->prefs);
     140              : 
     141            2 :         if (LZ4F_isError(compressed_size))
     142            0 :             pg_log_error("could not write lz4 header: %s",
     143              :                          LZ4F_getErrorName(compressed_size));
     144              : 
     145            2 :         mystreamer->bytes_written += compressed_size;
     146            2 :         mystreamer->header_written = true;
     147              :     }
     148              : 
     149              :     /*
     150              :      * Update the offset and capacity of output buffer based on number of
     151              :      * bytes written to output buffer.
     152              :      */
     153         5452 :     next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
     154         5452 :     avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     155              : 
     156              :     /*
     157              :      * Find out the compression bound and make sure that output buffer has the
     158              :      * required capacity for the success of LZ4F_compressUpdate. If needed
     159              :      * forward the content to next streamer and empty the buffer.
     160              :      */
     161         5452 :     out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
     162         5452 :     if (avail_out < out_bound)
     163              :     {
     164           34 :         astreamer_content(mystreamer->base.bbs_next, member,
     165           34 :                           mystreamer->base.bbs_buffer.data,
     166           34 :                           mystreamer->bytes_written,
     167              :                           context);
     168              : 
     169              :         /* Enlarge buffer if it falls short of out bound. */
     170           34 :         if (mystreamer->base.bbs_buffer.maxlen < out_bound)
     171            2 :             enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
     172              : 
     173           34 :         avail_out = mystreamer->base.bbs_buffer.maxlen;
     174           34 :         mystreamer->bytes_written = 0;
     175           34 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     176              :     }
     177              : 
     178              :     /*
     179              :      * This call compresses the data starting at next_in and generates the
     180              :      * output starting at next_out. It expects the caller to provide the size
     181              :      * of input buffer and capacity of output buffer by providing parameters
     182              :      * len and avail_out.
     183              :      *
     184              :      * It returns the number of bytes compressed to output buffer.
     185              :      */
     186         5452 :     compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
     187              :                                           next_out, avail_out,
     188              :                                           next_in, len, NULL);
     189              : 
     190         5452 :     if (LZ4F_isError(compressed_size))
     191            0 :         pg_log_error("could not compress data: %s",
     192              :                      LZ4F_getErrorName(compressed_size));
     193              : 
     194         5452 :     mystreamer->bytes_written += compressed_size;
     195         5452 : }
     196              : 
     197              : /*
     198              :  * End-of-stream processing.
     199              :  */
     200              : static void
     201            2 : astreamer_lz4_compressor_finalize(astreamer *streamer)
     202              : {
     203              :     astreamer_lz4_frame *mystreamer;
     204              :     uint8      *next_out;
     205              :     size_t      footer_bound,
     206              :                 compressed_size,
     207              :                 avail_out;
     208              : 
     209            2 :     mystreamer = (astreamer_lz4_frame *) streamer;
     210              : 
     211              :     /* Find out the footer bound and update the output buffer. */
     212            2 :     footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
     213            2 :     if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
     214              :         footer_bound)
     215              :     {
     216            0 :         astreamer_content(mystreamer->base.bbs_next, NULL,
     217            0 :                           mystreamer->base.bbs_buffer.data,
     218            0 :                           mystreamer->bytes_written,
     219              :                           ASTREAMER_UNKNOWN);
     220              : 
     221              :         /* Enlarge buffer if it falls short of footer bound. */
     222            0 :         if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
     223            0 :             enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
     224              : 
     225            0 :         avail_out = mystreamer->base.bbs_buffer.maxlen;
     226            0 :         mystreamer->bytes_written = 0;
     227            0 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     228              :     }
     229              :     else
     230              :     {
     231            2 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
     232            2 :         avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     233              :     }
     234              : 
     235              :     /*
     236              :      * Finalize the frame and flush whatever data remaining in compression
     237              :      * context.
     238              :      */
     239            2 :     compressed_size = LZ4F_compressEnd(mystreamer->cctx,
     240              :                                        next_out, avail_out, NULL);
     241              : 
     242            2 :     if (LZ4F_isError(compressed_size))
     243            0 :         pg_log_error("could not end lz4 compression: %s",
     244              :                      LZ4F_getErrorName(compressed_size));
     245              : 
     246            2 :     mystreamer->bytes_written += compressed_size;
     247              : 
     248            2 :     astreamer_content(mystreamer->base.bbs_next, NULL,
     249            2 :                       mystreamer->base.bbs_buffer.data,
     250            2 :                       mystreamer->bytes_written,
     251              :                       ASTREAMER_UNKNOWN);
     252              : 
     253            2 :     astreamer_finalize(mystreamer->base.bbs_next);
     254            2 : }
     255              : 
     256              : /*
     257              :  * Free memory.
     258              :  */
     259              : static void
     260            2 : astreamer_lz4_compressor_free(astreamer *streamer)
     261              : {
     262              :     astreamer_lz4_frame *mystreamer;
     263              : 
     264            2 :     mystreamer = (astreamer_lz4_frame *) streamer;
     265            2 :     astreamer_free(streamer->bbs_next);
     266            2 :     LZ4F_freeCompressionContext(mystreamer->cctx);
     267            2 :     pfree(streamer->bbs_buffer.data);
     268            2 :     pfree(streamer);
     269            2 : }
     270              : #endif
     271              : 
     272              : /*
     273              :  * Create a new base backup streamer that performs decompression of lz4
     274              :  * compressed blocks.
     275              :  */
     276              : astreamer *
     277            7 : astreamer_lz4_decompressor_new(astreamer *next)
     278              : {
     279              : #ifdef USE_LZ4
     280              :     astreamer_lz4_frame *streamer;
     281              :     LZ4F_errorCode_t ctxError;
     282              : 
     283              :     Assert(next != NULL);
     284              : 
     285            7 :     streamer = palloc0_object(astreamer_lz4_frame);
     286            7 :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
     287              :         &astreamer_lz4_decompressor_ops;
     288              : 
     289            7 :     streamer->base.bbs_next = next;
     290            7 :     initStringInfo(&streamer->base.bbs_buffer);
     291              : 
     292              :     /* Initialize internal stream state for decompression */
     293            7 :     ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
     294            7 :     if (LZ4F_isError(ctxError))
     295            0 :         pg_fatal("could not initialize compression library: %s",
     296              :                  LZ4F_getErrorName(ctxError));
     297              : 
     298            7 :     return &streamer->base;
     299              : #else
     300              :     pg_fatal("this build does not support compression with %s", "LZ4");
     301              :     return NULL;                /* keep compiler quiet */
     302              : #endif
     303              : }
     304              : 
     305              : #ifdef USE_LZ4
     306              : /*
     307              :  * Decompress the input data to output buffer until we run out of input
     308              :  * data. Each time the output buffer is full, pass on the decompressed data
     309              :  * to the next streamer.
     310              :  */
     311              : static void
     312          252 : astreamer_lz4_decompressor_content(astreamer *streamer,
     313              :                                    astreamer_member *member,
     314              :                                    const char *data, int len,
     315              :                                    astreamer_archive_context context)
     316              : {
     317              :     astreamer_lz4_frame *mystreamer;
     318              :     const uint8 *next_in;
     319              :     uint8      *next_out;
     320              :     size_t      avail_in,
     321              :                 avail_out;
     322              : 
     323          252 :     mystreamer = (astreamer_lz4_frame *) streamer;
     324          252 :     next_in = (const uint8 *) data;
     325          252 :     next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
     326          252 :     avail_in = len;
     327          252 :     avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     328              : 
     329       203706 :     while (avail_in > 0)
     330              :     {
     331              :         size_t      ret,
     332              :                     read_size,
     333              :                     out_size;
     334              : 
     335       203454 :         read_size = avail_in;
     336       203454 :         out_size = avail_out;
     337              : 
     338              :         /*
     339              :          * This call decompresses the data starting at next_in and generates
     340              :          * the output data starting at next_out. It expects the caller to
     341              :          * provide size of the input buffer and total capacity of the output
     342              :          * buffer by providing the read_size and out_size parameters
     343              :          * respectively.
     344              :          *
     345              :          * Per the documentation of LZ4, parameters read_size and out_size
     346              :          * behaves as dual parameters. On return, the number of bytes consumed
     347              :          * from the input buffer will be written back to read_size and the
     348              :          * number of bytes decompressed to output buffer will be written back
     349              :          * to out_size respectively.
     350              :          */
     351       203454 :         ret = LZ4F_decompress(mystreamer->dctx,
     352              :                               next_out, &out_size,
     353              :                               next_in, &read_size, NULL);
     354              : 
     355       203454 :         if (LZ4F_isError(ret))
     356            0 :             pg_log_error("could not decompress data: %s",
     357              :                          LZ4F_getErrorName(ret));
     358              : 
     359              :         /* Update input buffer based on number of bytes consumed */
     360       203454 :         avail_in -= read_size;
     361       203454 :         next_in += read_size;
     362              : 
     363       203454 :         mystreamer->bytes_written += out_size;
     364              : 
     365              :         /*
     366              :          * If output buffer is full then forward the content to next streamer
     367              :          * and update the output buffer.
     368              :          */
     369       203454 :         if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
     370              :         {
     371       203304 :             astreamer_content(mystreamer->base.bbs_next, member,
     372       203304 :                               mystreamer->base.bbs_buffer.data,
     373              :                               mystreamer->base.bbs_buffer.maxlen,
     374              :                               context);
     375              : 
     376       203304 :             avail_out = mystreamer->base.bbs_buffer.maxlen;
     377       203304 :             mystreamer->bytes_written = 0;
     378       203304 :             next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     379              :         }
     380              :         else
     381              :         {
     382          150 :             avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     383          150 :             next_out += mystreamer->bytes_written;
     384              :         }
     385              :     }
     386          252 : }
     387              : 
     388              : /*
     389              :  * End-of-stream processing.
     390              :  */
     391              : static void
     392            7 : astreamer_lz4_decompressor_finalize(astreamer *streamer)
     393              : {
     394              :     astreamer_lz4_frame *mystreamer;
     395              : 
     396            7 :     mystreamer = (astreamer_lz4_frame *) streamer;
     397              : 
     398              :     /*
     399              :      * End of the stream, if there is some pending data in output buffers then
     400              :      * we must forward it to next streamer.
     401              :      */
     402            7 :     astreamer_content(mystreamer->base.bbs_next, NULL,
     403            7 :                       mystreamer->base.bbs_buffer.data,
     404              :                       mystreamer->base.bbs_buffer.maxlen,
     405              :                       ASTREAMER_UNKNOWN);
     406              : 
     407            7 :     astreamer_finalize(mystreamer->base.bbs_next);
     408            7 : }
     409              : 
     410              : /*
     411              :  * Free memory.
     412              :  */
     413              : static void
     414            7 : astreamer_lz4_decompressor_free(astreamer *streamer)
     415              : {
     416              :     astreamer_lz4_frame *mystreamer;
     417              : 
     418            7 :     mystreamer = (astreamer_lz4_frame *) streamer;
     419            7 :     astreamer_free(streamer->bbs_next);
     420            7 :     LZ4F_freeDecompressionContext(mystreamer->dctx);
     421            7 :     pfree(streamer->bbs_buffer.data);
     422            7 :     pfree(streamer);
     423            7 : }
     424              : #endif
        

Generated by: LCOV version 2.0-1