Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * spgxlog.c
4 : * WAL replay logic for SP-GiST
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/spgist/spgxlog.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/bufmask.h"
18 : #include "access/spgist_private.h"
19 : #include "access/spgxlog.h"
20 : #include "access/xlogutils.h"
21 : #include "storage/standby.h"
22 : #include "utils/memutils.h"
23 :
24 :
25 : static MemoryContext opCtx; /* working memory for operations */
26 :
27 :
28 : /*
29 : * Prepare a dummy SpGistState, with just the minimum info needed for replay.
30 : *
31 : * At present, all we need is enough info to support spgFormDeadTuple(),
32 : * plus the isBuild flag.
33 : */
34 : static void
35 814 : fillFakeState(SpGistState *state, spgxlogState stateSrc)
36 : {
37 814 : memset(state, 0, sizeof(*state));
38 :
39 814 : state->redirectXid = stateSrc.redirectXid;
40 814 : state->isBuild = stateSrc.isBuild;
41 814 : state->deadTupleStorage = palloc0(SGDTSIZE);
42 814 : }
43 :
44 : /*
45 : * Add a leaf tuple, or replace an existing placeholder tuple. This is used
46 : * to replay SpGistPageAddNewItem() operations. If the offset points at an
47 : * existing tuple, it had better be a placeholder tuple.
48 : */
49 : static void
50 141466 : addOrReplaceTuple(Page page, const void *tuple, int size, OffsetNumber offset)
51 : {
52 141466 : if (offset <= PageGetMaxOffsetNumber(page))
53 : {
54 38230 : SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
55 38230 : PageGetItemId(page, offset));
56 :
57 38230 : if (dt->tupstate != SPGIST_PLACEHOLDER)
58 0 : elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
59 :
60 : Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
61 38230 : SpGistPageGetOpaque(page)->nPlaceholder--;
62 :
63 38230 : PageIndexTupleDelete(page, offset);
64 : }
65 :
66 : Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
67 :
68 141466 : if (PageAddItem(page, tuple, size, offset, false, false) != offset)
69 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
70 : size);
71 141466 : }
72 :
73 : static void
74 77936 : spgRedoAddLeaf(XLogReaderState *record)
75 : {
76 77936 : XLogRecPtr lsn = record->EndRecPtr;
77 77936 : char *ptr = XLogRecGetData(record);
78 77936 : spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
79 : char *leafTuple;
80 : SpGistLeafTupleData leafTupleHdr;
81 : Buffer buffer;
82 : Page page;
83 : XLogRedoAction action;
84 :
85 77936 : ptr += sizeof(spgxlogAddLeaf);
86 77936 : leafTuple = ptr;
87 : /* the leaf tuple is unaligned, so make a copy to access its header */
88 77936 : memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
89 :
90 : /*
91 : * In normal operation we would have both current and parent pages locked
92 : * simultaneously; but in WAL replay it should be safe to update the leaf
93 : * page before updating the parent.
94 : */
95 77936 : if (xldata->newPage)
96 : {
97 0 : buffer = XLogInitBufferForRedo(record, 0);
98 0 : SpGistInitBuffer(buffer,
99 0 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
100 0 : action = BLK_NEEDS_REDO;
101 : }
102 : else
103 77936 : action = XLogReadBufferForRedo(record, 0, &buffer);
104 :
105 77936 : if (action == BLK_NEEDS_REDO)
106 : {
107 77562 : page = BufferGetPage(buffer);
108 :
109 : /* insert new tuple */
110 77562 : if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
111 : {
112 : /* normal cases, tuple was added by SpGistPageAddNewItem */
113 77562 : addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, xldata->offnumLeaf);
114 :
115 : /* update head tuple's chain link if needed */
116 77562 : if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
117 : {
118 : SpGistLeafTuple head;
119 :
120 76282 : head = (SpGistLeafTuple) PageGetItem(page,
121 76282 : PageGetItemId(page, xldata->offnumHeadLeaf));
122 : Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
123 76282 : SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
124 : }
125 : }
126 : else
127 : {
128 : /* replacing a DEAD tuple */
129 0 : PageIndexTupleDelete(page, xldata->offnumLeaf);
130 0 : if (PageAddItem(page,
131 : leafTuple, leafTupleHdr.size,
132 0 : xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
133 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
134 : leafTupleHdr.size);
135 : }
136 :
137 77562 : PageSetLSN(page, lsn);
138 77562 : MarkBufferDirty(buffer);
139 : }
140 77936 : if (BufferIsValid(buffer))
141 77936 : UnlockReleaseBuffer(buffer);
142 :
143 : /* update parent downlink if necessary */
144 77936 : if (xldata->offnumParent != InvalidOffsetNumber)
145 : {
146 240 : if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
147 : {
148 : SpGistInnerTuple tuple;
149 : BlockNumber blknoLeaf;
150 :
151 240 : XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
152 :
153 240 : page = BufferGetPage(buffer);
154 :
155 240 : tuple = (SpGistInnerTuple) PageGetItem(page,
156 240 : PageGetItemId(page, xldata->offnumParent));
157 :
158 240 : spgUpdateNodeLink(tuple, xldata->nodeI,
159 240 : blknoLeaf, xldata->offnumLeaf);
160 :
161 240 : PageSetLSN(page, lsn);
162 240 : MarkBufferDirty(buffer);
163 : }
164 240 : if (BufferIsValid(buffer))
165 240 : UnlockReleaseBuffer(buffer);
166 : }
167 77936 : }
168 :
169 : static void
170 152 : spgRedoMoveLeafs(XLogReaderState *record)
171 : {
172 152 : XLogRecPtr lsn = record->EndRecPtr;
173 152 : char *ptr = XLogRecGetData(record);
174 152 : spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
175 : SpGistState state;
176 : OffsetNumber *toDelete;
177 : OffsetNumber *toInsert;
178 : int nInsert;
179 : Buffer buffer;
180 : Page page;
181 : XLogRedoAction action;
182 : BlockNumber blknoDst;
183 :
184 152 : XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
185 :
186 152 : fillFakeState(&state, xldata->stateSrc);
187 :
188 152 : nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
189 :
190 152 : ptr += SizeOfSpgxlogMoveLeafs;
191 152 : toDelete = (OffsetNumber *) ptr;
192 152 : ptr += sizeof(OffsetNumber) * xldata->nMoves;
193 152 : toInsert = (OffsetNumber *) ptr;
194 152 : ptr += sizeof(OffsetNumber) * nInsert;
195 :
196 : /* now ptr points to the list of leaf tuples */
197 :
198 : /*
199 : * In normal operation we would have all three pages (source, dest, and
200 : * parent) locked simultaneously; but in WAL replay it should be safe to
201 : * update them one at a time, as long as we do it in the right order.
202 : */
203 :
204 : /* Insert tuples on the dest page (do first, so redirect is valid) */
205 152 : if (xldata->newPage)
206 : {
207 64 : buffer = XLogInitBufferForRedo(record, 1);
208 64 : SpGistInitBuffer(buffer,
209 64 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
210 64 : action = BLK_NEEDS_REDO;
211 : }
212 : else
213 88 : action = XLogReadBufferForRedo(record, 1, &buffer);
214 :
215 152 : if (action == BLK_NEEDS_REDO)
216 : {
217 : int i;
218 :
219 152 : page = BufferGetPage(buffer);
220 :
221 6788 : for (i = 0; i < nInsert; i++)
222 : {
223 : char *leafTuple;
224 : SpGistLeafTupleData leafTupleHdr;
225 :
226 : /*
227 : * the tuples are not aligned, so must copy to access the size
228 : * field.
229 : */
230 6636 : leafTuple = ptr;
231 6636 : memcpy(&leafTupleHdr, leafTuple,
232 : sizeof(SpGistLeafTupleData));
233 :
234 6636 : addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, toInsert[i]);
235 6636 : ptr += leafTupleHdr.size;
236 : }
237 :
238 152 : PageSetLSN(page, lsn);
239 152 : MarkBufferDirty(buffer);
240 : }
241 152 : if (BufferIsValid(buffer))
242 152 : UnlockReleaseBuffer(buffer);
243 :
244 : /* Delete tuples from the source page, inserting a redirection pointer */
245 152 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
246 : {
247 152 : page = BufferGetPage(buffer);
248 :
249 152 : spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
250 152 : state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
251 : SPGIST_PLACEHOLDER,
252 : blknoDst,
253 152 : toInsert[nInsert - 1]);
254 :
255 152 : PageSetLSN(page, lsn);
256 152 : MarkBufferDirty(buffer);
257 : }
258 152 : if (BufferIsValid(buffer))
259 152 : UnlockReleaseBuffer(buffer);
260 :
261 : /* And update the parent downlink */
262 152 : if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
263 : {
264 : SpGistInnerTuple tuple;
265 :
266 152 : page = BufferGetPage(buffer);
267 :
268 152 : tuple = (SpGistInnerTuple) PageGetItem(page,
269 152 : PageGetItemId(page, xldata->offnumParent));
270 :
271 152 : spgUpdateNodeLink(tuple, xldata->nodeI,
272 152 : blknoDst, toInsert[nInsert - 1]);
273 :
274 152 : PageSetLSN(page, lsn);
275 152 : MarkBufferDirty(buffer);
276 : }
277 152 : if (BufferIsValid(buffer))
278 152 : UnlockReleaseBuffer(buffer);
279 152 : }
280 :
281 : static void
282 202 : spgRedoAddNode(XLogReaderState *record)
283 : {
284 202 : XLogRecPtr lsn = record->EndRecPtr;
285 202 : char *ptr = XLogRecGetData(record);
286 202 : spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
287 : char *innerTuple;
288 : SpGistInnerTupleData innerTupleHdr;
289 : SpGistState state;
290 : Buffer buffer;
291 : Page page;
292 : XLogRedoAction action;
293 :
294 202 : ptr += sizeof(spgxlogAddNode);
295 202 : innerTuple = ptr;
296 : /* the tuple is unaligned, so make a copy to access its header */
297 202 : memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
298 :
299 202 : fillFakeState(&state, xldata->stateSrc);
300 :
301 202 : if (!XLogRecHasBlockRef(record, 1))
302 : {
303 : /* update in place */
304 : Assert(xldata->parentBlk == -1);
305 200 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
306 : {
307 200 : page = BufferGetPage(buffer);
308 :
309 200 : PageIndexTupleDelete(page, xldata->offnum);
310 200 : if (PageAddItem(page, innerTuple, innerTupleHdr.size,
311 : xldata->offnum,
312 200 : false, false) != xldata->offnum)
313 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
314 : innerTupleHdr.size);
315 :
316 200 : PageSetLSN(page, lsn);
317 200 : MarkBufferDirty(buffer);
318 : }
319 200 : if (BufferIsValid(buffer))
320 200 : UnlockReleaseBuffer(buffer);
321 : }
322 : else
323 : {
324 : BlockNumber blkno;
325 : BlockNumber blknoNew;
326 :
327 2 : XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
328 2 : XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
329 :
330 : /*
331 : * In normal operation we would have all three pages (source, dest,
332 : * and parent) locked simultaneously; but in WAL replay it should be
333 : * safe to update them one at a time, as long as we do it in the right
334 : * order. We must insert the new tuple before replacing the old tuple
335 : * with the redirect tuple.
336 : */
337 :
338 : /* Install new tuple first so redirect is valid */
339 2 : if (xldata->newPage)
340 : {
341 : /* AddNode is not used for nulls pages */
342 2 : buffer = XLogInitBufferForRedo(record, 1);
343 2 : SpGistInitBuffer(buffer, 0);
344 2 : action = BLK_NEEDS_REDO;
345 : }
346 : else
347 0 : action = XLogReadBufferForRedo(record, 1, &buffer);
348 2 : if (action == BLK_NEEDS_REDO)
349 : {
350 2 : page = BufferGetPage(buffer);
351 :
352 2 : addOrReplaceTuple(page, innerTuple, innerTupleHdr.size, xldata->offnumNew);
353 :
354 : /*
355 : * If parent is in this same page, update it now.
356 : */
357 2 : if (xldata->parentBlk == 1)
358 : {
359 : SpGistInnerTuple parentTuple;
360 :
361 0 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
362 0 : PageGetItemId(page, xldata->offnumParent));
363 :
364 0 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
365 0 : blknoNew, xldata->offnumNew);
366 : }
367 2 : PageSetLSN(page, lsn);
368 2 : MarkBufferDirty(buffer);
369 : }
370 2 : if (BufferIsValid(buffer))
371 2 : UnlockReleaseBuffer(buffer);
372 :
373 : /* Delete old tuple, replacing it with redirect or placeholder tuple */
374 2 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
375 : {
376 : SpGistDeadTuple dt;
377 :
378 2 : page = BufferGetPage(buffer);
379 :
380 2 : if (state.isBuild)
381 0 : dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
382 : InvalidBlockNumber,
383 : InvalidOffsetNumber);
384 : else
385 2 : dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
386 : blknoNew,
387 2 : xldata->offnumNew);
388 :
389 2 : PageIndexTupleDelete(page, xldata->offnum);
390 2 : if (PageAddItem(page, dt, dt->size,
391 : xldata->offnum,
392 2 : false, false) != xldata->offnum)
393 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
394 : dt->size);
395 :
396 2 : if (state.isBuild)
397 0 : SpGistPageGetOpaque(page)->nPlaceholder++;
398 : else
399 2 : SpGistPageGetOpaque(page)->nRedirection++;
400 :
401 : /*
402 : * If parent is in this same page, update it now.
403 : */
404 2 : if (xldata->parentBlk == 0)
405 : {
406 : SpGistInnerTuple parentTuple;
407 :
408 0 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
409 0 : PageGetItemId(page, xldata->offnumParent));
410 :
411 0 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
412 0 : blknoNew, xldata->offnumNew);
413 : }
414 2 : PageSetLSN(page, lsn);
415 2 : MarkBufferDirty(buffer);
416 : }
417 2 : if (BufferIsValid(buffer))
418 2 : UnlockReleaseBuffer(buffer);
419 :
420 : /*
421 : * Update parent downlink (if we didn't do it as part of the source or
422 : * destination page update already).
423 : */
424 2 : if (xldata->parentBlk == 2)
425 : {
426 2 : if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
427 : {
428 : SpGistInnerTuple parentTuple;
429 :
430 0 : page = BufferGetPage(buffer);
431 :
432 0 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
433 0 : PageGetItemId(page, xldata->offnumParent));
434 :
435 0 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
436 0 : blknoNew, xldata->offnumNew);
437 :
438 0 : PageSetLSN(page, lsn);
439 0 : MarkBufferDirty(buffer);
440 : }
441 2 : if (BufferIsValid(buffer))
442 2 : UnlockReleaseBuffer(buffer);
443 : }
444 : }
445 202 : }
446 :
447 : static void
448 202 : spgRedoSplitTuple(XLogReaderState *record)
449 : {
450 202 : XLogRecPtr lsn = record->EndRecPtr;
451 202 : char *ptr = XLogRecGetData(record);
452 202 : spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
453 : char *prefixTuple;
454 : SpGistInnerTupleData prefixTupleHdr;
455 : char *postfixTuple;
456 : SpGistInnerTupleData postfixTupleHdr;
457 : Buffer buffer;
458 : Page page;
459 : XLogRedoAction action;
460 :
461 202 : ptr += sizeof(spgxlogSplitTuple);
462 202 : prefixTuple = ptr;
463 : /* the prefix tuple is unaligned, so make a copy to access its header */
464 202 : memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
465 202 : ptr += prefixTupleHdr.size;
466 202 : postfixTuple = ptr;
467 : /* postfix tuple is also unaligned */
468 202 : memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
469 :
470 : /*
471 : * In normal operation we would have both pages locked simultaneously; but
472 : * in WAL replay it should be safe to update them one at a time, as long
473 : * as we do it in the right order.
474 : */
475 :
476 : /* insert postfix tuple first to avoid dangling link */
477 202 : if (!xldata->postfixBlkSame)
478 : {
479 54 : if (xldata->newPage)
480 : {
481 2 : buffer = XLogInitBufferForRedo(record, 1);
482 : /* SplitTuple is not used for nulls pages */
483 2 : SpGistInitBuffer(buffer, 0);
484 2 : action = BLK_NEEDS_REDO;
485 : }
486 : else
487 52 : action = XLogReadBufferForRedo(record, 1, &buffer);
488 54 : if (action == BLK_NEEDS_REDO)
489 : {
490 54 : page = BufferGetPage(buffer);
491 :
492 54 : addOrReplaceTuple(page, postfixTuple, postfixTupleHdr.size, xldata->offnumPostfix);
493 :
494 54 : PageSetLSN(page, lsn);
495 54 : MarkBufferDirty(buffer);
496 : }
497 54 : if (BufferIsValid(buffer))
498 54 : UnlockReleaseBuffer(buffer);
499 : }
500 :
501 : /* now handle the original page */
502 202 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
503 : {
504 200 : page = BufferGetPage(buffer);
505 :
506 200 : PageIndexTupleDelete(page, xldata->offnumPrefix);
507 200 : if (PageAddItem(page, prefixTuple, prefixTupleHdr.size,
508 200 : xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
509 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
510 : prefixTupleHdr.size);
511 :
512 200 : if (xldata->postfixBlkSame)
513 146 : addOrReplaceTuple(page, postfixTuple, postfixTupleHdr.size, xldata->offnumPostfix);
514 :
515 200 : PageSetLSN(page, lsn);
516 200 : MarkBufferDirty(buffer);
517 : }
518 202 : if (BufferIsValid(buffer))
519 202 : UnlockReleaseBuffer(buffer);
520 202 : }
521 :
522 : static void
523 408 : spgRedoPickSplit(XLogReaderState *record)
524 : {
525 408 : XLogRecPtr lsn = record->EndRecPtr;
526 408 : char *ptr = XLogRecGetData(record);
527 408 : spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
528 : char *innerTuple;
529 : SpGistInnerTupleData innerTupleHdr;
530 : SpGistState state;
531 : OffsetNumber *toDelete;
532 : OffsetNumber *toInsert;
533 : uint8 *leafPageSelect;
534 : Buffer srcBuffer;
535 : Buffer destBuffer;
536 : Buffer innerBuffer;
537 : Page srcPage;
538 : Page destPage;
539 : Page page;
540 : int i;
541 : BlockNumber blknoInner;
542 : XLogRedoAction action;
543 :
544 408 : XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
545 :
546 408 : fillFakeState(&state, xldata->stateSrc);
547 :
548 408 : ptr += SizeOfSpgxlogPickSplit;
549 408 : toDelete = (OffsetNumber *) ptr;
550 408 : ptr += sizeof(OffsetNumber) * xldata->nDelete;
551 408 : toInsert = (OffsetNumber *) ptr;
552 408 : ptr += sizeof(OffsetNumber) * xldata->nInsert;
553 408 : leafPageSelect = (uint8 *) ptr;
554 408 : ptr += sizeof(uint8) * xldata->nInsert;
555 :
556 408 : innerTuple = ptr;
557 : /* the inner tuple is unaligned, so make a copy to access its header */
558 408 : memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
559 408 : ptr += innerTupleHdr.size;
560 :
561 : /* now ptr points to the list of leaf tuples */
562 :
563 408 : if (xldata->isRootSplit)
564 : {
565 : /* when splitting root, we touch it only in the guise of new inner */
566 6 : srcBuffer = InvalidBuffer;
567 6 : srcPage = NULL;
568 : }
569 402 : else if (xldata->initSrc)
570 : {
571 : /* just re-init the source page */
572 0 : srcBuffer = XLogInitBufferForRedo(record, 0);
573 0 : srcPage = BufferGetPage(srcBuffer);
574 :
575 0 : SpGistInitBuffer(srcBuffer,
576 0 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
577 : /* don't update LSN etc till we're done with it */
578 : }
579 : else
580 : {
581 : /*
582 : * Delete the specified tuples from source page. (In case we're in
583 : * Hot Standby, we need to hold lock on the page till we're done
584 : * inserting leaf tuples and the new inner tuple, else the added
585 : * redirect tuple will be a dangling link.)
586 : */
587 402 : srcPage = NULL;
588 402 : if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
589 : {
590 402 : srcPage = BufferGetPage(srcBuffer);
591 :
592 : /*
593 : * We have it a bit easier here than in doPickSplit(), because we
594 : * know the inner tuple's location already, so we can inject the
595 : * correct redirection tuple now.
596 : */
597 402 : if (!state.isBuild)
598 402 : spgPageIndexMultiDelete(&state, srcPage,
599 402 : toDelete, xldata->nDelete,
600 : SPGIST_REDIRECT,
601 : SPGIST_PLACEHOLDER,
602 : blknoInner,
603 402 : xldata->offnumInner);
604 : else
605 0 : spgPageIndexMultiDelete(&state, srcPage,
606 0 : toDelete, xldata->nDelete,
607 : SPGIST_PLACEHOLDER,
608 : SPGIST_PLACEHOLDER,
609 : InvalidBlockNumber,
610 : InvalidOffsetNumber);
611 :
612 : /* don't update LSN etc till we're done with it */
613 : }
614 : }
615 :
616 : /* try to access dest page if any */
617 408 : if (!XLogRecHasBlockRef(record, 1))
618 : {
619 0 : destBuffer = InvalidBuffer;
620 0 : destPage = NULL;
621 : }
622 408 : else if (xldata->initDest)
623 : {
624 : /* just re-init the dest page */
625 380 : destBuffer = XLogInitBufferForRedo(record, 1);
626 380 : destPage = BufferGetPage(destBuffer);
627 :
628 380 : SpGistInitBuffer(destBuffer,
629 380 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
630 : /* don't update LSN etc till we're done with it */
631 : }
632 : else
633 : {
634 : /*
635 : * We could probably release the page lock immediately in the
636 : * full-page-image case, but for safety let's hold it till later.
637 : */
638 28 : if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
639 26 : destPage = BufferGetPage(destBuffer);
640 : else
641 2 : destPage = NULL; /* don't do any page updates */
642 : }
643 :
644 : /* restore leaf tuples to src and/or dest page */
645 57200 : for (i = 0; i < xldata->nInsert; i++)
646 : {
647 : char *leafTuple;
648 : SpGistLeafTupleData leafTupleHdr;
649 :
650 : /* the tuples are not aligned, so must copy to access the size field. */
651 56792 : leafTuple = ptr;
652 56792 : memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
653 56792 : ptr += leafTupleHdr.size;
654 :
655 56792 : page = leafPageSelect[i] ? destPage : srcPage;
656 56792 : if (page == NULL)
657 96 : continue; /* no need to touch this page */
658 :
659 56696 : addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, toInsert[i]);
660 : }
661 :
662 : /* Now update src and dest page LSNs if needed */
663 408 : if (srcPage != NULL)
664 : {
665 402 : PageSetLSN(srcPage, lsn);
666 402 : MarkBufferDirty(srcBuffer);
667 : }
668 408 : if (destPage != NULL)
669 : {
670 406 : PageSetLSN(destPage, lsn);
671 406 : MarkBufferDirty(destBuffer);
672 : }
673 :
674 : /* restore new inner tuple */
675 408 : if (xldata->initInner)
676 : {
677 12 : innerBuffer = XLogInitBufferForRedo(record, 2);
678 12 : SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
679 12 : action = BLK_NEEDS_REDO;
680 : }
681 : else
682 396 : action = XLogReadBufferForRedo(record, 2, &innerBuffer);
683 :
684 408 : if (action == BLK_NEEDS_REDO)
685 : {
686 370 : page = BufferGetPage(innerBuffer);
687 :
688 370 : addOrReplaceTuple(page, innerTuple, innerTupleHdr.size, xldata->offnumInner);
689 :
690 : /* if inner is also parent, update link while we're here */
691 370 : if (xldata->innerIsParent)
692 : {
693 : SpGistInnerTuple parent;
694 :
695 338 : parent = (SpGistInnerTuple) PageGetItem(page,
696 338 : PageGetItemId(page, xldata->offnumParent));
697 338 : spgUpdateNodeLink(parent, xldata->nodeI,
698 338 : blknoInner, xldata->offnumInner);
699 : }
700 :
701 370 : PageSetLSN(page, lsn);
702 370 : MarkBufferDirty(innerBuffer);
703 : }
704 408 : if (BufferIsValid(innerBuffer))
705 408 : UnlockReleaseBuffer(innerBuffer);
706 :
707 : /*
708 : * Now we can release the leaf-page locks. It's okay to do this before
709 : * updating the parent downlink.
710 : */
711 408 : if (BufferIsValid(srcBuffer))
712 402 : UnlockReleaseBuffer(srcBuffer);
713 408 : if (BufferIsValid(destBuffer))
714 408 : UnlockReleaseBuffer(destBuffer);
715 :
716 : /* update parent downlink, unless we did it above */
717 408 : if (XLogRecHasBlockRef(record, 3))
718 : {
719 : Buffer parentBuffer;
720 :
721 30 : if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
722 : {
723 : SpGistInnerTuple parent;
724 :
725 24 : page = BufferGetPage(parentBuffer);
726 :
727 24 : parent = (SpGistInnerTuple) PageGetItem(page,
728 24 : PageGetItemId(page, xldata->offnumParent));
729 24 : spgUpdateNodeLink(parent, xldata->nodeI,
730 24 : blknoInner, xldata->offnumInner);
731 :
732 24 : PageSetLSN(page, lsn);
733 24 : MarkBufferDirty(parentBuffer);
734 : }
735 30 : if (BufferIsValid(parentBuffer))
736 30 : UnlockReleaseBuffer(parentBuffer);
737 : }
738 : else
739 : Assert(xldata->innerIsParent || xldata->isRootSplit);
740 408 : }
741 :
742 : static void
743 52 : spgRedoVacuumLeaf(XLogReaderState *record)
744 : {
745 52 : XLogRecPtr lsn = record->EndRecPtr;
746 52 : char *ptr = XLogRecGetData(record);
747 52 : spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
748 : OffsetNumber *toDead;
749 : OffsetNumber *toPlaceholder;
750 : OffsetNumber *moveSrc;
751 : OffsetNumber *moveDest;
752 : OffsetNumber *chainSrc;
753 : OffsetNumber *chainDest;
754 : SpGistState state;
755 : Buffer buffer;
756 : Page page;
757 : int i;
758 :
759 52 : fillFakeState(&state, xldata->stateSrc);
760 :
761 52 : ptr += SizeOfSpgxlogVacuumLeaf;
762 52 : toDead = (OffsetNumber *) ptr;
763 52 : ptr += sizeof(OffsetNumber) * xldata->nDead;
764 52 : toPlaceholder = (OffsetNumber *) ptr;
765 52 : ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
766 52 : moveSrc = (OffsetNumber *) ptr;
767 52 : ptr += sizeof(OffsetNumber) * xldata->nMove;
768 52 : moveDest = (OffsetNumber *) ptr;
769 52 : ptr += sizeof(OffsetNumber) * xldata->nMove;
770 52 : chainSrc = (OffsetNumber *) ptr;
771 52 : ptr += sizeof(OffsetNumber) * xldata->nChain;
772 52 : chainDest = (OffsetNumber *) ptr;
773 :
774 52 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
775 : {
776 36 : page = BufferGetPage(buffer);
777 :
778 36 : spgPageIndexMultiDelete(&state, page,
779 36 : toDead, xldata->nDead,
780 : SPGIST_DEAD, SPGIST_DEAD,
781 : InvalidBlockNumber,
782 : InvalidOffsetNumber);
783 :
784 36 : spgPageIndexMultiDelete(&state, page,
785 36 : toPlaceholder, xldata->nPlaceholder,
786 : SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
787 : InvalidBlockNumber,
788 : InvalidOffsetNumber);
789 :
790 : /* see comments in vacuumLeafPage() */
791 72 : for (i = 0; i < xldata->nMove; i++)
792 : {
793 36 : ItemId idSrc = PageGetItemId(page, moveSrc[i]);
794 36 : ItemId idDest = PageGetItemId(page, moveDest[i]);
795 : ItemIdData tmp;
796 :
797 36 : tmp = *idSrc;
798 36 : *idSrc = *idDest;
799 36 : *idDest = tmp;
800 : }
801 :
802 36 : spgPageIndexMultiDelete(&state, page,
803 36 : moveSrc, xldata->nMove,
804 : SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
805 : InvalidBlockNumber,
806 : InvalidOffsetNumber);
807 :
808 82 : for (i = 0; i < xldata->nChain; i++)
809 : {
810 : SpGistLeafTuple lt;
811 :
812 46 : lt = (SpGistLeafTuple) PageGetItem(page,
813 46 : PageGetItemId(page, chainSrc[i]));
814 : Assert(lt->tupstate == SPGIST_LIVE);
815 46 : SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
816 : }
817 :
818 36 : PageSetLSN(page, lsn);
819 36 : MarkBufferDirty(buffer);
820 : }
821 52 : if (BufferIsValid(buffer))
822 52 : UnlockReleaseBuffer(buffer);
823 52 : }
824 :
825 : static void
826 0 : spgRedoVacuumRoot(XLogReaderState *record)
827 : {
828 0 : XLogRecPtr lsn = record->EndRecPtr;
829 0 : char *ptr = XLogRecGetData(record);
830 0 : spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
831 : OffsetNumber *toDelete;
832 : Buffer buffer;
833 : Page page;
834 :
835 0 : toDelete = xldata->offsets;
836 :
837 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
838 : {
839 0 : page = BufferGetPage(buffer);
840 :
841 : /* The tuple numbers are in order */
842 0 : PageIndexMultiDelete(page, toDelete, xldata->nDelete);
843 :
844 0 : PageSetLSN(page, lsn);
845 0 : MarkBufferDirty(buffer);
846 : }
847 0 : if (BufferIsValid(buffer))
848 0 : UnlockReleaseBuffer(buffer);
849 0 : }
850 :
851 : static void
852 798 : spgRedoVacuumRedirect(XLogReaderState *record)
853 : {
854 798 : XLogRecPtr lsn = record->EndRecPtr;
855 798 : char *ptr = XLogRecGetData(record);
856 798 : spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
857 : OffsetNumber *itemToPlaceholder;
858 : Buffer buffer;
859 :
860 798 : itemToPlaceholder = xldata->offsets;
861 :
862 : /*
863 : * If any redirection tuples are being removed, make sure there are no
864 : * live Hot Standby transactions that might need to see them.
865 : */
866 798 : if (InHotStandby)
867 : {
868 : RelFileLocator locator;
869 :
870 798 : XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
871 798 : ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
872 798 : xldata->isCatalogRel,
873 : locator);
874 : }
875 :
876 798 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
877 : {
878 62 : Page page = BufferGetPage(buffer);
879 62 : SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
880 : int i;
881 :
882 : /* Convert redirect pointers to plain placeholders */
883 68 : for (i = 0; i < xldata->nToPlaceholder; i++)
884 : {
885 : SpGistDeadTuple dt;
886 :
887 6 : dt = (SpGistDeadTuple) PageGetItem(page,
888 6 : PageGetItemId(page, itemToPlaceholder[i]));
889 : Assert(dt->tupstate == SPGIST_REDIRECT);
890 6 : dt->tupstate = SPGIST_PLACEHOLDER;
891 6 : ItemPointerSetInvalid(&dt->pointer);
892 : }
893 :
894 : Assert(opaque->nRedirection >= xldata->nToPlaceholder);
895 62 : opaque->nRedirection -= xldata->nToPlaceholder;
896 62 : opaque->nPlaceholder += xldata->nToPlaceholder;
897 :
898 : /* Remove placeholder tuples at end of page */
899 62 : if (xldata->firstPlaceholder != InvalidOffsetNumber)
900 : {
901 62 : int max = PageGetMaxOffsetNumber(page);
902 : OffsetNumber *toDelete;
903 :
904 62 : toDelete = palloc(sizeof(OffsetNumber) * max);
905 :
906 2622 : for (i = xldata->firstPlaceholder; i <= max; i++)
907 2560 : toDelete[i - xldata->firstPlaceholder] = i;
908 :
909 62 : i = max - xldata->firstPlaceholder + 1;
910 : Assert(opaque->nPlaceholder >= i);
911 62 : opaque->nPlaceholder -= i;
912 :
913 : /* The array is sorted, so can use PageIndexMultiDelete */
914 62 : PageIndexMultiDelete(page, toDelete, i);
915 :
916 62 : pfree(toDelete);
917 : }
918 :
919 62 : PageSetLSN(page, lsn);
920 62 : MarkBufferDirty(buffer);
921 : }
922 798 : if (BufferIsValid(buffer))
923 798 : UnlockReleaseBuffer(buffer);
924 798 : }
925 :
926 : void
927 79750 : spg_redo(XLogReaderState *record)
928 : {
929 79750 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
930 : MemoryContext oldCxt;
931 :
932 79750 : oldCxt = MemoryContextSwitchTo(opCtx);
933 79750 : switch (info)
934 : {
935 77936 : case XLOG_SPGIST_ADD_LEAF:
936 77936 : spgRedoAddLeaf(record);
937 77936 : break;
938 152 : case XLOG_SPGIST_MOVE_LEAFS:
939 152 : spgRedoMoveLeafs(record);
940 152 : break;
941 202 : case XLOG_SPGIST_ADD_NODE:
942 202 : spgRedoAddNode(record);
943 202 : break;
944 202 : case XLOG_SPGIST_SPLIT_TUPLE:
945 202 : spgRedoSplitTuple(record);
946 202 : break;
947 408 : case XLOG_SPGIST_PICKSPLIT:
948 408 : spgRedoPickSplit(record);
949 408 : break;
950 52 : case XLOG_SPGIST_VACUUM_LEAF:
951 52 : spgRedoVacuumLeaf(record);
952 52 : break;
953 0 : case XLOG_SPGIST_VACUUM_ROOT:
954 0 : spgRedoVacuumRoot(record);
955 0 : break;
956 798 : case XLOG_SPGIST_VACUUM_REDIRECT:
957 798 : spgRedoVacuumRedirect(record);
958 798 : break;
959 0 : default:
960 0 : elog(PANIC, "spg_redo: unknown op code %u", info);
961 : }
962 :
963 79750 : MemoryContextSwitchTo(oldCxt);
964 79750 : MemoryContextReset(opCtx);
965 79750 : }
966 :
967 : void
968 412 : spg_xlog_startup(void)
969 : {
970 412 : opCtx = AllocSetContextCreate(CurrentMemoryContext,
971 : "SP-GiST temporary context",
972 : ALLOCSET_DEFAULT_SIZES);
973 412 : }
974 :
975 : void
976 298 : spg_xlog_cleanup(void)
977 : {
978 298 : MemoryContextDelete(opCtx);
979 298 : opCtx = NULL;
980 298 : }
981 :
982 : /*
983 : * Mask a SpGist page before performing consistency checks on it.
984 : */
985 : void
986 160032 : spg_mask(char *pagedata, BlockNumber blkno)
987 : {
988 160032 : Page page = (Page) pagedata;
989 160032 : PageHeader pagehdr = (PageHeader) page;
990 :
991 160032 : mask_page_lsn_and_checksum(page);
992 :
993 160032 : mask_page_hint_bits(page);
994 :
995 : /*
996 : * Mask the unused space, but only if the page's pd_lower appears to have
997 : * been set correctly.
998 : */
999 160032 : if (pagehdr->pd_lower >= SizeOfPageHeaderData)
1000 160032 : mask_unused_space(page);
1001 160032 : }
|