Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * astreamer_zstd.c
4 : *
5 : * Archive streamers that deal with data compressed using zstd.
6 : * astreamer_zstd_compressor applies zstd compression to the input stream,
7 : * and astreamer_zstd_decompressor does the reverse.
8 : *
9 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
10 : *
11 : * IDENTIFICATION
12 : * src/fe_utils/astreamer_zstd.c
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres_fe.h"
17 :
18 : #include <unistd.h>
19 :
20 : #ifdef USE_ZSTD
21 : #include <zstd.h>
22 : #endif
23 :
24 : #include "common/logging.h"
25 : #include "fe_utils/astreamer.h"
26 :
27 : #ifdef USE_ZSTD
28 :
29 : typedef struct astreamer_zstd_frame
30 : {
31 : astreamer base;
32 :
33 : ZSTD_CCtx *cctx;
34 : ZSTD_DCtx *dctx;
35 : ZSTD_outBuffer zstd_outBuf;
36 : } astreamer_zstd_frame;
37 :
38 : static void astreamer_zstd_compressor_content(astreamer *streamer,
39 : astreamer_member *member,
40 : const char *data, int len,
41 : astreamer_archive_context context);
42 : static void astreamer_zstd_compressor_finalize(astreamer *streamer);
43 : static void astreamer_zstd_compressor_free(astreamer *streamer);
44 :
45 : static const astreamer_ops astreamer_zstd_compressor_ops = {
46 : .content = astreamer_zstd_compressor_content,
47 : .finalize = astreamer_zstd_compressor_finalize,
48 : .free = astreamer_zstd_compressor_free
49 : };
50 :
51 : static void astreamer_zstd_decompressor_content(astreamer *streamer,
52 : astreamer_member *member,
53 : const char *data, int len,
54 : astreamer_archive_context context);
55 : static void astreamer_zstd_decompressor_finalize(astreamer *streamer);
56 : static void astreamer_zstd_decompressor_free(astreamer *streamer);
57 :
58 : static const astreamer_ops astreamer_zstd_decompressor_ops = {
59 : .content = astreamer_zstd_decompressor_content,
60 : .finalize = astreamer_zstd_decompressor_finalize,
61 : .free = astreamer_zstd_decompressor_free
62 : };
63 : #endif
64 :
65 : /*
66 : * Create a new base backup streamer that performs zstd compression of tar
67 : * blocks.
68 : */
69 : astreamer *
70 0 : astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
71 : {
72 : #ifdef USE_ZSTD
73 : astreamer_zstd_frame *streamer;
74 : size_t ret;
75 :
76 : Assert(next != NULL);
77 :
78 : streamer = palloc0_object(astreamer_zstd_frame);
79 :
80 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
81 : &astreamer_zstd_compressor_ops;
82 :
83 : streamer->base.bbs_next = next;
84 : initStringInfo(&streamer->base.bbs_buffer);
85 : enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_CStreamOutSize());
86 :
87 : streamer->cctx = ZSTD_createCCtx();
88 : if (!streamer->cctx)
89 : pg_fatal("could not create zstd compression context");
90 :
91 : /* Set compression level */
92 : ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
93 : compress->level);
94 : if (ZSTD_isError(ret))
95 : pg_fatal("could not set zstd compression level to %d: %s",
96 : compress->level, ZSTD_getErrorName(ret));
97 :
98 : /* Set # of workers, if specified */
99 : if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
100 : {
101 : /*
102 : * On older versions of libzstd, this option does not exist, and
103 : * trying to set it will fail. Similarly for newer versions if they
104 : * are compiled without threading support.
105 : */
106 : ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
107 : compress->workers);
108 : if (ZSTD_isError(ret))
109 : pg_fatal("could not set compression worker count to %d: %s",
110 : compress->workers, ZSTD_getErrorName(ret));
111 : }
112 :
113 : if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
114 : {
115 : ret = ZSTD_CCtx_setParameter(streamer->cctx,
116 : ZSTD_c_enableLongDistanceMatching,
117 : compress->long_distance);
118 : if (ZSTD_isError(ret))
119 : pg_fatal("could not enable long-distance mode: %s",
120 : ZSTD_getErrorName(ret));
121 : }
122 :
123 : /* Initialize the ZSTD output buffer. */
124 : streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
125 : streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
126 : streamer->zstd_outBuf.pos = 0;
127 :
128 : return &streamer->base;
129 : #else
130 0 : pg_fatal("this build does not support compression with %s", "ZSTD");
131 : return NULL; /* keep compiler quiet */
132 : #endif
133 : }
134 :
135 : #ifdef USE_ZSTD
136 : /*
137 : * Compress the input data to output buffer.
138 : *
139 : * Find out the compression bound based on input data length for each
140 : * invocation to make sure that output buffer has enough capacity to
141 : * accommodate the compressed data. In case if the output buffer
142 : * capacity falls short of compression bound then forward the content
143 : * of output buffer to next streamer and empty the buffer.
144 : */
145 : static void
146 : astreamer_zstd_compressor_content(astreamer *streamer,
147 : astreamer_member *member,
148 : const char *data, int len,
149 : astreamer_archive_context context)
150 : {
151 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
152 : ZSTD_inBuffer inBuf = {data, len, 0};
153 :
154 : while (inBuf.pos < inBuf.size)
155 : {
156 : size_t yet_to_flush;
157 : size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
158 :
159 : /*
160 : * If the output buffer is not left with enough space, send the
161 : * compressed bytes to the next streamer, and empty the buffer.
162 : */
163 : if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
164 : max_needed)
165 : {
166 : astreamer_content(mystreamer->base.bbs_next, member,
167 : mystreamer->zstd_outBuf.dst,
168 : mystreamer->zstd_outBuf.pos,
169 : context);
170 :
171 : /* Reset the ZSTD output buffer. */
172 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
173 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
174 : mystreamer->zstd_outBuf.pos = 0;
175 : }
176 :
177 : yet_to_flush =
178 : ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
179 : &inBuf, ZSTD_e_continue);
180 :
181 : if (ZSTD_isError(yet_to_flush))
182 : pg_fatal("could not compress data: %s",
183 : ZSTD_getErrorName(yet_to_flush));
184 : }
185 : }
186 :
187 : /*
188 : * End-of-stream processing.
189 : */
190 : static void
191 : astreamer_zstd_compressor_finalize(astreamer *streamer)
192 : {
193 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
194 : size_t yet_to_flush;
195 :
196 : do
197 : {
198 : ZSTD_inBuffer in = {NULL, 0, 0};
199 : size_t max_needed = ZSTD_compressBound(0);
200 :
201 : /*
202 : * If the output buffer is not left with enough space, send the
203 : * compressed bytes to the next streamer, and empty the buffer.
204 : */
205 : if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
206 : max_needed)
207 : {
208 : astreamer_content(mystreamer->base.bbs_next, NULL,
209 : mystreamer->zstd_outBuf.dst,
210 : mystreamer->zstd_outBuf.pos,
211 : ASTREAMER_UNKNOWN);
212 :
213 : /* Reset the ZSTD output buffer. */
214 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
215 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
216 : mystreamer->zstd_outBuf.pos = 0;
217 : }
218 :
219 : yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
220 : &mystreamer->zstd_outBuf,
221 : &in, ZSTD_e_end);
222 :
223 : if (ZSTD_isError(yet_to_flush))
224 : pg_fatal("could not compress data: %s",
225 : ZSTD_getErrorName(yet_to_flush));
226 :
227 : } while (yet_to_flush > 0);
228 :
229 : /* Make sure to pass any remaining bytes to the next streamer. */
230 : if (mystreamer->zstd_outBuf.pos > 0)
231 : astreamer_content(mystreamer->base.bbs_next, NULL,
232 : mystreamer->zstd_outBuf.dst,
233 : mystreamer->zstd_outBuf.pos,
234 : ASTREAMER_UNKNOWN);
235 :
236 : astreamer_finalize(mystreamer->base.bbs_next);
237 : }
238 :
239 : /*
240 : * Free memory.
241 : */
242 : static void
243 : astreamer_zstd_compressor_free(astreamer *streamer)
244 : {
245 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
246 :
247 : astreamer_free(streamer->bbs_next);
248 : ZSTD_freeCCtx(mystreamer->cctx);
249 : pfree(streamer->bbs_buffer.data);
250 : pfree(streamer);
251 : }
252 : #endif
253 :
254 : /*
255 : * Create a new base backup streamer that performs decompression of zstd
256 : * compressed blocks.
257 : */
258 : astreamer *
259 0 : astreamer_zstd_decompressor_new(astreamer *next)
260 : {
261 : #ifdef USE_ZSTD
262 : astreamer_zstd_frame *streamer;
263 :
264 : Assert(next != NULL);
265 :
266 : streamer = palloc0_object(astreamer_zstd_frame);
267 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
268 : &astreamer_zstd_decompressor_ops;
269 :
270 : streamer->base.bbs_next = next;
271 : initStringInfo(&streamer->base.bbs_buffer);
272 : enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
273 :
274 : streamer->dctx = ZSTD_createDCtx();
275 : if (!streamer->dctx)
276 : pg_fatal("could not create zstd decompression context");
277 :
278 : /* Initialize the ZSTD output buffer. */
279 : streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
280 : streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
281 : streamer->zstd_outBuf.pos = 0;
282 :
283 : return &streamer->base;
284 : #else
285 0 : pg_fatal("this build does not support compression with %s", "ZSTD");
286 : return NULL; /* keep compiler quiet */
287 : #endif
288 : }
289 :
290 : #ifdef USE_ZSTD
291 : /*
292 : * Decompress the input data to output buffer until we run out of input
293 : * data. Each time the output buffer is full, pass on the decompressed data
294 : * to the next streamer.
295 : */
296 : static void
297 : astreamer_zstd_decompressor_content(astreamer *streamer,
298 : astreamer_member *member,
299 : const char *data, int len,
300 : astreamer_archive_context context)
301 : {
302 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
303 : ZSTD_inBuffer inBuf = {data, len, 0};
304 :
305 : while (inBuf.pos < inBuf.size)
306 : {
307 : size_t ret;
308 :
309 : /*
310 : * If output buffer is full then forward the content to next streamer
311 : * and update the output buffer.
312 : */
313 : if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
314 : {
315 : astreamer_content(mystreamer->base.bbs_next, member,
316 : mystreamer->zstd_outBuf.dst,
317 : mystreamer->zstd_outBuf.pos,
318 : context);
319 :
320 : /* Reset the ZSTD output buffer. */
321 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
322 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
323 : mystreamer->zstd_outBuf.pos = 0;
324 : }
325 :
326 : ret = ZSTD_decompressStream(mystreamer->dctx,
327 : &mystreamer->zstd_outBuf, &inBuf);
328 :
329 : if (ZSTD_isError(ret))
330 : pg_fatal("could not decompress data: %s",
331 : ZSTD_getErrorName(ret));
332 : }
333 : }
334 :
335 : /*
336 : * End-of-stream processing.
337 : */
338 : static void
339 : astreamer_zstd_decompressor_finalize(astreamer *streamer)
340 : {
341 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
342 :
343 : /*
344 : * End of the stream, if there is some pending data in output buffers then
345 : * we must forward it to next streamer.
346 : */
347 : if (mystreamer->zstd_outBuf.pos > 0)
348 : astreamer_content(mystreamer->base.bbs_next, NULL,
349 : mystreamer->base.bbs_buffer.data,
350 : mystreamer->zstd_outBuf.pos,
351 : ASTREAMER_UNKNOWN);
352 :
353 : astreamer_finalize(mystreamer->base.bbs_next);
354 : }
355 :
356 : /*
357 : * Free memory.
358 : */
359 : static void
360 : astreamer_zstd_decompressor_free(astreamer *streamer)
361 : {
362 : astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
363 :
364 : astreamer_free(streamer->bbs_next);
365 : ZSTD_freeDCtx(mystreamer->dctx);
366 : pfree(streamer->bbs_buffer.data);
367 : pfree(streamer);
368 : }
369 : #endif
|