Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * compress_lz4.c
4 : * Routines for archivers to write a LZ4 compressed data stream.
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/bin/pg_dump/compress_lz4.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 : #include "postgres_fe.h"
15 :
16 : #include "compress_lz4.h"
17 : #include "pg_backup_utils.h"
18 :
19 : #ifdef USE_LZ4
20 : #include <lz4frame.h>
21 :
22 : /*
23 : * LZ4F_HEADER_SIZE_MAX first appeared in v1.7.5 of the library.
24 : * Redefine it for installations with a lesser version.
25 : */
26 : #ifndef LZ4F_HEADER_SIZE_MAX
27 : #define LZ4F_HEADER_SIZE_MAX 32
28 : #endif
29 :
30 : /*---------------------------------
31 : * Common to both compression APIs
32 : *---------------------------------
33 : */
34 :
35 : /*
36 : * (de)compression state used by both the Compressor and Stream APIs.
37 : */
38 : typedef struct LZ4State
39 : {
40 : /*
41 : * Used by the Stream API to keep track of the file stream.
42 : */
43 : FILE *fp;
44 :
45 : LZ4F_preferences_t prefs;
46 :
47 : LZ4F_compressionContext_t ctx;
48 : LZ4F_decompressionContext_t dtx;
49 :
50 : /*
51 : * Used by the Stream API's lazy initialization.
52 : */
53 : bool inited;
54 :
55 : /*
56 : * Used by the Stream API to distinguish between compression and
57 : * decompression operations.
58 : */
59 : bool compressing;
60 :
61 : /*
62 : * Used by the Compressor API to mark if the compression headers have been
63 : * written after initialization.
64 : */
65 : bool needs_header_flush;
66 :
67 : size_t buflen;
68 : char *buffer;
69 :
70 : /*
71 : * Used by the Stream API to store already uncompressed data that the
72 : * caller has not consumed.
73 : */
74 : size_t overflowalloclen;
75 : size_t overflowlen;
76 : char *overflowbuf;
77 :
78 : /*
79 : * Used by both APIs to keep track of the compressed data length stored in
80 : * the buffer.
81 : */
82 : size_t compressedlen;
83 :
84 : /*
85 : * Used by both APIs to keep track of error codes.
86 : */
87 : size_t errcode;
88 : } LZ4State;
89 :
90 : /*
91 : * LZ4State_compression_init
92 : * Initialize the required LZ4State members for compression.
93 : *
94 : * Write the LZ4 frame header in a buffer keeping track of its length. Users of
95 : * this function can choose when and how to write the header to a file stream.
96 : *
97 : * Returns true on success. In case of a failure returns false, and stores the
98 : * error code in state->errcode.
99 : */
100 : static bool
101 128 : LZ4State_compression_init(LZ4State *state)
102 : {
103 : size_t status;
104 :
105 128 : state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs);
106 :
107 : /*
108 : * LZ4F_compressBegin requires a buffer that is greater or equal to
109 : * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met.
110 : */
111 128 : if (state->buflen < LZ4F_HEADER_SIZE_MAX)
112 0 : state->buflen = LZ4F_HEADER_SIZE_MAX;
113 :
114 128 : status = LZ4F_createCompressionContext(&state->ctx, LZ4F_VERSION);
115 128 : if (LZ4F_isError(status))
116 : {
117 0 : state->errcode = status;
118 0 : return false;
119 : }
120 :
121 128 : state->buffer = pg_malloc(state->buflen);
122 128 : status = LZ4F_compressBegin(state->ctx,
123 128 : state->buffer, state->buflen,
124 128 : &state->prefs);
125 128 : if (LZ4F_isError(status))
126 : {
127 0 : state->errcode = status;
128 0 : return false;
129 : }
130 :
131 128 : state->compressedlen = status;
132 :
133 128 : return true;
134 : }
135 :
136 : /*----------------------
137 : * Compressor API
138 : *----------------------
139 : */
140 :
141 : /* Private routines that support LZ4 compressed data I/O */
142 :
143 : static void
144 64 : ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
145 : {
146 : size_t r;
147 : size_t readbuflen;
148 : char *outbuf;
149 : char *readbuf;
150 64 : LZ4F_decompressionContext_t ctx = NULL;
151 : LZ4F_decompressOptions_t dec_opt;
152 : LZ4F_errorCode_t status;
153 :
154 64 : memset(&dec_opt, 0, sizeof(dec_opt));
155 64 : status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
156 64 : if (LZ4F_isError(status))
157 0 : pg_fatal("could not create LZ4 decompression context: %s",
158 : LZ4F_getErrorName(status));
159 :
160 64 : outbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
161 64 : readbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
162 64 : readbuflen = DEFAULT_IO_BUFFER_SIZE;
163 192 : while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0)
164 : {
165 : char *readp;
166 : char *readend;
167 :
168 : /* Process one chunk */
169 128 : readp = readbuf;
170 128 : readend = readbuf + r;
171 262 : while (readp < readend)
172 : {
173 134 : size_t out_size = DEFAULT_IO_BUFFER_SIZE;
174 134 : size_t read_size = readend - readp;
175 :
176 134 : memset(outbuf, 0, DEFAULT_IO_BUFFER_SIZE);
177 134 : status = LZ4F_decompress(ctx, outbuf, &out_size,
178 : readp, &read_size, &dec_opt);
179 134 : if (LZ4F_isError(status))
180 0 : pg_fatal("could not decompress: %s",
181 : LZ4F_getErrorName(status));
182 :
183 134 : ahwrite(outbuf, 1, out_size, AH);
184 134 : readp += read_size;
185 : }
186 : }
187 :
188 64 : pg_free(outbuf);
189 64 : pg_free(readbuf);
190 :
191 64 : status = LZ4F_freeDecompressionContext(ctx);
192 64 : if (LZ4F_isError(status))
193 0 : pg_fatal("could not free LZ4 decompression context: %s",
194 : LZ4F_getErrorName(status));
195 64 : }
196 :
197 : static void
198 122 : WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
199 : const void *data, size_t dLen)
200 : {
201 122 : LZ4State *state = (LZ4State *) cs->private_data;
202 122 : size_t remaining = dLen;
203 : size_t status;
204 : size_t chunk;
205 :
206 : /* Write the header if not yet written. */
207 122 : if (state->needs_header_flush)
208 : {
209 62 : cs->writeF(AH, state->buffer, state->compressedlen);
210 62 : state->needs_header_flush = false;
211 : }
212 :
213 250 : while (remaining > 0)
214 : {
215 :
216 128 : if (remaining > DEFAULT_IO_BUFFER_SIZE)
217 6 : chunk = DEFAULT_IO_BUFFER_SIZE;
218 : else
219 122 : chunk = remaining;
220 :
221 128 : remaining -= chunk;
222 128 : status = LZ4F_compressUpdate(state->ctx,
223 128 : state->buffer, state->buflen,
224 : data, chunk, NULL);
225 :
226 128 : if (LZ4F_isError(status))
227 0 : pg_fatal("could not compress data: %s",
228 : LZ4F_getErrorName(status));
229 :
230 128 : cs->writeF(AH, state->buffer, status);
231 :
232 128 : data = ((char *) data) + chunk;
233 : }
234 122 : }
235 :
236 : static void
237 128 : EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
238 : {
239 128 : LZ4State *state = (LZ4State *) cs->private_data;
240 : size_t status;
241 :
242 : /* Nothing needs to be done */
243 128 : if (!state)
244 64 : return;
245 :
246 : /*
247 : * Write the header if not yet written. The caller is not required to call
248 : * writeData if the relation does not contain any data. Thus it is
249 : * possible to reach here without having flushed the header. Do it before
250 : * ending the compression.
251 : */
252 64 : if (state->needs_header_flush)
253 2 : cs->writeF(AH, state->buffer, state->compressedlen);
254 :
255 64 : status = LZ4F_compressEnd(state->ctx,
256 64 : state->buffer, state->buflen,
257 : NULL);
258 64 : if (LZ4F_isError(status))
259 0 : pg_fatal("could not end compression: %s",
260 : LZ4F_getErrorName(status));
261 :
262 64 : cs->writeF(AH, state->buffer, status);
263 :
264 64 : status = LZ4F_freeCompressionContext(state->ctx);
265 64 : if (LZ4F_isError(status))
266 0 : pg_fatal("could not end compression: %s",
267 : LZ4F_getErrorName(status));
268 :
269 64 : pg_free(state->buffer);
270 64 : pg_free(state);
271 :
272 64 : cs->private_data = NULL;
273 : }
274 :
275 : /*
276 : * Public routines that support LZ4 compressed data I/O
277 : */
278 : void
279 128 : InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec)
280 : {
281 : LZ4State *state;
282 :
283 128 : cs->readData = ReadDataFromArchiveLZ4;
284 128 : cs->writeData = WriteDataToArchiveLZ4;
285 128 : cs->end = EndCompressorLZ4;
286 :
287 128 : cs->compression_spec = compression_spec;
288 :
289 : /*
290 : * Read operations have access to the whole input. No state needs to be
291 : * carried between calls.
292 : */
293 128 : if (cs->readF)
294 64 : return;
295 :
296 64 : state = pg_malloc0(sizeof(*state));
297 64 : if (cs->compression_spec.level >= 0)
298 64 : state->prefs.compressionLevel = cs->compression_spec.level;
299 :
300 64 : if (!LZ4State_compression_init(state))
301 0 : pg_fatal("could not initialize LZ4 compression: %s",
302 : LZ4F_getErrorName(state->errcode));
303 :
304 : /* Remember that the header has not been written. */
305 64 : state->needs_header_flush = true;
306 64 : cs->private_data = state;
307 : }
308 :
309 : /*----------------------
310 : * Compress Stream API
311 : *----------------------
312 : */
313 :
314 :
315 : /*
316 : * LZ4 equivalent to feof() or gzeof(). Return true iff there is no
317 : * decompressed output in the overflow buffer and the end of the backing file
318 : * is reached.
319 : */
320 : static bool
321 0 : LZ4Stream_eof(CompressFileHandle *CFH)
322 : {
323 0 : LZ4State *state = (LZ4State *) CFH->private_data;
324 :
325 0 : return state->overflowlen == 0 && feof(state->fp);
326 : }
327 :
328 : static const char *
329 0 : LZ4Stream_get_error(CompressFileHandle *CFH)
330 : {
331 0 : LZ4State *state = (LZ4State *) CFH->private_data;
332 : const char *errmsg;
333 :
334 0 : if (LZ4F_isError(state->errcode))
335 0 : errmsg = LZ4F_getErrorName(state->errcode);
336 : else
337 0 : errmsg = strerror(errno);
338 :
339 0 : return errmsg;
340 : }
341 :
342 : /*
343 : * Initialize an already alloc'ed LZ4State struct for subsequent calls.
344 : *
345 : * Creates the necessary contexts for either compression or decompression. When
346 : * compressing data (indicated by compressing=true), it additionally writes the
347 : * LZ4 header in the output stream.
348 : *
349 : * Returns true on success. In case of a failure returns false, and stores the
350 : * error code in state->errcode.
351 : */
352 : static bool
353 3434 : LZ4Stream_init(LZ4State *state, int size, bool compressing)
354 : {
355 : size_t status;
356 :
357 3434 : if (state->inited)
358 3306 : return true;
359 :
360 128 : state->compressing = compressing;
361 128 : state->inited = true;
362 :
363 : /* When compressing, write LZ4 header to the output stream. */
364 128 : if (state->compressing)
365 : {
366 :
367 64 : if (!LZ4State_compression_init(state))
368 0 : return false;
369 :
370 64 : if (fwrite(state->buffer, 1, state->compressedlen, state->fp) != state->compressedlen)
371 : {
372 0 : errno = (errno) ? errno : ENOSPC;
373 0 : return false;
374 : }
375 : }
376 : else
377 : {
378 64 : status = LZ4F_createDecompressionContext(&state->dtx, LZ4F_VERSION);
379 64 : if (LZ4F_isError(status))
380 : {
381 0 : state->errcode = status;
382 0 : return false;
383 : }
384 :
385 64 : state->buflen = Max(size, DEFAULT_IO_BUFFER_SIZE);
386 64 : state->buffer = pg_malloc(state->buflen);
387 :
388 64 : state->overflowalloclen = state->buflen;
389 64 : state->overflowbuf = pg_malloc(state->overflowalloclen);
390 64 : state->overflowlen = 0;
391 : }
392 :
393 128 : return true;
394 : }
395 :
396 : /*
397 : * Read already decompressed content from the overflow buffer into 'ptr' up to
398 : * 'size' bytes, if available. If the eol_flag is set, then stop at the first
399 : * occurrence of the newline char prior to 'size' bytes.
400 : *
401 : * Any unread content in the overflow buffer is moved to the beginning.
402 : *
403 : * Returns the number of bytes read from the overflow buffer (and copied into
404 : * the 'ptr' buffer), or 0 if the overflow buffer is empty.
405 : */
406 : static int
407 132 : LZ4Stream_read_overflow(LZ4State *state, void *ptr, int size, bool eol_flag)
408 : {
409 : char *p;
410 132 : int readlen = 0;
411 :
412 132 : if (state->overflowlen == 0)
413 126 : return 0;
414 :
415 6 : if (state->overflowlen >= size)
416 4 : readlen = size;
417 : else
418 2 : readlen = state->overflowlen;
419 :
420 6 : if (eol_flag && (p = memchr(state->overflowbuf, '\n', readlen)))
421 : /* Include the line terminating char */
422 0 : readlen = p - state->overflowbuf + 1;
423 :
424 6 : memcpy(ptr, state->overflowbuf, readlen);
425 6 : state->overflowlen -= readlen;
426 :
427 6 : if (state->overflowlen > 0)
428 4 : memmove(state->overflowbuf, state->overflowbuf + readlen, state->overflowlen);
429 :
430 6 : return readlen;
431 : }
432 :
433 : /*
434 : * The workhorse for reading decompressed content out of an LZ4 compressed
435 : * stream.
436 : *
437 : * It will read up to 'ptrsize' decompressed content, or up to the new line
438 : * char if found first when the eol_flag is set. It is possible that the
439 : * decompressed output generated by reading any compressed input via the
440 : * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored
441 : * at an overflow buffer within LZ4State. Of course, when the function is
442 : * called, it will first try to consume any decompressed content already
443 : * present in the overflow buffer, before decompressing new content.
444 : *
445 : * Returns the number of bytes of decompressed data copied into the ptr
446 : * buffer, or -1 in case of error.
447 : */
448 : static int
449 132 : LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
450 : {
451 132 : int dsize = 0;
452 : int rsize;
453 132 : int size = ptrsize;
454 132 : bool eol_found = false;
455 :
456 : void *readbuf;
457 :
458 : /* Lazy init */
459 132 : if (!LZ4Stream_init(state, size, false /* decompressing */ ))
460 0 : return -1;
461 :
462 : /* No work needs to be done for a zero-sized output buffer */
463 132 : if (size <= 0)
464 0 : return 0;
465 :
466 : /* Verify that there is enough space in the outbuf */
467 132 : if (size > state->buflen)
468 : {
469 0 : state->buflen = size;
470 0 : state->buffer = pg_realloc(state->buffer, size);
471 : }
472 :
473 : /* use already decompressed content if available */
474 132 : dsize = LZ4Stream_read_overflow(state, ptr, size, eol_flag);
475 132 : if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
476 4 : return dsize;
477 :
478 128 : readbuf = pg_malloc(size);
479 :
480 : do
481 : {
482 : char *rp;
483 : char *rend;
484 :
485 134 : rsize = fread(readbuf, 1, size, state->fp);
486 134 : if (rsize < size && !feof(state->fp))
487 0 : return -1;
488 :
489 134 : rp = (char *) readbuf;
490 134 : rend = (char *) readbuf + rsize;
491 :
492 208 : while (rp < rend)
493 : {
494 : size_t status;
495 74 : size_t outlen = state->buflen;
496 74 : size_t read_remain = rend - rp;
497 :
498 74 : memset(state->buffer, 0, outlen);
499 74 : status = LZ4F_decompress(state->dtx, state->buffer, &outlen,
500 : rp, &read_remain, NULL);
501 74 : if (LZ4F_isError(status))
502 : {
503 0 : state->errcode = status;
504 0 : return -1;
505 : }
506 :
507 74 : rp += read_remain;
508 :
509 : /*
510 : * fill in what space is available in ptr if the eol flag is set,
511 : * either skip if one already found or fill up to EOL if present
512 : * in the outbuf
513 : */
514 74 : if (outlen > 0 && dsize < size && eol_found == false)
515 : {
516 : char *p;
517 62 : size_t lib = (!eol_flag) ? size - dsize : size - 1 - dsize;
518 62 : size_t len = outlen < lib ? outlen : lib;
519 :
520 62 : if (eol_flag &&
521 0 : (p = memchr(state->buffer, '\n', outlen)) &&
522 0 : (size_t) (p - state->buffer + 1) <= len)
523 : {
524 0 : len = p - state->buffer + 1;
525 0 : eol_found = true;
526 : }
527 :
528 62 : memcpy((char *) ptr + dsize, state->buffer, len);
529 62 : dsize += len;
530 :
531 : /* move what did not fit, if any, at the beginning of the buf */
532 62 : if (len < outlen)
533 0 : memmove(state->buffer, state->buffer + len, outlen - len);
534 62 : outlen -= len;
535 : }
536 :
537 : /* if there is available output, save it */
538 74 : if (outlen > 0)
539 : {
540 10 : while (state->overflowlen + outlen > state->overflowalloclen)
541 : {
542 4 : state->overflowalloclen *= 2;
543 4 : state->overflowbuf = pg_realloc(state->overflowbuf,
544 : state->overflowalloclen);
545 : }
546 :
547 6 : memcpy(state->overflowbuf + state->overflowlen, state->buffer, outlen);
548 6 : state->overflowlen += outlen;
549 : }
550 : }
551 134 : } while (rsize == size && dsize < size && eol_found == false);
552 :
553 128 : pg_free(readbuf);
554 :
555 128 : return dsize;
556 : }
557 :
558 : /*
559 : * Compress size bytes from ptr and write them to the stream.
560 : */
561 : static bool
562 3302 : LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
563 : {
564 3302 : LZ4State *state = (LZ4State *) CFH->private_data;
565 : size_t status;
566 3302 : int remaining = size;
567 :
568 : /* Lazy init */
569 3302 : if (!LZ4Stream_init(state, size, true))
570 0 : return false;
571 :
572 6616 : while (remaining > 0)
573 : {
574 3314 : int chunk = Min(remaining, DEFAULT_IO_BUFFER_SIZE);
575 :
576 3314 : remaining -= chunk;
577 :
578 3314 : status = LZ4F_compressUpdate(state->ctx, state->buffer, state->buflen,
579 : ptr, chunk, NULL);
580 3314 : if (LZ4F_isError(status))
581 : {
582 0 : state->errcode = status;
583 0 : return false;
584 : }
585 :
586 3314 : if (fwrite(state->buffer, 1, status, state->fp) != status)
587 : {
588 0 : errno = (errno) ? errno : ENOSPC;
589 0 : return false;
590 : }
591 :
592 3314 : ptr = ((const char *) ptr) + chunk;
593 : }
594 :
595 3302 : return true;
596 : }
597 :
598 : /*
599 : * fread() equivalent implementation for LZ4 compressed files.
600 : */
601 : static bool
602 132 : LZ4Stream_read(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
603 : {
604 132 : LZ4State *state = (LZ4State *) CFH->private_data;
605 : int ret;
606 :
607 132 : if ((ret = LZ4Stream_read_internal(state, ptr, size, false)) < 0)
608 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
609 :
610 132 : if (rsize)
611 132 : *rsize = (size_t) ret;
612 :
613 132 : return true;
614 : }
615 :
616 : /*
617 : * fgetc() equivalent implementation for LZ4 compressed files.
618 : */
619 : static int
620 0 : LZ4Stream_getc(CompressFileHandle *CFH)
621 : {
622 0 : LZ4State *state = (LZ4State *) CFH->private_data;
623 : unsigned char c;
624 :
625 0 : if (LZ4Stream_read_internal(state, &c, 1, false) <= 0)
626 : {
627 0 : if (!LZ4Stream_eof(CFH))
628 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
629 : else
630 0 : pg_fatal("could not read from input file: end of file");
631 : }
632 :
633 0 : return c;
634 : }
635 :
636 : /*
637 : * fgets() equivalent implementation for LZ4 compressed files.
638 : */
639 : static char *
640 0 : LZ4Stream_gets(char *ptr, int size, CompressFileHandle *CFH)
641 : {
642 0 : LZ4State *state = (LZ4State *) CFH->private_data;
643 : int ret;
644 :
645 0 : ret = LZ4Stream_read_internal(state, ptr, size - 1, true);
646 0 : if (ret < 0 || (ret == 0 && !LZ4Stream_eof(CFH)))
647 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
648 :
649 : /* Done reading */
650 0 : if (ret == 0)
651 0 : return NULL;
652 :
653 : /*
654 : * Our caller expects the return string to be NULL terminated and we know
655 : * that ret is greater than zero.
656 : */
657 0 : ptr[ret - 1] = '\0';
658 :
659 0 : return ptr;
660 : }
661 :
662 : /*
663 : * Finalize (de)compression of a stream. When compressing it will write any
664 : * remaining content and/or generated footer from the LZ4 API.
665 : */
666 : static bool
667 130 : LZ4Stream_close(CompressFileHandle *CFH)
668 : {
669 : FILE *fp;
670 130 : LZ4State *state = (LZ4State *) CFH->private_data;
671 : size_t status;
672 :
673 130 : fp = state->fp;
674 130 : if (state->inited)
675 : {
676 128 : if (state->compressing)
677 : {
678 64 : status = LZ4F_compressEnd(state->ctx, state->buffer, state->buflen, NULL);
679 64 : if (LZ4F_isError(status))
680 0 : pg_fatal("could not end compression: %s",
681 : LZ4F_getErrorName(status));
682 64 : else if (fwrite(state->buffer, 1, status, state->fp) != status)
683 : {
684 0 : errno = (errno) ? errno : ENOSPC;
685 0 : WRITE_ERROR_EXIT;
686 : }
687 :
688 64 : status = LZ4F_freeCompressionContext(state->ctx);
689 64 : if (LZ4F_isError(status))
690 0 : pg_fatal("could not end compression: %s",
691 : LZ4F_getErrorName(status));
692 : }
693 : else
694 : {
695 64 : status = LZ4F_freeDecompressionContext(state->dtx);
696 64 : if (LZ4F_isError(status))
697 0 : pg_fatal("could not end decompression: %s",
698 : LZ4F_getErrorName(status));
699 64 : pg_free(state->overflowbuf);
700 : }
701 :
702 128 : pg_free(state->buffer);
703 : }
704 :
705 130 : pg_free(state);
706 :
707 130 : return fclose(fp) == 0;
708 : }
709 :
710 : static bool
711 130 : LZ4Stream_open(const char *path, int fd, const char *mode,
712 : CompressFileHandle *CFH)
713 : {
714 : FILE *fp;
715 130 : LZ4State *state = (LZ4State *) CFH->private_data;
716 :
717 130 : if (fd >= 0)
718 0 : fp = fdopen(fd, mode);
719 : else
720 130 : fp = fopen(path, mode);
721 130 : if (fp == NULL)
722 : {
723 0 : state->errcode = errno;
724 0 : return false;
725 : }
726 :
727 130 : state->fp = fp;
728 :
729 130 : return true;
730 : }
731 :
732 : static bool
733 64 : LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
734 : {
735 : char *fname;
736 : int save_errno;
737 : bool ret;
738 :
739 64 : fname = psprintf("%s.lz4", path);
740 64 : ret = CFH->open_func(fname, -1, mode, CFH);
741 :
742 64 : save_errno = errno;
743 64 : pg_free(fname);
744 64 : errno = save_errno;
745 :
746 64 : return ret;
747 : }
748 :
749 : /*
750 : * Public routines
751 : */
752 : void
753 130 : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
754 : const pg_compress_specification compression_spec)
755 : {
756 : LZ4State *state;
757 :
758 130 : CFH->open_func = LZ4Stream_open;
759 130 : CFH->open_write_func = LZ4Stream_open_write;
760 130 : CFH->read_func = LZ4Stream_read;
761 130 : CFH->write_func = LZ4Stream_write;
762 130 : CFH->gets_func = LZ4Stream_gets;
763 130 : CFH->getc_func = LZ4Stream_getc;
764 130 : CFH->eof_func = LZ4Stream_eof;
765 130 : CFH->close_func = LZ4Stream_close;
766 130 : CFH->get_error_func = LZ4Stream_get_error;
767 :
768 130 : CFH->compression_spec = compression_spec;
769 130 : state = pg_malloc0(sizeof(*state));
770 130 : if (CFH->compression_spec.level >= 0)
771 130 : state->prefs.compressionLevel = CFH->compression_spec.level;
772 :
773 130 : CFH->private_data = state;
774 130 : }
775 : #else /* USE_LZ4 */
776 : void
777 : InitCompressorLZ4(CompressorState *cs,
778 : const pg_compress_specification compression_spec)
779 : {
780 : pg_fatal("this build does not support compression with %s", "LZ4");
781 : }
782 :
783 : void
784 : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
785 : const pg_compress_specification compression_spec)
786 : {
787 : pg_fatal("this build does not support compression with %s", "LZ4");
788 : }
789 : #endif /* USE_LZ4 */
|