LCOV - code coverage report
Current view: top level - src/bin/pg_dump - compress_lz4.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 217 288 75.3 %
Date: 2025-08-31 01:17:28 Functions: 14 18 77.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * compress_lz4.c
       4             :  *   Routines for archivers to write a LZ4 compressed data stream.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *     src/bin/pg_dump/compress_lz4.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : #include "postgres_fe.h"
      15             : #include <unistd.h>
      16             : 
      17             : #include "compress_lz4.h"
      18             : #include "pg_backup_utils.h"
      19             : 
      20             : #ifdef USE_LZ4
      21             : #include <lz4frame.h>
      22             : 
      23             : /*
      24             :  * LZ4F_HEADER_SIZE_MAX first appeared in v1.7.5 of the library.
      25             :  * Redefine it for installations with a lesser version.
      26             :  */
      27             : #ifndef LZ4F_HEADER_SIZE_MAX
      28             : #define LZ4F_HEADER_SIZE_MAX    32
      29             : #endif
      30             : 
      31             : /*---------------------------------
      32             :  * Common to both compression APIs
      33             :  *---------------------------------
      34             :  */
      35             : 
      36             : /*
      37             :  * (de)compression state used by both the Compressor and Stream APIs.
      38             :  */
      39             : typedef struct LZ4State
      40             : {
      41             :     /*
      42             :      * Used by the Stream API to keep track of the file stream.
      43             :      */
      44             :     FILE       *fp;
      45             : 
      46             :     LZ4F_preferences_t prefs;
      47             : 
      48             :     LZ4F_compressionContext_t ctx;
      49             :     LZ4F_decompressionContext_t dtx;
      50             : 
      51             :     /*
      52             :      * Used by the Stream API's lazy initialization.
      53             :      */
      54             :     bool        inited;
      55             : 
      56             :     /*
      57             :      * Used by the Stream API to distinguish between compression and
      58             :      * decompression operations.
      59             :      */
      60             :     bool        compressing;
      61             : 
      62             :     /*
      63             :      * Used by the Compressor API to mark if the compression headers have been
      64             :      * written after initialization.
      65             :      */
      66             :     bool        needs_header_flush;
      67             : 
      68             :     size_t      buflen;
      69             :     char       *buffer;
      70             : 
      71             :     /*
      72             :      * Used by the Stream API to store already uncompressed data that the
      73             :      * caller has not consumed.
      74             :      */
      75             :     size_t      overflowalloclen;
      76             :     size_t      overflowlen;
      77             :     char       *overflowbuf;
      78             : 
      79             :     /*
      80             :      * Used by both APIs to keep track of the compressed data length stored in
      81             :      * the buffer.
      82             :      */
      83             :     size_t      compressedlen;
      84             : 
      85             :     /*
      86             :      * Used by both APIs to keep track of error codes.
      87             :      */
      88             :     size_t      errcode;
      89             : } LZ4State;
      90             : 
      91             : /*
      92             :  * LZ4State_compression_init
      93             :  *      Initialize the required LZ4State members for compression.
      94             :  *
      95             :  * Write the LZ4 frame header in a buffer keeping track of its length. Users of
      96             :  * this function can choose when and how to write the header to a file stream.
      97             :  *
      98             :  * Returns true on success. In case of a failure returns false, and stores the
      99             :  * error code in state->errcode.
     100             :  */
     101             : static bool
     102         160 : LZ4State_compression_init(LZ4State *state)
     103             : {
     104             :     size_t      status;
     105             : 
     106         160 :     state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs);
     107             : 
     108             :     /*
     109             :      * LZ4F_compressBegin requires a buffer that is greater or equal to
     110             :      * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met.
     111             :      */
     112         160 :     if (state->buflen < LZ4F_HEADER_SIZE_MAX)
     113           0 :         state->buflen = LZ4F_HEADER_SIZE_MAX;
     114             : 
     115         160 :     status = LZ4F_createCompressionContext(&state->ctx, LZ4F_VERSION);
     116         160 :     if (LZ4F_isError(status))
     117             :     {
     118           0 :         state->errcode = status;
     119           0 :         return false;
     120             :     }
     121             : 
     122         160 :     state->buffer = pg_malloc(state->buflen);
     123         160 :     status = LZ4F_compressBegin(state->ctx,
     124         160 :                                 state->buffer, state->buflen,
     125         160 :                                 &state->prefs);
     126         160 :     if (LZ4F_isError(status))
     127             :     {
     128           0 :         state->errcode = status;
     129           0 :         return false;
     130             :     }
     131             : 
     132         160 :     state->compressedlen = status;
     133             : 
     134         160 :     return true;
     135             : }
     136             : 
     137             : /*----------------------
     138             :  * Compressor API
     139             :  *----------------------
     140             :  */
     141             : 
     142             : /* Private routines that support LZ4 compressed data I/O */
     143             : 
     144             : static void
     145          80 : ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
     146             : {
     147             :     size_t      r;
     148             :     size_t      readbuflen;
     149             :     char       *outbuf;
     150             :     char       *readbuf;
     151          80 :     LZ4F_decompressionContext_t ctx = NULL;
     152             :     LZ4F_decompressOptions_t dec_opt;
     153             :     LZ4F_errorCode_t status;
     154             : 
     155          80 :     memset(&dec_opt, 0, sizeof(dec_opt));
     156          80 :     status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
     157          80 :     if (LZ4F_isError(status))
     158           0 :         pg_fatal("could not create LZ4 decompression context: %s",
     159             :                  LZ4F_getErrorName(status));
     160             : 
     161          80 :     outbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
     162          80 :     readbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
     163          80 :     readbuflen = DEFAULT_IO_BUFFER_SIZE;
     164         240 :     while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0)
     165             :     {
     166             :         char       *readp;
     167             :         char       *readend;
     168             : 
     169             :         /* Process one chunk */
     170         160 :         readp = readbuf;
     171         160 :         readend = readbuf + r;
     172         326 :         while (readp < readend)
     173             :         {
     174         166 :             size_t      out_size = DEFAULT_IO_BUFFER_SIZE;
     175         166 :             size_t      read_size = readend - readp;
     176             : 
     177         166 :             memset(outbuf, 0, DEFAULT_IO_BUFFER_SIZE);
     178         166 :             status = LZ4F_decompress(ctx, outbuf, &out_size,
     179             :                                      readp, &read_size, &dec_opt);
     180         166 :             if (LZ4F_isError(status))
     181           0 :                 pg_fatal("could not decompress: %s",
     182             :                          LZ4F_getErrorName(status));
     183             : 
     184         166 :             ahwrite(outbuf, 1, out_size, AH);
     185         166 :             readp += read_size;
     186             :         }
     187             :     }
     188             : 
     189          80 :     pg_free(outbuf);
     190          80 :     pg_free(readbuf);
     191             : 
     192          80 :     status = LZ4F_freeDecompressionContext(ctx);
     193          80 :     if (LZ4F_isError(status))
     194           0 :         pg_fatal("could not free LZ4 decompression context: %s",
     195             :                  LZ4F_getErrorName(status));
     196          80 : }
     197             : 
     198             : static void
     199         338 : WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
     200             :                       const void *data, size_t dLen)
     201             : {
     202         338 :     LZ4State   *state = (LZ4State *) cs->private_data;
     203         338 :     size_t      remaining = dLen;
     204             :     size_t      status;
     205             :     size_t      chunk;
     206             : 
     207             :     /* Write the header if not yet written. */
     208         338 :     if (state->needs_header_flush)
     209             :     {
     210          78 :         cs->writeF(AH, state->buffer, state->compressedlen);
     211          78 :         state->needs_header_flush = false;
     212             :     }
     213             : 
     214         682 :     while (remaining > 0)
     215             :     {
     216             : 
     217         344 :         if (remaining > DEFAULT_IO_BUFFER_SIZE)
     218           6 :             chunk = DEFAULT_IO_BUFFER_SIZE;
     219             :         else
     220         338 :             chunk = remaining;
     221             : 
     222         344 :         remaining -= chunk;
     223         344 :         status = LZ4F_compressUpdate(state->ctx,
     224         344 :                                      state->buffer, state->buflen,
     225             :                                      data, chunk, NULL);
     226             : 
     227         344 :         if (LZ4F_isError(status))
     228           0 :             pg_fatal("could not compress data: %s",
     229             :                      LZ4F_getErrorName(status));
     230             : 
     231         344 :         cs->writeF(AH, state->buffer, status);
     232             : 
     233         344 :         data = ((char *) data) + chunk;
     234             :     }
     235         338 : }
     236             : 
     237             : static void
     238         160 : EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
     239             : {
     240         160 :     LZ4State   *state = (LZ4State *) cs->private_data;
     241             :     size_t      status;
     242             : 
     243             :     /* Nothing needs to be done */
     244         160 :     if (!state)
     245          80 :         return;
     246             : 
     247             :     /*
     248             :      * Write the header if not yet written. The caller is not required to call
     249             :      * writeData if the relation does not contain any data. Thus it is
     250             :      * possible to reach here without having flushed the header. Do it before
     251             :      * ending the compression.
     252             :      */
     253          80 :     if (state->needs_header_flush)
     254           2 :         cs->writeF(AH, state->buffer, state->compressedlen);
     255             : 
     256          80 :     status = LZ4F_compressEnd(state->ctx,
     257          80 :                               state->buffer, state->buflen,
     258             :                               NULL);
     259          80 :     if (LZ4F_isError(status))
     260           0 :         pg_fatal("could not end compression: %s",
     261             :                  LZ4F_getErrorName(status));
     262             : 
     263          80 :     cs->writeF(AH, state->buffer, status);
     264             : 
     265          80 :     status = LZ4F_freeCompressionContext(state->ctx);
     266          80 :     if (LZ4F_isError(status))
     267           0 :         pg_fatal("could not end compression: %s",
     268             :                  LZ4F_getErrorName(status));
     269             : 
     270          80 :     pg_free(state->buffer);
     271          80 :     pg_free(state);
     272             : 
     273          80 :     cs->private_data = NULL;
     274             : }
     275             : 
     276             : /*
     277             :  * Public routines that support LZ4 compressed data I/O
     278             :  */
     279             : void
     280         160 : InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec)
     281             : {
     282             :     LZ4State   *state;
     283             : 
     284         160 :     cs->readData = ReadDataFromArchiveLZ4;
     285         160 :     cs->writeData = WriteDataToArchiveLZ4;
     286         160 :     cs->end = EndCompressorLZ4;
     287             : 
     288         160 :     cs->compression_spec = compression_spec;
     289             : 
     290             :     /*
     291             :      * Read operations have access to the whole input. No state needs to be
     292             :      * carried between calls.
     293             :      */
     294         160 :     if (cs->readF)
     295          80 :         return;
     296             : 
     297          80 :     state = pg_malloc0(sizeof(*state));
     298          80 :     if (cs->compression_spec.level >= 0)
     299          80 :         state->prefs.compressionLevel = cs->compression_spec.level;
     300             : 
     301          80 :     if (!LZ4State_compression_init(state))
     302           0 :         pg_fatal("could not initialize LZ4 compression: %s",
     303             :                  LZ4F_getErrorName(state->errcode));
     304             : 
     305             :     /* Remember that the header has not been written. */
     306          80 :     state->needs_header_flush = true;
     307          80 :     cs->private_data = state;
     308             : }
     309             : 
     310             : /*----------------------
     311             :  * Compress Stream API
     312             :  *----------------------
     313             :  */
     314             : 
     315             : 
     316             : /*
     317             :  * LZ4 equivalent to feof() or gzeof().  Return true iff there is no
     318             :  * decompressed output in the overflow buffer and the end of the backing file
     319             :  * is reached.
     320             :  */
     321             : static bool
     322           0 : LZ4Stream_eof(CompressFileHandle *CFH)
     323             : {
     324           0 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     325             : 
     326           0 :     return state->overflowlen == 0 && feof(state->fp);
     327             : }
     328             : 
     329             : static const char *
     330           0 : LZ4Stream_get_error(CompressFileHandle *CFH)
     331             : {
     332           0 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     333             :     const char *errmsg;
     334             : 
     335           0 :     if (LZ4F_isError(state->errcode))
     336           0 :         errmsg = LZ4F_getErrorName(state->errcode);
     337             :     else
     338           0 :         errmsg = strerror(errno);
     339             : 
     340           0 :     return errmsg;
     341             : }
     342             : 
     343             : /*
     344             :  * Initialize an already alloc'ed LZ4State struct for subsequent calls.
     345             :  *
     346             :  * Creates the necessary contexts for either compression or decompression. When
     347             :  * compressing data (indicated by compressing=true), it additionally writes the
     348             :  * LZ4 header in the output stream.
     349             :  *
     350             :  * Returns true on success. In case of a failure returns false, and stores the
     351             :  * error code in state->errcode.
     352             :  */
     353             : static bool
     354        4918 : LZ4Stream_init(LZ4State *state, int size, bool compressing)
     355             : {
     356             :     size_t      status;
     357             : 
     358        4918 :     if (state->inited)
     359        4758 :         return true;
     360             : 
     361         160 :     state->compressing = compressing;
     362             : 
     363             :     /* When compressing, write LZ4 header to the output stream. */
     364         160 :     if (state->compressing)
     365             :     {
     366             : 
     367          80 :         if (!LZ4State_compression_init(state))
     368           0 :             return false;
     369             : 
     370          80 :         errno = 0;
     371          80 :         if (fwrite(state->buffer, 1, state->compressedlen, state->fp) != state->compressedlen)
     372             :         {
     373           0 :             errno = (errno) ? errno : ENOSPC;
     374           0 :             return false;
     375             :         }
     376             :     }
     377             :     else
     378             :     {
     379          80 :         status = LZ4F_createDecompressionContext(&state->dtx, LZ4F_VERSION);
     380          80 :         if (LZ4F_isError(status))
     381             :         {
     382           0 :             state->errcode = status;
     383           0 :             return false;
     384             :         }
     385             : 
     386          80 :         state->buflen = Max(size, DEFAULT_IO_BUFFER_SIZE);
     387          80 :         state->buffer = pg_malloc(state->buflen);
     388             : 
     389          80 :         state->overflowalloclen = state->buflen;
     390          80 :         state->overflowbuf = pg_malloc(state->overflowalloclen);
     391          80 :         state->overflowlen = 0;
     392             :     }
     393             : 
     394         160 :     state->inited = true;
     395         160 :     return true;
     396             : }
     397             : 
     398             : /*
     399             :  * Read already decompressed content from the overflow buffer into 'ptr' up to
     400             :  * 'size' bytes, if available. If the eol_flag is set, then stop at the first
     401             :  * occurrence of the newline char prior to 'size' bytes.
     402             :  *
     403             :  * Any unread content in the overflow buffer is moved to the beginning.
     404             :  *
     405             :  * Returns the number of bytes read from the overflow buffer (and copied into
     406             :  * the 'ptr' buffer), or 0 if the overflow buffer is empty.
     407             :  */
     408             : static int
     409         164 : LZ4Stream_read_overflow(LZ4State *state, void *ptr, int size, bool eol_flag)
     410             : {
     411             :     char       *p;
     412         164 :     int         readlen = 0;
     413             : 
     414         164 :     if (state->overflowlen == 0)
     415         158 :         return 0;
     416             : 
     417           6 :     if (state->overflowlen >= size)
     418           4 :         readlen = size;
     419             :     else
     420           2 :         readlen = state->overflowlen;
     421             : 
     422           6 :     if (eol_flag && (p = memchr(state->overflowbuf, '\n', readlen)))
     423             :         /* Include the line terminating char */
     424           0 :         readlen = p - state->overflowbuf + 1;
     425             : 
     426           6 :     memcpy(ptr, state->overflowbuf, readlen);
     427           6 :     state->overflowlen -= readlen;
     428             : 
     429           6 :     if (state->overflowlen > 0)
     430           4 :         memmove(state->overflowbuf, state->overflowbuf + readlen, state->overflowlen);
     431             : 
     432           6 :     return readlen;
     433             : }
     434             : 
     435             : /*
     436             :  * The workhorse for reading decompressed content out of an LZ4 compressed
     437             :  * stream.
     438             :  *
     439             :  * It will read up to 'ptrsize' decompressed content, or up to the new line
     440             :  * char if found first when the eol_flag is set. It is possible that the
     441             :  * decompressed output generated by reading any compressed input via the
     442             :  * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored
     443             :  * at an overflow buffer within LZ4State. Of course, when the function is
     444             :  * called, it will first try to consume any decompressed content already
     445             :  * present in the overflow buffer, before decompressing new content.
     446             :  *
     447             :  * Returns the number of bytes of decompressed data copied into the ptr
     448             :  * buffer, or -1 in case of error.
     449             :  */
     450             : static int
     451         164 : LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
     452             : {
     453         164 :     int         dsize = 0;
     454             :     int         rsize;
     455         164 :     int         size = ptrsize;
     456         164 :     bool        eol_found = false;
     457             : 
     458             :     void       *readbuf;
     459             : 
     460             :     /* Lazy init */
     461         164 :     if (!LZ4Stream_init(state, size, false /* decompressing */ ))
     462             :     {
     463           0 :         pg_log_error("unable to initialize LZ4 library: %s",
     464             :                      LZ4F_getErrorName(state->errcode));
     465           0 :         return -1;
     466             :     }
     467             : 
     468             :     /* No work needs to be done for a zero-sized output buffer */
     469         164 :     if (size <= 0)
     470           0 :         return 0;
     471             : 
     472             :     /* Verify that there is enough space in the outbuf */
     473         164 :     if (size > state->buflen)
     474             :     {
     475           0 :         state->buflen = size;
     476           0 :         state->buffer = pg_realloc(state->buffer, size);
     477             :     }
     478             : 
     479             :     /* use already decompressed content if available */
     480         164 :     dsize = LZ4Stream_read_overflow(state, ptr, size, eol_flag);
     481         164 :     if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
     482           4 :         return dsize;
     483             : 
     484         160 :     readbuf = pg_malloc(size);
     485             : 
     486             :     do
     487             :     {
     488             :         char       *rp;
     489             :         char       *rend;
     490             : 
     491         166 :         rsize = fread(readbuf, 1, size, state->fp);
     492         166 :         if (rsize < size && !feof(state->fp))
     493             :         {
     494           0 :             pg_log_error("could not read from input file: %m");
     495           0 :             return -1;
     496             :         }
     497             : 
     498         166 :         rp = (char *) readbuf;
     499         166 :         rend = (char *) readbuf + rsize;
     500             : 
     501         256 :         while (rp < rend)
     502             :         {
     503             :             size_t      status;
     504          90 :             size_t      outlen = state->buflen;
     505          90 :             size_t      read_remain = rend - rp;
     506             : 
     507          90 :             memset(state->buffer, 0, outlen);
     508          90 :             status = LZ4F_decompress(state->dtx, state->buffer, &outlen,
     509             :                                      rp, &read_remain, NULL);
     510          90 :             if (LZ4F_isError(status))
     511             :             {
     512           0 :                 state->errcode = status;
     513           0 :                 pg_log_error("could not read from input file: %s",
     514             :                              LZ4F_getErrorName(state->errcode));
     515           0 :                 return -1;
     516             :             }
     517             : 
     518          90 :             rp += read_remain;
     519             : 
     520             :             /*
     521             :              * fill in what space is available in ptr if the eol flag is set,
     522             :              * either skip if one already found or fill up to EOL if present
     523             :              * in the outbuf
     524             :              */
     525          90 :             if (outlen > 0 && dsize < size && eol_found == false)
     526             :             {
     527             :                 char       *p;
     528          78 :                 size_t      lib = (!eol_flag) ? size - dsize : size - 1 - dsize;
     529          78 :                 size_t      len = outlen < lib ? outlen : lib;
     530             : 
     531          78 :                 if (eol_flag &&
     532           0 :                     (p = memchr(state->buffer, '\n', outlen)) &&
     533           0 :                     (size_t) (p - state->buffer + 1) <= len)
     534             :                 {
     535           0 :                     len = p - state->buffer + 1;
     536           0 :                     eol_found = true;
     537             :                 }
     538             : 
     539          78 :                 memcpy((char *) ptr + dsize, state->buffer, len);
     540          78 :                 dsize += len;
     541             : 
     542             :                 /* move what did not fit, if any, at the beginning of the buf */
     543          78 :                 if (len < outlen)
     544           0 :                     memmove(state->buffer, state->buffer + len, outlen - len);
     545          78 :                 outlen -= len;
     546             :             }
     547             : 
     548             :             /* if there is available output, save it */
     549          90 :             if (outlen > 0)
     550             :             {
     551          10 :                 while (state->overflowlen + outlen > state->overflowalloclen)
     552             :                 {
     553           4 :                     state->overflowalloclen *= 2;
     554           4 :                     state->overflowbuf = pg_realloc(state->overflowbuf,
     555             :                                                     state->overflowalloclen);
     556             :                 }
     557             : 
     558           6 :                 memcpy(state->overflowbuf + state->overflowlen, state->buffer, outlen);
     559           6 :                 state->overflowlen += outlen;
     560             :             }
     561             :         }
     562         166 :     } while (rsize == size && dsize < size && eol_found == false);
     563             : 
     564         160 :     pg_free(readbuf);
     565             : 
     566         160 :     return dsize;
     567             : }
     568             : 
     569             : /*
     570             :  * Compress size bytes from ptr and write them to the stream.
     571             :  */
     572             : static void
     573        4754 : LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
     574             : {
     575        4754 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     576             :     size_t      status;
     577        4754 :     int         remaining = size;
     578             : 
     579             :     /* Lazy init */
     580        4754 :     if (!LZ4Stream_init(state, size, true))
     581           0 :         pg_fatal("unable to initialize LZ4 library: %s",
     582             :                  LZ4F_getErrorName(state->errcode));
     583             : 
     584        9520 :     while (remaining > 0)
     585             :     {
     586        4766 :         int         chunk = Min(remaining, DEFAULT_IO_BUFFER_SIZE);
     587             : 
     588        4766 :         remaining -= chunk;
     589             : 
     590        4766 :         status = LZ4F_compressUpdate(state->ctx, state->buffer, state->buflen,
     591             :                                      ptr, chunk, NULL);
     592        4766 :         if (LZ4F_isError(status))
     593           0 :             pg_fatal("error during writing: %s", LZ4F_getErrorName(status));
     594             : 
     595        4766 :         errno = 0;
     596        4766 :         if (fwrite(state->buffer, 1, status, state->fp) != status)
     597             :         {
     598           0 :             errno = (errno) ? errno : ENOSPC;
     599           0 :             pg_fatal("error during writing: %m");
     600             :         }
     601             : 
     602        4766 :         ptr = ((const char *) ptr) + chunk;
     603             :     }
     604        4754 : }
     605             : 
     606             : /*
     607             :  * fread() equivalent implementation for LZ4 compressed files.
     608             :  */
     609             : static size_t
     610         164 : LZ4Stream_read(void *ptr, size_t size, CompressFileHandle *CFH)
     611             : {
     612         164 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     613             :     int         ret;
     614             : 
     615         164 :     if ((ret = LZ4Stream_read_internal(state, ptr, size, false)) < 0)
     616           0 :         pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
     617             : 
     618         164 :     return (size_t) ret;
     619             : }
     620             : 
     621             : /*
     622             :  * fgetc() equivalent implementation for LZ4 compressed files.
     623             :  */
     624             : static int
     625           0 : LZ4Stream_getc(CompressFileHandle *CFH)
     626             : {
     627           0 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     628             :     unsigned char c;
     629             : 
     630           0 :     if (LZ4Stream_read_internal(state, &c, 1, false) <= 0)
     631             :     {
     632           0 :         if (!LZ4Stream_eof(CFH))
     633           0 :             pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
     634             :         else
     635           0 :             pg_fatal("could not read from input file: end of file");
     636             :     }
     637             : 
     638           0 :     return c;
     639             : }
     640             : 
     641             : /*
     642             :  * fgets() equivalent implementation for LZ4 compressed files.
     643             :  */
     644             : static char *
     645           0 : LZ4Stream_gets(char *ptr, int size, CompressFileHandle *CFH)
     646             : {
     647           0 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     648             :     int         ret;
     649             : 
     650           0 :     ret = LZ4Stream_read_internal(state, ptr, size - 1, true);
     651             : 
     652             :     /*
     653             :      * LZ4Stream_read_internal returning 0 or -1 means that it was either an
     654             :      * EOF or an error, but gets_func is defined to return NULL in either case
     655             :      * so we can treat both the same here.
     656             :      */
     657           0 :     if (ret <= 0)
     658           0 :         return NULL;
     659             : 
     660             :     /*
     661             :      * Our caller expects the return string to be NULL terminated and we know
     662             :      * that ret is greater than zero.
     663             :      */
     664           0 :     ptr[ret - 1] = '\0';
     665             : 
     666           0 :     return ptr;
     667             : }
     668             : 
     669             : /*
     670             :  * Finalize (de)compression of a stream. When compressing it will write any
     671             :  * remaining content and/or generated footer from the LZ4 API.
     672             :  */
     673             : static bool
     674         162 : LZ4Stream_close(CompressFileHandle *CFH)
     675             : {
     676             :     FILE       *fp;
     677         162 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     678             :     size_t      status;
     679             :     int         ret;
     680             : 
     681         162 :     fp = state->fp;
     682         162 :     if (state->inited)
     683             :     {
     684         160 :         if (state->compressing)
     685             :         {
     686          80 :             status = LZ4F_compressEnd(state->ctx, state->buffer, state->buflen, NULL);
     687          80 :             if (LZ4F_isError(status))
     688             :             {
     689           0 :                 pg_log_error("could not end compression: %s",
     690             :                              LZ4F_getErrorName(status));
     691             :             }
     692             :             else
     693             :             {
     694          80 :                 errno = 0;
     695          80 :                 if (fwrite(state->buffer, 1, status, state->fp) != status)
     696             :                 {
     697           0 :                     errno = (errno) ? errno : ENOSPC;
     698           0 :                     pg_log_error("could not write to output file: %m");
     699             :                 }
     700             :             }
     701             : 
     702          80 :             status = LZ4F_freeCompressionContext(state->ctx);
     703          80 :             if (LZ4F_isError(status))
     704           0 :                 pg_log_error("could not end compression: %s",
     705             :                              LZ4F_getErrorName(status));
     706             :         }
     707             :         else
     708             :         {
     709          80 :             status = LZ4F_freeDecompressionContext(state->dtx);
     710          80 :             if (LZ4F_isError(status))
     711           0 :                 pg_log_error("could not end decompression: %s",
     712             :                              LZ4F_getErrorName(status));
     713          80 :             pg_free(state->overflowbuf);
     714             :         }
     715             : 
     716         160 :         pg_free(state->buffer);
     717             :     }
     718             : 
     719         162 :     pg_free(state);
     720         162 :     CFH->private_data = NULL;
     721             : 
     722         162 :     errno = 0;
     723         162 :     ret = fclose(fp);
     724         162 :     if (ret != 0)
     725             :     {
     726           0 :         pg_log_error("could not close file: %m");
     727           0 :         return false;
     728             :     }
     729             : 
     730         162 :     return true;
     731             : }
     732             : 
     733             : static bool
     734         162 : LZ4Stream_open(const char *path, int fd, const char *mode,
     735             :                CompressFileHandle *CFH)
     736             : {
     737         162 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     738             : 
     739         162 :     if (fd >= 0)
     740           0 :         state->fp = fdopen(dup(fd), mode);
     741             :     else
     742         162 :         state->fp = fopen(path, mode);
     743         162 :     if (state->fp == NULL)
     744             :     {
     745           0 :         state->errcode = errno;
     746           0 :         return false;
     747             :     }
     748             : 
     749         162 :     return true;
     750             : }
     751             : 
     752             : static bool
     753          80 : LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
     754             : {
     755             :     char       *fname;
     756             :     int         save_errno;
     757             :     bool        ret;
     758             : 
     759          80 :     fname = psprintf("%s.lz4", path);
     760          80 :     ret = CFH->open_func(fname, -1, mode, CFH);
     761             : 
     762          80 :     save_errno = errno;
     763          80 :     pg_free(fname);
     764          80 :     errno = save_errno;
     765             : 
     766          80 :     return ret;
     767             : }
     768             : 
     769             : /*
     770             :  * Public routines
     771             :  */
     772             : void
     773         162 : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
     774             :                           const pg_compress_specification compression_spec)
     775             : {
     776             :     LZ4State   *state;
     777             : 
     778         162 :     CFH->open_func = LZ4Stream_open;
     779         162 :     CFH->open_write_func = LZ4Stream_open_write;
     780         162 :     CFH->read_func = LZ4Stream_read;
     781         162 :     CFH->write_func = LZ4Stream_write;
     782         162 :     CFH->gets_func = LZ4Stream_gets;
     783         162 :     CFH->getc_func = LZ4Stream_getc;
     784         162 :     CFH->eof_func = LZ4Stream_eof;
     785         162 :     CFH->close_func = LZ4Stream_close;
     786         162 :     CFH->get_error_func = LZ4Stream_get_error;
     787             : 
     788         162 :     CFH->compression_spec = compression_spec;
     789         162 :     state = pg_malloc0(sizeof(*state));
     790         162 :     if (CFH->compression_spec.level >= 0)
     791         162 :         state->prefs.compressionLevel = CFH->compression_spec.level;
     792             : 
     793         162 :     CFH->private_data = state;
     794         162 : }
     795             : #else                           /* USE_LZ4 */
     796             : void
     797             : InitCompressorLZ4(CompressorState *cs,
     798             :                   const pg_compress_specification compression_spec)
     799             : {
     800             :     pg_fatal("this build does not support compression with %s", "LZ4");
     801             : }
     802             : 
     803             : void
     804             : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
     805             :                           const pg_compress_specification compression_spec)
     806             : {
     807             :     pg_fatal("this build does not support compression with %s", "LZ4");
     808             : }
     809             : #endif                          /* USE_LZ4 */

Generated by: LCOV version 1.16