Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * astreamer_lz4.c
4 : *
5 : * Archive streamers that deal with data compressed using lz4.
6 : * astreamer_lz4_compressor applies lz4 compression to the input stream,
7 : * and astreamer_lz4_decompressor does the reverse.
8 : *
9 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
10 : *
11 : * IDENTIFICATION
12 : * src/fe_utils/astreamer_lz4.c
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres_fe.h"
17 :
18 : #include <unistd.h>
19 :
20 : #ifdef USE_LZ4
21 : #include <lz4frame.h>
22 : #endif
23 :
24 : #include "common/logging.h"
25 : #include "fe_utils/astreamer.h"
26 :
27 : #ifdef USE_LZ4
28 : typedef struct astreamer_lz4_frame
29 : {
30 : astreamer base;
31 :
32 : LZ4F_compressionContext_t cctx;
33 : LZ4F_decompressionContext_t dctx;
34 : LZ4F_preferences_t prefs;
35 :
36 : size_t bytes_written;
37 : bool header_written;
38 : } astreamer_lz4_frame;
39 :
40 : static void astreamer_lz4_compressor_content(astreamer *streamer,
41 : astreamer_member *member,
42 : const char *data, int len,
43 : astreamer_archive_context context);
44 : static void astreamer_lz4_compressor_finalize(astreamer *streamer);
45 : static void astreamer_lz4_compressor_free(astreamer *streamer);
46 :
47 : static const astreamer_ops astreamer_lz4_compressor_ops = {
48 : .content = astreamer_lz4_compressor_content,
49 : .finalize = astreamer_lz4_compressor_finalize,
50 : .free = astreamer_lz4_compressor_free
51 : };
52 :
53 : static void astreamer_lz4_decompressor_content(astreamer *streamer,
54 : astreamer_member *member,
55 : const char *data, int len,
56 : astreamer_archive_context context);
57 : static void astreamer_lz4_decompressor_finalize(astreamer *streamer);
58 : static void astreamer_lz4_decompressor_free(astreamer *streamer);
59 :
60 : static const astreamer_ops astreamer_lz4_decompressor_ops = {
61 : .content = astreamer_lz4_decompressor_content,
62 : .finalize = astreamer_lz4_decompressor_finalize,
63 : .free = astreamer_lz4_decompressor_free
64 : };
65 : #endif
66 :
67 : /*
68 : * Create a new base backup streamer that performs lz4 compression of tar
69 : * blocks.
70 : */
71 : astreamer *
72 2 : astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress)
73 : {
74 : #ifdef USE_LZ4
75 : astreamer_lz4_frame *streamer;
76 : LZ4F_errorCode_t ctxError;
77 : LZ4F_preferences_t *prefs;
78 :
79 : Assert(next != NULL);
80 :
81 2 : streamer = palloc0(sizeof(astreamer_lz4_frame));
82 2 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
83 : &astreamer_lz4_compressor_ops;
84 :
85 2 : streamer->base.bbs_next = next;
86 2 : initStringInfo(&streamer->base.bbs_buffer);
87 2 : streamer->header_written = false;
88 :
89 : /* Initialize stream compression preferences */
90 2 : prefs = &streamer->prefs;
91 2 : memset(prefs, 0, sizeof(LZ4F_preferences_t));
92 2 : prefs->frameInfo.blockSizeID = LZ4F_max256KB;
93 2 : prefs->compressionLevel = compress->level;
94 :
95 2 : ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
96 2 : if (LZ4F_isError(ctxError))
97 0 : pg_log_error("could not create lz4 compression context: %s",
98 : LZ4F_getErrorName(ctxError));
99 :
100 2 : return &streamer->base;
101 : #else
102 : pg_fatal("this build does not support compression with %s", "LZ4");
103 : return NULL; /* keep compiler quiet */
104 : #endif
105 : }
106 :
107 : #ifdef USE_LZ4
108 : /*
109 : * Compress the input data to output buffer.
110 : *
111 : * Find out the compression bound based on input data length for each
112 : * invocation to make sure that output buffer has enough capacity to
113 : * accommodate the compressed data. In case if the output buffer
114 : * capacity falls short of compression bound then forward the content
115 : * of output buffer to next streamer and empty the buffer.
116 : */
117 : static void
118 5408 : astreamer_lz4_compressor_content(astreamer *streamer,
119 : astreamer_member *member,
120 : const char *data, int len,
121 : astreamer_archive_context context)
122 : {
123 : astreamer_lz4_frame *mystreamer;
124 : uint8 *next_in,
125 : *next_out;
126 : size_t out_bound,
127 : compressed_size,
128 : avail_out;
129 :
130 5408 : mystreamer = (astreamer_lz4_frame *) streamer;
131 5408 : next_in = (uint8 *) data;
132 :
133 : /* Write header before processing the first input chunk. */
134 5408 : if (!mystreamer->header_written)
135 : {
136 2 : compressed_size = LZ4F_compressBegin(mystreamer->cctx,
137 2 : (uint8 *) mystreamer->base.bbs_buffer.data,
138 2 : mystreamer->base.bbs_buffer.maxlen,
139 2 : &mystreamer->prefs);
140 :
141 2 : if (LZ4F_isError(compressed_size))
142 0 : pg_log_error("could not write lz4 header: %s",
143 : LZ4F_getErrorName(compressed_size));
144 :
145 2 : mystreamer->bytes_written += compressed_size;
146 2 : mystreamer->header_written = true;
147 : }
148 :
149 : /*
150 : * Update the offset and capacity of output buffer based on number of
151 : * bytes written to output buffer.
152 : */
153 5408 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
154 5408 : avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
155 :
156 : /*
157 : * Find out the compression bound and make sure that output buffer has the
158 : * required capacity for the success of LZ4F_compressUpdate. If needed
159 : * forward the content to next streamer and empty the buffer.
160 : */
161 5408 : out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
162 5408 : if (avail_out < out_bound)
163 : {
164 28 : astreamer_content(mystreamer->base.bbs_next, member,
165 28 : mystreamer->base.bbs_buffer.data,
166 28 : mystreamer->bytes_written,
167 : context);
168 :
169 : /* Enlarge buffer if it falls short of out bound. */
170 28 : if (mystreamer->base.bbs_buffer.maxlen < out_bound)
171 2 : enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
172 :
173 28 : avail_out = mystreamer->base.bbs_buffer.maxlen;
174 28 : mystreamer->bytes_written = 0;
175 28 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
176 : }
177 :
178 : /*
179 : * This call compresses the data starting at next_in and generates the
180 : * output starting at next_out. It expects the caller to provide the size
181 : * of input buffer and capacity of output buffer by providing parameters
182 : * len and avail_out.
183 : *
184 : * It returns the number of bytes compressed to output buffer.
185 : */
186 5408 : compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
187 : next_out, avail_out,
188 : next_in, len, NULL);
189 :
190 5408 : if (LZ4F_isError(compressed_size))
191 0 : pg_log_error("could not compress data: %s",
192 : LZ4F_getErrorName(compressed_size));
193 :
194 5408 : mystreamer->bytes_written += compressed_size;
195 5408 : }
196 :
197 : /*
198 : * End-of-stream processing.
199 : */
200 : static void
201 2 : astreamer_lz4_compressor_finalize(astreamer *streamer)
202 : {
203 : astreamer_lz4_frame *mystreamer;
204 : uint8 *next_out;
205 : size_t footer_bound,
206 : compressed_size,
207 : avail_out;
208 :
209 2 : mystreamer = (astreamer_lz4_frame *) streamer;
210 :
211 : /* Find out the footer bound and update the output buffer. */
212 2 : footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
213 2 : if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
214 : footer_bound)
215 : {
216 0 : astreamer_content(mystreamer->base.bbs_next, NULL,
217 0 : mystreamer->base.bbs_buffer.data,
218 0 : mystreamer->bytes_written,
219 : ASTREAMER_UNKNOWN);
220 :
221 : /* Enlarge buffer if it falls short of footer bound. */
222 0 : if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
223 0 : enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
224 :
225 0 : avail_out = mystreamer->base.bbs_buffer.maxlen;
226 0 : mystreamer->bytes_written = 0;
227 0 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
228 : }
229 : else
230 : {
231 2 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
232 2 : avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
233 : }
234 :
235 : /*
236 : * Finalize the frame and flush whatever data remaining in compression
237 : * context.
238 : */
239 2 : compressed_size = LZ4F_compressEnd(mystreamer->cctx,
240 : next_out, avail_out, NULL);
241 :
242 2 : if (LZ4F_isError(compressed_size))
243 0 : pg_log_error("could not end lz4 compression: %s",
244 : LZ4F_getErrorName(compressed_size));
245 :
246 2 : mystreamer->bytes_written += compressed_size;
247 :
248 2 : astreamer_content(mystreamer->base.bbs_next, NULL,
249 2 : mystreamer->base.bbs_buffer.data,
250 2 : mystreamer->bytes_written,
251 : ASTREAMER_UNKNOWN);
252 :
253 2 : astreamer_finalize(mystreamer->base.bbs_next);
254 2 : }
255 :
256 : /*
257 : * Free memory.
258 : */
259 : static void
260 2 : astreamer_lz4_compressor_free(astreamer *streamer)
261 : {
262 : astreamer_lz4_frame *mystreamer;
263 :
264 2 : mystreamer = (astreamer_lz4_frame *) streamer;
265 2 : astreamer_free(streamer->bbs_next);
266 2 : LZ4F_freeCompressionContext(mystreamer->cctx);
267 2 : pfree(streamer->bbs_buffer.data);
268 2 : pfree(streamer);
269 2 : }
270 : #endif
271 :
272 : /*
273 : * Create a new base backup streamer that performs decompression of lz4
274 : * compressed blocks.
275 : */
276 : astreamer *
277 8 : astreamer_lz4_decompressor_new(astreamer *next)
278 : {
279 : #ifdef USE_LZ4
280 : astreamer_lz4_frame *streamer;
281 : LZ4F_errorCode_t ctxError;
282 :
283 : Assert(next != NULL);
284 :
285 8 : streamer = palloc0(sizeof(astreamer_lz4_frame));
286 8 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
287 : &astreamer_lz4_decompressor_ops;
288 :
289 8 : streamer->base.bbs_next = next;
290 8 : initStringInfo(&streamer->base.bbs_buffer);
291 :
292 : /* Initialize internal stream state for decompression */
293 8 : ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
294 8 : if (LZ4F_isError(ctxError))
295 0 : pg_fatal("could not initialize compression library: %s",
296 : LZ4F_getErrorName(ctxError));
297 :
298 8 : return &streamer->base;
299 : #else
300 : pg_fatal("this build does not support compression with %s", "LZ4");
301 : return NULL; /* keep compiler quiet */
302 : #endif
303 : }
304 :
305 : #ifdef USE_LZ4
306 : /*
307 : * Decompress the input data to output buffer until we run out of input
308 : * data. Each time the output buffer is full, pass on the decompressed data
309 : * to the next streamer.
310 : */
311 : static void
312 328 : astreamer_lz4_decompressor_content(astreamer *streamer,
313 : astreamer_member *member,
314 : const char *data, int len,
315 : astreamer_archive_context context)
316 : {
317 : astreamer_lz4_frame *mystreamer;
318 : uint8 *next_in,
319 : *next_out;
320 : size_t avail_in,
321 : avail_out;
322 :
323 328 : mystreamer = (astreamer_lz4_frame *) streamer;
324 328 : next_in = (uint8 *) data;
325 328 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
326 328 : avail_in = len;
327 328 : avail_out = mystreamer->base.bbs_buffer.maxlen;
328 :
329 239460 : while (avail_in > 0)
330 : {
331 : size_t ret,
332 : read_size,
333 : out_size;
334 :
335 239132 : read_size = avail_in;
336 239132 : out_size = avail_out;
337 :
338 : /*
339 : * This call decompresses the data starting at next_in and generates
340 : * the output data starting at next_out. It expects the caller to
341 : * provide size of the input buffer and total capacity of the output
342 : * buffer by providing the read_size and out_size parameters
343 : * respectively.
344 : *
345 : * Per the documentation of LZ4, parameters read_size and out_size
346 : * behaves as dual parameters. On return, the number of bytes consumed
347 : * from the input buffer will be written back to read_size and the
348 : * number of bytes decompressed to output buffer will be written back
349 : * to out_size respectively.
350 : */
351 239132 : ret = LZ4F_decompress(mystreamer->dctx,
352 : next_out, &out_size,
353 : next_in, &read_size, NULL);
354 :
355 239132 : if (LZ4F_isError(ret))
356 0 : pg_log_error("could not decompress data: %s",
357 : LZ4F_getErrorName(ret));
358 :
359 : /* Update input buffer based on number of bytes consumed */
360 239132 : avail_in -= read_size;
361 239132 : next_in += read_size;
362 :
363 239132 : mystreamer->bytes_written += out_size;
364 :
365 : /*
366 : * If output buffer is full then forward the content to next streamer
367 : * and update the output buffer.
368 : */
369 239132 : if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
370 : {
371 239002 : astreamer_content(mystreamer->base.bbs_next, member,
372 239002 : mystreamer->base.bbs_buffer.data,
373 : mystreamer->base.bbs_buffer.maxlen,
374 : context);
375 :
376 239002 : avail_out = mystreamer->base.bbs_buffer.maxlen;
377 239002 : mystreamer->bytes_written = 0;
378 239002 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
379 : }
380 : else
381 : {
382 130 : avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
383 130 : next_out += mystreamer->bytes_written;
384 : }
385 : }
386 328 : }
387 :
388 : /*
389 : * End-of-stream processing.
390 : */
391 : static void
392 8 : astreamer_lz4_decompressor_finalize(astreamer *streamer)
393 : {
394 : astreamer_lz4_frame *mystreamer;
395 :
396 8 : mystreamer = (astreamer_lz4_frame *) streamer;
397 :
398 : /*
399 : * End of the stream, if there is some pending data in output buffers then
400 : * we must forward it to next streamer.
401 : */
402 8 : astreamer_content(mystreamer->base.bbs_next, NULL,
403 8 : mystreamer->base.bbs_buffer.data,
404 : mystreamer->base.bbs_buffer.maxlen,
405 : ASTREAMER_UNKNOWN);
406 :
407 8 : astreamer_finalize(mystreamer->base.bbs_next);
408 8 : }
409 :
410 : /*
411 : * Free memory.
412 : */
413 : static void
414 8 : astreamer_lz4_decompressor_free(astreamer *streamer)
415 : {
416 : astreamer_lz4_frame *mystreamer;
417 :
418 8 : mystreamer = (astreamer_lz4_frame *) streamer;
419 8 : astreamer_free(streamer->bbs_next);
420 8 : LZ4F_freeDecompressionContext(mystreamer->dctx);
421 8 : pfree(streamer->bbs_buffer.data);
422 8 : pfree(streamer);
423 8 : }
424 : #endif
|