Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * xloginsert.c
4 : * Functions for constructing WAL records
5 : *
6 : * Constructing a WAL record begins with a call to XLogBeginInsert,
7 : * followed by a number of XLogRegister* calls. The registered data is
8 : * collected in private working memory, and finally assembled into a chain
9 : * of XLogRecData structs by a call to XLogRecordAssemble(). See
10 : * access/transam/README for details.
11 : *
12 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
13 : * Portions Copyright (c) 1994, Regents of the University of California
14 : *
15 : * src/backend/access/transam/xloginsert.c
16 : *
17 : *-------------------------------------------------------------------------
18 : */
19 :
20 : #include "postgres.h"
21 :
22 : #ifdef USE_LZ4
23 : #include <lz4.h>
24 : #endif
25 :
26 : #ifdef USE_ZSTD
27 : #include <zstd.h>
28 : #endif
29 :
30 : #include "access/xact.h"
31 : #include "access/xlog.h"
32 : #include "access/xlog_internal.h"
33 : #include "access/xloginsert.h"
34 : #include "catalog/pg_control.h"
35 : #include "common/pg_lzcompress.h"
36 : #include "executor/instrument.h"
37 : #include "miscadmin.h"
38 : #include "pg_trace.h"
39 : #include "replication/origin.h"
40 : #include "storage/bufmgr.h"
41 : #include "storage/proc.h"
42 : #include "utils/memutils.h"
43 : #include "utils/pgstat_internal.h"
44 :
45 : /*
46 : * Guess the maximum buffer size required to store a compressed version of
47 : * backup block image.
48 : */
49 : #ifdef USE_LZ4
50 : #define LZ4_MAX_BLCKSZ LZ4_COMPRESSBOUND(BLCKSZ)
51 : #else
52 : #define LZ4_MAX_BLCKSZ 0
53 : #endif
54 :
55 : #ifdef USE_ZSTD
56 : #define ZSTD_MAX_BLCKSZ ZSTD_COMPRESSBOUND(BLCKSZ)
57 : #else
58 : #define ZSTD_MAX_BLCKSZ 0
59 : #endif
60 :
61 : #define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
62 :
63 : /* Buffer size required to store a compressed version of backup block image */
64 : #define COMPRESS_BUFSIZE Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
65 :
66 : /*
67 : * For each block reference registered with XLogRegisterBuffer, we fill in
68 : * a registered_buffer struct.
69 : */
70 : typedef struct
71 : {
72 : bool in_use; /* is this slot in use? */
73 : uint8 flags; /* REGBUF_* flags */
74 : RelFileLocator rlocator; /* identifies the relation and block */
75 : ForkNumber forkno;
76 : BlockNumber block;
77 : const PageData *page; /* page content */
78 : uint32 rdata_len; /* total length of data in rdata chain */
79 : XLogRecData *rdata_head; /* head of the chain of data registered with
80 : * this block */
81 : XLogRecData *rdata_tail; /* last entry in the chain, or &rdata_head if
82 : * empty */
83 :
84 : XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to
85 : * backup block data in XLogRecordAssemble() */
86 :
87 : /* buffer to store a compressed version of backup block image */
88 : char compressed_page[COMPRESS_BUFSIZE];
89 : } registered_buffer;
90 :
91 : static registered_buffer *registered_buffers;
92 : static int max_registered_buffers; /* allocated size */
93 : static int max_registered_block_id = 0; /* highest block_id + 1 currently
94 : * registered */
95 :
96 : /*
97 : * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
98 : * with XLogRegisterData(...).
99 : */
100 : static XLogRecData *mainrdata_head;
101 : static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
102 : static uint64 mainrdata_len; /* total # of bytes in chain */
103 :
104 : /* flags for the in-progress insertion */
105 : static uint8 curinsert_flags = 0;
106 :
107 : /*
108 : * These are used to hold the record header while constructing a record.
109 : * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
110 : * because we want it to be MAXALIGNed and padding bytes zeroed.
111 : *
112 : * For simplicity, it's allocated large enough to hold the headers for any
113 : * WAL record.
114 : */
115 : static XLogRecData hdr_rdt;
116 : static char *hdr_scratch = NULL;
117 :
118 : #define SizeOfXlogOrigin (sizeof(ReplOriginId) + sizeof(char))
119 : #define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char))
120 :
121 : #define HEADER_SCRATCH_SIZE \
122 : (SizeOfXLogRecord + \
123 : MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
124 : SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
125 : SizeOfXLogTransactionId)
126 :
127 : /*
128 : * An array of XLogRecData structs, to hold registered data.
129 : */
130 : static XLogRecData *rdatas;
131 : static int num_rdatas; /* entries currently used */
132 : static int max_rdatas; /* allocated size */
133 :
134 : static bool begininsert_called = false;
135 :
136 : /* Memory context to hold the registered buffer and data references. */
137 : static MemoryContext xloginsert_cxt;
138 :
139 : static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
140 : XLogRecPtr RedoRecPtr, bool doPageWrites,
141 : XLogRecPtr *fpw_lsn, int *num_fpi,
142 : uint64 *fpi_bytes,
143 : bool *topxid_included);
144 : static bool XLogCompressBackupBlock(const PageData *page, uint16 hole_offset,
145 : uint16 hole_length, void *dest, uint16 *dlen);
146 :
147 : /*
148 : * Begin constructing a WAL record. This must be called before the
149 : * XLogRegister* functions and XLogInsert().
150 : */
151 : void
152 16070974 : XLogBeginInsert(void)
153 : {
154 : Assert(max_registered_block_id == 0);
155 : Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
156 : Assert(mainrdata_len == 0);
157 :
158 : /* cross-check on whether we should be here or not */
159 16070974 : if (!XLogInsertAllowed())
160 0 : elog(ERROR, "cannot make new WAL entries during recovery");
161 :
162 16070974 : if (begininsert_called)
163 0 : elog(ERROR, "XLogBeginInsert was already called");
164 :
165 16070974 : begininsert_called = true;
166 16070974 : }
167 :
168 : /*
169 : * Ensure that there are enough buffer and data slots in the working area,
170 : * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
171 : * calls.
172 : *
173 : * There is always space for a small number of buffers and data chunks, enough
174 : * for most record types. This function is for the exceptional cases that need
175 : * more.
176 : */
177 : void
178 64273 : XLogEnsureRecordSpace(int max_block_id, int ndatas)
179 : {
180 : int nbuffers;
181 :
182 : /*
183 : * This must be called before entering a critical section, because
184 : * allocating memory inside a critical section can fail. repalloc() will
185 : * check the same, but better to check it here too so that we fail
186 : * consistently even if the arrays happen to be large enough already.
187 : */
188 : Assert(CritSectionCount == 0);
189 :
190 : /* the minimum values can't be decreased */
191 64273 : if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
192 2079 : max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
193 64273 : if (ndatas < XLR_NORMAL_RDATAS)
194 64249 : ndatas = XLR_NORMAL_RDATAS;
195 :
196 64273 : if (max_block_id > XLR_MAX_BLOCK_ID)
197 0 : elog(ERROR, "maximum number of WAL record block references exceeded");
198 64273 : nbuffers = max_block_id + 1;
199 :
200 64273 : if (nbuffers > max_registered_buffers)
201 : {
202 1772 : registered_buffers = (registered_buffer *)
203 1772 : repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
204 :
205 : /*
206 : * At least the padding bytes in the structs must be zeroed, because
207 : * they are included in WAL data, but initialize it all for tidiness.
208 : */
209 1772 : MemSet(®istered_buffers[max_registered_buffers], 0,
210 : (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
211 1772 : max_registered_buffers = nbuffers;
212 : }
213 :
214 64273 : if (ndatas > max_rdatas)
215 : {
216 15 : rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
217 15 : max_rdatas = ndatas;
218 : }
219 64273 : }
220 :
221 : /*
222 : * Reset WAL record construction buffers.
223 : */
224 : void
225 16102255 : XLogResetInsertion(void)
226 : {
227 : int i;
228 :
229 32082880 : for (i = 0; i < max_registered_block_id; i++)
230 15980625 : registered_buffers[i].in_use = false;
231 :
232 16102255 : num_rdatas = 0;
233 16102255 : max_registered_block_id = 0;
234 16102255 : mainrdata_len = 0;
235 16102255 : mainrdata_last = (XLogRecData *) &mainrdata_head;
236 16102255 : curinsert_flags = 0;
237 16102255 : begininsert_called = false;
238 16102255 : }
239 :
240 : /*
241 : * Register a reference to a buffer with the WAL record being constructed.
242 : * This must be called for every page that the WAL-logged operation modifies.
243 : */
244 : void
245 15667454 : XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
246 : {
247 : registered_buffer *regbuf;
248 :
249 : /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
250 : Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
251 : Assert(begininsert_called);
252 :
253 : /*
254 : * Ordinarily, buffer should be exclusive-locked and marked dirty before
255 : * we get here, otherwise we could end up violating one of the rules in
256 : * access/transam/README.
257 : *
258 : * Some callers intentionally register a clean page and never update that
259 : * page's LSN; in that case they can pass the flag REGBUF_NO_CHANGE to
260 : * bypass these checks.
261 : */
262 : #ifdef USE_ASSERT_CHECKING
263 : if (!(flags & REGBUF_NO_CHANGE))
264 : Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE) &&
265 : BufferIsDirty(buffer));
266 : #endif
267 :
268 15667454 : if (block_id >= max_registered_block_id)
269 : {
270 15222221 : if (block_id >= max_registered_buffers)
271 0 : elog(ERROR, "too many registered buffers");
272 15222221 : max_registered_block_id = block_id + 1;
273 : }
274 :
275 15667454 : regbuf = ®istered_buffers[block_id];
276 :
277 15667454 : BufferGetTag(buffer, ®buf->rlocator, ®buf->forkno, ®buf->block);
278 15667454 : regbuf->page = BufferGetPage(buffer);
279 15667454 : regbuf->flags = flags;
280 15667454 : regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head;
281 15667454 : regbuf->rdata_len = 0;
282 :
283 : /*
284 : * Check that this page hasn't already been registered with some other
285 : * block_id.
286 : */
287 : #ifdef USE_ASSERT_CHECKING
288 : {
289 : int i;
290 :
291 : for (i = 0; i < max_registered_block_id; i++)
292 : {
293 : registered_buffer *regbuf_old = ®istered_buffers[i];
294 :
295 : if (i == block_id || !regbuf_old->in_use)
296 : continue;
297 :
298 : Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
299 : regbuf_old->forkno != regbuf->forkno ||
300 : regbuf_old->block != regbuf->block);
301 : }
302 : }
303 : #endif
304 :
305 15667454 : regbuf->in_use = true;
306 15667454 : }
307 :
308 : /*
309 : * Like XLogRegisterBuffer, but for registering a block that's not in the
310 : * shared buffer pool (i.e. when you don't have a Buffer for it).
311 : */
312 : void
313 301687 : XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
314 : BlockNumber blknum, const PageData *page, uint8 flags)
315 : {
316 : registered_buffer *regbuf;
317 :
318 : Assert(begininsert_called);
319 :
320 301687 : if (block_id >= max_registered_block_id)
321 301687 : max_registered_block_id = block_id + 1;
322 :
323 301687 : if (block_id >= max_registered_buffers)
324 0 : elog(ERROR, "too many registered buffers");
325 :
326 301687 : regbuf = ®istered_buffers[block_id];
327 :
328 301687 : regbuf->rlocator = *rlocator;
329 301687 : regbuf->forkno = forknum;
330 301687 : regbuf->block = blknum;
331 301687 : regbuf->page = page;
332 301687 : regbuf->flags = flags;
333 301687 : regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head;
334 301687 : regbuf->rdata_len = 0;
335 :
336 : /*
337 : * Check that this page hasn't already been registered with some other
338 : * block_id.
339 : */
340 : #ifdef USE_ASSERT_CHECKING
341 : {
342 : int i;
343 :
344 : for (i = 0; i < max_registered_block_id; i++)
345 : {
346 : registered_buffer *regbuf_old = ®istered_buffers[i];
347 :
348 : if (i == block_id || !regbuf_old->in_use)
349 : continue;
350 :
351 : Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
352 : regbuf_old->forkno != regbuf->forkno ||
353 : regbuf_old->block != regbuf->block);
354 : }
355 : }
356 : #endif
357 :
358 301687 : regbuf->in_use = true;
359 301687 : }
360 :
361 : /*
362 : * Add data to the WAL record that's being constructed.
363 : *
364 : * The data is appended to the "main chunk", available at replay with
365 : * XLogRecGetData().
366 : */
367 : void
368 16643168 : XLogRegisterData(const void *data, uint32 len)
369 : {
370 : XLogRecData *rdata;
371 :
372 : Assert(begininsert_called);
373 :
374 16643168 : if (num_rdatas >= max_rdatas)
375 0 : ereport(ERROR,
376 : (errmsg_internal("too much WAL data"),
377 : errdetail_internal("%d out of %d data segments are already in use.",
378 : num_rdatas, max_rdatas)));
379 16643168 : rdata = &rdatas[num_rdatas++];
380 :
381 16643168 : rdata->data = data;
382 16643168 : rdata->len = len;
383 :
384 : /*
385 : * we use the mainrdata_last pointer to track the end of the chain, so no
386 : * need to clear 'next' here.
387 : */
388 :
389 16643168 : mainrdata_last->next = rdata;
390 16643168 : mainrdata_last = rdata;
391 :
392 16643168 : mainrdata_len += len;
393 16643168 : }
394 :
395 : /*
396 : * Add buffer-specific data to the WAL record that's being constructed.
397 : *
398 : * Block_id must reference a block previously registered with
399 : * XLogRegisterBuffer(). If this is called more than once for the same
400 : * block_id, the data is appended.
401 : *
402 : * The maximum amount of data that can be registered per block is 65535
403 : * bytes. That should be plenty; if you need more than BLCKSZ bytes to
404 : * reconstruct the changes to the page, you might as well just log a full
405 : * copy of it. (the "main data" that's not associated with a block is not
406 : * limited)
407 : */
408 : void
409 21776088 : XLogRegisterBufData(uint8 block_id, const void *data, uint32 len)
410 : {
411 : registered_buffer *regbuf;
412 : XLogRecData *rdata;
413 :
414 : Assert(begininsert_called);
415 :
416 : /* find the registered buffer struct */
417 21776088 : regbuf = ®istered_buffers[block_id];
418 21776088 : if (!regbuf->in_use)
419 0 : elog(ERROR, "no block with id %d registered with WAL insertion",
420 : block_id);
421 :
422 : /*
423 : * Check against max_rdatas and ensure we do not register more data per
424 : * buffer than can be handled by the physical data format; i.e. that
425 : * regbuf->rdata_len does not grow beyond what
426 : * XLogRecordBlockHeader->data_length can hold.
427 : */
428 21776088 : if (num_rdatas >= max_rdatas)
429 0 : ereport(ERROR,
430 : (errmsg_internal("too much WAL data"),
431 : errdetail_internal("%d out of %d data segments are already in use.",
432 : num_rdatas, max_rdatas)));
433 21776088 : if (regbuf->rdata_len + len > UINT16_MAX || len > UINT16_MAX)
434 0 : ereport(ERROR,
435 : (errmsg_internal("too much WAL data"),
436 : errdetail_internal("Registering more than maximum %u bytes allowed to block %u: current %u bytes, adding %u bytes.",
437 : UINT16_MAX, block_id, regbuf->rdata_len, len)));
438 :
439 21776088 : rdata = &rdatas[num_rdatas++];
440 :
441 21776088 : rdata->data = data;
442 21776088 : rdata->len = len;
443 :
444 21776088 : regbuf->rdata_tail->next = rdata;
445 21776088 : regbuf->rdata_tail = rdata;
446 21776088 : regbuf->rdata_len += len;
447 21776088 : }
448 :
449 : /*
450 : * Set insert status flags for the upcoming WAL record.
451 : *
452 : * The flags that can be used here are:
453 : * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
454 : * included in the record.
455 : * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
456 : * durability, which allows to avoid triggering WAL archiving and other
457 : * background activity.
458 : */
459 : void
460 9942002 : XLogSetRecordFlags(uint8 flags)
461 : {
462 : Assert(begininsert_called);
463 9942002 : curinsert_flags |= flags;
464 9942002 : }
465 :
466 : /*
467 : * Insert an XLOG record having the specified RMID and info bytes, with the
468 : * body of the record being the data and buffer references registered earlier
469 : * with XLogRegister* calls.
470 : *
471 : * Returns XLOG pointer to end of record (beginning of next record).
472 : * This can be used as LSN for data pages affected by the logged action.
473 : * (LSN is the XLOG point up to which the XLOG must be flushed to disk
474 : * before the data page can be written out. This implements the basic
475 : * WAL rule "write the log before the data".)
476 : */
477 : XLogRecPtr
478 16070974 : XLogInsert(RmgrId rmid, uint8 info)
479 : {
480 : XLogRecPtr EndPos;
481 :
482 : /* XLogBeginInsert() must have been called. */
483 16070974 : if (!begininsert_called)
484 0 : elog(ERROR, "XLogBeginInsert was not called");
485 :
486 : /*
487 : * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
488 : * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
489 : */
490 16070974 : if ((info & ~(XLR_RMGR_INFO_MASK |
491 : XLR_SPECIAL_REL_UPDATE |
492 : XLR_CHECK_CONSISTENCY)) != 0)
493 0 : elog(PANIC, "invalid xlog info mask %02X", info);
494 :
495 : TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
496 :
497 : /*
498 : * In bootstrap mode, we don't actually log anything but XLOG resources;
499 : * return a phony record pointer.
500 : */
501 16070974 : if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
502 : {
503 643875 : XLogResetInsertion();
504 643875 : EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
505 643875 : return EndPos;
506 : }
507 :
508 : do
509 : {
510 : XLogRecPtr RedoRecPtr;
511 : bool doPageWrites;
512 15435264 : bool topxid_included = false;
513 : XLogRecPtr fpw_lsn;
514 : XLogRecData *rdt;
515 15435264 : int num_fpi = 0;
516 15435264 : uint64 fpi_bytes = 0;
517 :
518 : /*
519 : * Get values needed to decide whether to do full-page writes. Since
520 : * we don't yet have an insertion lock, these could change under us,
521 : * but XLogInsertRecord will recheck them once it has a lock.
522 : */
523 15435264 : GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
524 :
525 15435264 : rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
526 : &fpw_lsn, &num_fpi, &fpi_bytes,
527 : &topxid_included);
528 :
529 15435264 : EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
530 : fpi_bytes, topxid_included);
531 15435264 : } while (!XLogRecPtrIsValid(EndPos));
532 :
533 15427099 : XLogResetInsertion();
534 :
535 15427099 : return EndPos;
536 : }
537 :
538 : /*
539 : * Simple wrapper to XLogInsert to insert a WAL record with elementary
540 : * contents (only an int64 is supported as value currently).
541 : */
542 : XLogRecPtr
543 432071 : XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value)
544 : {
545 432071 : XLogBeginInsert();
546 432071 : XLogRegisterData(&value, sizeof(value));
547 432071 : return XLogInsert(rmid, info);
548 : }
549 :
550 : /*
551 : * Assemble a WAL record from the registered data and buffers into an
552 : * XLogRecData chain, ready for insertion with XLogInsertRecord().
553 : *
554 : * The record header fields are filled in, except for the xl_prev field. The
555 : * calculated CRC does not include the record header yet.
556 : *
557 : * If there are any registered buffers, and a full-page image was not taken
558 : * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
559 : * signals that the assembled record is only good for insertion on the
560 : * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
561 : *
562 : * *topxid_included is set if the topmost transaction ID is logged with the
563 : * current subtransaction.
564 : */
565 : static XLogRecData *
566 15435264 : XLogRecordAssemble(RmgrId rmid, uint8 info,
567 : XLogRecPtr RedoRecPtr, bool doPageWrites,
568 : XLogRecPtr *fpw_lsn, int *num_fpi, uint64 *fpi_bytes,
569 : bool *topxid_included)
570 : {
571 : XLogRecData *rdt;
572 15435264 : uint64 total_len = 0;
573 : int block_id;
574 : pg_crc32c rdata_crc;
575 15435264 : registered_buffer *prev_regbuf = NULL;
576 : XLogRecData *rdt_datas_last;
577 : XLogRecord *rechdr;
578 15435264 : char *scratch = hdr_scratch;
579 :
580 : /*
581 : * Note: this function can be called multiple times for the same record.
582 : * All the modifications we do to the rdata chains below must handle that.
583 : */
584 :
585 : /* The record begins with the fixed-size header */
586 15435264 : rechdr = (XLogRecord *) scratch;
587 15435264 : scratch += SizeOfXLogRecord;
588 :
589 15435264 : hdr_rdt.next = NULL;
590 15435264 : rdt_datas_last = &hdr_rdt;
591 15435264 : hdr_rdt.data = hdr_scratch;
592 :
593 : /*
594 : * Enforce consistency checks for this record if user is looking for it.
595 : * Do this before at the beginning of this routine to give the possibility
596 : * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
597 : * a record.
598 : */
599 15435264 : if (wal_consistency_checking[rmid])
600 2220445 : info |= XLR_CHECK_CONSISTENCY;
601 :
602 : /*
603 : * Make an rdata chain containing all the data portions of all block
604 : * references. This includes the data for full-page images. Also append
605 : * the headers for the block references in the scratch buffer.
606 : */
607 15435264 : *fpw_lsn = InvalidXLogRecPtr;
608 30802823 : for (block_id = 0; block_id < max_registered_block_id; block_id++)
609 : {
610 15367559 : registered_buffer *regbuf = ®istered_buffers[block_id];
611 : bool needs_backup;
612 : bool needs_data;
613 : XLogRecordBlockHeader bkpb;
614 : XLogRecordBlockImageHeader bimg;
615 15367559 : XLogRecordBlockCompressHeader cbimg = {0};
616 : bool samerel;
617 15367559 : bool is_compressed = false;
618 : bool include_image;
619 :
620 15367559 : if (!regbuf->in_use)
621 11484 : continue;
622 :
623 : /* Determine if this block needs to be backed up */
624 15356075 : if (regbuf->flags & REGBUF_FORCE_IMAGE)
625 313412 : needs_backup = true;
626 15042663 : else if (regbuf->flags & REGBUF_NO_IMAGE)
627 221204 : needs_backup = false;
628 14821459 : else if (!doPageWrites)
629 278090 : needs_backup = false;
630 : else
631 : {
632 : /*
633 : * We assume page LSN is first data on *every* page that can be
634 : * passed to XLogInsert, whether it has the standard page layout
635 : * or not.
636 : */
637 14543369 : XLogRecPtr page_lsn = PageGetLSN(regbuf->page);
638 :
639 14543369 : needs_backup = (page_lsn <= RedoRecPtr);
640 14543369 : if (!needs_backup)
641 : {
642 14448242 : if (!XLogRecPtrIsValid(*fpw_lsn) || page_lsn < *fpw_lsn)
643 13950009 : *fpw_lsn = page_lsn;
644 : }
645 : }
646 :
647 : /* Determine if the buffer data needs to included */
648 15356075 : if (regbuf->rdata_len == 0)
649 2947898 : needs_data = false;
650 12408177 : else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
651 299147 : needs_data = true;
652 : else
653 12109030 : needs_data = !needs_backup;
654 :
655 15356075 : bkpb.id = block_id;
656 15356075 : bkpb.fork_flags = regbuf->forkno;
657 15356075 : bkpb.data_length = 0;
658 :
659 15356075 : if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
660 217934 : bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
661 :
662 : /*
663 : * If needs_backup is true or WAL checking is enabled for current
664 : * resource manager, log a full-page write for the current block.
665 : */
666 15356075 : include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
667 :
668 15356075 : if (include_image)
669 : {
670 2770572 : const PageData *page = regbuf->page;
671 2770572 : uint16 compressed_len = 0;
672 :
673 : /*
674 : * The page needs to be backed up, so calculate its hole length
675 : * and offset.
676 : */
677 2770572 : if (regbuf->flags & REGBUF_STANDARD)
678 : {
679 : /* Assume we can omit data between pd_lower and pd_upper */
680 2628854 : uint16 lower = ((const PageHeaderData *) page)->pd_lower;
681 2628854 : uint16 upper = ((const PageHeaderData *) page)->pd_upper;
682 :
683 2628854 : if (lower >= SizeOfPageHeaderData &&
684 2626370 : upper > lower &&
685 : upper <= BLCKSZ)
686 : {
687 2626370 : bimg.hole_offset = lower;
688 2626370 : cbimg.hole_length = upper - lower;
689 : }
690 : else
691 : {
692 : /* No "hole" to remove */
693 2484 : bimg.hole_offset = 0;
694 2484 : cbimg.hole_length = 0;
695 : }
696 : }
697 : else
698 : {
699 : /* Not a standard page header, don't try to eliminate "hole" */
700 141718 : bimg.hole_offset = 0;
701 141718 : cbimg.hole_length = 0;
702 : }
703 :
704 : /*
705 : * Try to compress a block image if wal_compression is enabled
706 : */
707 2770572 : if (wal_compression != WAL_COMPRESSION_NONE)
708 : {
709 : is_compressed =
710 0 : XLogCompressBackupBlock(page, bimg.hole_offset,
711 0 : cbimg.hole_length,
712 0 : regbuf->compressed_page,
713 : &compressed_len);
714 : }
715 :
716 : /*
717 : * Fill in the remaining fields in the XLogRecordBlockHeader
718 : * struct
719 : */
720 2770572 : bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
721 :
722 : /* Report a full page image constructed for the WAL record */
723 2770572 : *num_fpi += 1;
724 :
725 : /*
726 : * Construct XLogRecData entries for the page content.
727 : */
728 2770572 : rdt_datas_last->next = ®buf->bkp_rdatas[0];
729 2770572 : rdt_datas_last = rdt_datas_last->next;
730 :
731 2770572 : bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
732 :
733 : /*
734 : * If WAL consistency checking is enabled for the resource manager
735 : * of this WAL record, a full-page image is included in the record
736 : * for the block modified. During redo, the full-page is replayed
737 : * only if BKPIMAGE_APPLY is set.
738 : */
739 2770572 : if (needs_backup)
740 408539 : bimg.bimg_info |= BKPIMAGE_APPLY;
741 :
742 2770572 : if (is_compressed)
743 : {
744 : /* The current compression is stored in the WAL record */
745 0 : bimg.length = compressed_len;
746 :
747 : /* Set the compression method used for this block */
748 0 : switch ((WalCompression) wal_compression)
749 : {
750 0 : case WAL_COMPRESSION_PGLZ:
751 0 : bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
752 0 : break;
753 :
754 0 : case WAL_COMPRESSION_LZ4:
755 : #ifdef USE_LZ4
756 0 : bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
757 : #else
758 : elog(ERROR, "LZ4 is not supported by this build");
759 : #endif
760 0 : break;
761 :
762 0 : case WAL_COMPRESSION_ZSTD:
763 : #ifdef USE_ZSTD
764 : bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
765 : #else
766 0 : elog(ERROR, "zstd is not supported by this build");
767 : #endif
768 : break;
769 :
770 0 : case WAL_COMPRESSION_NONE:
771 : Assert(false); /* cannot happen */
772 0 : break;
773 : /* no default case, so that compiler will warn */
774 : }
775 :
776 0 : rdt_datas_last->data = regbuf->compressed_page;
777 0 : rdt_datas_last->len = compressed_len;
778 : }
779 : else
780 : {
781 2770572 : bimg.length = BLCKSZ - cbimg.hole_length;
782 :
783 2770572 : if (cbimg.hole_length == 0)
784 : {
785 144202 : rdt_datas_last->data = page;
786 144202 : rdt_datas_last->len = BLCKSZ;
787 : }
788 : else
789 : {
790 : /* must skip the hole */
791 2626370 : rdt_datas_last->data = page;
792 2626370 : rdt_datas_last->len = bimg.hole_offset;
793 :
794 2626370 : rdt_datas_last->next = ®buf->bkp_rdatas[1];
795 2626370 : rdt_datas_last = rdt_datas_last->next;
796 :
797 2626370 : rdt_datas_last->data =
798 2626370 : page + (bimg.hole_offset + cbimg.hole_length);
799 2626370 : rdt_datas_last->len =
800 2626370 : BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
801 : }
802 : }
803 :
804 2770572 : total_len += bimg.length;
805 :
806 : /* Track the WAL full page images in bytes */
807 2770572 : *fpi_bytes += bimg.length;
808 : }
809 :
810 15356075 : if (needs_data)
811 : {
812 : /*
813 : * When copying to XLogRecordBlockHeader, the length is narrowed
814 : * to an uint16. Double-check that it is still correct.
815 : */
816 : Assert(regbuf->rdata_len <= UINT16_MAX);
817 :
818 : /*
819 : * Link the caller-supplied rdata chain for this buffer to the
820 : * overall list.
821 : */
822 12365484 : bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
823 12365484 : bkpb.data_length = (uint16) regbuf->rdata_len;
824 12365484 : total_len += regbuf->rdata_len;
825 :
826 12365484 : rdt_datas_last->next = regbuf->rdata_head;
827 12365484 : rdt_datas_last = regbuf->rdata_tail;
828 : }
829 :
830 15356075 : if (prev_regbuf && RelFileLocatorEquals(regbuf->rlocator, prev_regbuf->rlocator))
831 : {
832 790012 : samerel = true;
833 790012 : bkpb.fork_flags |= BKPBLOCK_SAME_REL;
834 : }
835 : else
836 14566063 : samerel = false;
837 15356075 : prev_regbuf = regbuf;
838 :
839 : /* Ok, copy the header to the scratch buffer */
840 15356075 : memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
841 15356075 : scratch += SizeOfXLogRecordBlockHeader;
842 15356075 : if (include_image)
843 : {
844 2770572 : memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
845 2770572 : scratch += SizeOfXLogRecordBlockImageHeader;
846 2770572 : if (cbimg.hole_length != 0 && is_compressed)
847 : {
848 0 : memcpy(scratch, &cbimg,
849 : SizeOfXLogRecordBlockCompressHeader);
850 0 : scratch += SizeOfXLogRecordBlockCompressHeader;
851 : }
852 : }
853 15356075 : if (!samerel)
854 : {
855 14566063 : memcpy(scratch, ®buf->rlocator, sizeof(RelFileLocator));
856 14566063 : scratch += sizeof(RelFileLocator);
857 : }
858 15356075 : memcpy(scratch, ®buf->block, sizeof(BlockNumber));
859 15356075 : scratch += sizeof(BlockNumber);
860 : }
861 :
862 : /* followed by the record's origin, if any */
863 15435264 : if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
864 9207419 : replorigin_xact_state.origin != InvalidReplOriginId)
865 : {
866 150633 : *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
867 150633 : memcpy(scratch, &replorigin_xact_state.origin, sizeof(replorigin_xact_state.origin));
868 150633 : scratch += sizeof(replorigin_xact_state.origin);
869 : }
870 :
871 : /* followed by toplevel XID, if not already included in previous record */
872 15435264 : if (IsSubxactTopXidLogPending())
873 : {
874 221 : TransactionId xid = GetTopTransactionIdIfAny();
875 :
876 : /* Set the flag that the top xid is included in the WAL */
877 221 : *topxid_included = true;
878 :
879 221 : *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
880 221 : memcpy(scratch, &xid, sizeof(TransactionId));
881 221 : scratch += sizeof(TransactionId);
882 : }
883 :
884 : /* followed by main data, if any */
885 15435264 : if (mainrdata_len > 0)
886 : {
887 15099484 : if (mainrdata_len > 255)
888 : {
889 : uint32 mainrdata_len_4b;
890 :
891 39931 : if (mainrdata_len > PG_UINT32_MAX)
892 0 : ereport(ERROR,
893 : (errmsg_internal("too much WAL data"),
894 : errdetail_internal("Main data length is %" PRIu64 " bytes for a maximum of %u bytes.",
895 : mainrdata_len,
896 : PG_UINT32_MAX)));
897 :
898 39931 : mainrdata_len_4b = (uint32) mainrdata_len;
899 39931 : *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
900 39931 : memcpy(scratch, &mainrdata_len_4b, sizeof(uint32));
901 39931 : scratch += sizeof(uint32);
902 : }
903 : else
904 : {
905 15059553 : *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
906 15059553 : *(scratch++) = (uint8) mainrdata_len;
907 : }
908 15099484 : rdt_datas_last->next = mainrdata_head;
909 15099484 : rdt_datas_last = mainrdata_last;
910 15099484 : total_len += mainrdata_len;
911 : }
912 15435264 : rdt_datas_last->next = NULL;
913 :
914 15435264 : hdr_rdt.len = (scratch - hdr_scratch);
915 15435264 : total_len += hdr_rdt.len;
916 :
917 : /*
918 : * Calculate CRC of the data
919 : *
920 : * Note that the record header isn't added into the CRC initially since we
921 : * don't know the prev-link yet. Thus, the CRC will represent the CRC of
922 : * the whole record in the order: rdata, then backup blocks, then record
923 : * header.
924 : */
925 15435264 : INIT_CRC32C(rdata_crc);
926 15435264 : COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
927 57348559 : for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
928 41913295 : COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
929 :
930 : /*
931 : * Ensure that the XLogRecord is not too large.
932 : *
933 : * XLogReader machinery is only able to handle records up to a certain
934 : * size (ignoring machine resource limitations), so make sure that we will
935 : * not emit records larger than the sizes advertised to be supported.
936 : */
937 15435264 : if (total_len > XLogRecordMaxSize)
938 0 : ereport(ERROR,
939 : (errmsg_internal("oversized WAL record"),
940 : errdetail_internal("WAL record would be %" PRIu64 " bytes (of maximum %u bytes); rmid %u flags %u.",
941 : total_len, XLogRecordMaxSize, rmid, info)));
942 :
943 : /*
944 : * Fill in the fields in the record header. Prev-link is filled in later,
945 : * once we know where in the WAL the record will be inserted. The CRC does
946 : * not include the record header yet.
947 : */
948 15435264 : rechdr->xl_xid = GetCurrentTransactionIdIfAny();
949 15435264 : rechdr->xl_tot_len = (uint32) total_len;
950 15435264 : rechdr->xl_info = info;
951 15435264 : rechdr->xl_rmid = rmid;
952 15435264 : rechdr->xl_prev = InvalidXLogRecPtr;
953 15435264 : rechdr->xl_crc = rdata_crc;
954 :
955 15435264 : return &hdr_rdt;
956 : }
957 :
958 : /*
959 : * Create a compressed version of a backup block image.
960 : *
961 : * Returns false if compression fails (i.e., compressed result is actually
962 : * bigger than original). Otherwise, returns true and sets 'dlen' to
963 : * the length of compressed block image.
964 : */
965 : static bool
966 0 : XLogCompressBackupBlock(const PageData *page, uint16 hole_offset, uint16 hole_length,
967 : void *dest, uint16 *dlen)
968 : {
969 0 : int32 orig_len = BLCKSZ - hole_length;
970 0 : int32 len = -1;
971 0 : int32 extra_bytes = 0;
972 : const void *source;
973 : PGAlignedBlock tmp;
974 :
975 0 : if (hole_length != 0)
976 : {
977 : /* must skip the hole */
978 0 : memcpy(tmp.data, page, hole_offset);
979 0 : memcpy(tmp.data + hole_offset,
980 0 : page + (hole_offset + hole_length),
981 0 : BLCKSZ - (hole_length + hole_offset));
982 0 : source = tmp.data;
983 :
984 : /*
985 : * Extra data needs to be stored in WAL record for the compressed
986 : * version of block image if the hole exists.
987 : */
988 0 : extra_bytes = SizeOfXLogRecordBlockCompressHeader;
989 : }
990 : else
991 0 : source = page;
992 :
993 0 : switch ((WalCompression) wal_compression)
994 : {
995 0 : case WAL_COMPRESSION_PGLZ:
996 0 : len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
997 0 : break;
998 :
999 0 : case WAL_COMPRESSION_LZ4:
1000 : #ifdef USE_LZ4
1001 0 : len = LZ4_compress_default(source, dest, orig_len,
1002 : COMPRESS_BUFSIZE);
1003 0 : if (len <= 0)
1004 0 : len = -1; /* failure */
1005 : #else
1006 : elog(ERROR, "LZ4 is not supported by this build");
1007 : #endif
1008 0 : break;
1009 :
1010 0 : case WAL_COMPRESSION_ZSTD:
1011 : #ifdef USE_ZSTD
1012 : len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
1013 : ZSTD_CLEVEL_DEFAULT);
1014 : if (ZSTD_isError(len))
1015 : len = -1; /* failure */
1016 : #else
1017 0 : elog(ERROR, "zstd is not supported by this build");
1018 : #endif
1019 : break;
1020 :
1021 0 : case WAL_COMPRESSION_NONE:
1022 : Assert(false); /* cannot happen */
1023 0 : break;
1024 : /* no default case, so that compiler will warn */
1025 : }
1026 :
1027 : /*
1028 : * We recheck the actual size even if compression reports success and see
1029 : * if the number of bytes saved by compression is larger than the length
1030 : * of extra data needed for the compressed version of block image.
1031 : */
1032 0 : if (len >= 0 &&
1033 0 : len + extra_bytes < orig_len)
1034 : {
1035 0 : *dlen = (uint16) len; /* successful compression */
1036 0 : return true;
1037 : }
1038 0 : return false;
1039 : }
1040 :
1041 : /*
1042 : * Determine whether the buffer referenced has to be backed up.
1043 : *
1044 : * Since we don't yet have the insert lock, fullPageWrites and runningBackups
1045 : * (which forces full-page writes) could change later, so the result should
1046 : * be used for optimization purposes only.
1047 : */
1048 : bool
1049 147110 : XLogCheckBufferNeedsBackup(Buffer buffer)
1050 : {
1051 : XLogRecPtr RedoRecPtr;
1052 : bool doPageWrites;
1053 : Page page;
1054 :
1055 147110 : GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
1056 :
1057 147110 : page = BufferGetPage(buffer);
1058 :
1059 147110 : if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
1060 1238 : return true; /* buffer requires backup */
1061 :
1062 145872 : return false; /* buffer does not need to be backed up */
1063 : }
1064 :
1065 : /*
1066 : * Write a backup block if needed when we are setting a hint. Note that
1067 : * this may be called for a variety of page types, not just heaps.
1068 : *
1069 : * Callable while holding just a share-exclusive lock on the buffer
1070 : * content. That suffices to prevent concurrent modifications of the
1071 : * buffer. The buffer already needs to have been marked dirty by
1072 : * MarkBufferDirtyHint().
1073 : *
1074 : * We can't use the plain backup block mechanism since that relies on the
1075 : * Buffer being exclusively locked. Since some modifications (setting LSN, hint
1076 : * bits) are allowed in a sharelocked buffer that can lead to wal checksum
1077 : * failures. So instead we copy the page and insert the copied data as normal
1078 : * record data.
1079 : *
1080 : * We only need to do something if page has not yet been full page written in
1081 : * this checkpoint round. The LSN of the inserted wal record is returned if we
1082 : * had to write, InvalidXLogRecPtr otherwise.
1083 : */
1084 : XLogRecPtr
1085 61626 : XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
1086 : {
1087 61626 : XLogRecPtr recptr = InvalidXLogRecPtr;
1088 : XLogRecPtr lsn;
1089 : XLogRecPtr RedoRecPtr;
1090 :
1091 : /* this also verifies that we hold an appropriate lock */
1092 : Assert(BufferIsDirty(buffer));
1093 :
1094 : /*
1095 : * Update RedoRecPtr so that we can make the right decision. It's possible
1096 : * that a new checkpoint will start just after GetRedoRecPtr(), but that
1097 : * is ok, as the buffer is already dirty, ensuring that any BufferSync()
1098 : * started after the buffer was marked dirty cannot complete without
1099 : * flushing this buffer. If a checkpoint started between marking the
1100 : * buffer dirty and this check, we will emit an unnecessary WAL record (as
1101 : * the buffer will be written out as part of the checkpoint), but the
1102 : * window for that is not big.
1103 : */
1104 61626 : RedoRecPtr = GetRedoRecPtr();
1105 :
1106 : /*
1107 : * We assume page LSN is first data on *every* page that can be passed to
1108 : * XLogInsert, whether it has the standard page layout or not.
1109 : */
1110 61626 : lsn = PageGetLSN(BufferGetPage(buffer));
1111 :
1112 61626 : if (lsn <= RedoRecPtr)
1113 : {
1114 31532 : int flags = 0;
1115 : PGAlignedBlock copied_buffer;
1116 31532 : char *origdata = (char *) BufferGetBlock(buffer);
1117 : RelFileLocator rlocator;
1118 : ForkNumber forkno;
1119 : BlockNumber blkno;
1120 :
1121 : /*
1122 : * Copy buffer so we don't have to worry about concurrent hint bit or
1123 : * lsn updates. We assume pd_lower/upper cannot be changed without an
1124 : * exclusive lock, so the contents bkp are not racy.
1125 : */
1126 31532 : if (buffer_std)
1127 : {
1128 : /* Assume we can omit data between pd_lower and pd_upper */
1129 19306 : Page page = BufferGetPage(buffer);
1130 19306 : uint16 lower = ((PageHeader) page)->pd_lower;
1131 19306 : uint16 upper = ((PageHeader) page)->pd_upper;
1132 :
1133 19306 : memcpy(copied_buffer.data, origdata, lower);
1134 19306 : memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
1135 : }
1136 : else
1137 12226 : memcpy(copied_buffer.data, origdata, BLCKSZ);
1138 :
1139 31532 : XLogBeginInsert();
1140 :
1141 31532 : if (buffer_std)
1142 19306 : flags |= REGBUF_STANDARD;
1143 :
1144 31532 : BufferGetTag(buffer, &rlocator, &forkno, &blkno);
1145 31532 : XLogRegisterBlock(0, &rlocator, forkno, blkno, copied_buffer.data, flags);
1146 :
1147 31532 : recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
1148 : }
1149 :
1150 61626 : return recptr;
1151 : }
1152 :
1153 : /*
1154 : * Write a WAL record containing a full image of a page. Caller is responsible
1155 : * for writing the page to disk after calling this routine.
1156 : *
1157 : * Note: If you're using this function, you should be building pages in private
1158 : * memory and writing them directly to smgr. If you're using buffers, call
1159 : * log_newpage_buffer instead.
1160 : *
1161 : * If the page follows the standard page layout, with a PageHeader and unused
1162 : * space between pd_lower and pd_upper, set 'page_std' to true. That allows
1163 : * the unused space to be left out from the WAL record, making it smaller.
1164 : */
1165 : XLogRecPtr
1166 141368 : log_newpage(RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blkno,
1167 : Page page, bool page_std)
1168 : {
1169 : int flags;
1170 : XLogRecPtr recptr;
1171 :
1172 141368 : flags = REGBUF_FORCE_IMAGE;
1173 141368 : if (page_std)
1174 141205 : flags |= REGBUF_STANDARD;
1175 :
1176 141368 : XLogBeginInsert();
1177 141368 : XLogRegisterBlock(0, rlocator, forknum, blkno, page, flags);
1178 141368 : recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
1179 :
1180 : /*
1181 : * The page may be uninitialized. If so, we can't set the LSN because that
1182 : * would corrupt the page.
1183 : */
1184 141368 : if (!PageIsNew(page))
1185 : {
1186 141364 : PageSetLSN(page, recptr);
1187 : }
1188 :
1189 141368 : return recptr;
1190 : }
1191 :
1192 : /*
1193 : * Like log_newpage(), but allows logging multiple pages in one operation.
1194 : * It is more efficient than calling log_newpage() for each page separately,
1195 : * because we can write multiple pages in a single WAL record.
1196 : */
1197 : void
1198 23320 : log_newpages(RelFileLocator *rlocator, ForkNumber forknum, int num_pages,
1199 : BlockNumber *blknos, Page *pages, bool page_std)
1200 : {
1201 : int flags;
1202 : XLogRecPtr recptr;
1203 : int i;
1204 : int j;
1205 :
1206 23320 : flags = REGBUF_FORCE_IMAGE;
1207 23320 : if (page_std)
1208 23255 : flags |= REGBUF_STANDARD;
1209 :
1210 : /*
1211 : * Iterate over all the pages. They are collected into batches of
1212 : * XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each
1213 : * batch.
1214 : */
1215 23320 : XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
1216 :
1217 23320 : i = 0;
1218 46640 : while (i < num_pages)
1219 : {
1220 23320 : int batch_start = i;
1221 : int nbatch;
1222 :
1223 23320 : XLogBeginInsert();
1224 :
1225 23320 : nbatch = 0;
1226 68559 : while (nbatch < XLR_MAX_BLOCK_ID && i < num_pages)
1227 : {
1228 45239 : XLogRegisterBlock(nbatch, rlocator, forknum, blknos[i], pages[i], flags);
1229 45239 : i++;
1230 45239 : nbatch++;
1231 : }
1232 :
1233 23320 : recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
1234 :
1235 68559 : for (j = batch_start; j < i; j++)
1236 : {
1237 : /*
1238 : * The page may be uninitialized. If so, we can't set the LSN
1239 : * because that would corrupt the page.
1240 : */
1241 45239 : if (!PageIsNew(pages[j]))
1242 : {
1243 45233 : PageSetLSN(pages[j], recptr);
1244 : }
1245 : }
1246 : }
1247 23320 : }
1248 :
1249 : /*
1250 : * Write a WAL record containing a full image of a page.
1251 : *
1252 : * Caller should initialize the buffer and mark it dirty before calling this
1253 : * function. This function will set the page LSN.
1254 : *
1255 : * If the page follows the standard page layout, with a PageHeader and unused
1256 : * space between pd_lower and pd_upper, set 'page_std' to true. That allows
1257 : * the unused space to be left out from the WAL record, making it smaller.
1258 : */
1259 : XLogRecPtr
1260 137421 : log_newpage_buffer(Buffer buffer, bool page_std)
1261 : {
1262 137421 : Page page = BufferGetPage(buffer);
1263 : RelFileLocator rlocator;
1264 : ForkNumber forknum;
1265 : BlockNumber blkno;
1266 :
1267 : /* Shared buffers should be modified in a critical section. */
1268 : Assert(CritSectionCount > 0);
1269 :
1270 137421 : BufferGetTag(buffer, &rlocator, &forknum, &blkno);
1271 :
1272 137421 : return log_newpage(&rlocator, forknum, blkno, page, page_std);
1273 : }
1274 :
1275 : /*
1276 : * WAL-log a range of blocks in a relation.
1277 : *
1278 : * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
1279 : * written to the WAL. If the range is large, this is done in multiple WAL
1280 : * records.
1281 : *
1282 : * If all page follows the standard page layout, with a PageHeader and unused
1283 : * space between pd_lower and pd_upper, set 'page_std' to true. That allows
1284 : * the unused space to be left out from the WAL records, making them smaller.
1285 : *
1286 : * NOTE: This function acquires exclusive-locks on the pages. Typically, this
1287 : * is used on a newly-built relation, and the caller is holding a
1288 : * AccessExclusiveLock on it, so no other backend can be accessing it at the
1289 : * same time. If that's not the case, you must ensure that this does not
1290 : * cause a deadlock through some other means.
1291 : */
1292 : void
1293 38741 : log_newpage_range(Relation rel, ForkNumber forknum,
1294 : BlockNumber startblk, BlockNumber endblk,
1295 : bool page_std)
1296 : {
1297 : int flags;
1298 : BlockNumber blkno;
1299 :
1300 38741 : flags = REGBUF_FORCE_IMAGE;
1301 38741 : if (page_std)
1302 492 : flags |= REGBUF_STANDARD;
1303 :
1304 : /*
1305 : * Iterate over all the pages in the range. They are collected into
1306 : * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
1307 : * for each batch.
1308 : */
1309 38741 : XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
1310 :
1311 38741 : blkno = startblk;
1312 70880 : while (blkno < endblk)
1313 : {
1314 : Buffer bufpack[XLR_MAX_BLOCK_ID];
1315 : XLogRecPtr recptr;
1316 : int nbufs;
1317 : int i;
1318 :
1319 32139 : CHECK_FOR_INTERRUPTS();
1320 :
1321 : /* Collect a batch of blocks. */
1322 32139 : nbufs = 0;
1323 156023 : while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
1324 : {
1325 123884 : Buffer buf = ReadBufferExtended(rel, forknum, blkno,
1326 : RBM_NORMAL, NULL);
1327 :
1328 123884 : LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1329 :
1330 : /*
1331 : * Completely empty pages are not WAL-logged. Writing a WAL record
1332 : * would change the LSN, and we don't want that. We want the page
1333 : * to stay empty.
1334 : */
1335 123884 : if (!PageIsNew(BufferGetPage(buf)))
1336 123794 : bufpack[nbufs++] = buf;
1337 : else
1338 90 : UnlockReleaseBuffer(buf);
1339 123884 : blkno++;
1340 : }
1341 :
1342 : /* Nothing more to do if all remaining blocks were empty. */
1343 32139 : if (nbufs == 0)
1344 0 : break;
1345 :
1346 : /* Write WAL record for this batch. */
1347 32139 : XLogBeginInsert();
1348 :
1349 32139 : START_CRIT_SECTION();
1350 155933 : for (i = 0; i < nbufs; i++)
1351 : {
1352 123794 : MarkBufferDirty(bufpack[i]);
1353 123794 : XLogRegisterBuffer(i, bufpack[i], flags);
1354 : }
1355 :
1356 32139 : recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
1357 :
1358 155933 : for (i = 0; i < nbufs; i++)
1359 123794 : PageSetLSN(BufferGetPage(bufpack[i]), recptr);
1360 :
1361 32139 : END_CRIT_SECTION();
1362 :
1363 155933 : for (i = 0; i < nbufs; i++)
1364 123794 : UnlockReleaseBuffer(bufpack[i]);
1365 : }
1366 38741 : }
1367 :
1368 : /*
1369 : * Allocate working buffers needed for WAL record construction.
1370 : */
1371 : void
1372 23484 : InitXLogInsert(void)
1373 : {
1374 : #ifdef USE_ASSERT_CHECKING
1375 :
1376 : /*
1377 : * Check that any records assembled can be decoded. This is capped based
1378 : * on what XLogReader would require at its maximum bound. The XLOG_BLCKSZ
1379 : * addend covers the larger allocate_recordbuf() demand. This code path
1380 : * is called once per backend, more than enough for this check.
1381 : */
1382 : size_t max_required =
1383 : DecodeXLogRecordRequiredSpace(XLogRecordMaxSize + XLOG_BLCKSZ);
1384 :
1385 : Assert(AllocSizeIsValid(max_required));
1386 : #endif
1387 :
1388 : /* Initialize the working areas */
1389 23484 : if (xloginsert_cxt == NULL)
1390 : {
1391 23484 : xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
1392 : "WAL record construction",
1393 : ALLOCSET_DEFAULT_SIZES);
1394 : }
1395 :
1396 23484 : if (registered_buffers == NULL)
1397 : {
1398 23484 : registered_buffers = (registered_buffer *)
1399 23484 : MemoryContextAllocZero(xloginsert_cxt,
1400 : sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
1401 23484 : max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
1402 : }
1403 23484 : if (rdatas == NULL)
1404 : {
1405 23484 : rdatas = MemoryContextAlloc(xloginsert_cxt,
1406 : sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
1407 23484 : max_rdatas = XLR_NORMAL_RDATAS;
1408 : }
1409 :
1410 : /*
1411 : * Allocate a buffer to hold the header information for a WAL record.
1412 : */
1413 23484 : if (hdr_scratch == NULL)
1414 23484 : hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
1415 : HEADER_SCRATCH_SIZE);
1416 23484 : }
|