LCOV - code coverage report
Current view: top level - src/bin/pg_dump - compress_lz4.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 213 282 75.5 %
Date: 2024-10-10 04:14:55 Functions: 14 18 77.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * compress_lz4.c
       4             :  *   Routines for archivers to write a LZ4 compressed data stream.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *     src/bin/pg_dump/compress_lz4.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : #include "postgres_fe.h"
      15             : 
      16             : #include "compress_lz4.h"
      17             : #include "pg_backup_utils.h"
      18             : 
      19             : #ifdef USE_LZ4
      20             : #include <lz4frame.h>
      21             : 
      22             : /*
      23             :  * LZ4F_HEADER_SIZE_MAX first appeared in v1.7.5 of the library.
      24             :  * Redefine it for installations with a lesser version.
      25             :  */
      26             : #ifndef LZ4F_HEADER_SIZE_MAX
      27             : #define LZ4F_HEADER_SIZE_MAX    32
      28             : #endif
      29             : 
      30             : /*---------------------------------
      31             :  * Common to both compression APIs
      32             :  *---------------------------------
      33             :  */
      34             : 
      35             : /*
      36             :  * (de)compression state used by both the Compressor and Stream APIs.
      37             :  */
      38             : typedef struct LZ4State
      39             : {
      40             :     /*
      41             :      * Used by the Stream API to keep track of the file stream.
      42             :      */
      43             :     FILE       *fp;
      44             : 
      45             :     LZ4F_preferences_t prefs;
      46             : 
      47             :     LZ4F_compressionContext_t ctx;
      48             :     LZ4F_decompressionContext_t dtx;
      49             : 
      50             :     /*
      51             :      * Used by the Stream API's lazy initialization.
      52             :      */
      53             :     bool        inited;
      54             : 
      55             :     /*
      56             :      * Used by the Stream API to distinguish between compression and
      57             :      * decompression operations.
      58             :      */
      59             :     bool        compressing;
      60             : 
      61             :     /*
      62             :      * Used by the Compressor API to mark if the compression headers have been
      63             :      * written after initialization.
      64             :      */
      65             :     bool        needs_header_flush;
      66             : 
      67             :     size_t      buflen;
      68             :     char       *buffer;
      69             : 
      70             :     /*
      71             :      * Used by the Stream API to store already uncompressed data that the
      72             :      * caller has not consumed.
      73             :      */
      74             :     size_t      overflowalloclen;
      75             :     size_t      overflowlen;
      76             :     char       *overflowbuf;
      77             : 
      78             :     /*
      79             :      * Used by both APIs to keep track of the compressed data length stored in
      80             :      * the buffer.
      81             :      */
      82             :     size_t      compressedlen;
      83             : 
      84             :     /*
      85             :      * Used by both APIs to keep track of error codes.
      86             :      */
      87             :     size_t      errcode;
      88             : } LZ4State;
      89             : 
      90             : /*
      91             :  * LZ4State_compression_init
      92             :  *      Initialize the required LZ4State members for compression.
      93             :  *
      94             :  * Write the LZ4 frame header in a buffer keeping track of its length. Users of
      95             :  * this function can choose when and how to write the header to a file stream.
      96             :  *
      97             :  * Returns true on success. In case of a failure returns false, and stores the
      98             :  * error code in state->errcode.
      99             :  */
     100             : static bool
     101         128 : LZ4State_compression_init(LZ4State *state)
     102             : {
     103             :     size_t      status;
     104             : 
     105         128 :     state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs);
     106             : 
     107             :     /*
     108             :      * LZ4F_compressBegin requires a buffer that is greater or equal to
     109             :      * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met.
     110             :      */
     111         128 :     if (state->buflen < LZ4F_HEADER_SIZE_MAX)
     112           0 :         state->buflen = LZ4F_HEADER_SIZE_MAX;
     113             : 
     114         128 :     status = LZ4F_createCompressionContext(&state->ctx, LZ4F_VERSION);
     115         128 :     if (LZ4F_isError(status))
     116             :     {
     117           0 :         state->errcode = status;
     118           0 :         return false;
     119             :     }
     120             : 
     121         128 :     state->buffer = pg_malloc(state->buflen);
     122         128 :     status = LZ4F_compressBegin(state->ctx,
     123         128 :                                 state->buffer, state->buflen,
     124         128 :                                 &state->prefs);
     125         128 :     if (LZ4F_isError(status))
     126             :     {
     127           0 :         state->errcode = status;
     128           0 :         return false;
     129             :     }
     130             : 
     131         128 :     state->compressedlen = status;
     132             : 
     133         128 :     return true;
     134             : }
     135             : 
     136             : /*----------------------
     137             :  * Compressor API
     138             :  *----------------------
     139             :  */
     140             : 
     141             : /* Private routines that support LZ4 compressed data I/O */
     142             : 
     143             : static void
     144          64 : ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
     145             : {
     146             :     size_t      r;
     147             :     size_t      readbuflen;
     148             :     char       *outbuf;
     149             :     char       *readbuf;
     150          64 :     LZ4F_decompressionContext_t ctx = NULL;
     151             :     LZ4F_decompressOptions_t dec_opt;
     152             :     LZ4F_errorCode_t status;
     153             : 
     154          64 :     memset(&dec_opt, 0, sizeof(dec_opt));
     155          64 :     status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
     156          64 :     if (LZ4F_isError(status))
     157           0 :         pg_fatal("could not create LZ4 decompression context: %s",
     158             :                  LZ4F_getErrorName(status));
     159             : 
     160          64 :     outbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
     161          64 :     readbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
     162          64 :     readbuflen = DEFAULT_IO_BUFFER_SIZE;
     163         192 :     while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0)
     164             :     {
     165             :         char       *readp;
     166             :         char       *readend;
     167             : 
     168             :         /* Process one chunk */
     169         128 :         readp = readbuf;
     170         128 :         readend = readbuf + r;
     171         262 :         while (readp < readend)
     172             :         {
     173         134 :             size_t      out_size = DEFAULT_IO_BUFFER_SIZE;
     174         134 :             size_t      read_size = readend - readp;
     175             : 
     176         134 :             memset(outbuf, 0, DEFAULT_IO_BUFFER_SIZE);
     177         134 :             status = LZ4F_decompress(ctx, outbuf, &out_size,
     178             :                                      readp, &read_size, &dec_opt);
     179         134 :             if (LZ4F_isError(status))
     180           0 :                 pg_fatal("could not decompress: %s",
     181             :                          LZ4F_getErrorName(status));
     182             : 
     183         134 :             ahwrite(outbuf, 1, out_size, AH);
     184         134 :             readp += read_size;
     185             :         }
     186             :     }
     187             : 
     188          64 :     pg_free(outbuf);
     189          64 :     pg_free(readbuf);
     190             : 
     191          64 :     status = LZ4F_freeDecompressionContext(ctx);
     192          64 :     if (LZ4F_isError(status))
     193           0 :         pg_fatal("could not free LZ4 decompression context: %s",
     194             :                  LZ4F_getErrorName(status));
     195          64 : }
     196             : 
     197             : static void
     198         122 : WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
     199             :                       const void *data, size_t dLen)
     200             : {
     201         122 :     LZ4State   *state = (LZ4State *) cs->private_data;
     202         122 :     size_t      remaining = dLen;
     203             :     size_t      status;
     204             :     size_t      chunk;
     205             : 
     206             :     /* Write the header if not yet written. */
     207         122 :     if (state->needs_header_flush)
     208             :     {
     209          62 :         cs->writeF(AH, state->buffer, state->compressedlen);
     210          62 :         state->needs_header_flush = false;
     211             :     }
     212             : 
     213         250 :     while (remaining > 0)
     214             :     {
     215             : 
     216         128 :         if (remaining > DEFAULT_IO_BUFFER_SIZE)
     217           6 :             chunk = DEFAULT_IO_BUFFER_SIZE;
     218             :         else
     219         122 :             chunk = remaining;
     220             : 
     221         128 :         remaining -= chunk;
     222         128 :         status = LZ4F_compressUpdate(state->ctx,
     223         128 :                                      state->buffer, state->buflen,
     224             :                                      data, chunk, NULL);
     225             : 
     226         128 :         if (LZ4F_isError(status))
     227           0 :             pg_fatal("could not compress data: %s",
     228             :                      LZ4F_getErrorName(status));
     229             : 
     230         128 :         cs->writeF(AH, state->buffer, status);
     231             : 
     232         128 :         data = ((char *) data) + chunk;
     233             :     }
     234         122 : }
     235             : 
     236             : static void
     237         128 : EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
     238             : {
     239         128 :     LZ4State   *state = (LZ4State *) cs->private_data;
     240             :     size_t      status;
     241             : 
     242             :     /* Nothing needs to be done */
     243         128 :     if (!state)
     244          64 :         return;
     245             : 
     246             :     /*
     247             :      * Write the header if not yet written. The caller is not required to call
     248             :      * writeData if the relation does not contain any data. Thus it is
     249             :      * possible to reach here without having flushed the header. Do it before
     250             :      * ending the compression.
     251             :      */
     252          64 :     if (state->needs_header_flush)
     253           2 :         cs->writeF(AH, state->buffer, state->compressedlen);
     254             : 
     255          64 :     status = LZ4F_compressEnd(state->ctx,
     256          64 :                               state->buffer, state->buflen,
     257             :                               NULL);
     258          64 :     if (LZ4F_isError(status))
     259           0 :         pg_fatal("could not end compression: %s",
     260             :                  LZ4F_getErrorName(status));
     261             : 
     262          64 :     cs->writeF(AH, state->buffer, status);
     263             : 
     264          64 :     status = LZ4F_freeCompressionContext(state->ctx);
     265          64 :     if (LZ4F_isError(status))
     266           0 :         pg_fatal("could not end compression: %s",
     267             :                  LZ4F_getErrorName(status));
     268             : 
     269          64 :     pg_free(state->buffer);
     270          64 :     pg_free(state);
     271             : 
     272          64 :     cs->private_data = NULL;
     273             : }
     274             : 
     275             : /*
     276             :  * Public routines that support LZ4 compressed data I/O
     277             :  */
     278             : void
     279         128 : InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec)
     280             : {
     281             :     LZ4State   *state;
     282             : 
     283         128 :     cs->readData = ReadDataFromArchiveLZ4;
     284         128 :     cs->writeData = WriteDataToArchiveLZ4;
     285         128 :     cs->end = EndCompressorLZ4;
     286             : 
     287         128 :     cs->compression_spec = compression_spec;
     288             : 
     289             :     /*
     290             :      * Read operations have access to the whole input. No state needs to be
     291             :      * carried between calls.
     292             :      */
     293         128 :     if (cs->readF)
     294          64 :         return;
     295             : 
     296          64 :     state = pg_malloc0(sizeof(*state));
     297          64 :     if (cs->compression_spec.level >= 0)
     298          64 :         state->prefs.compressionLevel = cs->compression_spec.level;
     299             : 
     300          64 :     if (!LZ4State_compression_init(state))
     301           0 :         pg_fatal("could not initialize LZ4 compression: %s",
     302             :                  LZ4F_getErrorName(state->errcode));
     303             : 
     304             :     /* Remember that the header has not been written. */
     305          64 :     state->needs_header_flush = true;
     306          64 :     cs->private_data = state;
     307             : }
     308             : 
     309             : /*----------------------
     310             :  * Compress Stream API
     311             :  *----------------------
     312             :  */
     313             : 
     314             : 
     315             : /*
     316             :  * LZ4 equivalent to feof() or gzeof().  Return true iff there is no
     317             :  * decompressed output in the overflow buffer and the end of the backing file
     318             :  * is reached.
     319             :  */
     320             : static bool
     321           0 : LZ4Stream_eof(CompressFileHandle *CFH)
     322             : {
     323           0 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     324             : 
     325           0 :     return state->overflowlen == 0 && feof(state->fp);
     326             : }
     327             : 
     328             : static const char *
     329           0 : LZ4Stream_get_error(CompressFileHandle *CFH)
     330             : {
     331           0 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     332             :     const char *errmsg;
     333             : 
     334           0 :     if (LZ4F_isError(state->errcode))
     335           0 :         errmsg = LZ4F_getErrorName(state->errcode);
     336             :     else
     337           0 :         errmsg = strerror(errno);
     338             : 
     339           0 :     return errmsg;
     340             : }
     341             : 
     342             : /*
     343             :  * Initialize an already alloc'ed LZ4State struct for subsequent calls.
     344             :  *
     345             :  * Creates the necessary contexts for either compression or decompression. When
     346             :  * compressing data (indicated by compressing=true), it additionally writes the
     347             :  * LZ4 header in the output stream.
     348             :  *
     349             :  * Returns true on success. In case of a failure returns false, and stores the
     350             :  * error code in state->errcode.
     351             :  */
     352             : static bool
     353        3422 : LZ4Stream_init(LZ4State *state, int size, bool compressing)
     354             : {
     355             :     size_t      status;
     356             : 
     357        3422 :     if (state->inited)
     358        3294 :         return true;
     359             : 
     360         128 :     state->compressing = compressing;
     361         128 :     state->inited = true;
     362             : 
     363             :     /* When compressing, write LZ4 header to the output stream. */
     364         128 :     if (state->compressing)
     365             :     {
     366             : 
     367          64 :         if (!LZ4State_compression_init(state))
     368           0 :             return false;
     369             : 
     370          64 :         if (fwrite(state->buffer, 1, state->compressedlen, state->fp) != state->compressedlen)
     371             :         {
     372           0 :             errno = (errno) ? errno : ENOSPC;
     373           0 :             return false;
     374             :         }
     375             :     }
     376             :     else
     377             :     {
     378          64 :         status = LZ4F_createDecompressionContext(&state->dtx, LZ4F_VERSION);
     379          64 :         if (LZ4F_isError(status))
     380             :         {
     381           0 :             state->errcode = status;
     382           0 :             return false;
     383             :         }
     384             : 
     385          64 :         state->buflen = Max(size, DEFAULT_IO_BUFFER_SIZE);
     386          64 :         state->buffer = pg_malloc(state->buflen);
     387             : 
     388          64 :         state->overflowalloclen = state->buflen;
     389          64 :         state->overflowbuf = pg_malloc(state->overflowalloclen);
     390          64 :         state->overflowlen = 0;
     391             :     }
     392             : 
     393         128 :     return true;
     394             : }
     395             : 
     396             : /*
     397             :  * Read already decompressed content from the overflow buffer into 'ptr' up to
     398             :  * 'size' bytes, if available. If the eol_flag is set, then stop at the first
     399             :  * occurrence of the newline char prior to 'size' bytes.
     400             :  *
     401             :  * Any unread content in the overflow buffer is moved to the beginning.
     402             :  *
     403             :  * Returns the number of bytes read from the overflow buffer (and copied into
     404             :  * the 'ptr' buffer), or 0 if the overflow buffer is empty.
     405             :  */
     406             : static int
     407         132 : LZ4Stream_read_overflow(LZ4State *state, void *ptr, int size, bool eol_flag)
     408             : {
     409             :     char       *p;
     410         132 :     int         readlen = 0;
     411             : 
     412         132 :     if (state->overflowlen == 0)
     413         126 :         return 0;
     414             : 
     415           6 :     if (state->overflowlen >= size)
     416           4 :         readlen = size;
     417             :     else
     418           2 :         readlen = state->overflowlen;
     419             : 
     420           6 :     if (eol_flag && (p = memchr(state->overflowbuf, '\n', readlen)))
     421             :         /* Include the line terminating char */
     422           0 :         readlen = p - state->overflowbuf + 1;
     423             : 
     424           6 :     memcpy(ptr, state->overflowbuf, readlen);
     425           6 :     state->overflowlen -= readlen;
     426             : 
     427           6 :     if (state->overflowlen > 0)
     428           4 :         memmove(state->overflowbuf, state->overflowbuf + readlen, state->overflowlen);
     429             : 
     430           6 :     return readlen;
     431             : }
     432             : 
     433             : /*
     434             :  * The workhorse for reading decompressed content out of an LZ4 compressed
     435             :  * stream.
     436             :  *
     437             :  * It will read up to 'ptrsize' decompressed content, or up to the new line
     438             :  * char if found first when the eol_flag is set. It is possible that the
     439             :  * decompressed output generated by reading any compressed input via the
     440             :  * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored
     441             :  * at an overflow buffer within LZ4State. Of course, when the function is
     442             :  * called, it will first try to consume any decompressed content already
     443             :  * present in the overflow buffer, before decompressing new content.
     444             :  *
     445             :  * Returns the number of bytes of decompressed data copied into the ptr
     446             :  * buffer, or -1 in case of error.
     447             :  */
     448             : static int
     449         132 : LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
     450             : {
     451         132 :     int         dsize = 0;
     452             :     int         rsize;
     453         132 :     int         size = ptrsize;
     454         132 :     bool        eol_found = false;
     455             : 
     456             :     void       *readbuf;
     457             : 
     458             :     /* Lazy init */
     459         132 :     if (!LZ4Stream_init(state, size, false /* decompressing */ ))
     460           0 :         return -1;
     461             : 
     462             :     /* No work needs to be done for a zero-sized output buffer */
     463         132 :     if (size <= 0)
     464           0 :         return 0;
     465             : 
     466             :     /* Verify that there is enough space in the outbuf */
     467         132 :     if (size > state->buflen)
     468             :     {
     469           0 :         state->buflen = size;
     470           0 :         state->buffer = pg_realloc(state->buffer, size);
     471             :     }
     472             : 
     473             :     /* use already decompressed content if available */
     474         132 :     dsize = LZ4Stream_read_overflow(state, ptr, size, eol_flag);
     475         132 :     if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
     476           4 :         return dsize;
     477             : 
     478         128 :     readbuf = pg_malloc(size);
     479             : 
     480             :     do
     481             :     {
     482             :         char       *rp;
     483             :         char       *rend;
     484             : 
     485         134 :         rsize = fread(readbuf, 1, size, state->fp);
     486         134 :         if (rsize < size && !feof(state->fp))
     487           0 :             return -1;
     488             : 
     489         134 :         rp = (char *) readbuf;
     490         134 :         rend = (char *) readbuf + rsize;
     491             : 
     492         208 :         while (rp < rend)
     493             :         {
     494             :             size_t      status;
     495          74 :             size_t      outlen = state->buflen;
     496          74 :             size_t      read_remain = rend - rp;
     497             : 
     498          74 :             memset(state->buffer, 0, outlen);
     499          74 :             status = LZ4F_decompress(state->dtx, state->buffer, &outlen,
     500             :                                      rp, &read_remain, NULL);
     501          74 :             if (LZ4F_isError(status))
     502             :             {
     503           0 :                 state->errcode = status;
     504           0 :                 return -1;
     505             :             }
     506             : 
     507          74 :             rp += read_remain;
     508             : 
     509             :             /*
     510             :              * fill in what space is available in ptr if the eol flag is set,
     511             :              * either skip if one already found or fill up to EOL if present
     512             :              * in the outbuf
     513             :              */
     514          74 :             if (outlen > 0 && dsize < size && eol_found == false)
     515             :             {
     516             :                 char       *p;
     517          62 :                 size_t      lib = (!eol_flag) ? size - dsize : size - 1 - dsize;
     518          62 :                 size_t      len = outlen < lib ? outlen : lib;
     519             : 
     520          62 :                 if (eol_flag &&
     521           0 :                     (p = memchr(state->buffer, '\n', outlen)) &&
     522           0 :                     (size_t) (p - state->buffer + 1) <= len)
     523             :                 {
     524           0 :                     len = p - state->buffer + 1;
     525           0 :                     eol_found = true;
     526             :                 }
     527             : 
     528          62 :                 memcpy((char *) ptr + dsize, state->buffer, len);
     529          62 :                 dsize += len;
     530             : 
     531             :                 /* move what did not fit, if any, at the beginning of the buf */
     532          62 :                 if (len < outlen)
     533           0 :                     memmove(state->buffer, state->buffer + len, outlen - len);
     534          62 :                 outlen -= len;
     535             :             }
     536             : 
     537             :             /* if there is available output, save it */
     538          74 :             if (outlen > 0)
     539             :             {
     540          10 :                 while (state->overflowlen + outlen > state->overflowalloclen)
     541             :                 {
     542           4 :                     state->overflowalloclen *= 2;
     543           4 :                     state->overflowbuf = pg_realloc(state->overflowbuf,
     544             :                                                     state->overflowalloclen);
     545             :                 }
     546             : 
     547           6 :                 memcpy(state->overflowbuf + state->overflowlen, state->buffer, outlen);
     548           6 :                 state->overflowlen += outlen;
     549             :             }
     550             :         }
     551         134 :     } while (rsize == size && dsize < size && eol_found == false);
     552             : 
     553         128 :     pg_free(readbuf);
     554             : 
     555         128 :     return dsize;
     556             : }
     557             : 
     558             : /*
     559             :  * Compress size bytes from ptr and write them to the stream.
     560             :  */
     561             : static bool
     562        3290 : LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
     563             : {
     564        3290 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     565             :     size_t      status;
     566        3290 :     int         remaining = size;
     567             : 
     568             :     /* Lazy init */
     569        3290 :     if (!LZ4Stream_init(state, size, true))
     570           0 :         return false;
     571             : 
     572        6592 :     while (remaining > 0)
     573             :     {
     574        3302 :         int         chunk = Min(remaining, DEFAULT_IO_BUFFER_SIZE);
     575             : 
     576        3302 :         remaining -= chunk;
     577             : 
     578        3302 :         status = LZ4F_compressUpdate(state->ctx, state->buffer, state->buflen,
     579             :                                      ptr, chunk, NULL);
     580        3302 :         if (LZ4F_isError(status))
     581             :         {
     582           0 :             state->errcode = status;
     583           0 :             return false;
     584             :         }
     585             : 
     586        3302 :         if (fwrite(state->buffer, 1, status, state->fp) != status)
     587             :         {
     588           0 :             errno = (errno) ? errno : ENOSPC;
     589           0 :             return false;
     590             :         }
     591             : 
     592        3302 :         ptr = ((const char *) ptr) + chunk;
     593             :     }
     594             : 
     595        3290 :     return true;
     596             : }
     597             : 
     598             : /*
     599             :  * fread() equivalent implementation for LZ4 compressed files.
     600             :  */
     601             : static bool
     602         132 : LZ4Stream_read(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
     603             : {
     604         132 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     605             :     int         ret;
     606             : 
     607         132 :     if ((ret = LZ4Stream_read_internal(state, ptr, size, false)) < 0)
     608           0 :         pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
     609             : 
     610         132 :     if (rsize)
     611         132 :         *rsize = (size_t) ret;
     612             : 
     613         132 :     return true;
     614             : }
     615             : 
     616             : /*
     617             :  * fgetc() equivalent implementation for LZ4 compressed files.
     618             :  */
     619             : static int
     620           0 : LZ4Stream_getc(CompressFileHandle *CFH)
     621             : {
     622           0 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     623             :     unsigned char c;
     624             : 
     625           0 :     if (LZ4Stream_read_internal(state, &c, 1, false) <= 0)
     626             :     {
     627           0 :         if (!LZ4Stream_eof(CFH))
     628           0 :             pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
     629             :         else
     630           0 :             pg_fatal("could not read from input file: end of file");
     631             :     }
     632             : 
     633           0 :     return c;
     634             : }
     635             : 
     636             : /*
     637             :  * fgets() equivalent implementation for LZ4 compressed files.
     638             :  */
     639             : static char *
     640           0 : LZ4Stream_gets(char *ptr, int size, CompressFileHandle *CFH)
     641             : {
     642           0 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     643             :     int         ret;
     644             : 
     645           0 :     ret = LZ4Stream_read_internal(state, ptr, size - 1, true);
     646           0 :     if (ret < 0 || (ret == 0 && !LZ4Stream_eof(CFH)))
     647           0 :         pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
     648             : 
     649             :     /* Done reading */
     650           0 :     if (ret == 0)
     651           0 :         return NULL;
     652             : 
     653             :     /*
     654             :      * Our caller expects the return string to be NULL terminated and we know
     655             :      * that ret is greater than zero.
     656             :      */
     657           0 :     ptr[ret - 1] = '\0';
     658             : 
     659           0 :     return ptr;
     660             : }
     661             : 
     662             : /*
     663             :  * Finalize (de)compression of a stream. When compressing it will write any
     664             :  * remaining content and/or generated footer from the LZ4 API.
     665             :  */
     666             : static bool
     667         130 : LZ4Stream_close(CompressFileHandle *CFH)
     668             : {
     669             :     FILE       *fp;
     670         130 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     671             :     size_t      status;
     672             : 
     673         130 :     fp = state->fp;
     674         130 :     if (state->inited)
     675             :     {
     676         128 :         if (state->compressing)
     677             :         {
     678          64 :             status = LZ4F_compressEnd(state->ctx, state->buffer, state->buflen, NULL);
     679          64 :             if (LZ4F_isError(status))
     680           0 :                 pg_fatal("could not end compression: %s",
     681             :                          LZ4F_getErrorName(status));
     682          64 :             else if (fwrite(state->buffer, 1, status, state->fp) != status)
     683             :             {
     684           0 :                 errno = (errno) ? errno : ENOSPC;
     685           0 :                 WRITE_ERROR_EXIT;
     686             :             }
     687             : 
     688          64 :             status = LZ4F_freeCompressionContext(state->ctx);
     689          64 :             if (LZ4F_isError(status))
     690           0 :                 pg_fatal("could not end compression: %s",
     691             :                          LZ4F_getErrorName(status));
     692             :         }
     693             :         else
     694             :         {
     695          64 :             status = LZ4F_freeDecompressionContext(state->dtx);
     696          64 :             if (LZ4F_isError(status))
     697           0 :                 pg_fatal("could not end decompression: %s",
     698             :                          LZ4F_getErrorName(status));
     699          64 :             pg_free(state->overflowbuf);
     700             :         }
     701             : 
     702         128 :         pg_free(state->buffer);
     703             :     }
     704             : 
     705         130 :     pg_free(state);
     706             : 
     707         130 :     return fclose(fp) == 0;
     708             : }
     709             : 
     710             : static bool
     711         130 : LZ4Stream_open(const char *path, int fd, const char *mode,
     712             :                CompressFileHandle *CFH)
     713             : {
     714             :     FILE       *fp;
     715         130 :     LZ4State   *state = (LZ4State *) CFH->private_data;
     716             : 
     717         130 :     if (fd >= 0)
     718           0 :         fp = fdopen(fd, mode);
     719             :     else
     720         130 :         fp = fopen(path, mode);
     721         130 :     if (fp == NULL)
     722             :     {
     723           0 :         state->errcode = errno;
     724           0 :         return false;
     725             :     }
     726             : 
     727         130 :     state->fp = fp;
     728             : 
     729         130 :     return true;
     730             : }
     731             : 
     732             : static bool
     733          64 : LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
     734             : {
     735             :     char       *fname;
     736             :     int         save_errno;
     737             :     bool        ret;
     738             : 
     739          64 :     fname = psprintf("%s.lz4", path);
     740          64 :     ret = CFH->open_func(fname, -1, mode, CFH);
     741             : 
     742          64 :     save_errno = errno;
     743          64 :     pg_free(fname);
     744          64 :     errno = save_errno;
     745             : 
     746          64 :     return ret;
     747             : }
     748             : 
     749             : /*
     750             :  * Public routines
     751             :  */
     752             : void
     753         130 : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
     754             :                           const pg_compress_specification compression_spec)
     755             : {
     756             :     LZ4State   *state;
     757             : 
     758         130 :     CFH->open_func = LZ4Stream_open;
     759         130 :     CFH->open_write_func = LZ4Stream_open_write;
     760         130 :     CFH->read_func = LZ4Stream_read;
     761         130 :     CFH->write_func = LZ4Stream_write;
     762         130 :     CFH->gets_func = LZ4Stream_gets;
     763         130 :     CFH->getc_func = LZ4Stream_getc;
     764         130 :     CFH->eof_func = LZ4Stream_eof;
     765         130 :     CFH->close_func = LZ4Stream_close;
     766         130 :     CFH->get_error_func = LZ4Stream_get_error;
     767             : 
     768         130 :     CFH->compression_spec = compression_spec;
     769         130 :     state = pg_malloc0(sizeof(*state));
     770         130 :     if (CFH->compression_spec.level >= 0)
     771         130 :         state->prefs.compressionLevel = CFH->compression_spec.level;
     772             : 
     773         130 :     CFH->private_data = state;
     774         130 : }
     775             : #else                           /* USE_LZ4 */
     776             : void
     777             : InitCompressorLZ4(CompressorState *cs,
     778             :                   const pg_compress_specification compression_spec)
     779             : {
     780             :     pg_fatal("this build does not support compression with %s", "LZ4");
     781             : }
     782             : 
     783             : void
     784             : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
     785             :                           const pg_compress_specification compression_spec)
     786             : {
     787             :     pg_fatal("this build does not support compression with %s", "LZ4");
     788             : }
     789             : #endif                          /* USE_LZ4 */

Generated by: LCOV version 1.14