Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * compress_zstd.c
4 : * Routines for archivers to write a Zstd compressed data stream.
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/bin/pg_dump/compress_zstd.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres_fe.h"
16 :
17 : #include "compress_zstd.h"
18 : #include "pg_backup_utils.h"
19 :
20 : #ifndef USE_ZSTD
21 :
22 : void
23 0 : InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
24 : {
25 0 : pg_fatal("this build does not support compression with %s", "ZSTD");
26 : }
27 :
28 : void
29 0 : InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
30 : {
31 0 : pg_fatal("this build does not support compression with %s", "ZSTD");
32 : }
33 :
34 : #else
35 :
36 : #include <zstd.h>
37 :
38 : typedef struct ZstdCompressorState
39 : {
40 : /* This is a normal file to which we read/write compressed data */
41 : FILE *fp;
42 :
43 : ZSTD_CStream *cstream;
44 : ZSTD_DStream *dstream;
45 : ZSTD_outBuffer output;
46 : ZSTD_inBuffer input;
47 :
48 : /* pointer to a static string like from strerror(), for Zstd_write() */
49 : const char *zstderror;
50 : } ZstdCompressorState;
51 :
52 : static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress);
53 : static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
54 : static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
55 : const void *data, size_t dLen);
56 : static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
57 :
58 : static void
59 : _Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
60 : ZSTD_cParameter param, int value, char *paramname)
61 : {
62 : size_t res;
63 :
64 : res = ZSTD_CCtx_setParameter(cstream, param, value);
65 : if (ZSTD_isError(res))
66 : pg_fatal("could not set compression parameter \"%s\": %s",
67 : paramname, ZSTD_getErrorName(res));
68 : }
69 :
70 : /* Return a compression stream with parameters set per argument */
71 : static ZSTD_CStream *
72 : _ZstdCStreamParams(pg_compress_specification compress)
73 : {
74 : ZSTD_CStream *cstream;
75 :
76 : cstream = ZSTD_createCStream();
77 : if (cstream == NULL)
78 : pg_fatal("could not initialize compression library");
79 :
80 : _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
81 : compress.level, "level");
82 :
83 : if (compress.options & PG_COMPRESSION_OPTION_LONG_DISTANCE)
84 : _Zstd_CCtx_setParam_or_die(cstream,
85 : ZSTD_c_enableLongDistanceMatching,
86 : compress.long_distance, "long");
87 :
88 : return cstream;
89 : }
90 :
91 : /* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
92 : static void
93 : _ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
94 : {
95 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
96 : ZSTD_inBuffer *input = &zstdcs->input;
97 : ZSTD_outBuffer *output = &zstdcs->output;
98 :
99 : /* Loop while there's any input or until flushed */
100 : while (input->pos != input->size || flush)
101 : {
102 : size_t res;
103 :
104 : output->pos = 0;
105 : res = ZSTD_compressStream2(zstdcs->cstream, output,
106 : input, flush ? ZSTD_e_end : ZSTD_e_continue);
107 :
108 : if (ZSTD_isError(res))
109 : pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
110 :
111 : /*
112 : * Extra paranoia: avoid zero-length chunks, since a zero length chunk
113 : * is the EOF marker in the custom format. This should never happen
114 : * but...
115 : */
116 : if (output->pos > 0)
117 : cs->writeF(AH, output->dst, output->pos);
118 :
119 : if (res == 0)
120 : break; /* End of frame or all input consumed */
121 : }
122 : }
123 :
124 : static void
125 : EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
126 : {
127 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
128 :
129 : if (cs->readF != NULL)
130 : {
131 : Assert(zstdcs->cstream == NULL);
132 : ZSTD_freeDStream(zstdcs->dstream);
133 : pg_free(unconstify(void *, zstdcs->input.src));
134 : }
135 : else if (cs->writeF != NULL)
136 : {
137 : Assert(zstdcs->dstream == NULL);
138 : _ZstdWriteCommon(AH, cs, true);
139 : ZSTD_freeCStream(zstdcs->cstream);
140 : pg_free(zstdcs->output.dst);
141 : }
142 :
143 : pg_free(zstdcs);
144 : }
145 :
146 : static void
147 : WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
148 : const void *data, size_t dLen)
149 : {
150 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
151 :
152 : zstdcs->input.src = data;
153 : zstdcs->input.size = dLen;
154 : zstdcs->input.pos = 0;
155 :
156 : _ZstdWriteCommon(AH, cs, false);
157 : }
158 :
159 : static void
160 : ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
161 : {
162 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
163 : ZSTD_outBuffer *output = &zstdcs->output;
164 : ZSTD_inBuffer *input = &zstdcs->input;
165 : size_t input_allocated_size = ZSTD_DStreamInSize();
166 : size_t res;
167 :
168 : for (;;)
169 : {
170 : size_t cnt;
171 :
172 : /*
173 : * Read compressed data. Note that readF can resize the buffer; the
174 : * new size is tracked and used for future loops.
175 : */
176 : input->size = input_allocated_size;
177 : cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
178 :
179 : /* ensure that readF didn't *shrink* the buffer */
180 : Assert(input->size >= input_allocated_size);
181 : input_allocated_size = input->size;
182 : input->size = cnt;
183 : input->pos = 0;
184 :
185 : if (cnt == 0)
186 : break;
187 :
188 : /* Now decompress */
189 : while (input->pos < input->size)
190 : {
191 : output->pos = 0;
192 : res = ZSTD_decompressStream(zstdcs->dstream, output, input);
193 : if (ZSTD_isError(res))
194 : pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
195 :
196 : /*
197 : * then write the decompressed data to the output handle
198 : */
199 : ((char *) output->dst)[output->pos] = '\0';
200 : ahwrite(output->dst, 1, output->pos, AH);
201 :
202 : if (res == 0)
203 : break; /* End of frame */
204 : }
205 : }
206 : }
207 :
208 : /* Public routine that supports Zstd compressed data I/O */
209 : void
210 : InitCompressorZstd(CompressorState *cs,
211 : const pg_compress_specification compression_spec)
212 : {
213 : ZstdCompressorState *zstdcs;
214 :
215 : cs->readData = ReadDataFromArchiveZstd;
216 : cs->writeData = WriteDataToArchiveZstd;
217 : cs->end = EndCompressorZstd;
218 :
219 : cs->compression_spec = compression_spec;
220 :
221 : zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
222 : cs->private_data = zstdcs;
223 :
224 : /* We expect that exactly one of readF/writeF is specified */
225 : Assert((cs->readF == NULL) != (cs->writeF == NULL));
226 :
227 : if (cs->readF != NULL)
228 : {
229 : zstdcs->dstream = ZSTD_createDStream();
230 : if (zstdcs->dstream == NULL)
231 : pg_fatal("could not initialize compression library");
232 :
233 : zstdcs->input.size = ZSTD_DStreamInSize();
234 : zstdcs->input.src = pg_malloc(zstdcs->input.size);
235 :
236 : /*
237 : * output.size is the buffer size we tell zstd it can output to.
238 : * Allocate an additional byte such that ReadDataFromArchiveZstd() can
239 : * call ahwrite() with a null-terminated string, which is an optimized
240 : * case in ExecuteSqlCommandBuf().
241 : */
242 : zstdcs->output.size = ZSTD_DStreamOutSize();
243 : zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
244 : }
245 : else if (cs->writeF != NULL)
246 : {
247 : zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec);
248 :
249 : zstdcs->output.size = ZSTD_CStreamOutSize();
250 : zstdcs->output.dst = pg_malloc(zstdcs->output.size);
251 : zstdcs->output.pos = 0;
252 : }
253 : }
254 :
255 : /*
256 : * Compressed stream API
257 : */
258 :
259 : static bool
260 : Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
261 : {
262 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
263 : ZSTD_inBuffer *input = &zstdcs->input;
264 : ZSTD_outBuffer *output = &zstdcs->output;
265 : size_t input_allocated_size = ZSTD_DStreamInSize();
266 : size_t res,
267 : cnt;
268 :
269 : output->size = size;
270 : output->dst = ptr;
271 : output->pos = 0;
272 :
273 : for (;;)
274 : {
275 : Assert(input->pos <= input->size);
276 : Assert(input->size <= input_allocated_size);
277 :
278 : /*
279 : * If the input is completely consumed, start back at the beginning
280 : */
281 : if (input->pos == input->size)
282 : {
283 : /* input->size is size produced by "fread" */
284 : input->size = 0;
285 : /* input->pos is position consumed by decompress */
286 : input->pos = 0;
287 : }
288 :
289 : /* read compressed data if we must produce more input */
290 : if (input->pos == input->size)
291 : {
292 : cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
293 : input->size = cnt;
294 :
295 : Assert(cnt <= input_allocated_size);
296 :
297 : /* If we have no more input to consume, we're done */
298 : if (cnt == 0)
299 : break;
300 : }
301 :
302 : while (input->pos < input->size)
303 : {
304 : /* now decompress */
305 : res = ZSTD_decompressStream(zstdcs->dstream, output, input);
306 :
307 : if (ZSTD_isError(res))
308 : pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
309 :
310 : if (output->pos == output->size)
311 : break; /* No more room for output */
312 :
313 : if (res == 0)
314 : break; /* End of frame */
315 : }
316 :
317 : if (output->pos == output->size)
318 : break; /* We read all the data that fits */
319 : }
320 :
321 : if (rdsize != NULL)
322 : *rdsize = output->pos;
323 :
324 : return true;
325 : }
326 :
327 : static bool
328 : Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
329 : {
330 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
331 : ZSTD_inBuffer *input = &zstdcs->input;
332 : ZSTD_outBuffer *output = &zstdcs->output;
333 : size_t res,
334 : cnt;
335 :
336 : input->src = ptr;
337 : input->size = size;
338 : input->pos = 0;
339 :
340 : /* Consume all input, to be flushed later */
341 : while (input->pos != input->size)
342 : {
343 : output->pos = 0;
344 : res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
345 : if (ZSTD_isError(res))
346 : {
347 : zstdcs->zstderror = ZSTD_getErrorName(res);
348 : return false;
349 : }
350 :
351 : cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
352 : if (cnt != output->pos)
353 : {
354 : zstdcs->zstderror = strerror(errno);
355 : return false;
356 : }
357 : }
358 :
359 : return size;
360 : }
361 :
362 : static int
363 : Zstd_getc(CompressFileHandle *CFH)
364 : {
365 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
366 : int ret;
367 :
368 : if (CFH->read_func(&ret, 1, NULL, CFH) != 1)
369 : {
370 : if (feof(zstdcs->fp))
371 : pg_fatal("could not read from input file: end of file");
372 : else
373 : pg_fatal("could not read from input file: %m");
374 : }
375 : return ret;
376 : }
377 :
378 : static char *
379 : Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
380 : {
381 : int i;
382 :
383 : Assert(len > 0);
384 :
385 : /*
386 : * Read one byte at a time until newline or EOF. This is only used to read
387 : * the list of LOs, and the I/O is buffered anyway.
388 : */
389 : for (i = 0; i < len - 1; ++i)
390 : {
391 : size_t readsz;
392 :
393 : if (!CFH->read_func(&buf[i], 1, &readsz, CFH))
394 : break;
395 : if (readsz != 1)
396 : break;
397 : if (buf[i] == '\n')
398 : {
399 : ++i;
400 : break;
401 : }
402 : }
403 : buf[i] = '\0';
404 : return i > 0 ? buf : NULL;
405 : }
406 :
407 : static bool
408 : Zstd_close(CompressFileHandle *CFH)
409 : {
410 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
411 :
412 : if (zstdcs->cstream)
413 : {
414 : size_t res,
415 : cnt;
416 : ZSTD_inBuffer *input = &zstdcs->input;
417 : ZSTD_outBuffer *output = &zstdcs->output;
418 :
419 : /* Loop until the compression buffers are fully consumed */
420 : for (;;)
421 : {
422 : output->pos = 0;
423 : res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
424 : if (ZSTD_isError(res))
425 : {
426 : zstdcs->zstderror = ZSTD_getErrorName(res);
427 : return false;
428 : }
429 :
430 : cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
431 : if (cnt != output->pos)
432 : {
433 : zstdcs->zstderror = strerror(errno);
434 : return false;
435 : }
436 :
437 : if (res == 0)
438 : break; /* End of frame */
439 : }
440 :
441 : ZSTD_freeCStream(zstdcs->cstream);
442 : pg_free(zstdcs->output.dst);
443 : }
444 :
445 : if (zstdcs->dstream)
446 : {
447 : ZSTD_freeDStream(zstdcs->dstream);
448 : pg_free(unconstify(void *, zstdcs->input.src));
449 : }
450 :
451 : if (fclose(zstdcs->fp) != 0)
452 : return false;
453 :
454 : pg_free(zstdcs);
455 : return true;
456 : }
457 :
458 : static bool
459 : Zstd_eof(CompressFileHandle *CFH)
460 : {
461 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
462 :
463 : return feof(zstdcs->fp);
464 : }
465 :
466 : static bool
467 : Zstd_open(const char *path, int fd, const char *mode,
468 : CompressFileHandle *CFH)
469 : {
470 : FILE *fp;
471 : ZstdCompressorState *zstdcs;
472 :
473 : if (fd >= 0)
474 : fp = fdopen(fd, mode);
475 : else
476 : fp = fopen(path, mode);
477 :
478 : if (fp == NULL)
479 : return false;
480 :
481 : zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
482 : CFH->private_data = zstdcs;
483 : zstdcs->fp = fp;
484 :
485 : if (mode[0] == 'r')
486 : {
487 : zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
488 : zstdcs->dstream = ZSTD_createDStream();
489 : if (zstdcs->dstream == NULL)
490 : pg_fatal("could not initialize compression library");
491 : }
492 : else if (mode[0] == 'w' || mode[0] == 'a')
493 : {
494 : zstdcs->output.size = ZSTD_CStreamOutSize();
495 : zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
496 : zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
497 : if (zstdcs->cstream == NULL)
498 : pg_fatal("could not initialize compression library");
499 : }
500 : else
501 : pg_fatal("unhandled mode \"%s\"", mode);
502 :
503 : return true;
504 : }
505 :
506 : static bool
507 : Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
508 : {
509 : char fname[MAXPGPATH];
510 :
511 : sprintf(fname, "%s.zst", path);
512 : return CFH->open_func(fname, -1, mode, CFH);
513 : }
514 :
515 : static const char *
516 : Zstd_get_error(CompressFileHandle *CFH)
517 : {
518 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
519 :
520 : return zstdcs->zstderror;
521 : }
522 :
523 : void
524 : InitCompressFileHandleZstd(CompressFileHandle *CFH,
525 : const pg_compress_specification compression_spec)
526 : {
527 : CFH->open_func = Zstd_open;
528 : CFH->open_write_func = Zstd_open_write;
529 : CFH->read_func = Zstd_read;
530 : CFH->write_func = Zstd_write;
531 : CFH->gets_func = Zstd_gets;
532 : CFH->getc_func = Zstd_getc;
533 : CFH->close_func = Zstd_close;
534 : CFH->eof_func = Zstd_eof;
535 : CFH->get_error_func = Zstd_get_error;
536 :
537 : CFH->compression_spec = compression_spec;
538 :
539 : CFH->private_data = NULL;
540 : }
541 :
542 : #endif /* USE_ZSTD */
|