Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * compress_zstd.c
4 : * Routines for archivers to write a Zstd compressed data stream.
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/bin/pg_dump/compress_zstd.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres_fe.h"
16 :
17 : #include "compress_zstd.h"
18 : #include "pg_backup_utils.h"
19 :
20 : #ifndef USE_ZSTD
21 :
22 : void
23 0 : InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
24 : {
25 0 : pg_fatal("this build does not support compression with %s", "ZSTD");
26 : }
27 :
28 : void
29 0 : InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
30 : {
31 0 : pg_fatal("this build does not support compression with %s", "ZSTD");
32 : }
33 :
34 : #else
35 :
36 : #include <zstd.h>
37 :
38 : typedef struct ZstdCompressorState
39 : {
40 : /* This is a normal file to which we read/write compressed data */
41 : FILE *fp;
42 :
43 : ZSTD_CStream *cstream;
44 : ZSTD_DStream *dstream;
45 : ZSTD_outBuffer output;
46 : ZSTD_inBuffer input;
47 :
48 : /* pointer to a static string like from strerror(), for Zstd_write() */
49 : const char *zstderror;
50 : } ZstdCompressorState;
51 :
52 : static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress);
53 : static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
54 : static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
55 : const void *data, size_t dLen);
56 : static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
57 :
58 : static void
59 : _Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
60 : ZSTD_cParameter param, int value, char *paramname)
61 : {
62 : size_t res;
63 :
64 : res = ZSTD_CCtx_setParameter(cstream, param, value);
65 : if (ZSTD_isError(res))
66 : pg_fatal("could not set compression parameter \"%s\": %s",
67 : paramname, ZSTD_getErrorName(res));
68 : }
69 :
70 : /* Return a compression stream with parameters set per argument */
71 : static ZSTD_CStream *
72 : _ZstdCStreamParams(pg_compress_specification compress)
73 : {
74 : ZSTD_CStream *cstream;
75 :
76 : cstream = ZSTD_createCStream();
77 : if (cstream == NULL)
78 : pg_fatal("could not initialize compression library");
79 :
80 : _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
81 : compress.level, "level");
82 :
83 : if (compress.options & PG_COMPRESSION_OPTION_LONG_DISTANCE)
84 : _Zstd_CCtx_setParam_or_die(cstream,
85 : ZSTD_c_enableLongDistanceMatching,
86 : compress.long_distance, "long");
87 :
88 : return cstream;
89 : }
90 :
91 : /* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
92 : static void
93 : _ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
94 : {
95 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
96 : ZSTD_inBuffer *input = &zstdcs->input;
97 : ZSTD_outBuffer *output = &zstdcs->output;
98 :
99 : /* Loop while there's any input or until flushed */
100 : while (input->pos != input->size || flush)
101 : {
102 : size_t res;
103 :
104 : output->pos = 0;
105 : res = ZSTD_compressStream2(zstdcs->cstream, output,
106 : input, flush ? ZSTD_e_end : ZSTD_e_continue);
107 :
108 : if (ZSTD_isError(res))
109 : pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
110 :
111 : /*
112 : * Extra paranoia: avoid zero-length chunks, since a zero length chunk
113 : * is the EOF marker in the custom format. This should never happen
114 : * but...
115 : */
116 : if (output->pos > 0)
117 : cs->writeF(AH, output->dst, output->pos);
118 :
119 : if (res == 0)
120 : break; /* End of frame or all input consumed */
121 : }
122 : }
123 :
124 : static void
125 : EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
126 : {
127 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
128 :
129 : if (cs->readF != NULL)
130 : {
131 : Assert(zstdcs->cstream == NULL);
132 : ZSTD_freeDStream(zstdcs->dstream);
133 : pg_free(unconstify(void *, zstdcs->input.src));
134 : }
135 : else if (cs->writeF != NULL)
136 : {
137 : Assert(zstdcs->dstream == NULL);
138 : _ZstdWriteCommon(AH, cs, true);
139 : ZSTD_freeCStream(zstdcs->cstream);
140 : }
141 :
142 : /* output buffer may be allocated in either mode */
143 : pg_free(zstdcs->output.dst);
144 : pg_free(zstdcs);
145 : cs->private_data = NULL;
146 : }
147 :
148 : static void
149 : WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
150 : const void *data, size_t dLen)
151 : {
152 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
153 :
154 : zstdcs->input.src = data;
155 : zstdcs->input.size = dLen;
156 : zstdcs->input.pos = 0;
157 :
158 : _ZstdWriteCommon(AH, cs, false);
159 : }
160 :
161 : static void
162 : ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
163 : {
164 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
165 : ZSTD_outBuffer *output = &zstdcs->output;
166 : ZSTD_inBuffer *input = &zstdcs->input;
167 : size_t input_allocated_size = ZSTD_DStreamInSize();
168 : size_t res;
169 :
170 : for (;;)
171 : {
172 : size_t cnt;
173 :
174 : /*
175 : * Read compressed data. Note that readF can resize the buffer; the
176 : * new size is tracked and used for future loops.
177 : */
178 : input->size = input_allocated_size;
179 : cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
180 :
181 : /* ensure that readF didn't *shrink* the buffer */
182 : Assert(input->size >= input_allocated_size);
183 : input_allocated_size = input->size;
184 : input->size = cnt;
185 : input->pos = 0;
186 :
187 : if (cnt == 0)
188 : break;
189 :
190 : /* Now decompress */
191 : while (input->pos < input->size)
192 : {
193 : output->pos = 0;
194 : res = ZSTD_decompressStream(zstdcs->dstream, output, input);
195 : if (ZSTD_isError(res))
196 : pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
197 :
198 : /*
199 : * then write the decompressed data to the output handle
200 : */
201 : ((char *) output->dst)[output->pos] = '\0';
202 : ahwrite(output->dst, 1, output->pos, AH);
203 :
204 : if (res == 0)
205 : break; /* End of frame */
206 : }
207 : }
208 : }
209 :
210 : /* Public routine that supports Zstd compressed data I/O */
211 : void
212 : InitCompressorZstd(CompressorState *cs,
213 : const pg_compress_specification compression_spec)
214 : {
215 : ZstdCompressorState *zstdcs;
216 :
217 : cs->readData = ReadDataFromArchiveZstd;
218 : cs->writeData = WriteDataToArchiveZstd;
219 : cs->end = EndCompressorZstd;
220 :
221 : cs->compression_spec = compression_spec;
222 :
223 : zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
224 : cs->private_data = zstdcs;
225 :
226 : /* We expect that exactly one of readF/writeF is specified */
227 : Assert((cs->readF == NULL) != (cs->writeF == NULL));
228 :
229 : if (cs->readF != NULL)
230 : {
231 : zstdcs->dstream = ZSTD_createDStream();
232 : if (zstdcs->dstream == NULL)
233 : pg_fatal("could not initialize compression library");
234 :
235 : zstdcs->input.size = ZSTD_DStreamInSize();
236 : zstdcs->input.src = pg_malloc(zstdcs->input.size);
237 :
238 : /*
239 : * output.size is the buffer size we tell zstd it can output to.
240 : * Allocate an additional byte such that ReadDataFromArchiveZstd() can
241 : * call ahwrite() with a null-terminated string, which is an optimized
242 : * case in ExecuteSqlCommandBuf().
243 : */
244 : zstdcs->output.size = ZSTD_DStreamOutSize();
245 : zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
246 : }
247 : else if (cs->writeF != NULL)
248 : {
249 : zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec);
250 :
251 : zstdcs->output.size = ZSTD_CStreamOutSize();
252 : zstdcs->output.dst = pg_malloc(zstdcs->output.size);
253 : zstdcs->output.pos = 0;
254 : }
255 : }
256 :
257 : /*
258 : * Compressed stream API
259 : */
260 :
261 : static bool
262 : Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
263 : {
264 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
265 : ZSTD_inBuffer *input = &zstdcs->input;
266 : ZSTD_outBuffer *output = &zstdcs->output;
267 : size_t input_allocated_size = ZSTD_DStreamInSize();
268 : size_t res,
269 : cnt;
270 :
271 : output->size = size;
272 : output->dst = ptr;
273 : output->pos = 0;
274 :
275 : for (;;)
276 : {
277 : Assert(input->pos <= input->size);
278 : Assert(input->size <= input_allocated_size);
279 :
280 : /*
281 : * If the input is completely consumed, start back at the beginning
282 : */
283 : if (input->pos == input->size)
284 : {
285 : /* input->size is size produced by "fread" */
286 : input->size = 0;
287 : /* input->pos is position consumed by decompress */
288 : input->pos = 0;
289 : }
290 :
291 : /* read compressed data if we must produce more input */
292 : if (input->pos == input->size)
293 : {
294 : cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
295 : input->size = cnt;
296 :
297 : Assert(cnt <= input_allocated_size);
298 :
299 : /* If we have no more input to consume, we're done */
300 : if (cnt == 0)
301 : break;
302 : }
303 :
304 : while (input->pos < input->size)
305 : {
306 : /* now decompress */
307 : res = ZSTD_decompressStream(zstdcs->dstream, output, input);
308 :
309 : if (ZSTD_isError(res))
310 : pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
311 :
312 : if (output->pos == output->size)
313 : break; /* No more room for output */
314 :
315 : if (res == 0)
316 : break; /* End of frame */
317 : }
318 :
319 : if (output->pos == output->size)
320 : break; /* We read all the data that fits */
321 : }
322 :
323 : if (rdsize != NULL)
324 : *rdsize = output->pos;
325 :
326 : return true;
327 : }
328 :
329 : static bool
330 : Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
331 : {
332 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
333 : ZSTD_inBuffer *input = &zstdcs->input;
334 : ZSTD_outBuffer *output = &zstdcs->output;
335 : size_t res,
336 : cnt;
337 :
338 : input->src = ptr;
339 : input->size = size;
340 : input->pos = 0;
341 :
342 : /* Consume all input, to be flushed later */
343 : while (input->pos != input->size)
344 : {
345 : output->pos = 0;
346 : res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
347 : if (ZSTD_isError(res))
348 : {
349 : zstdcs->zstderror = ZSTD_getErrorName(res);
350 : return false;
351 : }
352 :
353 : cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
354 : if (cnt != output->pos)
355 : {
356 : zstdcs->zstderror = strerror(errno);
357 : return false;
358 : }
359 : }
360 :
361 : return size;
362 : }
363 :
364 : static int
365 : Zstd_getc(CompressFileHandle *CFH)
366 : {
367 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
368 : int ret;
369 :
370 : if (CFH->read_func(&ret, 1, NULL, CFH) != 1)
371 : {
372 : if (feof(zstdcs->fp))
373 : pg_fatal("could not read from input file: end of file");
374 : else
375 : pg_fatal("could not read from input file: %m");
376 : }
377 : return ret;
378 : }
379 :
380 : static char *
381 : Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
382 : {
383 : int i;
384 :
385 : Assert(len > 0);
386 :
387 : /*
388 : * Read one byte at a time until newline or EOF. This is only used to read
389 : * the list of LOs, and the I/O is buffered anyway.
390 : */
391 : for (i = 0; i < len - 1; ++i)
392 : {
393 : size_t readsz;
394 :
395 : if (!CFH->read_func(&buf[i], 1, &readsz, CFH))
396 : break;
397 : if (readsz != 1)
398 : break;
399 : if (buf[i] == '\n')
400 : {
401 : ++i;
402 : break;
403 : }
404 : }
405 : buf[i] = '\0';
406 : return i > 0 ? buf : NULL;
407 : }
408 :
409 : static bool
410 : Zstd_close(CompressFileHandle *CFH)
411 : {
412 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
413 :
414 : if (zstdcs->cstream)
415 : {
416 : size_t res,
417 : cnt;
418 : ZSTD_inBuffer *input = &zstdcs->input;
419 : ZSTD_outBuffer *output = &zstdcs->output;
420 :
421 : /* Loop until the compression buffers are fully consumed */
422 : for (;;)
423 : {
424 : output->pos = 0;
425 : res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
426 : if (ZSTD_isError(res))
427 : {
428 : zstdcs->zstderror = ZSTD_getErrorName(res);
429 : return false;
430 : }
431 :
432 : cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
433 : if (cnt != output->pos)
434 : {
435 : zstdcs->zstderror = strerror(errno);
436 : return false;
437 : }
438 :
439 : if (res == 0)
440 : break; /* End of frame */
441 : }
442 :
443 : ZSTD_freeCStream(zstdcs->cstream);
444 : pg_free(zstdcs->output.dst);
445 : }
446 :
447 : if (zstdcs->dstream)
448 : {
449 : ZSTD_freeDStream(zstdcs->dstream);
450 : pg_free(unconstify(void *, zstdcs->input.src));
451 : }
452 :
453 : if (fclose(zstdcs->fp) != 0)
454 : return false;
455 :
456 : pg_free(zstdcs);
457 : return true;
458 : }
459 :
460 : static bool
461 : Zstd_eof(CompressFileHandle *CFH)
462 : {
463 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
464 :
465 : return feof(zstdcs->fp);
466 : }
467 :
468 : static bool
469 : Zstd_open(const char *path, int fd, const char *mode,
470 : CompressFileHandle *CFH)
471 : {
472 : FILE *fp;
473 : ZstdCompressorState *zstdcs;
474 :
475 : if (fd >= 0)
476 : fp = fdopen(fd, mode);
477 : else
478 : fp = fopen(path, mode);
479 :
480 : if (fp == NULL)
481 : return false;
482 :
483 : zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
484 : CFH->private_data = zstdcs;
485 : zstdcs->fp = fp;
486 :
487 : if (mode[0] == 'r')
488 : {
489 : zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
490 : zstdcs->dstream = ZSTD_createDStream();
491 : if (zstdcs->dstream == NULL)
492 : pg_fatal("could not initialize compression library");
493 : }
494 : else if (mode[0] == 'w' || mode[0] == 'a')
495 : {
496 : zstdcs->output.size = ZSTD_CStreamOutSize();
497 : zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
498 : zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
499 : if (zstdcs->cstream == NULL)
500 : pg_fatal("could not initialize compression library");
501 : }
502 : else
503 : pg_fatal("unhandled mode \"%s\"", mode);
504 :
505 : return true;
506 : }
507 :
508 : static bool
509 : Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
510 : {
511 : char fname[MAXPGPATH];
512 :
513 : sprintf(fname, "%s.zst", path);
514 : return CFH->open_func(fname, -1, mode, CFH);
515 : }
516 :
517 : static const char *
518 : Zstd_get_error(CompressFileHandle *CFH)
519 : {
520 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
521 :
522 : return zstdcs->zstderror;
523 : }
524 :
525 : void
526 : InitCompressFileHandleZstd(CompressFileHandle *CFH,
527 : const pg_compress_specification compression_spec)
528 : {
529 : CFH->open_func = Zstd_open;
530 : CFH->open_write_func = Zstd_open_write;
531 : CFH->read_func = Zstd_read;
532 : CFH->write_func = Zstd_write;
533 : CFH->gets_func = Zstd_gets;
534 : CFH->getc_func = Zstd_getc;
535 : CFH->close_func = Zstd_close;
536 : CFH->eof_func = Zstd_eof;
537 : CFH->get_error_func = Zstd_get_error;
538 :
539 : CFH->compression_spec = compression_spec;
540 :
541 : CFH->private_data = NULL;
542 : }
543 :
544 : #endif /* USE_ZSTD */
|