Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * spgxlog.c
4 : * WAL replay logic for SP-GiST
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/spgist/spgxlog.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/bufmask.h"
18 : #include "access/spgist_private.h"
19 : #include "access/spgxlog.h"
20 : #include "access/xlogutils.h"
21 : #include "storage/standby.h"
22 : #include "utils/memutils.h"
23 :
24 :
25 : static MemoryContext opCtx; /* working memory for operations */
26 :
27 :
28 : /*
29 : * Prepare a dummy SpGistState, with just the minimum info needed for replay.
30 : *
31 : * At present, all we need is enough info to support spgFormDeadTuple(),
32 : * plus the isBuild flag.
33 : */
34 : static void
35 525 : fillFakeState(SpGistState *state, spgxlogState stateSrc)
36 : {
37 525 : memset(state, 0, sizeof(*state));
38 :
39 525 : state->redirectXid = stateSrc.redirectXid;
40 525 : state->isBuild = stateSrc.isBuild;
41 525 : state->deadTupleStorage = palloc0(SGDTSIZE);
42 525 : }
43 :
44 : /*
45 : * Add a leaf tuple, or replace an existing placeholder tuple. This is used
46 : * to replay SpGistPageAddNewItem() operations. If the offset points at an
47 : * existing tuple, it had better be a placeholder tuple.
48 : */
49 : static void
50 70570 : addOrReplaceTuple(Page page, const void *tuple, int size, OffsetNumber offset)
51 : {
52 70570 : if (offset <= PageGetMaxOffsetNumber(page))
53 : {
54 19076 : SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
55 19076 : PageGetItemId(page, offset));
56 :
57 19076 : if (dt->tupstate != SPGIST_PLACEHOLDER)
58 0 : elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
59 :
60 : Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
61 19076 : SpGistPageGetOpaque(page)->nPlaceholder--;
62 :
63 19076 : PageIndexTupleDelete(page, offset);
64 : }
65 :
66 : Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
67 :
68 70570 : if (PageAddItem(page, tuple, size, offset, false, false) != offset)
69 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
70 : size);
71 70570 : }
72 :
73 : static void
74 38969 : spgRedoAddLeaf(XLogReaderState *record)
75 : {
76 38969 : XLogRecPtr lsn = record->EndRecPtr;
77 38969 : char *ptr = XLogRecGetData(record);
78 38969 : spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
79 : char *leafTuple;
80 : SpGistLeafTupleData leafTupleHdr;
81 : Buffer buffer;
82 : Page page;
83 : XLogRedoAction action;
84 :
85 38969 : ptr += sizeof(spgxlogAddLeaf);
86 38969 : leafTuple = ptr;
87 : /* the leaf tuple is unaligned, so make a copy to access its header */
88 38969 : memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
89 :
90 : /*
91 : * In normal operation we would have both current and parent pages locked
92 : * simultaneously; but in WAL replay it should be safe to update the leaf
93 : * page before updating the parent.
94 : */
95 38969 : if (xldata->newPage)
96 : {
97 1 : buffer = XLogInitBufferForRedo(record, 0);
98 1 : SpGistInitBuffer(buffer,
99 1 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
100 1 : action = BLK_NEEDS_REDO;
101 : }
102 : else
103 38968 : action = XLogReadBufferForRedo(record, 0, &buffer);
104 :
105 38969 : if (action == BLK_NEEDS_REDO)
106 : {
107 38787 : page = BufferGetPage(buffer);
108 :
109 : /* insert new tuple */
110 38787 : if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
111 : {
112 : /* normal cases, tuple was added by SpGistPageAddNewItem */
113 38787 : addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, xldata->offnumLeaf);
114 :
115 : /* update head tuple's chain link if needed */
116 38787 : if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
117 : {
118 : SpGistLeafTuple head;
119 :
120 38146 : head = (SpGistLeafTuple) PageGetItem(page,
121 38146 : PageGetItemId(page, xldata->offnumHeadLeaf));
122 : Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
123 38146 : SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
124 : }
125 : }
126 : else
127 : {
128 : /* replacing a DEAD tuple */
129 0 : PageIndexTupleDelete(page, xldata->offnumLeaf);
130 0 : if (PageAddItem(page,
131 : leafTuple, leafTupleHdr.size,
132 0 : xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
133 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
134 : leafTupleHdr.size);
135 : }
136 :
137 38787 : PageSetLSN(page, lsn);
138 38787 : MarkBufferDirty(buffer);
139 : }
140 38969 : if (BufferIsValid(buffer))
141 38969 : UnlockReleaseBuffer(buffer);
142 :
143 : /* update parent downlink if necessary */
144 38969 : if (xldata->offnumParent != InvalidOffsetNumber)
145 : {
146 120 : if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
147 : {
148 : SpGistInnerTuple tuple;
149 : BlockNumber blknoLeaf;
150 :
151 120 : XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
152 :
153 120 : page = BufferGetPage(buffer);
154 :
155 120 : tuple = (SpGistInnerTuple) PageGetItem(page,
156 120 : PageGetItemId(page, xldata->offnumParent));
157 :
158 120 : spgUpdateNodeLink(tuple, xldata->nodeI,
159 120 : blknoLeaf, xldata->offnumLeaf);
160 :
161 120 : PageSetLSN(page, lsn);
162 120 : MarkBufferDirty(buffer);
163 : }
164 120 : if (BufferIsValid(buffer))
165 120 : UnlockReleaseBuffer(buffer);
166 : }
167 38969 : }
168 :
169 : static void
170 76 : spgRedoMoveLeafs(XLogReaderState *record)
171 : {
172 76 : XLogRecPtr lsn = record->EndRecPtr;
173 76 : char *ptr = XLogRecGetData(record);
174 76 : spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
175 : SpGistState state;
176 : OffsetNumber *toDelete;
177 : OffsetNumber *toInsert;
178 : int nInsert;
179 : Buffer buffer;
180 : Page page;
181 : XLogRedoAction action;
182 : BlockNumber blknoDst;
183 :
184 76 : XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
185 :
186 76 : fillFakeState(&state, xldata->stateSrc);
187 :
188 76 : nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
189 :
190 76 : ptr += SizeOfSpgxlogMoveLeafs;
191 76 : toDelete = (OffsetNumber *) ptr;
192 76 : ptr += sizeof(OffsetNumber) * xldata->nMoves;
193 76 : toInsert = (OffsetNumber *) ptr;
194 76 : ptr += sizeof(OffsetNumber) * nInsert;
195 :
196 : /* now ptr points to the list of leaf tuples */
197 :
198 : /*
199 : * In normal operation we would have all three pages (source, dest, and
200 : * parent) locked simultaneously; but in WAL replay it should be safe to
201 : * update them one at a time, as long as we do it in the right order.
202 : */
203 :
204 : /* Insert tuples on the dest page (do first, so redirect is valid) */
205 76 : if (xldata->newPage)
206 : {
207 33 : buffer = XLogInitBufferForRedo(record, 1);
208 33 : SpGistInitBuffer(buffer,
209 33 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
210 33 : action = BLK_NEEDS_REDO;
211 : }
212 : else
213 43 : action = XLogReadBufferForRedo(record, 1, &buffer);
214 :
215 76 : if (action == BLK_NEEDS_REDO)
216 : {
217 : int i;
218 :
219 74 : page = BufferGetPage(buffer);
220 :
221 3284 : for (i = 0; i < nInsert; i++)
222 : {
223 : char *leafTuple;
224 : SpGistLeafTupleData leafTupleHdr;
225 :
226 : /*
227 : * the tuples are not aligned, so must copy to access the size
228 : * field.
229 : */
230 3210 : leafTuple = ptr;
231 3210 : memcpy(&leafTupleHdr, leafTuple,
232 : sizeof(SpGistLeafTupleData));
233 :
234 3210 : addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, toInsert[i]);
235 3210 : ptr += leafTupleHdr.size;
236 : }
237 :
238 74 : PageSetLSN(page, lsn);
239 74 : MarkBufferDirty(buffer);
240 : }
241 76 : if (BufferIsValid(buffer))
242 76 : UnlockReleaseBuffer(buffer);
243 :
244 : /* Delete tuples from the source page, inserting a redirection pointer */
245 76 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
246 : {
247 76 : page = BufferGetPage(buffer);
248 :
249 76 : spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
250 76 : state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
251 : SPGIST_PLACEHOLDER,
252 : blknoDst,
253 76 : toInsert[nInsert - 1]);
254 :
255 76 : PageSetLSN(page, lsn);
256 76 : MarkBufferDirty(buffer);
257 : }
258 76 : if (BufferIsValid(buffer))
259 76 : UnlockReleaseBuffer(buffer);
260 :
261 : /* And update the parent downlink */
262 76 : if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
263 : {
264 : SpGistInnerTuple tuple;
265 :
266 74 : page = BufferGetPage(buffer);
267 :
268 74 : tuple = (SpGistInnerTuple) PageGetItem(page,
269 74 : PageGetItemId(page, xldata->offnumParent));
270 :
271 74 : spgUpdateNodeLink(tuple, xldata->nodeI,
272 74 : blknoDst, toInsert[nInsert - 1]);
273 :
274 74 : PageSetLSN(page, lsn);
275 74 : MarkBufferDirty(buffer);
276 : }
277 76 : if (BufferIsValid(buffer))
278 76 : UnlockReleaseBuffer(buffer);
279 76 : }
280 :
281 : static void
282 101 : spgRedoAddNode(XLogReaderState *record)
283 : {
284 101 : XLogRecPtr lsn = record->EndRecPtr;
285 101 : char *ptr = XLogRecGetData(record);
286 101 : spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
287 : char *innerTuple;
288 : SpGistInnerTupleData innerTupleHdr;
289 : SpGistState state;
290 : Buffer buffer;
291 : Page page;
292 : XLogRedoAction action;
293 :
294 101 : ptr += sizeof(spgxlogAddNode);
295 101 : innerTuple = ptr;
296 : /* the tuple is unaligned, so make a copy to access its header */
297 101 : memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
298 :
299 101 : fillFakeState(&state, xldata->stateSrc);
300 :
301 101 : if (!XLogRecHasBlockRef(record, 1))
302 : {
303 : /* update in place */
304 : Assert(xldata->parentBlk == -1);
305 100 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
306 : {
307 100 : page = BufferGetPage(buffer);
308 :
309 100 : PageIndexTupleDelete(page, xldata->offnum);
310 100 : if (PageAddItem(page, innerTuple, innerTupleHdr.size,
311 : xldata->offnum,
312 100 : false, false) != xldata->offnum)
313 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
314 : innerTupleHdr.size);
315 :
316 100 : PageSetLSN(page, lsn);
317 100 : MarkBufferDirty(buffer);
318 : }
319 200 : if (BufferIsValid(buffer))
320 100 : UnlockReleaseBuffer(buffer);
321 : }
322 : else
323 : {
324 : BlockNumber blkno;
325 : BlockNumber blknoNew;
326 :
327 1 : XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
328 1 : XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
329 :
330 : /*
331 : * In normal operation we would have all three pages (source, dest,
332 : * and parent) locked simultaneously; but in WAL replay it should be
333 : * safe to update them one at a time, as long as we do it in the right
334 : * order. We must insert the new tuple before replacing the old tuple
335 : * with the redirect tuple.
336 : */
337 :
338 : /* Install new tuple first so redirect is valid */
339 1 : if (xldata->newPage)
340 : {
341 : /* AddNode is not used for nulls pages */
342 1 : buffer = XLogInitBufferForRedo(record, 1);
343 1 : SpGistInitBuffer(buffer, 0);
344 1 : action = BLK_NEEDS_REDO;
345 : }
346 : else
347 0 : action = XLogReadBufferForRedo(record, 1, &buffer);
348 1 : if (action == BLK_NEEDS_REDO)
349 : {
350 1 : page = BufferGetPage(buffer);
351 :
352 1 : addOrReplaceTuple(page, innerTuple, innerTupleHdr.size, xldata->offnumNew);
353 :
354 : /*
355 : * If parent is in this same page, update it now.
356 : */
357 1 : if (xldata->parentBlk == 1)
358 : {
359 : SpGistInnerTuple parentTuple;
360 :
361 0 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
362 0 : PageGetItemId(page, xldata->offnumParent));
363 :
364 0 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
365 0 : blknoNew, xldata->offnumNew);
366 : }
367 1 : PageSetLSN(page, lsn);
368 1 : MarkBufferDirty(buffer);
369 : }
370 1 : if (BufferIsValid(buffer))
371 1 : UnlockReleaseBuffer(buffer);
372 :
373 : /* Delete old tuple, replacing it with redirect or placeholder tuple */
374 1 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
375 : {
376 : SpGistDeadTuple dt;
377 :
378 1 : page = BufferGetPage(buffer);
379 :
380 1 : if (state.isBuild)
381 0 : dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
382 : InvalidBlockNumber,
383 : InvalidOffsetNumber);
384 : else
385 1 : dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
386 : blknoNew,
387 1 : xldata->offnumNew);
388 :
389 1 : PageIndexTupleDelete(page, xldata->offnum);
390 1 : if (PageAddItem(page, dt, dt->size,
391 : xldata->offnum,
392 1 : false, false) != xldata->offnum)
393 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
394 : dt->size);
395 :
396 1 : if (state.isBuild)
397 0 : SpGistPageGetOpaque(page)->nPlaceholder++;
398 : else
399 1 : SpGistPageGetOpaque(page)->nRedirection++;
400 :
401 : /*
402 : * If parent is in this same page, update it now.
403 : */
404 1 : if (xldata->parentBlk == 0)
405 : {
406 : SpGistInnerTuple parentTuple;
407 :
408 0 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
409 0 : PageGetItemId(page, xldata->offnumParent));
410 :
411 0 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
412 0 : blknoNew, xldata->offnumNew);
413 : }
414 1 : PageSetLSN(page, lsn);
415 1 : MarkBufferDirty(buffer);
416 : }
417 1 : if (BufferIsValid(buffer))
418 1 : UnlockReleaseBuffer(buffer);
419 :
420 : /*
421 : * Update parent downlink (if we didn't do it as part of the source or
422 : * destination page update already).
423 : */
424 1 : if (xldata->parentBlk == 2)
425 : {
426 1 : if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
427 : {
428 : SpGistInnerTuple parentTuple;
429 :
430 1 : page = BufferGetPage(buffer);
431 :
432 1 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
433 1 : PageGetItemId(page, xldata->offnumParent));
434 :
435 1 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
436 1 : blknoNew, xldata->offnumNew);
437 :
438 1 : PageSetLSN(page, lsn);
439 1 : MarkBufferDirty(buffer);
440 : }
441 1 : if (BufferIsValid(buffer))
442 1 : UnlockReleaseBuffer(buffer);
443 : }
444 : }
445 101 : }
446 :
447 : static void
448 101 : spgRedoSplitTuple(XLogReaderState *record)
449 : {
450 101 : XLogRecPtr lsn = record->EndRecPtr;
451 101 : char *ptr = XLogRecGetData(record);
452 101 : spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
453 : char *prefixTuple;
454 : SpGistInnerTupleData prefixTupleHdr;
455 : char *postfixTuple;
456 : SpGistInnerTupleData postfixTupleHdr;
457 : Buffer buffer;
458 : Page page;
459 : XLogRedoAction action;
460 :
461 101 : ptr += sizeof(spgxlogSplitTuple);
462 101 : prefixTuple = ptr;
463 : /* the prefix tuple is unaligned, so make a copy to access its header */
464 101 : memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
465 101 : ptr += prefixTupleHdr.size;
466 101 : postfixTuple = ptr;
467 : /* postfix tuple is also unaligned */
468 101 : memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
469 :
470 : /*
471 : * In normal operation we would have both pages locked simultaneously; but
472 : * in WAL replay it should be safe to update them one at a time, as long
473 : * as we do it in the right order.
474 : */
475 :
476 : /* insert postfix tuple first to avoid dangling link */
477 101 : if (!xldata->postfixBlkSame)
478 : {
479 27 : if (xldata->newPage)
480 : {
481 1 : buffer = XLogInitBufferForRedo(record, 1);
482 : /* SplitTuple is not used for nulls pages */
483 1 : SpGistInitBuffer(buffer, 0);
484 1 : action = BLK_NEEDS_REDO;
485 : }
486 : else
487 26 : action = XLogReadBufferForRedo(record, 1, &buffer);
488 27 : if (action == BLK_NEEDS_REDO)
489 : {
490 27 : page = BufferGetPage(buffer);
491 :
492 27 : addOrReplaceTuple(page, postfixTuple, postfixTupleHdr.size, xldata->offnumPostfix);
493 :
494 27 : PageSetLSN(page, lsn);
495 27 : MarkBufferDirty(buffer);
496 : }
497 27 : if (BufferIsValid(buffer))
498 27 : UnlockReleaseBuffer(buffer);
499 : }
500 :
501 : /* now handle the original page */
502 101 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
503 : {
504 100 : page = BufferGetPage(buffer);
505 :
506 100 : PageIndexTupleDelete(page, xldata->offnumPrefix);
507 100 : if (PageAddItem(page, prefixTuple, prefixTupleHdr.size,
508 100 : xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
509 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
510 : prefixTupleHdr.size);
511 :
512 100 : if (xldata->postfixBlkSame)
513 74 : addOrReplaceTuple(page, postfixTuple, postfixTupleHdr.size, xldata->offnumPostfix);
514 :
515 100 : PageSetLSN(page, lsn);
516 100 : MarkBufferDirty(buffer);
517 : }
518 101 : if (BufferIsValid(buffer))
519 101 : UnlockReleaseBuffer(buffer);
520 101 : }
521 :
522 : static void
523 203 : spgRedoPickSplit(XLogReaderState *record)
524 : {
525 203 : XLogRecPtr lsn = record->EndRecPtr;
526 203 : char *ptr = XLogRecGetData(record);
527 203 : spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
528 : char *innerTuple;
529 : SpGistInnerTupleData innerTupleHdr;
530 : SpGistState state;
531 : OffsetNumber *toDelete;
532 : OffsetNumber *toInsert;
533 : uint8 *leafPageSelect;
534 : Buffer srcBuffer;
535 : Buffer destBuffer;
536 : Buffer innerBuffer;
537 : Page srcPage;
538 : Page destPage;
539 : Page page;
540 : int i;
541 : BlockNumber blknoInner;
542 : XLogRedoAction action;
543 :
544 203 : XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
545 :
546 203 : fillFakeState(&state, xldata->stateSrc);
547 :
548 203 : ptr += SizeOfSpgxlogPickSplit;
549 203 : toDelete = (OffsetNumber *) ptr;
550 203 : ptr += sizeof(OffsetNumber) * xldata->nDelete;
551 203 : toInsert = (OffsetNumber *) ptr;
552 203 : ptr += sizeof(OffsetNumber) * xldata->nInsert;
553 203 : leafPageSelect = (uint8 *) ptr;
554 203 : ptr += sizeof(uint8) * xldata->nInsert;
555 :
556 203 : innerTuple = ptr;
557 : /* the inner tuple is unaligned, so make a copy to access its header */
558 203 : memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
559 203 : ptr += innerTupleHdr.size;
560 :
561 : /* now ptr points to the list of leaf tuples */
562 :
563 203 : if (xldata->isRootSplit)
564 : {
565 : /* when splitting root, we touch it only in the guise of new inner */
566 3 : srcBuffer = InvalidBuffer;
567 3 : srcPage = NULL;
568 : }
569 200 : else if (xldata->initSrc)
570 : {
571 : /* just re-init the source page */
572 0 : srcBuffer = XLogInitBufferForRedo(record, 0);
573 0 : srcPage = BufferGetPage(srcBuffer);
574 :
575 0 : SpGistInitBuffer(srcBuffer,
576 0 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
577 : /* don't update LSN etc till we're done with it */
578 : }
579 : else
580 : {
581 : /*
582 : * Delete the specified tuples from source page. (In case we're in
583 : * Hot Standby, we need to hold lock on the page till we're done
584 : * inserting leaf tuples and the new inner tuple, else the added
585 : * redirect tuple will be a dangling link.)
586 : */
587 200 : srcPage = NULL;
588 200 : if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
589 : {
590 200 : srcPage = BufferGetPage(srcBuffer);
591 :
592 : /*
593 : * We have it a bit easier here than in doPickSplit(), because we
594 : * know the inner tuple's location already, so we can inject the
595 : * correct redirection tuple now.
596 : */
597 200 : if (!state.isBuild)
598 200 : spgPageIndexMultiDelete(&state, srcPage,
599 200 : toDelete, xldata->nDelete,
600 : SPGIST_REDIRECT,
601 : SPGIST_PLACEHOLDER,
602 : blknoInner,
603 200 : xldata->offnumInner);
604 : else
605 0 : spgPageIndexMultiDelete(&state, srcPage,
606 0 : toDelete, xldata->nDelete,
607 : SPGIST_PLACEHOLDER,
608 : SPGIST_PLACEHOLDER,
609 : InvalidBlockNumber,
610 : InvalidOffsetNumber);
611 :
612 : /* don't update LSN etc till we're done with it */
613 : }
614 : }
615 :
616 : /* try to access dest page if any */
617 203 : if (!XLogRecHasBlockRef(record, 1))
618 : {
619 0 : destBuffer = InvalidBuffer;
620 0 : destPage = NULL;
621 : }
622 203 : else if (xldata->initDest)
623 : {
624 : /* just re-init the dest page */
625 188 : destBuffer = XLogInitBufferForRedo(record, 1);
626 188 : destPage = BufferGetPage(destBuffer);
627 :
628 188 : SpGistInitBuffer(destBuffer,
629 188 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
630 : /* don't update LSN etc till we're done with it */
631 : }
632 : else
633 : {
634 : /*
635 : * We could probably release the page lock immediately in the
636 : * full-page-image case, but for safety let's hold it till later.
637 : */
638 15 : if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
639 15 : destPage = BufferGetPage(destBuffer);
640 : else
641 0 : destPage = NULL; /* don't do any page updates */
642 : }
643 :
644 : /* restore leaf tuples to src and/or dest page */
645 28484 : for (i = 0; i < xldata->nInsert; i++)
646 : {
647 : char *leafTuple;
648 : SpGistLeafTupleData leafTupleHdr;
649 :
650 : /* the tuples are not aligned, so must copy to access the size field. */
651 28281 : leafTuple = ptr;
652 28281 : memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
653 28281 : ptr += leafTupleHdr.size;
654 :
655 28281 : page = leafPageSelect[i] ? destPage : srcPage;
656 28281 : if (page == NULL)
657 0 : continue; /* no need to touch this page */
658 :
659 28281 : addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, toInsert[i]);
660 : }
661 :
662 : /* Now update src and dest page LSNs if needed */
663 203 : if (srcPage != NULL)
664 : {
665 200 : PageSetLSN(srcPage, lsn);
666 200 : MarkBufferDirty(srcBuffer);
667 : }
668 203 : if (destPage != NULL)
669 : {
670 203 : PageSetLSN(destPage, lsn);
671 203 : MarkBufferDirty(destBuffer);
672 : }
673 :
674 : /* restore new inner tuple */
675 203 : if (xldata->initInner)
676 : {
677 6 : innerBuffer = XLogInitBufferForRedo(record, 2);
678 6 : SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
679 6 : action = BLK_NEEDS_REDO;
680 : }
681 : else
682 197 : action = XLogReadBufferForRedo(record, 2, &innerBuffer);
683 :
684 203 : if (action == BLK_NEEDS_REDO)
685 : {
686 190 : page = BufferGetPage(innerBuffer);
687 :
688 190 : addOrReplaceTuple(page, innerTuple, innerTupleHdr.size, xldata->offnumInner);
689 :
690 : /* if inner is also parent, update link while we're here */
691 190 : if (xldata->innerIsParent)
692 : {
693 : SpGistInnerTuple parent;
694 :
695 172 : parent = (SpGistInnerTuple) PageGetItem(page,
696 172 : PageGetItemId(page, xldata->offnumParent));
697 172 : spgUpdateNodeLink(parent, xldata->nodeI,
698 172 : blknoInner, xldata->offnumInner);
699 : }
700 :
701 190 : PageSetLSN(page, lsn);
702 190 : MarkBufferDirty(innerBuffer);
703 : }
704 203 : if (BufferIsValid(innerBuffer))
705 203 : UnlockReleaseBuffer(innerBuffer);
706 :
707 : /*
708 : * Now we can release the leaf-page locks. It's okay to do this before
709 : * updating the parent downlink.
710 : */
711 203 : if (BufferIsValid(srcBuffer))
712 200 : UnlockReleaseBuffer(srcBuffer);
713 203 : if (BufferIsValid(destBuffer))
714 203 : UnlockReleaseBuffer(destBuffer);
715 :
716 : /* update parent downlink, unless we did it above */
717 203 : if (XLogRecHasBlockRef(record, 3))
718 : {
719 : Buffer parentBuffer;
720 :
721 15 : if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
722 : {
723 : SpGistInnerTuple parent;
724 :
725 14 : page = BufferGetPage(parentBuffer);
726 :
727 14 : parent = (SpGistInnerTuple) PageGetItem(page,
728 14 : PageGetItemId(page, xldata->offnumParent));
729 14 : spgUpdateNodeLink(parent, xldata->nodeI,
730 14 : blknoInner, xldata->offnumInner);
731 :
732 14 : PageSetLSN(page, lsn);
733 14 : MarkBufferDirty(parentBuffer);
734 : }
735 15 : if (BufferIsValid(parentBuffer))
736 15 : UnlockReleaseBuffer(parentBuffer);
737 : }
738 : else
739 : Assert(xldata->innerIsParent || xldata->isRootSplit);
740 203 : }
741 :
742 : static void
743 145 : spgRedoVacuumLeaf(XLogReaderState *record)
744 : {
745 145 : XLogRecPtr lsn = record->EndRecPtr;
746 145 : char *ptr = XLogRecGetData(record);
747 145 : spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
748 : OffsetNumber *toDead;
749 : OffsetNumber *toPlaceholder;
750 : OffsetNumber *moveSrc;
751 : OffsetNumber *moveDest;
752 : OffsetNumber *chainSrc;
753 : OffsetNumber *chainDest;
754 : SpGistState state;
755 : Buffer buffer;
756 : Page page;
757 : int i;
758 :
759 145 : fillFakeState(&state, xldata->stateSrc);
760 :
761 145 : ptr += SizeOfSpgxlogVacuumLeaf;
762 145 : toDead = (OffsetNumber *) ptr;
763 145 : ptr += sizeof(OffsetNumber) * xldata->nDead;
764 145 : toPlaceholder = (OffsetNumber *) ptr;
765 145 : ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
766 145 : moveSrc = (OffsetNumber *) ptr;
767 145 : ptr += sizeof(OffsetNumber) * xldata->nMove;
768 145 : moveDest = (OffsetNumber *) ptr;
769 145 : ptr += sizeof(OffsetNumber) * xldata->nMove;
770 145 : chainSrc = (OffsetNumber *) ptr;
771 145 : ptr += sizeof(OffsetNumber) * xldata->nChain;
772 145 : chainDest = (OffsetNumber *) ptr;
773 :
774 145 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
775 : {
776 9 : page = BufferGetPage(buffer);
777 :
778 9 : spgPageIndexMultiDelete(&state, page,
779 9 : toDead, xldata->nDead,
780 : SPGIST_DEAD, SPGIST_DEAD,
781 : InvalidBlockNumber,
782 : InvalidOffsetNumber);
783 :
784 9 : spgPageIndexMultiDelete(&state, page,
785 9 : toPlaceholder, xldata->nPlaceholder,
786 : SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
787 : InvalidBlockNumber,
788 : InvalidOffsetNumber);
789 :
790 : /* see comments in vacuumLeafPage() */
791 17 : for (i = 0; i < xldata->nMove; i++)
792 : {
793 8 : ItemId idSrc = PageGetItemId(page, moveSrc[i]);
794 8 : ItemId idDest = PageGetItemId(page, moveDest[i]);
795 : ItemIdData tmp;
796 :
797 8 : tmp = *idSrc;
798 8 : *idSrc = *idDest;
799 8 : *idDest = tmp;
800 : }
801 :
802 9 : spgPageIndexMultiDelete(&state, page,
803 9 : moveSrc, xldata->nMove,
804 : SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
805 : InvalidBlockNumber,
806 : InvalidOffsetNumber);
807 :
808 21 : for (i = 0; i < xldata->nChain; i++)
809 : {
810 : SpGistLeafTuple lt;
811 :
812 12 : lt = (SpGistLeafTuple) PageGetItem(page,
813 12 : PageGetItemId(page, chainSrc[i]));
814 : Assert(lt->tupstate == SPGIST_LIVE);
815 12 : SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
816 : }
817 :
818 9 : PageSetLSN(page, lsn);
819 9 : MarkBufferDirty(buffer);
820 : }
821 145 : if (BufferIsValid(buffer))
822 145 : UnlockReleaseBuffer(buffer);
823 145 : }
824 :
825 : static void
826 0 : spgRedoVacuumRoot(XLogReaderState *record)
827 : {
828 0 : XLogRecPtr lsn = record->EndRecPtr;
829 0 : char *ptr = XLogRecGetData(record);
830 0 : spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
831 : OffsetNumber *toDelete;
832 : Buffer buffer;
833 : Page page;
834 :
835 0 : toDelete = xldata->offsets;
836 :
837 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
838 : {
839 0 : page = BufferGetPage(buffer);
840 :
841 : /* The tuple numbers are in order */
842 0 : PageIndexMultiDelete(page, toDelete, xldata->nDelete);
843 :
844 0 : PageSetLSN(page, lsn);
845 0 : MarkBufferDirty(buffer);
846 : }
847 0 : if (BufferIsValid(buffer))
848 0 : UnlockReleaseBuffer(buffer);
849 0 : }
850 :
851 : static void
852 585 : spgRedoVacuumRedirect(XLogReaderState *record)
853 : {
854 585 : XLogRecPtr lsn = record->EndRecPtr;
855 585 : char *ptr = XLogRecGetData(record);
856 585 : spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
857 : OffsetNumber *itemToPlaceholder;
858 : Buffer buffer;
859 :
860 585 : itemToPlaceholder = xldata->offsets;
861 :
862 : /*
863 : * If any redirection tuples are being removed, make sure there are no
864 : * live Hot Standby transactions that might need to see them.
865 : */
866 585 : if (InHotStandby)
867 : {
868 : RelFileLocator locator;
869 :
870 585 : XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
871 585 : ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
872 585 : xldata->isCatalogRel,
873 : locator);
874 : }
875 :
876 585 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
877 : {
878 89 : Page page = BufferGetPage(buffer);
879 89 : SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
880 : int i;
881 :
882 : /* Convert redirect pointers to plain placeholders */
883 92 : for (i = 0; i < xldata->nToPlaceholder; i++)
884 : {
885 : SpGistDeadTuple dt;
886 :
887 3 : dt = (SpGistDeadTuple) PageGetItem(page,
888 3 : PageGetItemId(page, itemToPlaceholder[i]));
889 : Assert(dt->tupstate == SPGIST_REDIRECT);
890 3 : dt->tupstate = SPGIST_PLACEHOLDER;
891 3 : ItemPointerSetInvalid(&dt->pointer);
892 : }
893 :
894 : Assert(opaque->nRedirection >= xldata->nToPlaceholder);
895 89 : opaque->nRedirection -= xldata->nToPlaceholder;
896 89 : opaque->nPlaceholder += xldata->nToPlaceholder;
897 :
898 : /* Remove placeholder tuples at end of page */
899 89 : if (xldata->firstPlaceholder != InvalidOffsetNumber)
900 : {
901 89 : int max = PageGetMaxOffsetNumber(page);
902 : OffsetNumber *toDelete;
903 :
904 89 : toDelete = palloc_array(OffsetNumber, max);
905 :
906 1247 : for (i = xldata->firstPlaceholder; i <= max; i++)
907 1158 : toDelete[i - xldata->firstPlaceholder] = i;
908 :
909 89 : i = max - xldata->firstPlaceholder + 1;
910 : Assert(opaque->nPlaceholder >= i);
911 89 : opaque->nPlaceholder -= i;
912 :
913 : /* The array is sorted, so can use PageIndexMultiDelete */
914 89 : PageIndexMultiDelete(page, toDelete, i);
915 :
916 89 : pfree(toDelete);
917 : }
918 :
919 89 : PageSetLSN(page, lsn);
920 89 : MarkBufferDirty(buffer);
921 : }
922 585 : if (BufferIsValid(buffer))
923 585 : UnlockReleaseBuffer(buffer);
924 585 : }
925 :
926 : void
927 40180 : spg_redo(XLogReaderState *record)
928 : {
929 40180 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
930 : MemoryContext oldCxt;
931 :
932 40180 : oldCxt = MemoryContextSwitchTo(opCtx);
933 40180 : switch (info)
934 : {
935 38969 : case XLOG_SPGIST_ADD_LEAF:
936 38969 : spgRedoAddLeaf(record);
937 38969 : break;
938 76 : case XLOG_SPGIST_MOVE_LEAFS:
939 76 : spgRedoMoveLeafs(record);
940 76 : break;
941 101 : case XLOG_SPGIST_ADD_NODE:
942 101 : spgRedoAddNode(record);
943 101 : break;
944 101 : case XLOG_SPGIST_SPLIT_TUPLE:
945 101 : spgRedoSplitTuple(record);
946 101 : break;
947 203 : case XLOG_SPGIST_PICKSPLIT:
948 203 : spgRedoPickSplit(record);
949 203 : break;
950 145 : case XLOG_SPGIST_VACUUM_LEAF:
951 145 : spgRedoVacuumLeaf(record);
952 145 : break;
953 0 : case XLOG_SPGIST_VACUUM_ROOT:
954 0 : spgRedoVacuumRoot(record);
955 0 : break;
956 585 : case XLOG_SPGIST_VACUUM_REDIRECT:
957 585 : spgRedoVacuumRedirect(record);
958 585 : break;
959 0 : default:
960 0 : elog(PANIC, "spg_redo: unknown op code %u", info);
961 : }
962 :
963 40180 : MemoryContextSwitchTo(oldCxt);
964 40180 : MemoryContextReset(opCtx);
965 40180 : }
966 :
967 : void
968 214 : spg_xlog_startup(void)
969 : {
970 214 : opCtx = AllocSetContextCreate(CurrentMemoryContext,
971 : "SP-GiST temporary context",
972 : ALLOCSET_DEFAULT_SIZES);
973 214 : }
974 :
975 : void
976 154 : spg_xlog_cleanup(void)
977 : {
978 154 : MemoryContextDelete(opCtx);
979 154 : opCtx = NULL;
980 154 : }
981 :
982 : /*
983 : * Mask a SpGist page before performing consistency checks on it.
984 : */
985 : void
986 80132 : spg_mask(char *pagedata, BlockNumber blkno)
987 : {
988 80132 : Page page = (Page) pagedata;
989 80132 : PageHeader pagehdr = (PageHeader) page;
990 :
991 80132 : mask_page_lsn_and_checksum(page);
992 :
993 80132 : mask_page_hint_bits(page);
994 :
995 : /*
996 : * Mask the unused space, but only if the page's pd_lower appears to have
997 : * been set correctly.
998 : */
999 80132 : if (pagehdr->pd_lower >= SizeOfPageHeaderData)
1000 80132 : mask_unused_space(page);
1001 80132 : }
|