Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * compress_lz4.c
4 : * Routines for archivers to write a LZ4 compressed data stream.
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/bin/pg_dump/compress_lz4.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 : #include "postgres_fe.h"
15 : #include <unistd.h>
16 :
17 : #include "compress_lz4.h"
18 : #include "pg_backup_utils.h"
19 :
20 : #ifdef USE_LZ4
21 : #include <lz4frame.h>
22 :
23 : /*
24 : * LZ4F_HEADER_SIZE_MAX first appeared in v1.7.5 of the library.
25 : * Redefine it for installations with a lesser version.
26 : */
27 : #ifndef LZ4F_HEADER_SIZE_MAX
28 : #define LZ4F_HEADER_SIZE_MAX 32
29 : #endif
30 :
31 : /*---------------------------------
32 : * Common to both compression APIs
33 : *---------------------------------
34 : */
35 :
36 : /*
37 : * (de)compression state used by both the Compressor and Stream APIs.
38 : */
39 : typedef struct LZ4State
40 : {
41 : /*
42 : * Used by the Stream API to keep track of the file stream.
43 : */
44 : FILE *fp;
45 :
46 : LZ4F_preferences_t prefs;
47 :
48 : LZ4F_compressionContext_t ctx;
49 : LZ4F_decompressionContext_t dtx;
50 :
51 : /*
52 : * Used by the Stream API's lazy initialization.
53 : */
54 : bool inited;
55 :
56 : /*
57 : * Used by the Stream API to distinguish between compression and
58 : * decompression operations.
59 : */
60 : bool compressing;
61 :
62 : /*
63 : * I/O buffer area.
64 : */
65 : char *buffer; /* buffer for compressed data */
66 : size_t buflen; /* allocated size of buffer */
67 : size_t bufdata; /* amount of valid data currently in buffer */
68 : /* These fields are used only while decompressing: */
69 : size_t bufnext; /* next buffer position to decompress */
70 : char *outbuf; /* buffer for decompressed data */
71 : size_t outbuflen; /* allocated size of outbuf */
72 : size_t outbufdata; /* amount of valid data currently in outbuf */
73 : size_t outbufnext; /* next outbuf position to return */
74 :
75 : /*
76 : * Used by both APIs to keep track of error codes.
77 : */
78 : size_t errcode;
79 : } LZ4State;
80 :
81 : /*
82 : * LZ4State_compression_init
83 : * Initialize the required LZ4State members for compression.
84 : *
85 : * Write the LZ4 frame header in a buffer keeping track of its length. Users of
86 : * this function can choose when and how to write the header to a file stream.
87 : *
88 : * Returns true on success. In case of a failure returns false, and stores the
89 : * error code in state->errcode.
90 : */
91 : static bool
92 7 : LZ4State_compression_init(LZ4State *state)
93 : {
94 : size_t status;
95 :
96 : /*
97 : * Compute size needed for buffer, assuming we will present at most
98 : * DEFAULT_IO_BUFFER_SIZE input bytes at a time.
99 : */
100 7 : state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs);
101 :
102 : /*
103 : * Add some slop to ensure we're not forced to flush every time.
104 : *
105 : * The present slop factor of 50% is chosen so that the typical output
106 : * block size is about 128K when DEFAULT_IO_BUFFER_SIZE = 128K. We might
107 : * need a different slop factor to maintain that equivalence if
108 : * DEFAULT_IO_BUFFER_SIZE is changed dramatically.
109 : */
110 7 : state->buflen += state->buflen / 2;
111 :
112 : /*
113 : * LZ4F_compressBegin requires a buffer that is greater or equal to
114 : * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met.
115 : */
116 7 : if (state->buflen < LZ4F_HEADER_SIZE_MAX)
117 0 : state->buflen = LZ4F_HEADER_SIZE_MAX;
118 :
119 7 : status = LZ4F_createCompressionContext(&state->ctx, LZ4F_VERSION);
120 7 : if (LZ4F_isError(status))
121 : {
122 0 : state->errcode = status;
123 0 : return false;
124 : }
125 :
126 7 : state->buffer = pg_malloc(state->buflen);
127 :
128 : /*
129 : * Insert LZ4 header into buffer.
130 : */
131 7 : status = LZ4F_compressBegin(state->ctx,
132 7 : state->buffer, state->buflen,
133 7 : &state->prefs);
134 7 : if (LZ4F_isError(status))
135 : {
136 0 : state->errcode = status;
137 0 : return false;
138 : }
139 :
140 7 : state->bufdata = status;
141 :
142 7 : return true;
143 : }
144 :
145 : /*----------------------
146 : * Compressor API
147 : *----------------------
148 : */
149 :
150 : /* Private routines that support LZ4 compressed data I/O */
151 :
152 : static void
153 3 : ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
154 : {
155 : size_t r;
156 : size_t readbuflen;
157 : char *outbuf;
158 : char *readbuf;
159 3 : LZ4F_decompressionContext_t ctx = NULL;
160 : LZ4F_decompressOptions_t dec_opt;
161 : LZ4F_errorCode_t status;
162 :
163 3 : memset(&dec_opt, 0, sizeof(dec_opt));
164 3 : status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
165 3 : if (LZ4F_isError(status))
166 0 : pg_fatal("could not create LZ4 decompression context: %s",
167 : LZ4F_getErrorName(status));
168 :
169 3 : outbuf = pg_malloc(DEFAULT_IO_BUFFER_SIZE);
170 3 : readbuf = pg_malloc(DEFAULT_IO_BUFFER_SIZE);
171 3 : readbuflen = DEFAULT_IO_BUFFER_SIZE;
172 7 : while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0)
173 : {
174 : char *readp;
175 : char *readend;
176 :
177 : /* Process one chunk */
178 4 : readp = readbuf;
179 4 : readend = readbuf + r;
180 9 : while (readp < readend)
181 : {
182 5 : size_t out_size = DEFAULT_IO_BUFFER_SIZE;
183 5 : size_t read_size = readend - readp;
184 :
185 5 : status = LZ4F_decompress(ctx, outbuf, &out_size,
186 : readp, &read_size, &dec_opt);
187 5 : if (LZ4F_isError(status))
188 0 : pg_fatal("could not decompress: %s",
189 : LZ4F_getErrorName(status));
190 :
191 5 : ahwrite(outbuf, 1, out_size, AH);
192 5 : readp += read_size;
193 : }
194 : }
195 :
196 3 : pg_free(outbuf);
197 3 : pg_free(readbuf);
198 :
199 3 : status = LZ4F_freeDecompressionContext(ctx);
200 3 : if (LZ4F_isError(status))
201 0 : pg_fatal("could not free LZ4 decompression context: %s",
202 : LZ4F_getErrorName(status));
203 3 : }
204 :
205 : static void
206 4 : WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
207 : const void *data, size_t dLen)
208 : {
209 4 : LZ4State *state = (LZ4State *) cs->private_data;
210 4 : size_t remaining = dLen;
211 :
212 10 : while (remaining > 0)
213 : {
214 : size_t chunk;
215 : size_t required;
216 : size_t status;
217 :
218 : /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */
219 6 : chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE);
220 :
221 : /* If not enough space, must flush buffer */
222 6 : required = LZ4F_compressBound(chunk, &state->prefs);
223 6 : if (required > state->buflen - state->bufdata)
224 : {
225 1 : cs->writeF(AH, state->buffer, state->bufdata);
226 1 : state->bufdata = 0;
227 : }
228 :
229 6 : status = LZ4F_compressUpdate(state->ctx,
230 6 : state->buffer + state->bufdata,
231 6 : state->buflen - state->bufdata,
232 : data, chunk, NULL);
233 :
234 6 : if (LZ4F_isError(status))
235 0 : pg_fatal("could not compress data: %s",
236 : LZ4F_getErrorName(status));
237 :
238 6 : state->bufdata += status;
239 :
240 6 : data = ((const char *) data) + chunk;
241 6 : remaining -= chunk;
242 : }
243 4 : }
244 :
245 : static void
246 6 : EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
247 : {
248 6 : LZ4State *state = (LZ4State *) cs->private_data;
249 : size_t required;
250 : size_t status;
251 :
252 : /* Nothing needs to be done */
253 6 : if (!state)
254 3 : return;
255 :
256 : /* We might need to flush the buffer to make room for LZ4F_compressEnd */
257 3 : required = LZ4F_compressBound(0, &state->prefs);
258 3 : if (required > state->buflen - state->bufdata)
259 : {
260 0 : cs->writeF(AH, state->buffer, state->bufdata);
261 0 : state->bufdata = 0;
262 : }
263 :
264 3 : status = LZ4F_compressEnd(state->ctx,
265 3 : state->buffer + state->bufdata,
266 3 : state->buflen - state->bufdata,
267 : NULL);
268 3 : if (LZ4F_isError(status))
269 0 : pg_fatal("could not end compression: %s",
270 : LZ4F_getErrorName(status));
271 3 : state->bufdata += status;
272 :
273 : /* Write the final bufferload */
274 3 : cs->writeF(AH, state->buffer, state->bufdata);
275 :
276 3 : status = LZ4F_freeCompressionContext(state->ctx);
277 3 : if (LZ4F_isError(status))
278 0 : pg_fatal("could not end compression: %s",
279 : LZ4F_getErrorName(status));
280 :
281 3 : pg_free(state->buffer);
282 3 : pg_free(state);
283 :
284 3 : cs->private_data = NULL;
285 : }
286 :
287 : /*
288 : * Public routines that support LZ4 compressed data I/O
289 : */
290 : void
291 6 : InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec)
292 : {
293 : LZ4State *state;
294 :
295 6 : cs->readData = ReadDataFromArchiveLZ4;
296 6 : cs->writeData = WriteDataToArchiveLZ4;
297 6 : cs->end = EndCompressorLZ4;
298 :
299 6 : cs->compression_spec = compression_spec;
300 :
301 : /*
302 : * Read operations have access to the whole input. No state needs to be
303 : * carried between calls.
304 : */
305 6 : if (cs->readF)
306 3 : return;
307 :
308 3 : state = pg_malloc0_object(LZ4State);
309 3 : if (cs->compression_spec.level >= 0)
310 3 : state->prefs.compressionLevel = cs->compression_spec.level;
311 :
312 3 : if (!LZ4State_compression_init(state))
313 0 : pg_fatal("could not initialize LZ4 compression: %s",
314 : LZ4F_getErrorName(state->errcode));
315 :
316 3 : cs->private_data = state;
317 : }
318 :
319 : /*----------------------
320 : * Compress Stream API
321 : *----------------------
322 : */
323 :
324 :
325 : /*
326 : * LZ4 equivalent to feof() or gzeof(). Return true iff there is no
327 : * more buffered data and the end of the input file has been reached.
328 : */
329 : static bool
330 0 : LZ4Stream_eof(CompressFileHandle *CFH)
331 : {
332 0 : LZ4State *state = (LZ4State *) CFH->private_data;
333 :
334 0 : return state->outbufnext >= state->outbufdata &&
335 0 : state->bufnext >= state->bufdata &&
336 0 : feof(state->fp);
337 : }
338 :
339 : static const char *
340 0 : LZ4Stream_get_error(CompressFileHandle *CFH)
341 : {
342 0 : LZ4State *state = (LZ4State *) CFH->private_data;
343 : const char *errmsg;
344 :
345 0 : if (LZ4F_isError(state->errcode))
346 0 : errmsg = LZ4F_getErrorName(state->errcode);
347 : else
348 0 : errmsg = strerror(errno);
349 :
350 0 : return errmsg;
351 : }
352 :
353 : /*
354 : * Initialize an already alloc'ed LZ4State struct for subsequent calls.
355 : *
356 : * Creates the necessary contexts for either compression or decompression. When
357 : * compressing data (indicated by compressing=true), it additionally writes the
358 : * LZ4 header in the output buffer.
359 : *
360 : * It's expected that a not-yet-initialized LZ4State will be zero-filled.
361 : *
362 : * Returns true on success. In case of a failure returns false, and stores the
363 : * error code in state->errcode.
364 : */
365 : static bool
366 93 : LZ4Stream_init(LZ4State *state, bool compressing)
367 : {
368 : size_t status;
369 :
370 93 : if (state->inited)
371 89 : return true;
372 :
373 4 : state->compressing = compressing;
374 :
375 4 : if (state->compressing)
376 : {
377 4 : if (!LZ4State_compression_init(state))
378 0 : return false;
379 : }
380 : else
381 : {
382 0 : status = LZ4F_createDecompressionContext(&state->dtx, LZ4F_VERSION);
383 0 : if (LZ4F_isError(status))
384 : {
385 0 : state->errcode = status;
386 0 : return false;
387 : }
388 :
389 0 : state->buflen = DEFAULT_IO_BUFFER_SIZE;
390 0 : state->buffer = pg_malloc(state->buflen);
391 0 : state->outbuflen = DEFAULT_IO_BUFFER_SIZE;
392 0 : state->outbuf = pg_malloc(state->outbuflen);
393 : }
394 :
395 4 : state->inited = true;
396 4 : return true;
397 : }
398 :
399 : /*
400 : * The workhorse for reading decompressed content out of an LZ4 compressed
401 : * stream.
402 : *
403 : * It will read up to 'ptrsize' decompressed content, or up to the new line
404 : * char if one is found first when the eol_flag is set.
405 : *
406 : * Returns the number of bytes of decompressed data copied into the ptr
407 : * buffer, or -1 in case of error.
408 : */
409 : static int
410 0 : LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
411 : {
412 0 : int dsize = 0;
413 0 : int remaining = ptrsize;
414 :
415 : /* Lazy init */
416 0 : if (!LZ4Stream_init(state, false /* decompressing */ ))
417 : {
418 0 : pg_log_error("unable to initialize LZ4 library: %s",
419 : LZ4F_getErrorName(state->errcode));
420 0 : return -1;
421 : }
422 :
423 : /* Loop until postcondition is satisfied */
424 0 : while (remaining > 0)
425 : {
426 : /*
427 : * If we already have some decompressed data, return that.
428 : */
429 0 : if (state->outbufnext < state->outbufdata)
430 : {
431 0 : char *outptr = state->outbuf + state->outbufnext;
432 0 : size_t readlen = state->outbufdata - state->outbufnext;
433 0 : bool eol_found = false;
434 :
435 0 : if (readlen > remaining)
436 0 : readlen = remaining;
437 : /* If eol_flag is set, don't read beyond a newline */
438 0 : if (eol_flag)
439 : {
440 0 : char *eolptr = memchr(outptr, '\n', readlen);
441 :
442 0 : if (eolptr)
443 : {
444 0 : readlen = eolptr - outptr + 1;
445 0 : eol_found = true;
446 : }
447 : }
448 0 : memcpy(ptr, outptr, readlen);
449 0 : ptr = ((char *) ptr) + readlen;
450 0 : state->outbufnext += readlen;
451 0 : dsize += readlen;
452 0 : remaining -= readlen;
453 0 : if (eol_found || remaining == 0)
454 : break;
455 : /* We must have emptied outbuf */
456 : Assert(state->outbufnext >= state->outbufdata);
457 : }
458 :
459 : /*
460 : * If we don't have any pending compressed data, load more into
461 : * state->buffer.
462 : */
463 0 : if (state->bufnext >= state->bufdata)
464 : {
465 : size_t rsize;
466 :
467 0 : rsize = fread(state->buffer, 1, state->buflen, state->fp);
468 0 : if (rsize < state->buflen && !feof(state->fp))
469 : {
470 0 : pg_log_error("could not read from input file: %m");
471 0 : return -1;
472 : }
473 0 : if (rsize == 0)
474 0 : break; /* must be EOF */
475 0 : state->bufdata = rsize;
476 0 : state->bufnext = 0;
477 : }
478 :
479 : /*
480 : * Decompress some data into state->outbuf.
481 : */
482 : {
483 : size_t status;
484 0 : size_t outlen = state->outbuflen;
485 0 : size_t inlen = state->bufdata - state->bufnext;
486 :
487 0 : status = LZ4F_decompress(state->dtx,
488 0 : state->outbuf, &outlen,
489 0 : state->buffer + state->bufnext,
490 : &inlen,
491 : NULL);
492 0 : if (LZ4F_isError(status))
493 : {
494 0 : state->errcode = status;
495 0 : pg_log_error("could not read from input file: %s",
496 : LZ4F_getErrorName(state->errcode));
497 0 : return -1;
498 : }
499 0 : state->bufnext += inlen;
500 0 : state->outbufdata = outlen;
501 0 : state->outbufnext = 0;
502 : }
503 : }
504 :
505 0 : return dsize;
506 : }
507 :
508 : /*
509 : * Compress size bytes from ptr and write them to the stream.
510 : */
511 : static void
512 93 : LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
513 : {
514 93 : LZ4State *state = (LZ4State *) CFH->private_data;
515 93 : size_t remaining = size;
516 :
517 : /* Lazy init */
518 93 : if (!LZ4Stream_init(state, true))
519 0 : pg_fatal("unable to initialize LZ4 library: %s",
520 : LZ4F_getErrorName(state->errcode));
521 :
522 190 : while (remaining > 0)
523 : {
524 : size_t chunk;
525 : size_t required;
526 : size_t status;
527 :
528 : /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */
529 97 : chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE);
530 :
531 : /* If not enough space, must flush buffer */
532 97 : required = LZ4F_compressBound(chunk, &state->prefs);
533 97 : if (required > state->buflen - state->bufdata)
534 : {
535 2 : errno = 0;
536 2 : if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata)
537 : {
538 0 : errno = (errno) ? errno : ENOSPC;
539 0 : pg_fatal("error during writing: %m");
540 : }
541 2 : state->bufdata = 0;
542 : }
543 :
544 97 : status = LZ4F_compressUpdate(state->ctx,
545 97 : state->buffer + state->bufdata,
546 97 : state->buflen - state->bufdata,
547 : ptr, chunk, NULL);
548 97 : if (LZ4F_isError(status))
549 0 : pg_fatal("error during writing: %s", LZ4F_getErrorName(status));
550 97 : state->bufdata += status;
551 :
552 97 : ptr = ((const char *) ptr) + chunk;
553 97 : remaining -= chunk;
554 : }
555 93 : }
556 :
557 : /*
558 : * fread() equivalent implementation for LZ4 compressed files.
559 : */
560 : static size_t
561 0 : LZ4Stream_read(void *ptr, size_t size, CompressFileHandle *CFH)
562 : {
563 0 : LZ4State *state = (LZ4State *) CFH->private_data;
564 : int ret;
565 :
566 0 : if ((ret = LZ4Stream_read_internal(state, ptr, size, false)) < 0)
567 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
568 :
569 0 : return (size_t) ret;
570 : }
571 :
572 : /*
573 : * fgetc() equivalent implementation for LZ4 compressed files.
574 : */
575 : static int
576 0 : LZ4Stream_getc(CompressFileHandle *CFH)
577 : {
578 0 : LZ4State *state = (LZ4State *) CFH->private_data;
579 : unsigned char c;
580 :
581 0 : if (LZ4Stream_read_internal(state, &c, 1, false) <= 0)
582 : {
583 0 : if (!LZ4Stream_eof(CFH))
584 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
585 : else
586 0 : pg_fatal("could not read from input file: end of file");
587 : }
588 :
589 0 : return c;
590 : }
591 :
592 : /*
593 : * fgets() equivalent implementation for LZ4 compressed files.
594 : */
595 : static char *
596 0 : LZ4Stream_gets(char *ptr, int size, CompressFileHandle *CFH)
597 : {
598 0 : LZ4State *state = (LZ4State *) CFH->private_data;
599 : int ret;
600 :
601 0 : ret = LZ4Stream_read_internal(state, ptr, size - 1, true);
602 :
603 : /*
604 : * LZ4Stream_read_internal returning 0 or -1 means that it was either an
605 : * EOF or an error, but gets_func is defined to return NULL in either case
606 : * so we can treat both the same here.
607 : */
608 0 : if (ret <= 0)
609 0 : return NULL;
610 :
611 : /*
612 : * Our caller expects the return string to be NULL terminated and we know
613 : * that ret is greater than zero.
614 : */
615 0 : ptr[ret - 1] = '\0';
616 :
617 0 : return ptr;
618 : }
619 :
620 : /*
621 : * Finalize (de)compression of a stream. When compressing it will write any
622 : * remaining content and/or generated footer from the LZ4 API.
623 : */
624 : static bool
625 4 : LZ4Stream_close(CompressFileHandle *CFH)
626 : {
627 : FILE *fp;
628 4 : LZ4State *state = (LZ4State *) CFH->private_data;
629 : size_t required;
630 : size_t status;
631 : int ret;
632 4 : bool success = true;
633 :
634 4 : fp = state->fp;
635 4 : if (state->inited)
636 : {
637 4 : if (state->compressing)
638 : {
639 : /* We might need to flush the buffer to make room */
640 4 : required = LZ4F_compressBound(0, &state->prefs);
641 4 : if (required > state->buflen - state->bufdata)
642 : {
643 0 : errno = 0;
644 0 : if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata)
645 : {
646 0 : errno = (errno) ? errno : ENOSPC;
647 0 : pg_log_error("could not write to output file: %m");
648 0 : success = false;
649 : }
650 0 : state->bufdata = 0;
651 : }
652 :
653 4 : status = LZ4F_compressEnd(state->ctx,
654 4 : state->buffer + state->bufdata,
655 4 : state->buflen - state->bufdata,
656 : NULL);
657 4 : if (LZ4F_isError(status))
658 : {
659 0 : pg_log_error("could not end compression: %s",
660 : LZ4F_getErrorName(status));
661 0 : success = false;
662 : }
663 : else
664 4 : state->bufdata += status;
665 :
666 4 : errno = 0;
667 4 : if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata)
668 : {
669 0 : errno = (errno) ? errno : ENOSPC;
670 0 : pg_log_error("could not write to output file: %m");
671 0 : success = false;
672 : }
673 :
674 4 : status = LZ4F_freeCompressionContext(state->ctx);
675 4 : if (LZ4F_isError(status))
676 : {
677 0 : pg_log_error("could not end compression: %s",
678 : LZ4F_getErrorName(status));
679 0 : success = false;
680 : }
681 : }
682 : else
683 : {
684 0 : status = LZ4F_freeDecompressionContext(state->dtx);
685 0 : if (LZ4F_isError(status))
686 : {
687 0 : pg_log_error("could not end decompression: %s",
688 : LZ4F_getErrorName(status));
689 0 : success = false;
690 : }
691 0 : pg_free(state->outbuf);
692 : }
693 :
694 4 : pg_free(state->buffer);
695 : }
696 :
697 4 : pg_free(state);
698 4 : CFH->private_data = NULL;
699 :
700 4 : errno = 0;
701 4 : ret = fclose(fp);
702 4 : if (ret != 0)
703 : {
704 0 : pg_log_error("could not close file: %m");
705 0 : success = false;
706 : }
707 :
708 4 : return success;
709 : }
710 :
711 : static bool
712 4 : LZ4Stream_open(const char *path, int fd, const char *mode,
713 : CompressFileHandle *CFH)
714 : {
715 4 : LZ4State *state = (LZ4State *) CFH->private_data;
716 :
717 4 : if (fd >= 0)
718 : {
719 0 : int dup_fd = dup(fd);
720 :
721 0 : if (dup_fd < 0)
722 : {
723 0 : state->errcode = errno;
724 0 : return false;
725 : }
726 0 : state->fp = fdopen(dup_fd, mode);
727 0 : if (state->fp == NULL)
728 : {
729 0 : state->errcode = errno;
730 0 : close(dup_fd);
731 0 : return false;
732 : }
733 : }
734 : else
735 : {
736 4 : state->fp = fopen(path, mode);
737 4 : if (state->fp == NULL)
738 : {
739 0 : state->errcode = errno;
740 0 : return false;
741 : }
742 : }
743 :
744 4 : return true;
745 : }
746 :
747 : static bool
748 3 : LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
749 : {
750 : char *fname;
751 : int save_errno;
752 : bool ret;
753 :
754 3 : fname = psprintf("%s.lz4", path);
755 3 : ret = CFH->open_func(fname, -1, mode, CFH);
756 :
757 3 : save_errno = errno;
758 3 : pg_free(fname);
759 3 : errno = save_errno;
760 :
761 3 : return ret;
762 : }
763 :
764 : /*
765 : * Public routines
766 : */
767 : void
768 4 : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
769 : const pg_compress_specification compression_spec)
770 : {
771 : LZ4State *state;
772 :
773 4 : CFH->open_func = LZ4Stream_open;
774 4 : CFH->open_write_func = LZ4Stream_open_write;
775 4 : CFH->read_func = LZ4Stream_read;
776 4 : CFH->write_func = LZ4Stream_write;
777 4 : CFH->gets_func = LZ4Stream_gets;
778 4 : CFH->getc_func = LZ4Stream_getc;
779 4 : CFH->eof_func = LZ4Stream_eof;
780 4 : CFH->close_func = LZ4Stream_close;
781 4 : CFH->get_error_func = LZ4Stream_get_error;
782 :
783 4 : CFH->compression_spec = compression_spec;
784 4 : state = pg_malloc0_object(LZ4State);
785 4 : if (CFH->compression_spec.level >= 0)
786 4 : state->prefs.compressionLevel = CFH->compression_spec.level;
787 :
788 4 : CFH->private_data = state;
789 4 : }
790 : #else /* USE_LZ4 */
791 : void
792 : InitCompressorLZ4(CompressorState *cs,
793 : const pg_compress_specification compression_spec)
794 : {
795 : pg_fatal("this build does not support compression with %s", "LZ4");
796 : }
797 :
798 : void
799 : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
800 : const pg_compress_specification compression_spec)
801 : {
802 : pg_fatal("this build does not support compression with %s", "LZ4");
803 : }
804 : #endif /* USE_LZ4 */
|