Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * astreamer_zstd.c
4 : *
5 : * Archive streamers that deal with data compressed using zstd.
6 : * astreamer_zstd_compressor applies lz4 compression to the input stream,
7 : * and astreamer_zstd_decompressor does the reverse.
8 : *
9 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
10 : *
11 : * IDENTIFICATION
12 : * src/fe_utils/astreamer_zstd.c
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres_fe.h"
17 :
18 : #include <unistd.h>
19 :
20 : #ifdef USE_ZSTD
21 : #include <zstd.h>
22 : #endif
23 :
24 : #include "common/logging.h"
25 : #include "fe_utils/astreamer.h"
26 :
27 : #ifdef USE_ZSTD
28 :
29 : typedef struct astreamer_zstd_frame
30 : {
31 : astreamer base;
32 :
33 : ZSTD_CCtx *cctx;
34 : ZSTD_DCtx *dctx;
35 : ZSTD_outBuffer zstd_outBuf;
36 : } astreamer_zstd_frame;
37 :
38 : static void astreamer_zstd_compressor_content(astreamer *streamer,
39 : astreamer_member *member,
40 : const char *data, int len,
41 : astreamer_archive_context context);
42 : static void astreamer_zstd_compressor_finalize(astreamer *streamer);
43 : static void astreamer_zstd_compressor_free(astreamer *streamer);
44 :
45 : static const astreamer_ops astreamer_zstd_compressor_ops = {
46 : .content = astreamer_zstd_compressor_content,
47 : .finalize = astreamer_zstd_compressor_finalize,
48 : .free = astreamer_zstd_compressor_free
49 : };
50 :
51 : static void astreamer_zstd_decompressor_content(astreamer *streamer,
52 : astreamer_member *member,
53 : const char *data, int len,
54 : astreamer_archive_context context);
55 : static void astreamer_zstd_decompressor_finalize(astreamer *streamer);
56 : static void astreamer_zstd_decompressor_free(astreamer *streamer);
57 :
58 : static const astreamer_ops astreamer_zstd_decompressor_ops = {
59 : .content = astreamer_zstd_decompressor_content,
60 : .finalize = astreamer_zstd_decompressor_finalize,
61 : .free = astreamer_zstd_decompressor_free
62 : };
63 : #endif
64 :
65 : /*
66 : * Create a new base backup streamer that performs zstd compression of tar
67 : * blocks.
68 : */
69 : astreamer *
70 0 : astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
71 : {
72 : #ifdef USE_ZSTD
73 : astreamer_zstd_frame *streamer;
74 : size_t ret;
75 :
76 : Assert(next != NULL);
77 :
78 : streamer = palloc0(sizeof(astreamer_zstd_frame));
79 :
80 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
81 : &astreamer_zstd_compressor_ops;
82 :
83 : streamer->base.bbs_next = next;
84 : initStringInfo(&streamer->base.bbs_buffer);
85 : enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
86 :
87 : streamer->cctx = ZSTD_createCCtx();
88 : if (!streamer->cctx)
89 : pg_fatal("could not create zstd compression context");
90 :
91 : /* Set compression level */
92 : ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
93 : compress->level);
94 : if (ZSTD_isError(ret))
95 : pg_fatal("could not set zstd compression level to %d: %s",
96 : compress->level, ZSTD_getErrorName(ret));
97 :
98 : /* Set # of workers, if specified */
99 : if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
100 : {
101 : /*
102 : * On older versions of libzstd, this option does not exist, and
103 : * trying to set it will fail. Similarly for newer versions if they
104 : * are compiled without threading support.
105 : */
106 : ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
107 : compress->workers);
108 : if (ZSTD_isError(ret))
109 : pg_fatal("could not set compression worker count to %d: %s",
110 : compress->workers, ZSTD_getErrorName(ret));
111 : }
112 :
113 : if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
114 : {
115 : ret = ZSTD_CCtx_setParameter(streamer->cctx,
116 : ZSTD_c_enableLongDistanceMatching,
117 : compress->long_distance);
118 : if (ZSTD_isError(ret))
119 : {
120 : pg_log_error("could not enable long-distance mode: %s",
121 : ZSTD_getErrorName(ret));
122 : exit(1);
123 : }
124 : }
125 :
126 : /* Initialize the ZSTD output buffer. */
127 : streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
128 : streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
129 : streamer->zstd_outBuf.pos = 0;
130 :
131 : return &streamer->base;
132 : #else
133 0 : pg_fatal("this build does not support compression with %s", "ZSTD");
134 : return NULL; /* keep compiler quiet */
135 : #endif
136 : }
137 :
138 : #ifdef USE_ZSTD
139 : /*
140 : * Compress the input data to output buffer.
141 : *
142 : * Find out the compression bound based on input data length for each
143 : * invocation to make sure that output buffer has enough capacity to
144 : * accommodate the compressed data. In case if the output buffer
145 : * capacity falls short of compression bound then forward the content
146 : * of output buffer to next streamer and empty the buffer.
147 : */
148 : static void
149 : astreamer_zstd_compressor_content(astreamer *streamer,
150 : astreamer_member *member,
151 : const char *data, int len,
152 : astreamer_archive_context context)
153 : {
154 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
155 : ZSTD_inBuffer inBuf = {data, len, 0};
156 :
157 : while (inBuf.pos < inBuf.size)
158 : {
159 : size_t yet_to_flush;
160 : size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
161 :
162 : /*
163 : * If the output buffer is not left with enough space, send the
164 : * compressed bytes to the next streamer, and empty the buffer.
165 : */
166 : if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
167 : max_needed)
168 : {
169 : astreamer_content(mystreamer->base.bbs_next, member,
170 : mystreamer->zstd_outBuf.dst,
171 : mystreamer->zstd_outBuf.pos,
172 : context);
173 :
174 : /* Reset the ZSTD output buffer. */
175 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
176 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
177 : mystreamer->zstd_outBuf.pos = 0;
178 : }
179 :
180 : yet_to_flush =
181 : ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
182 : &inBuf, ZSTD_e_continue);
183 :
184 : if (ZSTD_isError(yet_to_flush))
185 : pg_log_error("could not compress data: %s",
186 : ZSTD_getErrorName(yet_to_flush));
187 : }
188 : }
189 :
190 : /*
191 : * End-of-stream processing.
192 : */
193 : static void
194 : astreamer_zstd_compressor_finalize(astreamer *streamer)
195 : {
196 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
197 : size_t yet_to_flush;
198 :
199 : do
200 : {
201 : ZSTD_inBuffer in = {NULL, 0, 0};
202 : size_t max_needed = ZSTD_compressBound(0);
203 :
204 : /*
205 : * If the output buffer is not left with enough space, send the
206 : * compressed bytes to the next streamer, and empty the buffer.
207 : */
208 : if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
209 : max_needed)
210 : {
211 : astreamer_content(mystreamer->base.bbs_next, NULL,
212 : mystreamer->zstd_outBuf.dst,
213 : mystreamer->zstd_outBuf.pos,
214 : ASTREAMER_UNKNOWN);
215 :
216 : /* Reset the ZSTD output buffer. */
217 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
218 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
219 : mystreamer->zstd_outBuf.pos = 0;
220 : }
221 :
222 : yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
223 : &mystreamer->zstd_outBuf,
224 : &in, ZSTD_e_end);
225 :
226 : if (ZSTD_isError(yet_to_flush))
227 : pg_log_error("could not compress data: %s",
228 : ZSTD_getErrorName(yet_to_flush));
229 :
230 : } while (yet_to_flush > 0);
231 :
232 : /* Make sure to pass any remaining bytes to the next streamer. */
233 : if (mystreamer->zstd_outBuf.pos > 0)
234 : astreamer_content(mystreamer->base.bbs_next, NULL,
235 : mystreamer->zstd_outBuf.dst,
236 : mystreamer->zstd_outBuf.pos,
237 : ASTREAMER_UNKNOWN);
238 :
239 : astreamer_finalize(mystreamer->base.bbs_next);
240 : }
241 :
242 : /*
243 : * Free memory.
244 : */
245 : static void
246 : astreamer_zstd_compressor_free(astreamer *streamer)
247 : {
248 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
249 :
250 : astreamer_free(streamer->bbs_next);
251 : ZSTD_freeCCtx(mystreamer->cctx);
252 : pfree(streamer->bbs_buffer.data);
253 : pfree(streamer);
254 : }
255 : #endif
256 :
257 : /*
258 : * Create a new base backup streamer that performs decompression of zstd
259 : * compressed blocks.
260 : */
261 : astreamer *
262 0 : astreamer_zstd_decompressor_new(astreamer *next)
263 : {
264 : #ifdef USE_ZSTD
265 : astreamer_zstd_frame *streamer;
266 :
267 : Assert(next != NULL);
268 :
269 : streamer = palloc0(sizeof(astreamer_zstd_frame));
270 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
271 : &astreamer_zstd_decompressor_ops;
272 :
273 : streamer->base.bbs_next = next;
274 : initStringInfo(&streamer->base.bbs_buffer);
275 : enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
276 :
277 : streamer->dctx = ZSTD_createDCtx();
278 : if (!streamer->dctx)
279 : pg_fatal("could not create zstd decompression context");
280 :
281 : /* Initialize the ZSTD output buffer. */
282 : streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
283 : streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
284 : streamer->zstd_outBuf.pos = 0;
285 :
286 : return &streamer->base;
287 : #else
288 0 : pg_fatal("this build does not support compression with %s", "ZSTD");
289 : return NULL; /* keep compiler quiet */
290 : #endif
291 : }
292 :
293 : #ifdef USE_ZSTD
294 : /*
295 : * Decompress the input data to output buffer until we run out of input
296 : * data. Each time the output buffer is full, pass on the decompressed data
297 : * to the next streamer.
298 : */
299 : static void
300 : astreamer_zstd_decompressor_content(astreamer *streamer,
301 : astreamer_member *member,
302 : const char *data, int len,
303 : astreamer_archive_context context)
304 : {
305 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
306 : ZSTD_inBuffer inBuf = {data, len, 0};
307 :
308 : while (inBuf.pos < inBuf.size)
309 : {
310 : size_t ret;
311 :
312 : /*
313 : * If output buffer is full then forward the content to next streamer
314 : * and update the output buffer.
315 : */
316 : if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
317 : {
318 : astreamer_content(mystreamer->base.bbs_next, member,
319 : mystreamer->zstd_outBuf.dst,
320 : mystreamer->zstd_outBuf.pos,
321 : context);
322 :
323 : /* Reset the ZSTD output buffer. */
324 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
325 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
326 : mystreamer->zstd_outBuf.pos = 0;
327 : }
328 :
329 : ret = ZSTD_decompressStream(mystreamer->dctx,
330 : &mystreamer->zstd_outBuf, &inBuf);
331 :
332 : if (ZSTD_isError(ret))
333 : pg_log_error("could not decompress data: %s",
334 : ZSTD_getErrorName(ret));
335 : }
336 : }
337 :
338 : /*
339 : * End-of-stream processing.
340 : */
341 : static void
342 : astreamer_zstd_decompressor_finalize(astreamer *streamer)
343 : {
344 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
345 :
346 : /*
347 : * End of the stream, if there is some pending data in output buffers then
348 : * we must forward it to next streamer.
349 : */
350 : if (mystreamer->zstd_outBuf.pos > 0)
351 : astreamer_content(mystreamer->base.bbs_next, NULL,
352 : mystreamer->base.bbs_buffer.data,
353 : mystreamer->base.bbs_buffer.maxlen,
354 : ASTREAMER_UNKNOWN);
355 :
356 : astreamer_finalize(mystreamer->base.bbs_next);
357 : }
358 :
359 : /*
360 : * Free memory.
361 : */
362 : static void
363 : astreamer_zstd_decompressor_free(astreamer *streamer)
364 : {
365 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
366 :
367 : astreamer_free(streamer->bbs_next);
368 : ZSTD_freeDCtx(mystreamer->dctx);
369 : pfree(streamer->bbs_buffer.data);
370 : pfree(streamer);
371 : }
372 : #endif
|