Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * xloginsert.c
4 : * Functions for constructing WAL records
5 : *
6 : * Constructing a WAL record begins with a call to XLogBeginInsert,
7 : * followed by a number of XLogRegister* calls. The registered data is
8 : * collected in private working memory, and finally assembled into a chain
9 : * of XLogRecData structs by a call to XLogRecordAssemble(). See
10 : * access/transam/README for details.
11 : *
12 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
13 : * Portions Copyright (c) 1994, Regents of the University of California
14 : *
15 : * src/backend/access/transam/xloginsert.c
16 : *
17 : *-------------------------------------------------------------------------
18 : */
19 :
20 : #include "postgres.h"
21 :
22 : #ifdef USE_LZ4
23 : #include <lz4.h>
24 : #endif
25 :
26 : #ifdef USE_ZSTD
27 : #include <zstd.h>
28 : #endif
29 :
30 : #include "access/xact.h"
31 : #include "access/xlog.h"
32 : #include "access/xlog_internal.h"
33 : #include "access/xloginsert.h"
34 : #include "catalog/pg_control.h"
35 : #include "common/pg_lzcompress.h"
36 : #include "executor/instrument.h"
37 : #include "miscadmin.h"
38 : #include "pg_trace.h"
39 : #include "replication/origin.h"
40 : #include "storage/bufmgr.h"
41 : #include "storage/proc.h"
42 : #include "utils/memutils.h"
43 : #include "utils/pgstat_internal.h"
44 :
45 : /*
46 : * Guess the maximum buffer size required to store a compressed version of
47 : * backup block image.
48 : */
49 : #ifdef USE_LZ4
50 : #define LZ4_MAX_BLCKSZ LZ4_COMPRESSBOUND(BLCKSZ)
51 : #else
52 : #define LZ4_MAX_BLCKSZ 0
53 : #endif
54 :
55 : #ifdef USE_ZSTD
56 : #define ZSTD_MAX_BLCKSZ ZSTD_COMPRESSBOUND(BLCKSZ)
57 : #else
58 : #define ZSTD_MAX_BLCKSZ 0
59 : #endif
60 :
61 : #define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
62 :
63 : /* Buffer size required to store a compressed version of backup block image */
64 : #define COMPRESS_BUFSIZE Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
65 :
66 : /*
67 : * For each block reference registered with XLogRegisterBuffer, we fill in
68 : * a registered_buffer struct.
69 : */
70 : typedef struct
71 : {
72 : bool in_use; /* is this slot in use? */
73 : uint8 flags; /* REGBUF_* flags */
74 : RelFileLocator rlocator; /* identifies the relation and block */
75 : ForkNumber forkno;
76 : BlockNumber block;
77 : const PageData *page; /* page content */
78 : uint32 rdata_len; /* total length of data in rdata chain */
79 : XLogRecData *rdata_head; /* head of the chain of data registered with
80 : * this block */
81 : XLogRecData *rdata_tail; /* last entry in the chain, or &rdata_head if
82 : * empty */
83 :
84 : XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to
85 : * backup block data in XLogRecordAssemble() */
86 :
87 : /* buffer to store a compressed version of backup block image */
88 : char compressed_page[COMPRESS_BUFSIZE];
89 : } registered_buffer;
90 :
91 : static registered_buffer *registered_buffers;
92 : static int max_registered_buffers; /* allocated size */
93 : static int max_registered_block_id = 0; /* highest block_id + 1 currently
94 : * registered */
95 :
96 : /*
97 : * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
98 : * with XLogRegisterData(...).
99 : */
100 : static XLogRecData *mainrdata_head;
101 : static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
102 : static uint64 mainrdata_len; /* total # of bytes in chain */
103 :
104 : /* flags for the in-progress insertion */
105 : static uint8 curinsert_flags = 0;
106 :
107 : /*
108 : * These are used to hold the record header while constructing a record.
109 : * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
110 : * because we want it to be MAXALIGNed and padding bytes zeroed.
111 : *
112 : * For simplicity, it's allocated large enough to hold the headers for any
113 : * WAL record.
114 : */
115 : static XLogRecData hdr_rdt;
116 : static char *hdr_scratch = NULL;
117 :
118 : #define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
119 : #define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char))
120 :
121 : #define HEADER_SCRATCH_SIZE \
122 : (SizeOfXLogRecord + \
123 : MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
124 : SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
125 : SizeOfXLogTransactionId)
126 :
127 : /*
128 : * An array of XLogRecData structs, to hold registered data.
129 : */
130 : static XLogRecData *rdatas;
131 : static int num_rdatas; /* entries currently used */
132 : static int max_rdatas; /* allocated size */
133 :
134 : static bool begininsert_called = false;
135 :
136 : /* Memory context to hold the registered buffer and data references. */
137 : static MemoryContext xloginsert_cxt;
138 :
139 : static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
140 : XLogRecPtr RedoRecPtr, bool doPageWrites,
141 : XLogRecPtr *fpw_lsn, int *num_fpi,
142 : uint64 *fpi_bytes,
143 : bool *topxid_included);
144 : static bool XLogCompressBackupBlock(const PageData *page, uint16 hole_offset,
145 : uint16 hole_length, void *dest, uint16 *dlen);
146 :
147 : /*
148 : * Begin constructing a WAL record. This must be called before the
149 : * XLogRegister* functions and XLogInsert().
150 : */
151 : void
152 30492376 : XLogBeginInsert(void)
153 : {
154 : Assert(max_registered_block_id == 0);
155 : Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
156 : Assert(mainrdata_len == 0);
157 :
158 : /* cross-check on whether we should be here or not */
159 30492376 : if (!XLogInsertAllowed())
160 0 : elog(ERROR, "cannot make new WAL entries during recovery");
161 :
162 30492376 : if (begininsert_called)
163 0 : elog(ERROR, "XLogBeginInsert was already called");
164 :
165 30492376 : begininsert_called = true;
166 30492376 : }
167 :
168 : /*
169 : * Ensure that there are enough buffer and data slots in the working area,
170 : * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
171 : * calls.
172 : *
173 : * There is always space for a small number of buffers and data chunks, enough
174 : * for most record types. This function is for the exceptional cases that need
175 : * more.
176 : */
177 : void
178 135528 : XLogEnsureRecordSpace(int max_block_id, int ndatas)
179 : {
180 : int nbuffers;
181 :
182 : /*
183 : * This must be called before entering a critical section, because
184 : * allocating memory inside a critical section can fail. repalloc() will
185 : * check the same, but better to check it here too so that we fail
186 : * consistently even if the arrays happen to be large enough already.
187 : */
188 : Assert(CritSectionCount == 0);
189 :
190 : /* the minimum values can't be decreased */
191 135528 : if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
192 4110 : max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
193 135528 : if (ndatas < XLR_NORMAL_RDATAS)
194 135476 : ndatas = XLR_NORMAL_RDATAS;
195 :
196 135528 : if (max_block_id > XLR_MAX_BLOCK_ID)
197 0 : elog(ERROR, "maximum number of WAL record block references exceeded");
198 135528 : nbuffers = max_block_id + 1;
199 :
200 135528 : if (nbuffers > max_registered_buffers)
201 : {
202 3460 : registered_buffers = (registered_buffer *)
203 3460 : repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
204 :
205 : /*
206 : * At least the padding bytes in the structs must be zeroed, because
207 : * they are included in WAL data, but initialize it all for tidiness.
208 : */
209 3460 : MemSet(®istered_buffers[max_registered_buffers], 0,
210 : (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
211 3460 : max_registered_buffers = nbuffers;
212 : }
213 :
214 135528 : if (ndatas > max_rdatas)
215 : {
216 32 : rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
217 32 : max_rdatas = ndatas;
218 : }
219 135528 : }
220 :
221 : /*
222 : * Reset WAL record construction buffers.
223 : */
224 : void
225 30552726 : XLogResetInsertion(void)
226 : {
227 : int i;
228 :
229 60771566 : for (i = 0; i < max_registered_block_id; i++)
230 30218840 : registered_buffers[i].in_use = false;
231 :
232 30552726 : num_rdatas = 0;
233 30552726 : max_registered_block_id = 0;
234 30552726 : mainrdata_len = 0;
235 30552726 : mainrdata_last = (XLogRecData *) &mainrdata_head;
236 30552726 : curinsert_flags = 0;
237 30552726 : begininsert_called = false;
238 30552726 : }
239 :
240 : /*
241 : * Register a reference to a buffer with the WAL record being constructed.
242 : * This must be called for every page that the WAL-logged operation modifies.
243 : */
244 : void
245 29627892 : XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
246 : {
247 : registered_buffer *regbuf;
248 :
249 : /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
250 : Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
251 : Assert(begininsert_called);
252 :
253 : /*
254 : * Ordinarily, buffer should be exclusive-locked and marked dirty before
255 : * we get here, otherwise we could end up violating one of the rules in
256 : * access/transam/README.
257 : *
258 : * Some callers intentionally register a clean page and never update that
259 : * page's LSN; in that case they can pass the flag REGBUF_NO_CHANGE to
260 : * bypass these checks.
261 : */
262 : #ifdef USE_ASSERT_CHECKING
263 : if (!(flags & REGBUF_NO_CHANGE))
264 : Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE) &&
265 : BufferIsDirty(buffer));
266 : #endif
267 :
268 29627892 : if (block_id >= max_registered_block_id)
269 : {
270 28913312 : if (block_id >= max_registered_buffers)
271 0 : elog(ERROR, "too many registered buffers");
272 28913312 : max_registered_block_id = block_id + 1;
273 : }
274 :
275 29627892 : regbuf = ®istered_buffers[block_id];
276 :
277 29627892 : BufferGetTag(buffer, ®buf->rlocator, ®buf->forkno, ®buf->block);
278 29627892 : regbuf->page = BufferGetPage(buffer);
279 29627892 : regbuf->flags = flags;
280 29627892 : regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head;
281 29627892 : regbuf->rdata_len = 0;
282 :
283 : /*
284 : * Check that this page hasn't already been registered with some other
285 : * block_id.
286 : */
287 : #ifdef USE_ASSERT_CHECKING
288 : {
289 : int i;
290 :
291 : for (i = 0; i < max_registered_block_id; i++)
292 : {
293 : registered_buffer *regbuf_old = ®istered_buffers[i];
294 :
295 : if (i == block_id || !regbuf_old->in_use)
296 : continue;
297 :
298 : Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
299 : regbuf_old->forkno != regbuf->forkno ||
300 : regbuf_old->block != regbuf->block);
301 : }
302 : }
303 : #endif
304 :
305 29627892 : regbuf->in_use = true;
306 29627892 : }
307 :
308 : /*
309 : * Like XLogRegisterBuffer, but for registering a block that's not in the
310 : * shared buffer pool (i.e. when you don't have a Buffer for it).
311 : */
312 : void
313 568860 : XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
314 : BlockNumber blknum, const PageData *page, uint8 flags)
315 : {
316 : registered_buffer *regbuf;
317 :
318 : Assert(begininsert_called);
319 :
320 568860 : if (block_id >= max_registered_block_id)
321 568860 : max_registered_block_id = block_id + 1;
322 :
323 568860 : if (block_id >= max_registered_buffers)
324 0 : elog(ERROR, "too many registered buffers");
325 :
326 568860 : regbuf = ®istered_buffers[block_id];
327 :
328 568860 : regbuf->rlocator = *rlocator;
329 568860 : regbuf->forkno = forknum;
330 568860 : regbuf->block = blknum;
331 568860 : regbuf->page = page;
332 568860 : regbuf->flags = flags;
333 568860 : regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head;
334 568860 : regbuf->rdata_len = 0;
335 :
336 : /*
337 : * Check that this page hasn't already been registered with some other
338 : * block_id.
339 : */
340 : #ifdef USE_ASSERT_CHECKING
341 : {
342 : int i;
343 :
344 : for (i = 0; i < max_registered_block_id; i++)
345 : {
346 : registered_buffer *regbuf_old = ®istered_buffers[i];
347 :
348 : if (i == block_id || !regbuf_old->in_use)
349 : continue;
350 :
351 : Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
352 : regbuf_old->forkno != regbuf->forkno ||
353 : regbuf_old->block != regbuf->block);
354 : }
355 : }
356 : #endif
357 :
358 568860 : regbuf->in_use = true;
359 568860 : }
360 :
361 : /*
362 : * Add data to the WAL record that's being constructed.
363 : *
364 : * The data is appended to the "main chunk", available at replay with
365 : * XLogRecGetData().
366 : */
367 : void
368 31467294 : XLogRegisterData(const void *data, uint32 len)
369 : {
370 : XLogRecData *rdata;
371 :
372 : Assert(begininsert_called);
373 :
374 31467294 : if (num_rdatas >= max_rdatas)
375 0 : ereport(ERROR,
376 : (errmsg_internal("too much WAL data"),
377 : errdetail_internal("%d out of %d data segments are already in use.",
378 : num_rdatas, max_rdatas)));
379 31467294 : rdata = &rdatas[num_rdatas++];
380 :
381 31467294 : rdata->data = data;
382 31467294 : rdata->len = len;
383 :
384 : /*
385 : * we use the mainrdata_last pointer to track the end of the chain, so no
386 : * need to clear 'next' here.
387 : */
388 :
389 31467294 : mainrdata_last->next = rdata;
390 31467294 : mainrdata_last = rdata;
391 :
392 31467294 : mainrdata_len += len;
393 31467294 : }
394 :
395 : /*
396 : * Add buffer-specific data to the WAL record that's being constructed.
397 : *
398 : * Block_id must reference a block previously registered with
399 : * XLogRegisterBuffer(). If this is called more than once for the same
400 : * block_id, the data is appended.
401 : *
402 : * The maximum amount of data that can be registered per block is 65535
403 : * bytes. That should be plenty; if you need more than BLCKSZ bytes to
404 : * reconstruct the changes to the page, you might as well just log a full
405 : * copy of it. (the "main data" that's not associated with a block is not
406 : * limited)
407 : */
408 : void
409 41360064 : XLogRegisterBufData(uint8 block_id, const void *data, uint32 len)
410 : {
411 : registered_buffer *regbuf;
412 : XLogRecData *rdata;
413 :
414 : Assert(begininsert_called);
415 :
416 : /* find the registered buffer struct */
417 41360064 : regbuf = ®istered_buffers[block_id];
418 41360064 : if (!regbuf->in_use)
419 0 : elog(ERROR, "no block with id %d registered with WAL insertion",
420 : block_id);
421 :
422 : /*
423 : * Check against max_rdatas and ensure we do not register more data per
424 : * buffer than can be handled by the physical data format; i.e. that
425 : * regbuf->rdata_len does not grow beyond what
426 : * XLogRecordBlockHeader->data_length can hold.
427 : */
428 41360064 : if (num_rdatas >= max_rdatas)
429 0 : ereport(ERROR,
430 : (errmsg_internal("too much WAL data"),
431 : errdetail_internal("%d out of %d data segments are already in use.",
432 : num_rdatas, max_rdatas)));
433 41360064 : if (regbuf->rdata_len + len > UINT16_MAX || len > UINT16_MAX)
434 0 : ereport(ERROR,
435 : (errmsg_internal("too much WAL data"),
436 : errdetail_internal("Registering more than maximum %u bytes allowed to block %u: current %u bytes, adding %u bytes.",
437 : UINT16_MAX, block_id, regbuf->rdata_len, len)));
438 :
439 41360064 : rdata = &rdatas[num_rdatas++];
440 :
441 41360064 : rdata->data = data;
442 41360064 : rdata->len = len;
443 :
444 41360064 : regbuf->rdata_tail->next = rdata;
445 41360064 : regbuf->rdata_tail = rdata;
446 41360064 : regbuf->rdata_len += len;
447 41360064 : }
448 :
449 : /*
450 : * Set insert status flags for the upcoming WAL record.
451 : *
452 : * The flags that can be used here are:
453 : * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
454 : * included in the record.
455 : * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
456 : * durability, which allows to avoid triggering WAL archiving and other
457 : * background activity.
458 : */
459 : void
460 18868204 : XLogSetRecordFlags(uint8 flags)
461 : {
462 : Assert(begininsert_called);
463 18868204 : curinsert_flags |= flags;
464 18868204 : }
465 :
466 : /*
467 : * Insert an XLOG record having the specified RMID and info bytes, with the
468 : * body of the record being the data and buffer references registered earlier
469 : * with XLogRegister* calls.
470 : *
471 : * Returns XLOG pointer to end of record (beginning of next record).
472 : * This can be used as LSN for data pages affected by the logged action.
473 : * (LSN is the XLOG point up to which the XLOG must be flushed to disk
474 : * before the data page can be written out. This implements the basic
475 : * WAL rule "write the log before the data".)
476 : */
477 : XLogRecPtr
478 30492376 : XLogInsert(RmgrId rmid, uint8 info)
479 : {
480 : XLogRecPtr EndPos;
481 :
482 : /* XLogBeginInsert() must have been called. */
483 30492376 : if (!begininsert_called)
484 0 : elog(ERROR, "XLogBeginInsert was not called");
485 :
486 : /*
487 : * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
488 : * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
489 : */
490 30492376 : if ((info & ~(XLR_RMGR_INFO_MASK |
491 : XLR_SPECIAL_REL_UPDATE |
492 : XLR_CHECK_CONSISTENCY)) != 0)
493 0 : elog(PANIC, "invalid xlog info mask %02X", info);
494 :
495 : TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
496 :
497 : /*
498 : * In bootstrap mode, we don't actually log anything but XLOG resources;
499 : * return a phony record pointer.
500 : */
501 30492376 : if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
502 : {
503 1254000 : XLogResetInsertion();
504 1254000 : EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
505 1254000 : return EndPos;
506 : }
507 :
508 : do
509 : {
510 : XLogRecPtr RedoRecPtr;
511 : bool doPageWrites;
512 29253976 : bool topxid_included = false;
513 : XLogRecPtr fpw_lsn;
514 : XLogRecData *rdt;
515 29253976 : int num_fpi = 0;
516 29253976 : uint64 fpi_bytes = 0;
517 :
518 : /*
519 : * Get values needed to decide whether to do full-page writes. Since
520 : * we don't yet have an insertion lock, these could change under us,
521 : * but XLogInsertRecord will recheck them once it has a lock.
522 : */
523 29253976 : GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
524 :
525 29253976 : rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
526 : &fpw_lsn, &num_fpi, &fpi_bytes,
527 : &topxid_included);
528 :
529 29253976 : EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
530 : fpi_bytes, topxid_included);
531 29253976 : } while (!XLogRecPtrIsValid(EndPos));
532 :
533 29238376 : XLogResetInsertion();
534 :
535 29238376 : return EndPos;
536 : }
537 :
538 : /*
539 : * Simple wrapper to XLogInsert to insert a WAL record with elementary
540 : * contents (only an int64 is supported as value currently).
541 : */
542 : XLogRecPtr
543 864042 : XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value)
544 : {
545 864042 : XLogBeginInsert();
546 864042 : XLogRegisterData(&value, sizeof(value));
547 864042 : return XLogInsert(rmid, info);
548 : }
549 :
550 : /*
551 : * Assemble a WAL record from the registered data and buffers into an
552 : * XLogRecData chain, ready for insertion with XLogInsertRecord().
553 : *
554 : * The record header fields are filled in, except for the xl_prev field. The
555 : * calculated CRC does not include the record header yet.
556 : *
557 : * If there are any registered buffers, and a full-page image was not taken
558 : * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
559 : * signals that the assembled record is only good for insertion on the
560 : * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
561 : *
562 : * *topxid_included is set if the topmost transaction ID is logged with the
563 : * current subtransaction.
564 : */
565 : static XLogRecData *
566 29253976 : XLogRecordAssemble(RmgrId rmid, uint8 info,
567 : XLogRecPtr RedoRecPtr, bool doPageWrites,
568 : XLogRecPtr *fpw_lsn, int *num_fpi, uint64 *fpi_bytes,
569 : bool *topxid_included)
570 : {
571 : XLogRecData *rdt;
572 29253976 : uint64 total_len = 0;
573 : int block_id;
574 : pg_crc32c rdata_crc;
575 29253976 : registered_buffer *prev_regbuf = NULL;
576 : XLogRecData *rdt_datas_last;
577 : XLogRecord *rechdr;
578 29253976 : char *scratch = hdr_scratch;
579 :
580 : /*
581 : * Note: this function can be called multiple times for the same record.
582 : * All the modifications we do to the rdata chains below must handle that.
583 : */
584 :
585 : /* The record begins with the fixed-size header */
586 29253976 : rechdr = (XLogRecord *) scratch;
587 29253976 : scratch += SizeOfXLogRecord;
588 :
589 29253976 : hdr_rdt.next = NULL;
590 29253976 : rdt_datas_last = &hdr_rdt;
591 29253976 : hdr_rdt.data = hdr_scratch;
592 :
593 : /*
594 : * Enforce consistency checks for this record if user is looking for it.
595 : * Do this before at the beginning of this routine to give the possibility
596 : * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
597 : * a record.
598 : */
599 29253976 : if (wal_consistency_checking[rmid])
600 4339636 : info |= XLR_CHECK_CONSISTENCY;
601 :
602 : /*
603 : * Make an rdata chain containing all the data portions of all block
604 : * references. This includes the data for full-page images. Also append
605 : * the headers for the block references in the scratch buffer.
606 : */
607 29253976 : *fpw_lsn = InvalidXLogRecPtr;
608 58279198 : for (block_id = 0; block_id < max_registered_block_id; block_id++)
609 : {
610 29025222 : registered_buffer *regbuf = ®istered_buffers[block_id];
611 : bool needs_backup;
612 : bool needs_data;
613 : XLogRecordBlockHeader bkpb;
614 : XLogRecordBlockImageHeader bimg;
615 29025222 : XLogRecordBlockCompressHeader cbimg = {0};
616 : bool samerel;
617 29025222 : bool is_compressed = false;
618 : bool include_image;
619 :
620 29025222 : if (!regbuf->in_use)
621 22088 : continue;
622 :
623 : /* Determine if this block needs to be backed up */
624 29003134 : if (regbuf->flags & REGBUF_FORCE_IMAGE)
625 607316 : needs_backup = true;
626 28395818 : else if (regbuf->flags & REGBUF_NO_IMAGE)
627 428154 : needs_backup = false;
628 27967664 : else if (!doPageWrites)
629 541244 : needs_backup = false;
630 : else
631 : {
632 : /*
633 : * We assume page LSN is first data on *every* page that can be
634 : * passed to XLogInsert, whether it has the standard page layout
635 : * or not.
636 : */
637 27426420 : XLogRecPtr page_lsn = PageGetLSN(regbuf->page);
638 :
639 27426420 : needs_backup = (page_lsn <= RedoRecPtr);
640 27426420 : if (!needs_backup)
641 : {
642 27241070 : if (!XLogRecPtrIsValid(*fpw_lsn) || page_lsn < *fpw_lsn)
643 26418808 : *fpw_lsn = page_lsn;
644 : }
645 : }
646 :
647 : /* Determine if the buffer data needs to included */
648 29003134 : if (regbuf->rdata_len == 0)
649 5472720 : needs_data = false;
650 23530414 : else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
651 596004 : needs_data = true;
652 : else
653 22934410 : needs_data = !needs_backup;
654 :
655 29003134 : bkpb.id = block_id;
656 29003134 : bkpb.fork_flags = regbuf->forkno;
657 29003134 : bkpb.data_length = 0;
658 :
659 29003134 : if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
660 421754 : bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
661 :
662 : /*
663 : * If needs_backup is true or WAL checking is enabled for current
664 : * resource manager, log a full-page write for the current block.
665 : */
666 29003134 : include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
667 :
668 29003134 : if (include_image)
669 : {
670 5408512 : const PageData *page = regbuf->page;
671 5408512 : uint16 compressed_len = 0;
672 :
673 : /*
674 : * The page needs to be backed up, so calculate its hole length
675 : * and offset.
676 : */
677 5408512 : if (regbuf->flags & REGBUF_STANDARD)
678 : {
679 : /* Assume we can omit data between pd_lower and pd_upper */
680 5117032 : uint16 lower = ((PageHeader) page)->pd_lower;
681 5117032 : uint16 upper = ((PageHeader) page)->pd_upper;
682 :
683 5117032 : if (lower >= SizeOfPageHeaderData &&
684 5112060 : upper > lower &&
685 : upper <= BLCKSZ)
686 : {
687 5112060 : bimg.hole_offset = lower;
688 5112060 : cbimg.hole_length = upper - lower;
689 : }
690 : else
691 : {
692 : /* No "hole" to remove */
693 4972 : bimg.hole_offset = 0;
694 4972 : cbimg.hole_length = 0;
695 : }
696 : }
697 : else
698 : {
699 : /* Not a standard page header, don't try to eliminate "hole" */
700 291480 : bimg.hole_offset = 0;
701 291480 : cbimg.hole_length = 0;
702 : }
703 :
704 : /*
705 : * Try to compress a block image if wal_compression is enabled
706 : */
707 5408512 : if (wal_compression != WAL_COMPRESSION_NONE)
708 : {
709 : is_compressed =
710 0 : XLogCompressBackupBlock(page, bimg.hole_offset,
711 0 : cbimg.hole_length,
712 0 : regbuf->compressed_page,
713 : &compressed_len);
714 : }
715 :
716 : /*
717 : * Fill in the remaining fields in the XLogRecordBlockHeader
718 : * struct
719 : */
720 5408512 : bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
721 :
722 : /* Report a full page image constructed for the WAL record */
723 5408512 : *num_fpi += 1;
724 :
725 : /*
726 : * Construct XLogRecData entries for the page content.
727 : */
728 5408512 : rdt_datas_last->next = ®buf->bkp_rdatas[0];
729 5408512 : rdt_datas_last = rdt_datas_last->next;
730 :
731 5408512 : bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
732 :
733 : /*
734 : * If WAL consistency checking is enabled for the resource manager
735 : * of this WAL record, a full-page image is included in the record
736 : * for the block modified. During redo, the full-page is replayed
737 : * only if BKPIMAGE_APPLY is set.
738 : */
739 5408512 : if (needs_backup)
740 792666 : bimg.bimg_info |= BKPIMAGE_APPLY;
741 :
742 5408512 : if (is_compressed)
743 : {
744 : /* The current compression is stored in the WAL record */
745 0 : bimg.length = compressed_len;
746 :
747 : /* Set the compression method used for this block */
748 0 : switch ((WalCompression) wal_compression)
749 : {
750 0 : case WAL_COMPRESSION_PGLZ:
751 0 : bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
752 0 : break;
753 :
754 0 : case WAL_COMPRESSION_LZ4:
755 : #ifdef USE_LZ4
756 0 : bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
757 : #else
758 : elog(ERROR, "LZ4 is not supported by this build");
759 : #endif
760 0 : break;
761 :
762 0 : case WAL_COMPRESSION_ZSTD:
763 : #ifdef USE_ZSTD
764 : bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
765 : #else
766 0 : elog(ERROR, "zstd is not supported by this build");
767 : #endif
768 : break;
769 :
770 0 : case WAL_COMPRESSION_NONE:
771 : Assert(false); /* cannot happen */
772 0 : break;
773 : /* no default case, so that compiler will warn */
774 : }
775 :
776 0 : rdt_datas_last->data = regbuf->compressed_page;
777 0 : rdt_datas_last->len = compressed_len;
778 : }
779 : else
780 : {
781 5408512 : bimg.length = BLCKSZ - cbimg.hole_length;
782 :
783 5408512 : if (cbimg.hole_length == 0)
784 : {
785 296452 : rdt_datas_last->data = page;
786 296452 : rdt_datas_last->len = BLCKSZ;
787 : }
788 : else
789 : {
790 : /* must skip the hole */
791 5112060 : rdt_datas_last->data = page;
792 5112060 : rdt_datas_last->len = bimg.hole_offset;
793 :
794 5112060 : rdt_datas_last->next = ®buf->bkp_rdatas[1];
795 5112060 : rdt_datas_last = rdt_datas_last->next;
796 :
797 5112060 : rdt_datas_last->data =
798 5112060 : page + (bimg.hole_offset + cbimg.hole_length);
799 5112060 : rdt_datas_last->len =
800 5112060 : BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
801 : }
802 : }
803 :
804 5408512 : total_len += bimg.length;
805 :
806 : /* Track the WAL full page images in bytes */
807 5408512 : *fpi_bytes += bimg.length;
808 : }
809 :
810 29003134 : if (needs_data)
811 : {
812 : /*
813 : * When copying to XLogRecordBlockHeader, the length is narrowed
814 : * to an uint16. Double-check that it is still correct.
815 : */
816 : Assert(regbuf->rdata_len <= UINT16_MAX);
817 :
818 : /*
819 : * Link the caller-supplied rdata chain for this buffer to the
820 : * overall list.
821 : */
822 23448344 : bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
823 23448344 : bkpb.data_length = (uint16) regbuf->rdata_len;
824 23448344 : total_len += regbuf->rdata_len;
825 :
826 23448344 : rdt_datas_last->next = regbuf->rdata_head;
827 23448344 : rdt_datas_last = regbuf->rdata_tail;
828 : }
829 :
830 29003134 : if (prev_regbuf && RelFileLocatorEquals(regbuf->rlocator, prev_regbuf->rlocator))
831 : {
832 1394602 : samerel = true;
833 1394602 : bkpb.fork_flags |= BKPBLOCK_SAME_REL;
834 : }
835 : else
836 27608532 : samerel = false;
837 29003134 : prev_regbuf = regbuf;
838 :
839 : /* Ok, copy the header to the scratch buffer */
840 29003134 : memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
841 29003134 : scratch += SizeOfXLogRecordBlockHeader;
842 29003134 : if (include_image)
843 : {
844 5408512 : memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
845 5408512 : scratch += SizeOfXLogRecordBlockImageHeader;
846 5408512 : if (cbimg.hole_length != 0 && is_compressed)
847 : {
848 0 : memcpy(scratch, &cbimg,
849 : SizeOfXLogRecordBlockCompressHeader);
850 0 : scratch += SizeOfXLogRecordBlockCompressHeader;
851 : }
852 : }
853 29003134 : if (!samerel)
854 : {
855 27608532 : memcpy(scratch, ®buf->rlocator, sizeof(RelFileLocator));
856 27608532 : scratch += sizeof(RelFileLocator);
857 : }
858 29003134 : memcpy(scratch, ®buf->block, sizeof(BlockNumber));
859 29003134 : scratch += sizeof(BlockNumber);
860 : }
861 :
862 : /* followed by the record's origin, if any */
863 29253976 : if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
864 17492444 : replorigin_session_origin != InvalidRepOriginId)
865 : {
866 301170 : *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
867 301170 : memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
868 301170 : scratch += sizeof(replorigin_session_origin);
869 : }
870 :
871 : /* followed by toplevel XID, if not already included in previous record */
872 29253976 : if (IsSubxactTopXidLogPending())
873 : {
874 442 : TransactionId xid = GetTopTransactionIdIfAny();
875 :
876 : /* Set the flag that the top xid is included in the WAL */
877 442 : *topxid_included = true;
878 :
879 442 : *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
880 442 : memcpy(scratch, &xid, sizeof(TransactionId));
881 442 : scratch += sizeof(TransactionId);
882 : }
883 :
884 : /* followed by main data, if any */
885 29253976 : if (mainrdata_len > 0)
886 : {
887 28602272 : if (mainrdata_len > 255)
888 : {
889 : uint32 mainrdata_len_4b;
890 :
891 62240 : if (mainrdata_len > PG_UINT32_MAX)
892 0 : ereport(ERROR,
893 : (errmsg_internal("too much WAL data"),
894 : errdetail_internal("Main data length is %" PRIu64 " bytes for a maximum of %u bytes.",
895 : mainrdata_len,
896 : PG_UINT32_MAX)));
897 :
898 62240 : mainrdata_len_4b = (uint32) mainrdata_len;
899 62240 : *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
900 62240 : memcpy(scratch, &mainrdata_len_4b, sizeof(uint32));
901 62240 : scratch += sizeof(uint32);
902 : }
903 : else
904 : {
905 28540032 : *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
906 28540032 : *(scratch++) = (uint8) mainrdata_len;
907 : }
908 28602272 : rdt_datas_last->next = mainrdata_head;
909 28602272 : rdt_datas_last = mainrdata_last;
910 28602272 : total_len += mainrdata_len;
911 : }
912 29253976 : rdt_datas_last->next = NULL;
913 :
914 29253976 : hdr_rdt.len = (scratch - hdr_scratch);
915 29253976 : total_len += hdr_rdt.len;
916 :
917 : /*
918 : * Calculate CRC of the data
919 : *
920 : * Note that the record header isn't added into the CRC initially since we
921 : * don't know the prev-link yet. Thus, the CRC will represent the CRC of
922 : * the whole record in the order: rdata, then backup blocks, then record
923 : * header.
924 : */
925 29253976 : INIT_CRC32C(rdata_crc);
926 29253976 : COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
927 108898474 : for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
928 79644498 : COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
929 :
930 : /*
931 : * Ensure that the XLogRecord is not too large.
932 : *
933 : * XLogReader machinery is only able to handle records up to a certain
934 : * size (ignoring machine resource limitations), so make sure that we will
935 : * not emit records larger than the sizes advertised to be supported.
936 : */
937 29253976 : if (total_len > XLogRecordMaxSize)
938 0 : ereport(ERROR,
939 : (errmsg_internal("oversized WAL record"),
940 : errdetail_internal("WAL record would be %" PRIu64 " bytes (of maximum %u bytes); rmid %u flags %u.",
941 : total_len, XLogRecordMaxSize, rmid, info)));
942 :
943 : /*
944 : * Fill in the fields in the record header. Prev-link is filled in later,
945 : * once we know where in the WAL the record will be inserted. The CRC does
946 : * not include the record header yet.
947 : */
948 29253976 : rechdr->xl_xid = GetCurrentTransactionIdIfAny();
949 29253976 : rechdr->xl_tot_len = (uint32) total_len;
950 29253976 : rechdr->xl_info = info;
951 29253976 : rechdr->xl_rmid = rmid;
952 29253976 : rechdr->xl_prev = InvalidXLogRecPtr;
953 29253976 : rechdr->xl_crc = rdata_crc;
954 :
955 29253976 : return &hdr_rdt;
956 : }
957 :
958 : /*
959 : * Create a compressed version of a backup block image.
960 : *
961 : * Returns false if compression fails (i.e., compressed result is actually
962 : * bigger than original). Otherwise, returns true and sets 'dlen' to
963 : * the length of compressed block image.
964 : */
965 : static bool
966 0 : XLogCompressBackupBlock(const PageData *page, uint16 hole_offset, uint16 hole_length,
967 : void *dest, uint16 *dlen)
968 : {
969 0 : int32 orig_len = BLCKSZ - hole_length;
970 0 : int32 len = -1;
971 0 : int32 extra_bytes = 0;
972 : const void *source;
973 : PGAlignedBlock tmp;
974 :
975 0 : if (hole_length != 0)
976 : {
977 : /* must skip the hole */
978 0 : memcpy(tmp.data, page, hole_offset);
979 0 : memcpy(tmp.data + hole_offset,
980 0 : page + (hole_offset + hole_length),
981 0 : BLCKSZ - (hole_length + hole_offset));
982 0 : source = tmp.data;
983 :
984 : /*
985 : * Extra data needs to be stored in WAL record for the compressed
986 : * version of block image if the hole exists.
987 : */
988 0 : extra_bytes = SizeOfXLogRecordBlockCompressHeader;
989 : }
990 : else
991 0 : source = page;
992 :
993 0 : switch ((WalCompression) wal_compression)
994 : {
995 0 : case WAL_COMPRESSION_PGLZ:
996 0 : len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
997 0 : break;
998 :
999 0 : case WAL_COMPRESSION_LZ4:
1000 : #ifdef USE_LZ4
1001 0 : len = LZ4_compress_default(source, dest, orig_len,
1002 : COMPRESS_BUFSIZE);
1003 0 : if (len <= 0)
1004 0 : len = -1; /* failure */
1005 : #else
1006 : elog(ERROR, "LZ4 is not supported by this build");
1007 : #endif
1008 0 : break;
1009 :
1010 0 : case WAL_COMPRESSION_ZSTD:
1011 : #ifdef USE_ZSTD
1012 : len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
1013 : ZSTD_CLEVEL_DEFAULT);
1014 : if (ZSTD_isError(len))
1015 : len = -1; /* failure */
1016 : #else
1017 0 : elog(ERROR, "zstd is not supported by this build");
1018 : #endif
1019 : break;
1020 :
1021 0 : case WAL_COMPRESSION_NONE:
1022 : Assert(false); /* cannot happen */
1023 0 : break;
1024 : /* no default case, so that compiler will warn */
1025 : }
1026 :
1027 : /*
1028 : * We recheck the actual size even if compression reports success and see
1029 : * if the number of bytes saved by compression is larger than the length
1030 : * of extra data needed for the compressed version of block image.
1031 : */
1032 0 : if (len >= 0 &&
1033 0 : len + extra_bytes < orig_len)
1034 : {
1035 0 : *dlen = (uint16) len; /* successful compression */
1036 0 : return true;
1037 : }
1038 0 : return false;
1039 : }
1040 :
1041 : /*
1042 : * Determine whether the buffer referenced has to be backed up.
1043 : *
1044 : * Since we don't yet have the insert lock, fullPageWrites and runningBackups
1045 : * (which forces full-page writes) could change later, so the result should
1046 : * be used for optimization purposes only.
1047 : */
1048 : bool
1049 283504 : XLogCheckBufferNeedsBackup(Buffer buffer)
1050 : {
1051 : XLogRecPtr RedoRecPtr;
1052 : bool doPageWrites;
1053 : Page page;
1054 :
1055 283504 : GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
1056 :
1057 283504 : page = BufferGetPage(buffer);
1058 :
1059 283504 : if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
1060 1916 : return true; /* buffer requires backup */
1061 :
1062 281588 : return false; /* buffer does not need to be backed up */
1063 : }
1064 :
1065 : /*
1066 : * Write a backup block if needed when we are setting a hint. Note that
1067 : * this may be called for a variety of page types, not just heaps.
1068 : *
1069 : * Callable while holding just share lock on the buffer content.
1070 : *
1071 : * We can't use the plain backup block mechanism since that relies on the
1072 : * Buffer being exclusively locked. Since some modifications (setting LSN, hint
1073 : * bits) are allowed in a sharelocked buffer that can lead to wal checksum
1074 : * failures. So instead we copy the page and insert the copied data as normal
1075 : * record data.
1076 : *
1077 : * We only need to do something if page has not yet been full page written in
1078 : * this checkpoint round. The LSN of the inserted wal record is returned if we
1079 : * had to write, InvalidXLogRecPtr otherwise.
1080 : *
1081 : * It is possible that multiple concurrent backends could attempt to write WAL
1082 : * records. In that case, multiple copies of the same block would be recorded
1083 : * in separate WAL records by different backends, though that is still OK from
1084 : * a correctness perspective.
1085 : */
1086 : XLogRecPtr
1087 118538 : XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
1088 : {
1089 118538 : XLogRecPtr recptr = InvalidXLogRecPtr;
1090 : XLogRecPtr lsn;
1091 : XLogRecPtr RedoRecPtr;
1092 :
1093 : /*
1094 : * Ensure no checkpoint can change our view of RedoRecPtr.
1095 : */
1096 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) != 0);
1097 :
1098 : /*
1099 : * Update RedoRecPtr so that we can make the right decision
1100 : */
1101 118538 : RedoRecPtr = GetRedoRecPtr();
1102 :
1103 : /*
1104 : * We assume page LSN is first data on *every* page that can be passed to
1105 : * XLogInsert, whether it has the standard page layout or not. Since we're
1106 : * only holding a share-lock on the page, we must take the buffer header
1107 : * lock when we look at the LSN.
1108 : */
1109 118538 : lsn = BufferGetLSNAtomic(buffer);
1110 :
1111 118538 : if (lsn <= RedoRecPtr)
1112 : {
1113 61676 : int flags = 0;
1114 : PGAlignedBlock copied_buffer;
1115 61676 : char *origdata = (char *) BufferGetBlock(buffer);
1116 : RelFileLocator rlocator;
1117 : ForkNumber forkno;
1118 : BlockNumber blkno;
1119 :
1120 : /*
1121 : * Copy buffer so we don't have to worry about concurrent hint bit or
1122 : * lsn updates. We assume pd_lower/upper cannot be changed without an
1123 : * exclusive lock, so the contents bkp are not racy.
1124 : */
1125 61676 : if (buffer_std)
1126 : {
1127 : /* Assume we can omit data between pd_lower and pd_upper */
1128 38822 : Page page = BufferGetPage(buffer);
1129 38822 : uint16 lower = ((PageHeader) page)->pd_lower;
1130 38822 : uint16 upper = ((PageHeader) page)->pd_upper;
1131 :
1132 38822 : memcpy(copied_buffer.data, origdata, lower);
1133 38822 : memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
1134 : }
1135 : else
1136 22854 : memcpy(copied_buffer.data, origdata, BLCKSZ);
1137 :
1138 61676 : XLogBeginInsert();
1139 :
1140 61676 : if (buffer_std)
1141 38822 : flags |= REGBUF_STANDARD;
1142 :
1143 61676 : BufferGetTag(buffer, &rlocator, &forkno, &blkno);
1144 61676 : XLogRegisterBlock(0, &rlocator, forkno, blkno, copied_buffer.data, flags);
1145 :
1146 61676 : recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
1147 : }
1148 :
1149 118538 : return recptr;
1150 : }
1151 :
1152 : /*
1153 : * Write a WAL record containing a full image of a page. Caller is responsible
1154 : * for writing the page to disk after calling this routine.
1155 : *
1156 : * Note: If you're using this function, you should be building pages in private
1157 : * memory and writing them directly to smgr. If you're using buffers, call
1158 : * log_newpage_buffer instead.
1159 : *
1160 : * If the page follows the standard page layout, with a PageHeader and unused
1161 : * space between pd_lower and pd_upper, set 'page_std' to true. That allows
1162 : * the unused space to be left out from the WAL record, making it smaller.
1163 : */
1164 : XLogRecPtr
1165 267354 : log_newpage(RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blkno,
1166 : Page page, bool page_std)
1167 : {
1168 : int flags;
1169 : XLogRecPtr recptr;
1170 :
1171 267354 : flags = REGBUF_FORCE_IMAGE;
1172 267354 : if (page_std)
1173 267036 : flags |= REGBUF_STANDARD;
1174 :
1175 267354 : XLogBeginInsert();
1176 267354 : XLogRegisterBlock(0, rlocator, forknum, blkno, page, flags);
1177 267354 : recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
1178 :
1179 : /*
1180 : * The page may be uninitialized. If so, we can't set the LSN because that
1181 : * would corrupt the page.
1182 : */
1183 267354 : if (!PageIsNew(page))
1184 : {
1185 267346 : PageSetLSN(page, recptr);
1186 : }
1187 :
1188 267354 : return recptr;
1189 : }
1190 :
1191 : /*
1192 : * Like log_newpage(), but allows logging multiple pages in one operation.
1193 : * It is more efficient than calling log_newpage() for each page separately,
1194 : * because we can write multiple pages in a single WAL record.
1195 : */
1196 : void
1197 38804 : log_newpages(RelFileLocator *rlocator, ForkNumber forknum, int num_pages,
1198 : BlockNumber *blknos, Page *pages, bool page_std)
1199 : {
1200 : int flags;
1201 : XLogRecPtr recptr;
1202 : int i;
1203 : int j;
1204 :
1205 38804 : flags = REGBUF_FORCE_IMAGE;
1206 38804 : if (page_std)
1207 38716 : flags |= REGBUF_STANDARD;
1208 :
1209 : /*
1210 : * Iterate over all the pages. They are collected into batches of
1211 : * XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each
1212 : * batch.
1213 : */
1214 38804 : XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
1215 :
1216 38804 : i = 0;
1217 77608 : while (i < num_pages)
1218 : {
1219 38804 : int batch_start = i;
1220 : int nbatch;
1221 :
1222 38804 : XLogBeginInsert();
1223 :
1224 38804 : nbatch = 0;
1225 115158 : while (nbatch < XLR_MAX_BLOCK_ID && i < num_pages)
1226 : {
1227 76354 : XLogRegisterBlock(nbatch, rlocator, forknum, blknos[i], pages[i], flags);
1228 76354 : i++;
1229 76354 : nbatch++;
1230 : }
1231 :
1232 38804 : recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
1233 :
1234 115158 : for (j = batch_start; j < i; j++)
1235 : {
1236 : /*
1237 : * The page may be uninitialized. If so, we can't set the LSN
1238 : * because that would corrupt the page.
1239 : */
1240 76354 : if (!PageIsNew(pages[j]))
1241 : {
1242 76346 : PageSetLSN(pages[j], recptr);
1243 : }
1244 : }
1245 : }
1246 38804 : }
1247 :
1248 : /*
1249 : * Write a WAL record containing a full image of a page.
1250 : *
1251 : * Caller should initialize the buffer and mark it dirty before calling this
1252 : * function. This function will set the page LSN.
1253 : *
1254 : * If the page follows the standard page layout, with a PageHeader and unused
1255 : * space between pd_lower and pd_upper, set 'page_std' to true. That allows
1256 : * the unused space to be left out from the WAL record, making it smaller.
1257 : */
1258 : XLogRecPtr
1259 261844 : log_newpage_buffer(Buffer buffer, bool page_std)
1260 : {
1261 261844 : Page page = BufferGetPage(buffer);
1262 : RelFileLocator rlocator;
1263 : ForkNumber forknum;
1264 : BlockNumber blkno;
1265 :
1266 : /* Shared buffers should be modified in a critical section. */
1267 : Assert(CritSectionCount > 0);
1268 :
1269 261844 : BufferGetTag(buffer, &rlocator, &forknum, &blkno);
1270 :
1271 261844 : return log_newpage(&rlocator, forknum, blkno, page, page_std);
1272 : }
1273 :
1274 : /*
1275 : * WAL-log a range of blocks in a relation.
1276 : *
1277 : * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
1278 : * written to the WAL. If the range is large, this is done in multiple WAL
1279 : * records.
1280 : *
1281 : * If all page follows the standard page layout, with a PageHeader and unused
1282 : * space between pd_lower and pd_upper, set 'page_std' to true. That allows
1283 : * the unused space to be left out from the WAL records, making them smaller.
1284 : *
1285 : * NOTE: This function acquires exclusive-locks on the pages. Typically, this
1286 : * is used on a newly-built relation, and the caller is holding a
1287 : * AccessExclusiveLock on it, so no other backend can be accessing it at the
1288 : * same time. If that's not the case, you must ensure that this does not
1289 : * cause a deadlock through some other means.
1290 : */
1291 : void
1292 92320 : log_newpage_range(Relation rel, ForkNumber forknum,
1293 : BlockNumber startblk, BlockNumber endblk,
1294 : bool page_std)
1295 : {
1296 : int flags;
1297 : BlockNumber blkno;
1298 :
1299 92320 : flags = REGBUF_FORCE_IMAGE;
1300 92320 : if (page_std)
1301 736 : flags |= REGBUF_STANDARD;
1302 :
1303 : /*
1304 : * Iterate over all the pages in the range. They are collected into
1305 : * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
1306 : * for each batch.
1307 : */
1308 92320 : XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
1309 :
1310 92320 : blkno = startblk;
1311 162402 : while (blkno < endblk)
1312 : {
1313 : Buffer bufpack[XLR_MAX_BLOCK_ID];
1314 : XLogRecPtr recptr;
1315 : int nbufs;
1316 : int i;
1317 :
1318 70082 : CHECK_FOR_INTERRUPTS();
1319 :
1320 : /* Collect a batch of blocks. */
1321 70082 : nbufs = 0;
1322 329128 : while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
1323 : {
1324 259046 : Buffer buf = ReadBufferExtended(rel, forknum, blkno,
1325 : RBM_NORMAL, NULL);
1326 :
1327 259046 : LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1328 :
1329 : /*
1330 : * Completely empty pages are not WAL-logged. Writing a WAL record
1331 : * would change the LSN, and we don't want that. We want the page
1332 : * to stay empty.
1333 : */
1334 259046 : if (!PageIsNew(BufferGetPage(buf)))
1335 258086 : bufpack[nbufs++] = buf;
1336 : else
1337 960 : UnlockReleaseBuffer(buf);
1338 259046 : blkno++;
1339 : }
1340 :
1341 : /* Nothing more to do if all remaining blocks were empty. */
1342 70082 : if (nbufs == 0)
1343 0 : break;
1344 :
1345 : /* Write WAL record for this batch. */
1346 70082 : XLogBeginInsert();
1347 :
1348 70082 : START_CRIT_SECTION();
1349 328168 : for (i = 0; i < nbufs; i++)
1350 : {
1351 258086 : MarkBufferDirty(bufpack[i]);
1352 258086 : XLogRegisterBuffer(i, bufpack[i], flags);
1353 : }
1354 :
1355 70082 : recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
1356 :
1357 328168 : for (i = 0; i < nbufs; i++)
1358 : {
1359 258086 : PageSetLSN(BufferGetPage(bufpack[i]), recptr);
1360 258086 : UnlockReleaseBuffer(bufpack[i]);
1361 : }
1362 70082 : END_CRIT_SECTION();
1363 : }
1364 92320 : }
1365 :
1366 : /*
1367 : * Allocate working buffers needed for WAL record construction.
1368 : */
1369 : void
1370 44926 : InitXLogInsert(void)
1371 : {
1372 : #ifdef USE_ASSERT_CHECKING
1373 :
1374 : /*
1375 : * Check that any records assembled can be decoded. This is capped based
1376 : * on what XLogReader would require at its maximum bound. The XLOG_BLCKSZ
1377 : * addend covers the larger allocate_recordbuf() demand. This code path
1378 : * is called once per backend, more than enough for this check.
1379 : */
1380 : size_t max_required =
1381 : DecodeXLogRecordRequiredSpace(XLogRecordMaxSize + XLOG_BLCKSZ);
1382 :
1383 : Assert(AllocSizeIsValid(max_required));
1384 : #endif
1385 :
1386 : /* Initialize the working areas */
1387 44926 : if (xloginsert_cxt == NULL)
1388 : {
1389 44926 : xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
1390 : "WAL record construction",
1391 : ALLOCSET_DEFAULT_SIZES);
1392 : }
1393 :
1394 44926 : if (registered_buffers == NULL)
1395 : {
1396 44926 : registered_buffers = (registered_buffer *)
1397 44926 : MemoryContextAllocZero(xloginsert_cxt,
1398 : sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
1399 44926 : max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
1400 : }
1401 44926 : if (rdatas == NULL)
1402 : {
1403 44926 : rdatas = MemoryContextAlloc(xloginsert_cxt,
1404 : sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
1405 44926 : max_rdatas = XLR_NORMAL_RDATAS;
1406 : }
1407 :
1408 : /*
1409 : * Allocate a buffer to hold the header information for a WAL record.
1410 : */
1411 44926 : if (hdr_scratch == NULL)
1412 44926 : hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
1413 : HEADER_SCRATCH_SIZE);
1414 44926 : }
|