Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * xloginsert.c
4 : * Functions for constructing WAL records
5 : *
6 : * Constructing a WAL record begins with a call to XLogBeginInsert,
7 : * followed by a number of XLogRegister* calls. The registered data is
8 : * collected in private working memory, and finally assembled into a chain
9 : * of XLogRecData structs by a call to XLogRecordAssemble(). See
10 : * access/transam/README for details.
11 : *
12 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
13 : * Portions Copyright (c) 1994, Regents of the University of California
14 : *
15 : * src/backend/access/transam/xloginsert.c
16 : *
17 : *-------------------------------------------------------------------------
18 : */
19 :
20 : #include "postgres.h"
21 :
22 : #ifdef USE_LZ4
23 : #include <lz4.h>
24 : #endif
25 :
26 : #ifdef USE_ZSTD
27 : #include <zstd.h>
28 : #endif
29 :
30 : #include "access/xact.h"
31 : #include "access/xlog.h"
32 : #include "access/xlog_internal.h"
33 : #include "access/xloginsert.h"
34 : #include "catalog/pg_control.h"
35 : #include "common/pg_lzcompress.h"
36 : #include "miscadmin.h"
37 : #include "pg_trace.h"
38 : #include "replication/origin.h"
39 : #include "storage/bufmgr.h"
40 : #include "storage/proc.h"
41 : #include "utils/memutils.h"
42 :
43 : /*
44 : * Guess the maximum buffer size required to store a compressed version of
45 : * backup block image.
46 : */
47 : #ifdef USE_LZ4
48 : #define LZ4_MAX_BLCKSZ LZ4_COMPRESSBOUND(BLCKSZ)
49 : #else
50 : #define LZ4_MAX_BLCKSZ 0
51 : #endif
52 :
53 : #ifdef USE_ZSTD
54 : #define ZSTD_MAX_BLCKSZ ZSTD_COMPRESSBOUND(BLCKSZ)
55 : #else
56 : #define ZSTD_MAX_BLCKSZ 0
57 : #endif
58 :
59 : #define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
60 :
61 : /* Buffer size required to store a compressed version of backup block image */
62 : #define COMPRESS_BUFSIZE Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
63 :
64 : /*
65 : * For each block reference registered with XLogRegisterBuffer, we fill in
66 : * a registered_buffer struct.
67 : */
68 : typedef struct
69 : {
70 : bool in_use; /* is this slot in use? */
71 : uint8 flags; /* REGBUF_* flags */
72 : RelFileLocator rlocator; /* identifies the relation and block */
73 : ForkNumber forkno;
74 : BlockNumber block;
75 : const PageData *page; /* page content */
76 : uint32 rdata_len; /* total length of data in rdata chain */
77 : XLogRecData *rdata_head; /* head of the chain of data registered with
78 : * this block */
79 : XLogRecData *rdata_tail; /* last entry in the chain, or &rdata_head if
80 : * empty */
81 :
82 : XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to
83 : * backup block data in XLogRecordAssemble() */
84 :
85 : /* buffer to store a compressed version of backup block image */
86 : char compressed_page[COMPRESS_BUFSIZE];
87 : } registered_buffer;
88 :
89 : static registered_buffer *registered_buffers;
90 : static int max_registered_buffers; /* allocated size */
91 : static int max_registered_block_id = 0; /* highest block_id + 1 currently
92 : * registered */
93 :
94 : /*
95 : * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
96 : * with XLogRegisterData(...).
97 : */
98 : static XLogRecData *mainrdata_head;
99 : static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
100 : static uint64 mainrdata_len; /* total # of bytes in chain */
101 :
102 : /* flags for the in-progress insertion */
103 : static uint8 curinsert_flags = 0;
104 :
105 : /*
106 : * These are used to hold the record header while constructing a record.
107 : * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
108 : * because we want it to be MAXALIGNed and padding bytes zeroed.
109 : *
110 : * For simplicity, it's allocated large enough to hold the headers for any
111 : * WAL record.
112 : */
113 : static XLogRecData hdr_rdt;
114 : static char *hdr_scratch = NULL;
115 :
116 : #define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
117 : #define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char))
118 :
119 : #define HEADER_SCRATCH_SIZE \
120 : (SizeOfXLogRecord + \
121 : MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
122 : SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
123 : SizeOfXLogTransactionId)
124 :
125 : /*
126 : * An array of XLogRecData structs, to hold registered data.
127 : */
128 : static XLogRecData *rdatas;
129 : static int num_rdatas; /* entries currently used */
130 : static int max_rdatas; /* allocated size */
131 :
132 : static bool begininsert_called = false;
133 :
134 : /* Memory context to hold the registered buffer and data references. */
135 : static MemoryContext xloginsert_cxt;
136 :
137 : static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
138 : XLogRecPtr RedoRecPtr, bool doPageWrites,
139 : XLogRecPtr *fpw_lsn, int *num_fpi,
140 : bool *topxid_included);
141 : static bool XLogCompressBackupBlock(const PageData *page, uint16 hole_offset,
142 : uint16 hole_length, void *dest, uint16 *dlen);
143 :
144 : /*
145 : * Begin constructing a WAL record. This must be called before the
146 : * XLogRegister* functions and XLogInsert().
147 : */
148 : void
149 30508264 : XLogBeginInsert(void)
150 : {
151 : Assert(max_registered_block_id == 0);
152 : Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
153 : Assert(mainrdata_len == 0);
154 :
155 : /* cross-check on whether we should be here or not */
156 30508264 : if (!XLogInsertAllowed())
157 0 : elog(ERROR, "cannot make new WAL entries during recovery");
158 :
159 30508264 : if (begininsert_called)
160 0 : elog(ERROR, "XLogBeginInsert was already called");
161 :
162 30508264 : begininsert_called = true;
163 30508264 : }
164 :
165 : /*
166 : * Ensure that there are enough buffer and data slots in the working area,
167 : * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
168 : * calls.
169 : *
170 : * There is always space for a small number of buffers and data chunks, enough
171 : * for most record types. This function is for the exceptional cases that need
172 : * more.
173 : */
174 : void
175 140662 : XLogEnsureRecordSpace(int max_block_id, int ndatas)
176 : {
177 : int nbuffers;
178 :
179 : /*
180 : * This must be called before entering a critical section, because
181 : * allocating memory inside a critical section can fail. repalloc() will
182 : * check the same, but better to check it here too so that we fail
183 : * consistently even if the arrays happen to be large enough already.
184 : */
185 : Assert(CritSectionCount == 0);
186 :
187 : /* the minimum values can't be decreased */
188 140662 : if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
189 4050 : max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
190 140662 : if (ndatas < XLR_NORMAL_RDATAS)
191 140612 : ndatas = XLR_NORMAL_RDATAS;
192 :
193 140662 : if (max_block_id > XLR_MAX_BLOCK_ID)
194 0 : elog(ERROR, "maximum number of WAL record block references exceeded");
195 140662 : nbuffers = max_block_id + 1;
196 :
197 140662 : if (nbuffers > max_registered_buffers)
198 : {
199 3390 : registered_buffers = (registered_buffer *)
200 3390 : repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
201 :
202 : /*
203 : * At least the padding bytes in the structs must be zeroed, because
204 : * they are included in WAL data, but initialize it all for tidiness.
205 : */
206 3390 : MemSet(®istered_buffers[max_registered_buffers], 0,
207 : (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
208 3390 : max_registered_buffers = nbuffers;
209 : }
210 :
211 140662 : if (ndatas > max_rdatas)
212 : {
213 30 : rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
214 30 : max_rdatas = ndatas;
215 : }
216 140662 : }
217 :
218 : /*
219 : * Reset WAL record construction buffers.
220 : */
221 : void
222 30566744 : XLogResetInsertion(void)
223 : {
224 : int i;
225 :
226 60836624 : for (i = 0; i < max_registered_block_id; i++)
227 30269880 : registered_buffers[i].in_use = false;
228 :
229 30566744 : num_rdatas = 0;
230 30566744 : max_registered_block_id = 0;
231 30566744 : mainrdata_len = 0;
232 30566744 : mainrdata_last = (XLogRecData *) &mainrdata_head;
233 30566744 : curinsert_flags = 0;
234 30566744 : begininsert_called = false;
235 30566744 : }
236 :
237 : /*
238 : * Register a reference to a buffer with the WAL record being constructed.
239 : * This must be called for every page that the WAL-logged operation modifies.
240 : */
241 : void
242 29685992 : XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
243 : {
244 : registered_buffer *regbuf;
245 :
246 : /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
247 : Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
248 : Assert(begininsert_called);
249 :
250 : /*
251 : * Ordinarily, buffer should be exclusive-locked and marked dirty before
252 : * we get here, otherwise we could end up violating one of the rules in
253 : * access/transam/README.
254 : *
255 : * Some callers intentionally register a clean page and never update that
256 : * page's LSN; in that case they can pass the flag REGBUF_NO_CHANGE to
257 : * bypass these checks.
258 : */
259 : #ifdef USE_ASSERT_CHECKING
260 : if (!(flags & REGBUF_NO_CHANGE))
261 : Assert(BufferIsExclusiveLocked(buffer) && BufferIsDirty(buffer));
262 : #endif
263 :
264 29685992 : if (block_id >= max_registered_block_id)
265 : {
266 28960236 : if (block_id >= max_registered_buffers)
267 0 : elog(ERROR, "too many registered buffers");
268 28960236 : max_registered_block_id = block_id + 1;
269 : }
270 :
271 29685992 : regbuf = ®istered_buffers[block_id];
272 :
273 29685992 : BufferGetTag(buffer, ®buf->rlocator, ®buf->forkno, ®buf->block);
274 29685992 : regbuf->page = BufferGetPage(buffer);
275 29685992 : regbuf->flags = flags;
276 29685992 : regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head;
277 29685992 : regbuf->rdata_len = 0;
278 :
279 : /*
280 : * Check that this page hasn't already been registered with some other
281 : * block_id.
282 : */
283 : #ifdef USE_ASSERT_CHECKING
284 : {
285 : int i;
286 :
287 : for (i = 0; i < max_registered_block_id; i++)
288 : {
289 : registered_buffer *regbuf_old = ®istered_buffers[i];
290 :
291 : if (i == block_id || !regbuf_old->in_use)
292 : continue;
293 :
294 : Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
295 : regbuf_old->forkno != regbuf->forkno ||
296 : regbuf_old->block != regbuf->block);
297 : }
298 : }
299 : #endif
300 :
301 29685992 : regbuf->in_use = true;
302 29685992 : }
303 :
304 : /*
305 : * Like XLogRegisterBuffer, but for registering a block that's not in the
306 : * shared buffer pool (i.e. when you don't have a Buffer for it).
307 : */
308 : void
309 561750 : XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
310 : BlockNumber blknum, const PageData *page, uint8 flags)
311 : {
312 : registered_buffer *regbuf;
313 :
314 : Assert(begininsert_called);
315 :
316 561750 : if (block_id >= max_registered_block_id)
317 561750 : max_registered_block_id = block_id + 1;
318 :
319 561750 : if (block_id >= max_registered_buffers)
320 0 : elog(ERROR, "too many registered buffers");
321 :
322 561750 : regbuf = ®istered_buffers[block_id];
323 :
324 561750 : regbuf->rlocator = *rlocator;
325 561750 : regbuf->forkno = forknum;
326 561750 : regbuf->block = blknum;
327 561750 : regbuf->page = page;
328 561750 : regbuf->flags = flags;
329 561750 : regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head;
330 561750 : regbuf->rdata_len = 0;
331 :
332 : /*
333 : * Check that this page hasn't already been registered with some other
334 : * block_id.
335 : */
336 : #ifdef USE_ASSERT_CHECKING
337 : {
338 : int i;
339 :
340 : for (i = 0; i < max_registered_block_id; i++)
341 : {
342 : registered_buffer *regbuf_old = ®istered_buffers[i];
343 :
344 : if (i == block_id || !regbuf_old->in_use)
345 : continue;
346 :
347 : Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
348 : regbuf_old->forkno != regbuf->forkno ||
349 : regbuf_old->block != regbuf->block);
350 : }
351 : }
352 : #endif
353 :
354 561750 : regbuf->in_use = true;
355 561750 : }
356 :
357 : /*
358 : * Add data to the WAL record that's being constructed.
359 : *
360 : * The data is appended to the "main chunk", available at replay with
361 : * XLogRecGetData().
362 : */
363 : void
364 31484718 : XLogRegisterData(const void *data, uint32 len)
365 : {
366 : XLogRecData *rdata;
367 :
368 : Assert(begininsert_called);
369 :
370 31484718 : if (num_rdatas >= max_rdatas)
371 0 : ereport(ERROR,
372 : (errmsg_internal("too much WAL data"),
373 : errdetail_internal("%d out of %d data segments are already in use.",
374 : num_rdatas, max_rdatas)));
375 31484718 : rdata = &rdatas[num_rdatas++];
376 :
377 31484718 : rdata->data = data;
378 31484718 : rdata->len = len;
379 :
380 : /*
381 : * we use the mainrdata_last pointer to track the end of the chain, so no
382 : * need to clear 'next' here.
383 : */
384 :
385 31484718 : mainrdata_last->next = rdata;
386 31484718 : mainrdata_last = rdata;
387 :
388 31484718 : mainrdata_len += len;
389 31484718 : }
390 :
391 : /*
392 : * Add buffer-specific data to the WAL record that's being constructed.
393 : *
394 : * Block_id must reference a block previously registered with
395 : * XLogRegisterBuffer(). If this is called more than once for the same
396 : * block_id, the data is appended.
397 : *
398 : * The maximum amount of data that can be registered per block is 65535
399 : * bytes. That should be plenty; if you need more than BLCKSZ bytes to
400 : * reconstruct the changes to the page, you might as well just log a full
401 : * copy of it. (the "main data" that's not associated with a block is not
402 : * limited)
403 : */
404 : void
405 41399762 : XLogRegisterBufData(uint8 block_id, const void *data, uint32 len)
406 : {
407 : registered_buffer *regbuf;
408 : XLogRecData *rdata;
409 :
410 : Assert(begininsert_called);
411 :
412 : /* find the registered buffer struct */
413 41399762 : regbuf = ®istered_buffers[block_id];
414 41399762 : if (!regbuf->in_use)
415 0 : elog(ERROR, "no block with id %d registered with WAL insertion",
416 : block_id);
417 :
418 : /*
419 : * Check against max_rdatas and ensure we do not register more data per
420 : * buffer than can be handled by the physical data format; i.e. that
421 : * regbuf->rdata_len does not grow beyond what
422 : * XLogRecordBlockHeader->data_length can hold.
423 : */
424 41399762 : if (num_rdatas >= max_rdatas)
425 0 : ereport(ERROR,
426 : (errmsg_internal("too much WAL data"),
427 : errdetail_internal("%d out of %d data segments are already in use.",
428 : num_rdatas, max_rdatas)));
429 41399762 : if (regbuf->rdata_len + len > UINT16_MAX || len > UINT16_MAX)
430 0 : ereport(ERROR,
431 : (errmsg_internal("too much WAL data"),
432 : errdetail_internal("Registering more than maximum %u bytes allowed to block %u: current %u bytes, adding %u bytes.",
433 : UINT16_MAX, block_id, regbuf->rdata_len, len)));
434 :
435 41399762 : rdata = &rdatas[num_rdatas++];
436 :
437 41399762 : rdata->data = data;
438 41399762 : rdata->len = len;
439 :
440 41399762 : regbuf->rdata_tail->next = rdata;
441 41399762 : regbuf->rdata_tail = rdata;
442 41399762 : regbuf->rdata_len += len;
443 41399762 : }
444 :
445 : /*
446 : * Set insert status flags for the upcoming WAL record.
447 : *
448 : * The flags that can be used here are:
449 : * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
450 : * included in the record.
451 : * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
452 : * durability, which allows to avoid triggering WAL archiving and other
453 : * background activity.
454 : */
455 : void
456 18801224 : XLogSetRecordFlags(uint8 flags)
457 : {
458 : Assert(begininsert_called);
459 18801224 : curinsert_flags |= flags;
460 18801224 : }
461 :
462 : /*
463 : * Insert an XLOG record having the specified RMID and info bytes, with the
464 : * body of the record being the data and buffer references registered earlier
465 : * with XLogRegister* calls.
466 : *
467 : * Returns XLOG pointer to end of record (beginning of next record).
468 : * This can be used as LSN for data pages affected by the logged action.
469 : * (LSN is the XLOG point up to which the XLOG must be flushed to disk
470 : * before the data page can be written out. This implements the basic
471 : * WAL rule "write the log before the data".)
472 : */
473 : XLogRecPtr
474 30508264 : XLogInsert(RmgrId rmid, uint8 info)
475 : {
476 : XLogRecPtr EndPos;
477 :
478 : /* XLogBeginInsert() must have been called. */
479 30508264 : if (!begininsert_called)
480 0 : elog(ERROR, "XLogBeginInsert was not called");
481 :
482 : /*
483 : * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
484 : * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
485 : */
486 30508264 : if ((info & ~(XLR_RMGR_INFO_MASK |
487 : XLR_SPECIAL_REL_UPDATE |
488 : XLR_CHECK_CONSISTENCY)) != 0)
489 0 : elog(PANIC, "invalid xlog info mask %02X", info);
490 :
491 : TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
492 :
493 : /*
494 : * In bootstrap mode, we don't actually log anything but XLOG resources;
495 : * return a phony record pointer.
496 : */
497 30508264 : if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
498 : {
499 1277652 : XLogResetInsertion();
500 1277652 : EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
501 1277652 : return EndPos;
502 : }
503 :
504 : do
505 : {
506 : XLogRecPtr RedoRecPtr;
507 : bool doPageWrites;
508 29245424 : bool topxid_included = false;
509 : XLogRecPtr fpw_lsn;
510 : XLogRecData *rdt;
511 29245424 : int num_fpi = 0;
512 :
513 : /*
514 : * Get values needed to decide whether to do full-page writes. Since
515 : * we don't yet have an insertion lock, these could change under us,
516 : * but XLogInsertRecord will recheck them once it has a lock.
517 : */
518 29245424 : GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
519 :
520 29245424 : rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
521 : &fpw_lsn, &num_fpi, &topxid_included);
522 :
523 29245424 : EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
524 : topxid_included);
525 29245424 : } while (EndPos == InvalidXLogRecPtr);
526 :
527 29230612 : XLogResetInsertion();
528 :
529 29230612 : return EndPos;
530 : }
531 :
532 : /*
533 : * Simple wrapper to XLogInsert to insert a WAL record with elementary
534 : * contents (only an int64 is supported as value currently).
535 : */
536 : XLogRecPtr
537 863452 : XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value)
538 : {
539 863452 : XLogBeginInsert();
540 863452 : XLogRegisterData(&value, sizeof(value));
541 863452 : return XLogInsert(rmid, info);
542 : }
543 :
544 : /*
545 : * Assemble a WAL record from the registered data and buffers into an
546 : * XLogRecData chain, ready for insertion with XLogInsertRecord().
547 : *
548 : * The record header fields are filled in, except for the xl_prev field. The
549 : * calculated CRC does not include the record header yet.
550 : *
551 : * If there are any registered buffers, and a full-page image was not taken
552 : * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
553 : * signals that the assembled record is only good for insertion on the
554 : * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
555 : *
556 : * *topxid_included is set if the topmost transaction ID is logged with the
557 : * current subtransaction.
558 : */
559 : static XLogRecData *
560 29245424 : XLogRecordAssemble(RmgrId rmid, uint8 info,
561 : XLogRecPtr RedoRecPtr, bool doPageWrites,
562 : XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included)
563 : {
564 : XLogRecData *rdt;
565 29245424 : uint64 total_len = 0;
566 : int block_id;
567 : pg_crc32c rdata_crc;
568 29245424 : registered_buffer *prev_regbuf = NULL;
569 : XLogRecData *rdt_datas_last;
570 : XLogRecord *rechdr;
571 29245424 : char *scratch = hdr_scratch;
572 :
573 : /*
574 : * Note: this function can be called multiple times for the same record.
575 : * All the modifications we do to the rdata chains below must handle that.
576 : */
577 :
578 : /* The record begins with the fixed-size header */
579 29245424 : rechdr = (XLogRecord *) scratch;
580 29245424 : scratch += SizeOfXLogRecord;
581 :
582 29245424 : hdr_rdt.next = NULL;
583 29245424 : rdt_datas_last = &hdr_rdt;
584 29245424 : hdr_rdt.data = hdr_scratch;
585 :
586 : /*
587 : * Enforce consistency checks for this record if user is looking for it.
588 : * Do this before at the beginning of this routine to give the possibility
589 : * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
590 : * a record.
591 : */
592 29245424 : if (wal_consistency_checking[rmid])
593 4326412 : info |= XLR_CHECK_CONSISTENCY;
594 :
595 : /*
596 : * Make an rdata chain containing all the data portions of all block
597 : * references. This includes the data for full-page images. Also append
598 : * the headers for the block references in the scratch buffer.
599 : */
600 29245424 : *fpw_lsn = InvalidXLogRecPtr;
601 58298670 : for (block_id = 0; block_id < max_registered_block_id; block_id++)
602 : {
603 29053246 : registered_buffer *regbuf = ®istered_buffers[block_id];
604 : bool needs_backup;
605 : bool needs_data;
606 : XLogRecordBlockHeader bkpb;
607 : XLogRecordBlockImageHeader bimg;
608 29053246 : XLogRecordBlockCompressHeader cbimg = {0};
609 : bool samerel;
610 29053246 : bool is_compressed = false;
611 : bool include_image;
612 :
613 29053246 : if (!regbuf->in_use)
614 22138 : continue;
615 :
616 : /* Determine if this block needs to be backed up */
617 29031108 : if (regbuf->flags & REGBUF_FORCE_IMAGE)
618 607204 : needs_backup = true;
619 28423904 : else if (regbuf->flags & REGBUF_NO_IMAGE)
620 427542 : needs_backup = false;
621 27996362 : else if (!doPageWrites)
622 537714 : needs_backup = false;
623 : else
624 : {
625 : /*
626 : * We assume page LSN is first data on *every* page that can be
627 : * passed to XLogInsert, whether it has the standard page layout
628 : * or not.
629 : */
630 27458648 : XLogRecPtr page_lsn = PageGetLSN(regbuf->page);
631 :
632 27458648 : needs_backup = (page_lsn <= RedoRecPtr);
633 27458648 : if (!needs_backup)
634 : {
635 27267086 : if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
636 26425056 : *fpw_lsn = page_lsn;
637 : }
638 : }
639 :
640 : /* Determine if the buffer data needs to included */
641 29031108 : if (regbuf->rdata_len == 0)
642 5488848 : needs_data = false;
643 23542260 : else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
644 594628 : needs_data = true;
645 : else
646 22947632 : needs_data = !needs_backup;
647 :
648 29031108 : bkpb.id = block_id;
649 29031108 : bkpb.fork_flags = regbuf->forkno;
650 29031108 : bkpb.data_length = 0;
651 :
652 29031108 : if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
653 420518 : bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
654 :
655 : /*
656 : * If needs_backup is true or WAL checking is enabled for current
657 : * resource manager, log a full-page write for the current block.
658 : */
659 29031108 : include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
660 :
661 29031108 : if (include_image)
662 : {
663 5412002 : const PageData *page = regbuf->page;
664 5412002 : uint16 compressed_len = 0;
665 :
666 : /*
667 : * The page needs to be backed up, so calculate its hole length
668 : * and offset.
669 : */
670 5412002 : if (regbuf->flags & REGBUF_STANDARD)
671 : {
672 : /* Assume we can omit data between pd_lower and pd_upper */
673 5102848 : uint16 lower = ((PageHeader) page)->pd_lower;
674 5102848 : uint16 upper = ((PageHeader) page)->pd_upper;
675 :
676 5102848 : if (lower >= SizeOfPageHeaderData &&
677 5099442 : upper > lower &&
678 : upper <= BLCKSZ)
679 : {
680 5099442 : bimg.hole_offset = lower;
681 5099442 : cbimg.hole_length = upper - lower;
682 : }
683 : else
684 : {
685 : /* No "hole" to remove */
686 3406 : bimg.hole_offset = 0;
687 3406 : cbimg.hole_length = 0;
688 : }
689 : }
690 : else
691 : {
692 : /* Not a standard page header, don't try to eliminate "hole" */
693 309154 : bimg.hole_offset = 0;
694 309154 : cbimg.hole_length = 0;
695 : }
696 :
697 : /*
698 : * Try to compress a block image if wal_compression is enabled
699 : */
700 5412002 : if (wal_compression != WAL_COMPRESSION_NONE)
701 : {
702 : is_compressed =
703 0 : XLogCompressBackupBlock(page, bimg.hole_offset,
704 0 : cbimg.hole_length,
705 0 : regbuf->compressed_page,
706 : &compressed_len);
707 : }
708 :
709 : /*
710 : * Fill in the remaining fields in the XLogRecordBlockHeader
711 : * struct
712 : */
713 5412002 : bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
714 :
715 : /* Report a full page image constructed for the WAL record */
716 5412002 : *num_fpi += 1;
717 :
718 : /*
719 : * Construct XLogRecData entries for the page content.
720 : */
721 5412002 : rdt_datas_last->next = ®buf->bkp_rdatas[0];
722 5412002 : rdt_datas_last = rdt_datas_last->next;
723 :
724 5412002 : bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
725 :
726 : /*
727 : * If WAL consistency checking is enabled for the resource manager
728 : * of this WAL record, a full-page image is included in the record
729 : * for the block modified. During redo, the full-page is replayed
730 : * only if BKPIMAGE_APPLY is set.
731 : */
732 5412002 : if (needs_backup)
733 798766 : bimg.bimg_info |= BKPIMAGE_APPLY;
734 :
735 5412002 : if (is_compressed)
736 : {
737 : /* The current compression is stored in the WAL record */
738 0 : bimg.length = compressed_len;
739 :
740 : /* Set the compression method used for this block */
741 0 : switch ((WalCompression) wal_compression)
742 : {
743 0 : case WAL_COMPRESSION_PGLZ:
744 0 : bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
745 0 : break;
746 :
747 0 : case WAL_COMPRESSION_LZ4:
748 : #ifdef USE_LZ4
749 0 : bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
750 : #else
751 : elog(ERROR, "LZ4 is not supported by this build");
752 : #endif
753 0 : break;
754 :
755 0 : case WAL_COMPRESSION_ZSTD:
756 : #ifdef USE_ZSTD
757 : bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
758 : #else
759 0 : elog(ERROR, "zstd is not supported by this build");
760 : #endif
761 : break;
762 :
763 0 : case WAL_COMPRESSION_NONE:
764 : Assert(false); /* cannot happen */
765 0 : break;
766 : /* no default case, so that compiler will warn */
767 : }
768 :
769 0 : rdt_datas_last->data = regbuf->compressed_page;
770 0 : rdt_datas_last->len = compressed_len;
771 : }
772 : else
773 : {
774 5412002 : bimg.length = BLCKSZ - cbimg.hole_length;
775 :
776 5412002 : if (cbimg.hole_length == 0)
777 : {
778 312560 : rdt_datas_last->data = page;
779 312560 : rdt_datas_last->len = BLCKSZ;
780 : }
781 : else
782 : {
783 : /* must skip the hole */
784 5099442 : rdt_datas_last->data = page;
785 5099442 : rdt_datas_last->len = bimg.hole_offset;
786 :
787 5099442 : rdt_datas_last->next = ®buf->bkp_rdatas[1];
788 5099442 : rdt_datas_last = rdt_datas_last->next;
789 :
790 5099442 : rdt_datas_last->data =
791 5099442 : page + (bimg.hole_offset + cbimg.hole_length);
792 5099442 : rdt_datas_last->len =
793 5099442 : BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
794 : }
795 : }
796 :
797 5412002 : total_len += bimg.length;
798 : }
799 :
800 29031108 : if (needs_data)
801 : {
802 : /*
803 : * When copying to XLogRecordBlockHeader, the length is narrowed
804 : * to an uint16. Double-check that it is still correct.
805 : */
806 : Assert(regbuf->rdata_len <= UINT16_MAX);
807 :
808 : /*
809 : * Link the caller-supplied rdata chain for this buffer to the
810 : * overall list.
811 : */
812 23460672 : bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
813 23460672 : bkpb.data_length = (uint16) regbuf->rdata_len;
814 23460672 : total_len += regbuf->rdata_len;
815 :
816 23460672 : rdt_datas_last->next = regbuf->rdata_head;
817 23460672 : rdt_datas_last = regbuf->rdata_tail;
818 : }
819 :
820 29031108 : if (prev_regbuf && RelFileLocatorEquals(regbuf->rlocator, prev_regbuf->rlocator))
821 : {
822 1426602 : samerel = true;
823 1426602 : bkpb.fork_flags |= BKPBLOCK_SAME_REL;
824 : }
825 : else
826 27604506 : samerel = false;
827 29031108 : prev_regbuf = regbuf;
828 :
829 : /* Ok, copy the header to the scratch buffer */
830 29031108 : memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
831 29031108 : scratch += SizeOfXLogRecordBlockHeader;
832 29031108 : if (include_image)
833 : {
834 5412002 : memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
835 5412002 : scratch += SizeOfXLogRecordBlockImageHeader;
836 5412002 : if (cbimg.hole_length != 0 && is_compressed)
837 : {
838 0 : memcpy(scratch, &cbimg,
839 : SizeOfXLogRecordBlockCompressHeader);
840 0 : scratch += SizeOfXLogRecordBlockCompressHeader;
841 : }
842 : }
843 29031108 : if (!samerel)
844 : {
845 27604506 : memcpy(scratch, ®buf->rlocator, sizeof(RelFileLocator));
846 27604506 : scratch += sizeof(RelFileLocator);
847 : }
848 29031108 : memcpy(scratch, ®buf->block, sizeof(BlockNumber));
849 29031108 : scratch += sizeof(BlockNumber);
850 : }
851 :
852 : /* followed by the record's origin, if any */
853 29245424 : if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
854 17406284 : replorigin_session_origin != InvalidRepOriginId)
855 : {
856 300856 : *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
857 300856 : memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
858 300856 : scratch += sizeof(replorigin_session_origin);
859 : }
860 :
861 : /* followed by toplevel XID, if not already included in previous record */
862 29245424 : if (IsSubxactTopXidLogPending())
863 : {
864 442 : TransactionId xid = GetTopTransactionIdIfAny();
865 :
866 : /* Set the flag that the top xid is included in the WAL */
867 442 : *topxid_included = true;
868 :
869 442 : *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
870 442 : memcpy(scratch, &xid, sizeof(TransactionId));
871 442 : scratch += sizeof(TransactionId);
872 : }
873 :
874 : /* followed by main data, if any */
875 29245424 : if (mainrdata_len > 0)
876 : {
877 28601188 : if (mainrdata_len > 255)
878 : {
879 : uint32 mainrdata_len_4b;
880 :
881 61622 : if (mainrdata_len > PG_UINT32_MAX)
882 0 : ereport(ERROR,
883 : (errmsg_internal("too much WAL data"),
884 : errdetail_internal("Main data length is %" PRIu64 " bytes for a maximum of %u bytes.",
885 : mainrdata_len,
886 : PG_UINT32_MAX)));
887 :
888 61622 : mainrdata_len_4b = (uint32) mainrdata_len;
889 61622 : *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
890 61622 : memcpy(scratch, &mainrdata_len_4b, sizeof(uint32));
891 61622 : scratch += sizeof(uint32);
892 : }
893 : else
894 : {
895 28539566 : *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
896 28539566 : *(scratch++) = (uint8) mainrdata_len;
897 : }
898 28601188 : rdt_datas_last->next = mainrdata_head;
899 28601188 : rdt_datas_last = mainrdata_last;
900 28601188 : total_len += mainrdata_len;
901 : }
902 29245424 : rdt_datas_last->next = NULL;
903 :
904 29245424 : hdr_rdt.len = (scratch - hdr_scratch);
905 29245424 : total_len += hdr_rdt.len;
906 :
907 : /*
908 : * Calculate CRC of the data
909 : *
910 : * Note that the record header isn't added into the CRC initially since we
911 : * don't know the prev-link yet. Thus, the CRC will represent the CRC of
912 : * the whole record in the order: rdata, then backup blocks, then record
913 : * header.
914 : */
915 29245424 : INIT_CRC32C(rdata_crc);
916 29245424 : COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
917 108870348 : for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
918 79624924 : COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
919 :
920 : /*
921 : * Ensure that the XLogRecord is not too large.
922 : *
923 : * XLogReader machinery is only able to handle records up to a certain
924 : * size (ignoring machine resource limitations), so make sure that we will
925 : * not emit records larger than the sizes advertised to be supported.
926 : */
927 29245424 : if (total_len > XLogRecordMaxSize)
928 0 : ereport(ERROR,
929 : (errmsg_internal("oversized WAL record"),
930 : errdetail_internal("WAL record would be %" PRIu64 " bytes (of maximum %u bytes); rmid %u flags %u.",
931 : total_len, XLogRecordMaxSize, rmid, info)));
932 :
933 : /*
934 : * Fill in the fields in the record header. Prev-link is filled in later,
935 : * once we know where in the WAL the record will be inserted. The CRC does
936 : * not include the record header yet.
937 : */
938 29245424 : rechdr->xl_xid = GetCurrentTransactionIdIfAny();
939 29245424 : rechdr->xl_tot_len = (uint32) total_len;
940 29245424 : rechdr->xl_info = info;
941 29245424 : rechdr->xl_rmid = rmid;
942 29245424 : rechdr->xl_prev = InvalidXLogRecPtr;
943 29245424 : rechdr->xl_crc = rdata_crc;
944 :
945 29245424 : return &hdr_rdt;
946 : }
947 :
948 : /*
949 : * Create a compressed version of a backup block image.
950 : *
951 : * Returns false if compression fails (i.e., compressed result is actually
952 : * bigger than original). Otherwise, returns true and sets 'dlen' to
953 : * the length of compressed block image.
954 : */
955 : static bool
956 0 : XLogCompressBackupBlock(const PageData *page, uint16 hole_offset, uint16 hole_length,
957 : void *dest, uint16 *dlen)
958 : {
959 0 : int32 orig_len = BLCKSZ - hole_length;
960 0 : int32 len = -1;
961 0 : int32 extra_bytes = 0;
962 : const void *source;
963 : PGAlignedBlock tmp;
964 :
965 0 : if (hole_length != 0)
966 : {
967 : /* must skip the hole */
968 0 : memcpy(tmp.data, page, hole_offset);
969 0 : memcpy(tmp.data + hole_offset,
970 0 : page + (hole_offset + hole_length),
971 0 : BLCKSZ - (hole_length + hole_offset));
972 0 : source = tmp.data;
973 :
974 : /*
975 : * Extra data needs to be stored in WAL record for the compressed
976 : * version of block image if the hole exists.
977 : */
978 0 : extra_bytes = SizeOfXLogRecordBlockCompressHeader;
979 : }
980 : else
981 0 : source = page;
982 :
983 0 : switch ((WalCompression) wal_compression)
984 : {
985 0 : case WAL_COMPRESSION_PGLZ:
986 0 : len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
987 0 : break;
988 :
989 0 : case WAL_COMPRESSION_LZ4:
990 : #ifdef USE_LZ4
991 0 : len = LZ4_compress_default(source, dest, orig_len,
992 : COMPRESS_BUFSIZE);
993 0 : if (len <= 0)
994 0 : len = -1; /* failure */
995 : #else
996 : elog(ERROR, "LZ4 is not supported by this build");
997 : #endif
998 0 : break;
999 :
1000 0 : case WAL_COMPRESSION_ZSTD:
1001 : #ifdef USE_ZSTD
1002 : len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
1003 : ZSTD_CLEVEL_DEFAULT);
1004 : if (ZSTD_isError(len))
1005 : len = -1; /* failure */
1006 : #else
1007 0 : elog(ERROR, "zstd is not supported by this build");
1008 : #endif
1009 : break;
1010 :
1011 0 : case WAL_COMPRESSION_NONE:
1012 : Assert(false); /* cannot happen */
1013 0 : break;
1014 : /* no default case, so that compiler will warn */
1015 : }
1016 :
1017 : /*
1018 : * We recheck the actual size even if compression reports success and see
1019 : * if the number of bytes saved by compression is larger than the length
1020 : * of extra data needed for the compressed version of block image.
1021 : */
1022 0 : if (len >= 0 &&
1023 0 : len + extra_bytes < orig_len)
1024 : {
1025 0 : *dlen = (uint16) len; /* successful compression */
1026 0 : return true;
1027 : }
1028 0 : return false;
1029 : }
1030 :
1031 : /*
1032 : * Determine whether the buffer referenced has to be backed up.
1033 : *
1034 : * Since we don't yet have the insert lock, fullPageWrites and runningBackups
1035 : * (which forces full-page writes) could change later, so the result should
1036 : * be used for optimization purposes only.
1037 : */
1038 : bool
1039 289520 : XLogCheckBufferNeedsBackup(Buffer buffer)
1040 : {
1041 : XLogRecPtr RedoRecPtr;
1042 : bool doPageWrites;
1043 : Page page;
1044 :
1045 289520 : GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
1046 :
1047 289520 : page = BufferGetPage(buffer);
1048 :
1049 289520 : if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
1050 2484 : return true; /* buffer requires backup */
1051 :
1052 287036 : return false; /* buffer does not need to be backed up */
1053 : }
1054 :
1055 : /*
1056 : * Write a backup block if needed when we are setting a hint. Note that
1057 : * this may be called for a variety of page types, not just heaps.
1058 : *
1059 : * Callable while holding just share lock on the buffer content.
1060 : *
1061 : * We can't use the plain backup block mechanism since that relies on the
1062 : * Buffer being exclusively locked. Since some modifications (setting LSN, hint
1063 : * bits) are allowed in a sharelocked buffer that can lead to wal checksum
1064 : * failures. So instead we copy the page and insert the copied data as normal
1065 : * record data.
1066 : *
1067 : * We only need to do something if page has not yet been full page written in
1068 : * this checkpoint round. The LSN of the inserted wal record is returned if we
1069 : * had to write, InvalidXLogRecPtr otherwise.
1070 : *
1071 : * It is possible that multiple concurrent backends could attempt to write WAL
1072 : * records. In that case, multiple copies of the same block would be recorded
1073 : * in separate WAL records by different backends, though that is still OK from
1074 : * a correctness perspective.
1075 : */
1076 : XLogRecPtr
1077 119716 : XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
1078 : {
1079 119716 : XLogRecPtr recptr = InvalidXLogRecPtr;
1080 : XLogRecPtr lsn;
1081 : XLogRecPtr RedoRecPtr;
1082 :
1083 : /*
1084 : * Ensure no checkpoint can change our view of RedoRecPtr.
1085 : */
1086 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) != 0);
1087 :
1088 : /*
1089 : * Update RedoRecPtr so that we can make the right decision
1090 : */
1091 119716 : RedoRecPtr = GetRedoRecPtr();
1092 :
1093 : /*
1094 : * We assume page LSN is first data on *every* page that can be passed to
1095 : * XLogInsert, whether it has the standard page layout or not. Since we're
1096 : * only holding a share-lock on the page, we must take the buffer header
1097 : * lock when we look at the LSN.
1098 : */
1099 119716 : lsn = BufferGetLSNAtomic(buffer);
1100 :
1101 119716 : if (lsn <= RedoRecPtr)
1102 : {
1103 64586 : int flags = 0;
1104 : PGAlignedBlock copied_buffer;
1105 64586 : char *origdata = (char *) BufferGetBlock(buffer);
1106 : RelFileLocator rlocator;
1107 : ForkNumber forkno;
1108 : BlockNumber blkno;
1109 :
1110 : /*
1111 : * Copy buffer so we don't have to worry about concurrent hint bit or
1112 : * lsn updates. We assume pd_lower/upper cannot be changed without an
1113 : * exclusive lock, so the contents bkp are not racy.
1114 : */
1115 64586 : if (buffer_std)
1116 : {
1117 : /* Assume we can omit data between pd_lower and pd_upper */
1118 40952 : Page page = BufferGetPage(buffer);
1119 40952 : uint16 lower = ((PageHeader) page)->pd_lower;
1120 40952 : uint16 upper = ((PageHeader) page)->pd_upper;
1121 :
1122 40952 : memcpy(copied_buffer.data, origdata, lower);
1123 40952 : memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
1124 : }
1125 : else
1126 23634 : memcpy(copied_buffer.data, origdata, BLCKSZ);
1127 :
1128 64586 : XLogBeginInsert();
1129 :
1130 64586 : if (buffer_std)
1131 40952 : flags |= REGBUF_STANDARD;
1132 :
1133 64586 : BufferGetTag(buffer, &rlocator, &forkno, &blkno);
1134 64586 : XLogRegisterBlock(0, &rlocator, forkno, blkno, copied_buffer.data, flags);
1135 :
1136 64586 : recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
1137 : }
1138 :
1139 119716 : return recptr;
1140 : }
1141 :
1142 : /*
1143 : * Write a WAL record containing a full image of a page. Caller is responsible
1144 : * for writing the page to disk after calling this routine.
1145 : *
1146 : * Note: If you're using this function, you should be building pages in private
1147 : * memory and writing them directly to smgr. If you're using buffers, call
1148 : * log_newpage_buffer instead.
1149 : *
1150 : * If the page follows the standard page layout, with a PageHeader and unused
1151 : * space between pd_lower and pd_upper, set 'page_std' to true. That allows
1152 : * the unused space to be left out from the WAL record, making it smaller.
1153 : */
1154 : XLogRecPtr
1155 253850 : log_newpage(RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blkno,
1156 : Page page, bool page_std)
1157 : {
1158 : int flags;
1159 : XLogRecPtr recptr;
1160 :
1161 253850 : flags = REGBUF_FORCE_IMAGE;
1162 253850 : if (page_std)
1163 253492 : flags |= REGBUF_STANDARD;
1164 :
1165 253850 : XLogBeginInsert();
1166 253850 : XLogRegisterBlock(0, rlocator, forknum, blkno, page, flags);
1167 253850 : recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
1168 :
1169 : /*
1170 : * The page may be uninitialized. If so, we can't set the LSN because that
1171 : * would corrupt the page.
1172 : */
1173 253850 : if (!PageIsNew(page))
1174 : {
1175 253842 : PageSetLSN(page, recptr);
1176 : }
1177 :
1178 253850 : return recptr;
1179 : }
1180 :
1181 : /*
1182 : * Like log_newpage(), but allows logging multiple pages in one operation.
1183 : * It is more efficient than calling log_newpage() for each page separately,
1184 : * because we can write multiple pages in a single WAL record.
1185 : */
1186 : void
1187 38544 : log_newpages(RelFileLocator *rlocator, ForkNumber forknum, int num_pages,
1188 : BlockNumber *blknos, Page *pages, bool page_std)
1189 : {
1190 : int flags;
1191 : XLogRecPtr recptr;
1192 : int i;
1193 : int j;
1194 :
1195 38544 : flags = REGBUF_FORCE_IMAGE;
1196 38544 : if (page_std)
1197 38456 : flags |= REGBUF_STANDARD;
1198 :
1199 : /*
1200 : * Iterate over all the pages. They are collected into batches of
1201 : * XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each
1202 : * batch.
1203 : */
1204 38544 : XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
1205 :
1206 38544 : i = 0;
1207 77088 : while (i < num_pages)
1208 : {
1209 38544 : int batch_start = i;
1210 : int nbatch;
1211 :
1212 38544 : XLogBeginInsert();
1213 :
1214 38544 : nbatch = 0;
1215 114848 : while (nbatch < XLR_MAX_BLOCK_ID && i < num_pages)
1216 : {
1217 76304 : XLogRegisterBlock(nbatch, rlocator, forknum, blknos[i], pages[i], flags);
1218 76304 : i++;
1219 76304 : nbatch++;
1220 : }
1221 :
1222 38544 : recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
1223 :
1224 114848 : for (j = batch_start; j < i; j++)
1225 : {
1226 : /*
1227 : * The page may be uninitialized. If so, we can't set the LSN
1228 : * because that would corrupt the page.
1229 : */
1230 76304 : if (!PageIsNew(pages[j]))
1231 : {
1232 76296 : PageSetLSN(pages[j], recptr);
1233 : }
1234 : }
1235 : }
1236 38544 : }
1237 :
1238 : /*
1239 : * Write a WAL record containing a full image of a page.
1240 : *
1241 : * Caller should initialize the buffer and mark it dirty before calling this
1242 : * function. This function will set the page LSN.
1243 : *
1244 : * If the page follows the standard page layout, with a PageHeader and unused
1245 : * space between pd_lower and pd_upper, set 'page_std' to true. That allows
1246 : * the unused space to be left out from the WAL record, making it smaller.
1247 : */
1248 : XLogRecPtr
1249 248464 : log_newpage_buffer(Buffer buffer, bool page_std)
1250 : {
1251 248464 : Page page = BufferGetPage(buffer);
1252 : RelFileLocator rlocator;
1253 : ForkNumber forknum;
1254 : BlockNumber blkno;
1255 :
1256 : /* Shared buffers should be modified in a critical section. */
1257 : Assert(CritSectionCount > 0);
1258 :
1259 248464 : BufferGetTag(buffer, &rlocator, &forknum, &blkno);
1260 :
1261 248464 : return log_newpage(&rlocator, forknum, blkno, page, page_std);
1262 : }
1263 :
1264 : /*
1265 : * WAL-log a range of blocks in a relation.
1266 : *
1267 : * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
1268 : * written to the WAL. If the range is large, this is done in multiple WAL
1269 : * records.
1270 : *
1271 : * If all page follows the standard page layout, with a PageHeader and unused
1272 : * space between pd_lower and pd_upper, set 'page_std' to true. That allows
1273 : * the unused space to be left out from the WAL records, making them smaller.
1274 : *
1275 : * NOTE: This function acquires exclusive-locks on the pages. Typically, this
1276 : * is used on a newly-built relation, and the caller is holding a
1277 : * AccessExclusiveLock on it, so no other backend can be accessing it at the
1278 : * same time. If that's not the case, you must ensure that this does not
1279 : * cause a deadlock through some other means.
1280 : */
1281 : void
1282 97800 : log_newpage_range(Relation rel, ForkNumber forknum,
1283 : BlockNumber startblk, BlockNumber endblk,
1284 : bool page_std)
1285 : {
1286 : int flags;
1287 : BlockNumber blkno;
1288 :
1289 97800 : flags = REGBUF_FORCE_IMAGE;
1290 97800 : if (page_std)
1291 726 : flags |= REGBUF_STANDARD;
1292 :
1293 : /*
1294 : * Iterate over all the pages in the range. They are collected into
1295 : * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
1296 : * for each batch.
1297 : */
1298 97800 : XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
1299 :
1300 97800 : blkno = startblk;
1301 170972 : while (blkno < endblk)
1302 : {
1303 : Buffer bufpack[XLR_MAX_BLOCK_ID];
1304 : XLogRecPtr recptr;
1305 : int nbufs;
1306 : int i;
1307 :
1308 73174 : CHECK_FOR_INTERRUPTS();
1309 :
1310 : /* Collect a batch of blocks. */
1311 73174 : nbufs = 0;
1312 347848 : while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
1313 : {
1314 274674 : Buffer buf = ReadBufferExtended(rel, forknum, blkno,
1315 : RBM_NORMAL, NULL);
1316 :
1317 274674 : LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1318 :
1319 : /*
1320 : * Completely empty pages are not WAL-logged. Writing a WAL record
1321 : * would change the LSN, and we don't want that. We want the page
1322 : * to stay empty.
1323 : */
1324 274674 : if (!PageIsNew(BufferGetPage(buf)))
1325 271278 : bufpack[nbufs++] = buf;
1326 : else
1327 3396 : UnlockReleaseBuffer(buf);
1328 274674 : blkno++;
1329 : }
1330 :
1331 : /* Nothing more to do if all remaining blocks were empty. */
1332 73174 : if (nbufs == 0)
1333 2 : break;
1334 :
1335 : /* Write WAL record for this batch. */
1336 73172 : XLogBeginInsert();
1337 :
1338 73172 : START_CRIT_SECTION();
1339 344450 : for (i = 0; i < nbufs; i++)
1340 : {
1341 271278 : MarkBufferDirty(bufpack[i]);
1342 271278 : XLogRegisterBuffer(i, bufpack[i], flags);
1343 : }
1344 :
1345 73172 : recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
1346 :
1347 344450 : for (i = 0; i < nbufs; i++)
1348 : {
1349 271278 : PageSetLSN(BufferGetPage(bufpack[i]), recptr);
1350 271278 : UnlockReleaseBuffer(bufpack[i]);
1351 : }
1352 73172 : END_CRIT_SECTION();
1353 : }
1354 97800 : }
1355 :
1356 : /*
1357 : * Allocate working buffers needed for WAL record construction.
1358 : */
1359 : void
1360 42058 : InitXLogInsert(void)
1361 : {
1362 : #ifdef USE_ASSERT_CHECKING
1363 :
1364 : /*
1365 : * Check that any records assembled can be decoded. This is capped based
1366 : * on what XLogReader would require at its maximum bound. The XLOG_BLCKSZ
1367 : * addend covers the larger allocate_recordbuf() demand. This code path
1368 : * is called once per backend, more than enough for this check.
1369 : */
1370 : size_t max_required =
1371 : DecodeXLogRecordRequiredSpace(XLogRecordMaxSize + XLOG_BLCKSZ);
1372 :
1373 : Assert(AllocSizeIsValid(max_required));
1374 : #endif
1375 :
1376 : /* Initialize the working areas */
1377 42058 : if (xloginsert_cxt == NULL)
1378 : {
1379 42058 : xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
1380 : "WAL record construction",
1381 : ALLOCSET_DEFAULT_SIZES);
1382 : }
1383 :
1384 42058 : if (registered_buffers == NULL)
1385 : {
1386 42058 : registered_buffers = (registered_buffer *)
1387 42058 : MemoryContextAllocZero(xloginsert_cxt,
1388 : sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
1389 42058 : max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
1390 : }
1391 42058 : if (rdatas == NULL)
1392 : {
1393 42058 : rdatas = MemoryContextAlloc(xloginsert_cxt,
1394 : sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
1395 42058 : max_rdatas = XLR_NORMAL_RDATAS;
1396 : }
1397 :
1398 : /*
1399 : * Allocate a buffer to hold the header information for a WAL record.
1400 : */
1401 42058 : if (hdr_scratch == NULL)
1402 42058 : hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
1403 : HEADER_SCRATCH_SIZE);
1404 42058 : }
|