Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * spgxlog.c
4 : * WAL replay logic for SP-GiST
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/spgist/spgxlog.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/bufmask.h"
18 : #include "access/spgist_private.h"
19 : #include "access/spgxlog.h"
20 : #include "access/xlogutils.h"
21 : #include "storage/standby.h"
22 : #include "utils/memutils.h"
23 :
24 :
25 : static MemoryContext opCtx; /* working memory for operations */
26 :
27 :
28 : /*
29 : * Prepare a dummy SpGistState, with just the minimum info needed for replay.
30 : *
31 : * At present, all we need is enough info to support spgFormDeadTuple(),
32 : * plus the isBuild flag.
33 : */
34 : static void
35 828 : fillFakeState(SpGistState *state, spgxlogState stateSrc)
36 : {
37 828 : memset(state, 0, sizeof(*state));
38 :
39 828 : state->redirectXid = stateSrc.redirectXid;
40 828 : state->isBuild = stateSrc.isBuild;
41 828 : state->deadTupleStorage = palloc0(SGDTSIZE);
42 828 : }
43 :
44 : /*
45 : * Add a leaf tuple, or replace an existing placeholder tuple. This is used
46 : * to replay SpGistPageAddNewItem() operations. If the offset points at an
47 : * existing tuple, it had better be a placeholder tuple.
48 : */
49 : static void
50 145196 : addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
51 : {
52 145196 : if (offset <= PageGetMaxOffsetNumber(page))
53 : {
54 36648 : SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
55 : PageGetItemId(page, offset));
56 :
57 36648 : if (dt->tupstate != SPGIST_PLACEHOLDER)
58 0 : elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
59 :
60 : Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
61 36648 : SpGistPageGetOpaque(page)->nPlaceholder--;
62 :
63 36648 : PageIndexTupleDelete(page, offset);
64 : }
65 :
66 : Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
67 :
68 145196 : if (PageAddItem(page, tuple, size, offset, false, false) != offset)
69 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
70 : size);
71 145196 : }
72 :
73 : static void
74 78000 : spgRedoAddLeaf(XLogReaderState *record)
75 : {
76 78000 : XLogRecPtr lsn = record->EndRecPtr;
77 78000 : char *ptr = XLogRecGetData(record);
78 78000 : spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
79 : char *leafTuple;
80 : SpGistLeafTupleData leafTupleHdr;
81 : Buffer buffer;
82 : Page page;
83 : XLogRedoAction action;
84 :
85 78000 : ptr += sizeof(spgxlogAddLeaf);
86 78000 : leafTuple = ptr;
87 : /* the leaf tuple is unaligned, so make a copy to access its header */
88 78000 : memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
89 :
90 : /*
91 : * In normal operation we would have both current and parent pages locked
92 : * simultaneously; but in WAL replay it should be safe to update the leaf
93 : * page before updating the parent.
94 : */
95 78000 : if (xldata->newPage)
96 : {
97 0 : buffer = XLogInitBufferForRedo(record, 0);
98 0 : SpGistInitBuffer(buffer,
99 0 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
100 0 : action = BLK_NEEDS_REDO;
101 : }
102 : else
103 78000 : action = XLogReadBufferForRedo(record, 0, &buffer);
104 :
105 78000 : if (action == BLK_NEEDS_REDO)
106 : {
107 77642 : page = BufferGetPage(buffer);
108 :
109 : /* insert new tuple */
110 77642 : if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
111 : {
112 : /* normal cases, tuple was added by SpGistPageAddNewItem */
113 77642 : addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
114 77642 : xldata->offnumLeaf);
115 :
116 : /* update head tuple's chain link if needed */
117 77642 : if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
118 : {
119 : SpGistLeafTuple head;
120 :
121 76362 : head = (SpGistLeafTuple) PageGetItem(page,
122 76362 : PageGetItemId(page, xldata->offnumHeadLeaf));
123 : Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
124 76362 : SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
125 : }
126 : }
127 : else
128 : {
129 : /* replacing a DEAD tuple */
130 0 : PageIndexTupleDelete(page, xldata->offnumLeaf);
131 0 : if (PageAddItem(page,
132 : (Item) leafTuple, leafTupleHdr.size,
133 0 : xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
134 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
135 : leafTupleHdr.size);
136 : }
137 :
138 77642 : PageSetLSN(page, lsn);
139 77642 : MarkBufferDirty(buffer);
140 : }
141 78000 : if (BufferIsValid(buffer))
142 78000 : UnlockReleaseBuffer(buffer);
143 :
144 : /* update parent downlink if necessary */
145 78000 : if (xldata->offnumParent != InvalidOffsetNumber)
146 : {
147 240 : if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
148 : {
149 : SpGistInnerTuple tuple;
150 : BlockNumber blknoLeaf;
151 :
152 240 : XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
153 :
154 240 : page = BufferGetPage(buffer);
155 :
156 240 : tuple = (SpGistInnerTuple) PageGetItem(page,
157 240 : PageGetItemId(page, xldata->offnumParent));
158 :
159 240 : spgUpdateNodeLink(tuple, xldata->nodeI,
160 240 : blknoLeaf, xldata->offnumLeaf);
161 :
162 240 : PageSetLSN(page, lsn);
163 240 : MarkBufferDirty(buffer);
164 : }
165 240 : if (BufferIsValid(buffer))
166 240 : UnlockReleaseBuffer(buffer);
167 : }
168 78000 : }
169 :
170 : static void
171 152 : spgRedoMoveLeafs(XLogReaderState *record)
172 : {
173 152 : XLogRecPtr lsn = record->EndRecPtr;
174 152 : char *ptr = XLogRecGetData(record);
175 152 : spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
176 : SpGistState state;
177 : OffsetNumber *toDelete;
178 : OffsetNumber *toInsert;
179 : int nInsert;
180 : Buffer buffer;
181 : Page page;
182 : XLogRedoAction action;
183 : BlockNumber blknoDst;
184 :
185 152 : XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
186 :
187 152 : fillFakeState(&state, xldata->stateSrc);
188 :
189 152 : nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
190 :
191 152 : ptr += SizeOfSpgxlogMoveLeafs;
192 152 : toDelete = (OffsetNumber *) ptr;
193 152 : ptr += sizeof(OffsetNumber) * xldata->nMoves;
194 152 : toInsert = (OffsetNumber *) ptr;
195 152 : ptr += sizeof(OffsetNumber) * nInsert;
196 :
197 : /* now ptr points to the list of leaf tuples */
198 :
199 : /*
200 : * In normal operation we would have all three pages (source, dest, and
201 : * parent) locked simultaneously; but in WAL replay it should be safe to
202 : * update them one at a time, as long as we do it in the right order.
203 : */
204 :
205 : /* Insert tuples on the dest page (do first, so redirect is valid) */
206 152 : if (xldata->newPage)
207 : {
208 64 : buffer = XLogInitBufferForRedo(record, 1);
209 64 : SpGistInitBuffer(buffer,
210 64 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
211 64 : action = BLK_NEEDS_REDO;
212 : }
213 : else
214 88 : action = XLogReadBufferForRedo(record, 1, &buffer);
215 :
216 152 : if (action == BLK_NEEDS_REDO)
217 : {
218 : int i;
219 :
220 150 : page = BufferGetPage(buffer);
221 :
222 6638 : for (i = 0; i < nInsert; i++)
223 : {
224 : char *leafTuple;
225 : SpGistLeafTupleData leafTupleHdr;
226 :
227 : /*
228 : * the tuples are not aligned, so must copy to access the size
229 : * field.
230 : */
231 6488 : leafTuple = ptr;
232 6488 : memcpy(&leafTupleHdr, leafTuple,
233 : sizeof(SpGistLeafTupleData));
234 :
235 6488 : addOrReplaceTuple(page, (Item) leafTuple,
236 6488 : leafTupleHdr.size, toInsert[i]);
237 6488 : ptr += leafTupleHdr.size;
238 : }
239 :
240 150 : PageSetLSN(page, lsn);
241 150 : MarkBufferDirty(buffer);
242 : }
243 152 : if (BufferIsValid(buffer))
244 152 : UnlockReleaseBuffer(buffer);
245 :
246 : /* Delete tuples from the source page, inserting a redirection pointer */
247 152 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
248 : {
249 152 : page = BufferGetPage(buffer);
250 :
251 152 : spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
252 152 : state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
253 : SPGIST_PLACEHOLDER,
254 : blknoDst,
255 152 : toInsert[nInsert - 1]);
256 :
257 152 : PageSetLSN(page, lsn);
258 152 : MarkBufferDirty(buffer);
259 : }
260 152 : if (BufferIsValid(buffer))
261 152 : UnlockReleaseBuffer(buffer);
262 :
263 : /* And update the parent downlink */
264 152 : if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
265 : {
266 : SpGistInnerTuple tuple;
267 :
268 148 : page = BufferGetPage(buffer);
269 :
270 148 : tuple = (SpGistInnerTuple) PageGetItem(page,
271 148 : PageGetItemId(page, xldata->offnumParent));
272 :
273 148 : spgUpdateNodeLink(tuple, xldata->nodeI,
274 148 : blknoDst, toInsert[nInsert - 1]);
275 :
276 148 : PageSetLSN(page, lsn);
277 148 : MarkBufferDirty(buffer);
278 : }
279 152 : if (BufferIsValid(buffer))
280 152 : UnlockReleaseBuffer(buffer);
281 152 : }
282 :
283 : static void
284 202 : spgRedoAddNode(XLogReaderState *record)
285 : {
286 202 : XLogRecPtr lsn = record->EndRecPtr;
287 202 : char *ptr = XLogRecGetData(record);
288 202 : spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
289 : char *innerTuple;
290 : SpGistInnerTupleData innerTupleHdr;
291 : SpGistState state;
292 : Buffer buffer;
293 : Page page;
294 : XLogRedoAction action;
295 :
296 202 : ptr += sizeof(spgxlogAddNode);
297 202 : innerTuple = ptr;
298 : /* the tuple is unaligned, so make a copy to access its header */
299 202 : memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
300 :
301 202 : fillFakeState(&state, xldata->stateSrc);
302 :
303 202 : if (!XLogRecHasBlockRef(record, 1))
304 : {
305 : /* update in place */
306 : Assert(xldata->parentBlk == -1);
307 200 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
308 : {
309 200 : page = BufferGetPage(buffer);
310 :
311 200 : PageIndexTupleDelete(page, xldata->offnum);
312 200 : if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
313 : xldata->offnum,
314 200 : false, false) != xldata->offnum)
315 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
316 : innerTupleHdr.size);
317 :
318 200 : PageSetLSN(page, lsn);
319 200 : MarkBufferDirty(buffer);
320 : }
321 200 : if (BufferIsValid(buffer))
322 200 : UnlockReleaseBuffer(buffer);
323 : }
324 : else
325 : {
326 : BlockNumber blkno;
327 : BlockNumber blknoNew;
328 :
329 2 : XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
330 2 : XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
331 :
332 : /*
333 : * In normal operation we would have all three pages (source, dest,
334 : * and parent) locked simultaneously; but in WAL replay it should be
335 : * safe to update them one at a time, as long as we do it in the right
336 : * order. We must insert the new tuple before replacing the old tuple
337 : * with the redirect tuple.
338 : */
339 :
340 : /* Install new tuple first so redirect is valid */
341 2 : if (xldata->newPage)
342 : {
343 : /* AddNode is not used for nulls pages */
344 2 : buffer = XLogInitBufferForRedo(record, 1);
345 2 : SpGistInitBuffer(buffer, 0);
346 2 : action = BLK_NEEDS_REDO;
347 : }
348 : else
349 0 : action = XLogReadBufferForRedo(record, 1, &buffer);
350 2 : if (action == BLK_NEEDS_REDO)
351 : {
352 2 : page = BufferGetPage(buffer);
353 :
354 2 : addOrReplaceTuple(page, (Item) innerTuple,
355 2 : innerTupleHdr.size, xldata->offnumNew);
356 :
357 : /*
358 : * If parent is in this same page, update it now.
359 : */
360 2 : if (xldata->parentBlk == 1)
361 : {
362 : SpGistInnerTuple parentTuple;
363 :
364 0 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
365 0 : PageGetItemId(page, xldata->offnumParent));
366 :
367 0 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
368 0 : blknoNew, xldata->offnumNew);
369 : }
370 2 : PageSetLSN(page, lsn);
371 2 : MarkBufferDirty(buffer);
372 : }
373 2 : if (BufferIsValid(buffer))
374 2 : UnlockReleaseBuffer(buffer);
375 :
376 : /* Delete old tuple, replacing it with redirect or placeholder tuple */
377 2 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
378 : {
379 : SpGistDeadTuple dt;
380 :
381 2 : page = BufferGetPage(buffer);
382 :
383 2 : if (state.isBuild)
384 0 : dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
385 : InvalidBlockNumber,
386 : InvalidOffsetNumber);
387 : else
388 2 : dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
389 : blknoNew,
390 2 : xldata->offnumNew);
391 :
392 2 : PageIndexTupleDelete(page, xldata->offnum);
393 2 : if (PageAddItem(page, (Item) dt, dt->size,
394 : xldata->offnum,
395 2 : false, false) != xldata->offnum)
396 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
397 : dt->size);
398 :
399 2 : if (state.isBuild)
400 0 : SpGistPageGetOpaque(page)->nPlaceholder++;
401 : else
402 2 : SpGistPageGetOpaque(page)->nRedirection++;
403 :
404 : /*
405 : * If parent is in this same page, update it now.
406 : */
407 2 : if (xldata->parentBlk == 0)
408 : {
409 : SpGistInnerTuple parentTuple;
410 :
411 0 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
412 0 : PageGetItemId(page, xldata->offnumParent));
413 :
414 0 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
415 0 : blknoNew, xldata->offnumNew);
416 : }
417 2 : PageSetLSN(page, lsn);
418 2 : MarkBufferDirty(buffer);
419 : }
420 2 : if (BufferIsValid(buffer))
421 2 : UnlockReleaseBuffer(buffer);
422 :
423 : /*
424 : * Update parent downlink (if we didn't do it as part of the source or
425 : * destination page update already).
426 : */
427 2 : if (xldata->parentBlk == 2)
428 : {
429 2 : if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
430 : {
431 : SpGistInnerTuple parentTuple;
432 :
433 2 : page = BufferGetPage(buffer);
434 :
435 2 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
436 2 : PageGetItemId(page, xldata->offnumParent));
437 :
438 2 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
439 2 : blknoNew, xldata->offnumNew);
440 :
441 2 : PageSetLSN(page, lsn);
442 2 : MarkBufferDirty(buffer);
443 : }
444 2 : if (BufferIsValid(buffer))
445 2 : UnlockReleaseBuffer(buffer);
446 : }
447 : }
448 202 : }
449 :
450 : static void
451 202 : spgRedoSplitTuple(XLogReaderState *record)
452 : {
453 202 : XLogRecPtr lsn = record->EndRecPtr;
454 202 : char *ptr = XLogRecGetData(record);
455 202 : spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
456 : char *prefixTuple;
457 : SpGistInnerTupleData prefixTupleHdr;
458 : char *postfixTuple;
459 : SpGistInnerTupleData postfixTupleHdr;
460 : Buffer buffer;
461 : Page page;
462 : XLogRedoAction action;
463 :
464 202 : ptr += sizeof(spgxlogSplitTuple);
465 202 : prefixTuple = ptr;
466 : /* the prefix tuple is unaligned, so make a copy to access its header */
467 202 : memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
468 202 : ptr += prefixTupleHdr.size;
469 202 : postfixTuple = ptr;
470 : /* postfix tuple is also unaligned */
471 202 : memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
472 :
473 : /*
474 : * In normal operation we would have both pages locked simultaneously; but
475 : * in WAL replay it should be safe to update them one at a time, as long
476 : * as we do it in the right order.
477 : */
478 :
479 : /* insert postfix tuple first to avoid dangling link */
480 202 : if (!xldata->postfixBlkSame)
481 : {
482 54 : if (xldata->newPage)
483 : {
484 2 : buffer = XLogInitBufferForRedo(record, 1);
485 : /* SplitTuple is not used for nulls pages */
486 2 : SpGistInitBuffer(buffer, 0);
487 2 : action = BLK_NEEDS_REDO;
488 : }
489 : else
490 52 : action = XLogReadBufferForRedo(record, 1, &buffer);
491 54 : if (action == BLK_NEEDS_REDO)
492 : {
493 54 : page = BufferGetPage(buffer);
494 :
495 54 : addOrReplaceTuple(page, (Item) postfixTuple,
496 54 : postfixTupleHdr.size, xldata->offnumPostfix);
497 :
498 54 : PageSetLSN(page, lsn);
499 54 : MarkBufferDirty(buffer);
500 : }
501 54 : if (BufferIsValid(buffer))
502 54 : UnlockReleaseBuffer(buffer);
503 : }
504 :
505 : /* now handle the original page */
506 202 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
507 : {
508 202 : page = BufferGetPage(buffer);
509 :
510 202 : PageIndexTupleDelete(page, xldata->offnumPrefix);
511 202 : if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
512 202 : xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
513 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
514 : prefixTupleHdr.size);
515 :
516 202 : if (xldata->postfixBlkSame)
517 148 : addOrReplaceTuple(page, (Item) postfixTuple,
518 148 : postfixTupleHdr.size,
519 148 : xldata->offnumPostfix);
520 :
521 202 : PageSetLSN(page, lsn);
522 202 : MarkBufferDirty(buffer);
523 : }
524 202 : if (BufferIsValid(buffer))
525 202 : UnlockReleaseBuffer(buffer);
526 202 : }
527 :
528 : static void
529 434 : spgRedoPickSplit(XLogReaderState *record)
530 : {
531 434 : XLogRecPtr lsn = record->EndRecPtr;
532 434 : char *ptr = XLogRecGetData(record);
533 434 : spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
534 : char *innerTuple;
535 : SpGistInnerTupleData innerTupleHdr;
536 : SpGistState state;
537 : OffsetNumber *toDelete;
538 : OffsetNumber *toInsert;
539 : uint8 *leafPageSelect;
540 : Buffer srcBuffer;
541 : Buffer destBuffer;
542 : Buffer innerBuffer;
543 : Page srcPage;
544 : Page destPage;
545 : Page page;
546 : int i;
547 : BlockNumber blknoInner;
548 : XLogRedoAction action;
549 :
550 434 : XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
551 :
552 434 : fillFakeState(&state, xldata->stateSrc);
553 :
554 434 : ptr += SizeOfSpgxlogPickSplit;
555 434 : toDelete = (OffsetNumber *) ptr;
556 434 : ptr += sizeof(OffsetNumber) * xldata->nDelete;
557 434 : toInsert = (OffsetNumber *) ptr;
558 434 : ptr += sizeof(OffsetNumber) * xldata->nInsert;
559 434 : leafPageSelect = (uint8 *) ptr;
560 434 : ptr += sizeof(uint8) * xldata->nInsert;
561 :
562 434 : innerTuple = ptr;
563 : /* the inner tuple is unaligned, so make a copy to access its header */
564 434 : memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
565 434 : ptr += innerTupleHdr.size;
566 :
567 : /* now ptr points to the list of leaf tuples */
568 :
569 434 : if (xldata->isRootSplit)
570 : {
571 : /* when splitting root, we touch it only in the guise of new inner */
572 6 : srcBuffer = InvalidBuffer;
573 6 : srcPage = NULL;
574 : }
575 428 : else if (xldata->initSrc)
576 : {
577 : /* just re-init the source page */
578 0 : srcBuffer = XLogInitBufferForRedo(record, 0);
579 0 : srcPage = (Page) BufferGetPage(srcBuffer);
580 :
581 0 : SpGistInitBuffer(srcBuffer,
582 0 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
583 : /* don't update LSN etc till we're done with it */
584 : }
585 : else
586 : {
587 : /*
588 : * Delete the specified tuples from source page. (In case we're in
589 : * Hot Standby, we need to hold lock on the page till we're done
590 : * inserting leaf tuples and the new inner tuple, else the added
591 : * redirect tuple will be a dangling link.)
592 : */
593 428 : srcPage = NULL;
594 428 : if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
595 : {
596 424 : srcPage = BufferGetPage(srcBuffer);
597 :
598 : /*
599 : * We have it a bit easier here than in doPickSplit(), because we
600 : * know the inner tuple's location already, so we can inject the
601 : * correct redirection tuple now.
602 : */
603 424 : if (!state.isBuild)
604 424 : spgPageIndexMultiDelete(&state, srcPage,
605 424 : toDelete, xldata->nDelete,
606 : SPGIST_REDIRECT,
607 : SPGIST_PLACEHOLDER,
608 : blknoInner,
609 424 : xldata->offnumInner);
610 : else
611 0 : spgPageIndexMultiDelete(&state, srcPage,
612 0 : toDelete, xldata->nDelete,
613 : SPGIST_PLACEHOLDER,
614 : SPGIST_PLACEHOLDER,
615 : InvalidBlockNumber,
616 : InvalidOffsetNumber);
617 :
618 : /* don't update LSN etc till we're done with it */
619 : }
620 : }
621 :
622 : /* try to access dest page if any */
623 434 : if (!XLogRecHasBlockRef(record, 1))
624 : {
625 0 : destBuffer = InvalidBuffer;
626 0 : destPage = NULL;
627 : }
628 434 : else if (xldata->initDest)
629 : {
630 : /* just re-init the dest page */
631 404 : destBuffer = XLogInitBufferForRedo(record, 1);
632 404 : destPage = (Page) BufferGetPage(destBuffer);
633 :
634 404 : SpGistInitBuffer(destBuffer,
635 404 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
636 : /* don't update LSN etc till we're done with it */
637 : }
638 : else
639 : {
640 : /*
641 : * We could probably release the page lock immediately in the
642 : * full-page-image case, but for safety let's hold it till later.
643 : */
644 30 : if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
645 26 : destPage = (Page) BufferGetPage(destBuffer);
646 : else
647 4 : destPage = NULL; /* don't do any page updates */
648 : }
649 :
650 : /* restore leaf tuples to src and/or dest page */
651 61208 : for (i = 0; i < xldata->nInsert; i++)
652 : {
653 : char *leafTuple;
654 : SpGistLeafTupleData leafTupleHdr;
655 :
656 : /* the tuples are not aligned, so must copy to access the size field. */
657 60774 : leafTuple = ptr;
658 60774 : memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
659 60774 : ptr += leafTupleHdr.size;
660 :
661 60774 : page = leafPageSelect[i] ? destPage : srcPage;
662 60774 : if (page == NULL)
663 314 : continue; /* no need to touch this page */
664 :
665 60460 : addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
666 60460 : toInsert[i]);
667 : }
668 :
669 : /* Now update src and dest page LSNs if needed */
670 434 : if (srcPage != NULL)
671 : {
672 424 : PageSetLSN(srcPage, lsn);
673 424 : MarkBufferDirty(srcBuffer);
674 : }
675 434 : if (destPage != NULL)
676 : {
677 430 : PageSetLSN(destPage, lsn);
678 430 : MarkBufferDirty(destBuffer);
679 : }
680 :
681 : /* restore new inner tuple */
682 434 : if (xldata->initInner)
683 : {
684 12 : innerBuffer = XLogInitBufferForRedo(record, 2);
685 12 : SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
686 12 : action = BLK_NEEDS_REDO;
687 : }
688 : else
689 422 : action = XLogReadBufferForRedo(record, 2, &innerBuffer);
690 :
691 434 : if (action == BLK_NEEDS_REDO)
692 : {
693 402 : page = BufferGetPage(innerBuffer);
694 :
695 402 : addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
696 402 : xldata->offnumInner);
697 :
698 : /* if inner is also parent, update link while we're here */
699 402 : if (xldata->innerIsParent)
700 : {
701 : SpGistInnerTuple parent;
702 :
703 364 : parent = (SpGistInnerTuple) PageGetItem(page,
704 364 : PageGetItemId(page, xldata->offnumParent));
705 364 : spgUpdateNodeLink(parent, xldata->nodeI,
706 364 : blknoInner, xldata->offnumInner);
707 : }
708 :
709 402 : PageSetLSN(page, lsn);
710 402 : MarkBufferDirty(innerBuffer);
711 : }
712 434 : if (BufferIsValid(innerBuffer))
713 434 : UnlockReleaseBuffer(innerBuffer);
714 :
715 : /*
716 : * Now we can release the leaf-page locks. It's okay to do this before
717 : * updating the parent downlink.
718 : */
719 434 : if (BufferIsValid(srcBuffer))
720 428 : UnlockReleaseBuffer(srcBuffer);
721 434 : if (BufferIsValid(destBuffer))
722 434 : UnlockReleaseBuffer(destBuffer);
723 :
724 : /* update parent downlink, unless we did it above */
725 434 : if (XLogRecHasBlockRef(record, 3))
726 : {
727 : Buffer parentBuffer;
728 :
729 32 : if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
730 : {
731 : SpGistInnerTuple parent;
732 :
733 26 : page = BufferGetPage(parentBuffer);
734 :
735 26 : parent = (SpGistInnerTuple) PageGetItem(page,
736 26 : PageGetItemId(page, xldata->offnumParent));
737 26 : spgUpdateNodeLink(parent, xldata->nodeI,
738 26 : blknoInner, xldata->offnumInner);
739 :
740 26 : PageSetLSN(page, lsn);
741 26 : MarkBufferDirty(parentBuffer);
742 : }
743 32 : if (BufferIsValid(parentBuffer))
744 32 : UnlockReleaseBuffer(parentBuffer);
745 : }
746 : else
747 : Assert(xldata->innerIsParent || xldata->isRootSplit);
748 434 : }
749 :
750 : static void
751 40 : spgRedoVacuumLeaf(XLogReaderState *record)
752 : {
753 40 : XLogRecPtr lsn = record->EndRecPtr;
754 40 : char *ptr = XLogRecGetData(record);
755 40 : spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
756 : OffsetNumber *toDead;
757 : OffsetNumber *toPlaceholder;
758 : OffsetNumber *moveSrc;
759 : OffsetNumber *moveDest;
760 : OffsetNumber *chainSrc;
761 : OffsetNumber *chainDest;
762 : SpGistState state;
763 : Buffer buffer;
764 : Page page;
765 : int i;
766 :
767 40 : fillFakeState(&state, xldata->stateSrc);
768 :
769 40 : ptr += SizeOfSpgxlogVacuumLeaf;
770 40 : toDead = (OffsetNumber *) ptr;
771 40 : ptr += sizeof(OffsetNumber) * xldata->nDead;
772 40 : toPlaceholder = (OffsetNumber *) ptr;
773 40 : ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
774 40 : moveSrc = (OffsetNumber *) ptr;
775 40 : ptr += sizeof(OffsetNumber) * xldata->nMove;
776 40 : moveDest = (OffsetNumber *) ptr;
777 40 : ptr += sizeof(OffsetNumber) * xldata->nMove;
778 40 : chainSrc = (OffsetNumber *) ptr;
779 40 : ptr += sizeof(OffsetNumber) * xldata->nChain;
780 40 : chainDest = (OffsetNumber *) ptr;
781 :
782 40 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
783 : {
784 14 : page = BufferGetPage(buffer);
785 :
786 14 : spgPageIndexMultiDelete(&state, page,
787 14 : toDead, xldata->nDead,
788 : SPGIST_DEAD, SPGIST_DEAD,
789 : InvalidBlockNumber,
790 : InvalidOffsetNumber);
791 :
792 14 : spgPageIndexMultiDelete(&state, page,
793 14 : toPlaceholder, xldata->nPlaceholder,
794 : SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
795 : InvalidBlockNumber,
796 : InvalidOffsetNumber);
797 :
798 : /* see comments in vacuumLeafPage() */
799 28 : for (i = 0; i < xldata->nMove; i++)
800 : {
801 14 : ItemId idSrc = PageGetItemId(page, moveSrc[i]);
802 14 : ItemId idDest = PageGetItemId(page, moveDest[i]);
803 : ItemIdData tmp;
804 :
805 14 : tmp = *idSrc;
806 14 : *idSrc = *idDest;
807 14 : *idDest = tmp;
808 : }
809 :
810 14 : spgPageIndexMultiDelete(&state, page,
811 14 : moveSrc, xldata->nMove,
812 : SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
813 : InvalidBlockNumber,
814 : InvalidOffsetNumber);
815 :
816 28 : for (i = 0; i < xldata->nChain; i++)
817 : {
818 : SpGistLeafTuple lt;
819 :
820 14 : lt = (SpGistLeafTuple) PageGetItem(page,
821 14 : PageGetItemId(page, chainSrc[i]));
822 : Assert(lt->tupstate == SPGIST_LIVE);
823 14 : SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
824 : }
825 :
826 14 : PageSetLSN(page, lsn);
827 14 : MarkBufferDirty(buffer);
828 : }
829 40 : if (BufferIsValid(buffer))
830 40 : UnlockReleaseBuffer(buffer);
831 40 : }
832 :
833 : static void
834 0 : spgRedoVacuumRoot(XLogReaderState *record)
835 : {
836 0 : XLogRecPtr lsn = record->EndRecPtr;
837 0 : char *ptr = XLogRecGetData(record);
838 0 : spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
839 : OffsetNumber *toDelete;
840 : Buffer buffer;
841 : Page page;
842 :
843 0 : toDelete = xldata->offsets;
844 :
845 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
846 : {
847 0 : page = BufferGetPage(buffer);
848 :
849 : /* The tuple numbers are in order */
850 0 : PageIndexMultiDelete(page, toDelete, xldata->nDelete);
851 :
852 0 : PageSetLSN(page, lsn);
853 0 : MarkBufferDirty(buffer);
854 : }
855 0 : if (BufferIsValid(buffer))
856 0 : UnlockReleaseBuffer(buffer);
857 0 : }
858 :
859 : static void
860 736 : spgRedoVacuumRedirect(XLogReaderState *record)
861 : {
862 736 : XLogRecPtr lsn = record->EndRecPtr;
863 736 : char *ptr = XLogRecGetData(record);
864 736 : spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
865 : OffsetNumber *itemToPlaceholder;
866 : Buffer buffer;
867 :
868 736 : itemToPlaceholder = xldata->offsets;
869 :
870 : /*
871 : * If any redirection tuples are being removed, make sure there are no
872 : * live Hot Standby transactions that might need to see them.
873 : */
874 736 : if (InHotStandby)
875 : {
876 : RelFileLocator locator;
877 :
878 736 : XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
879 736 : ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
880 736 : xldata->isCatalogRel,
881 : locator);
882 : }
883 :
884 736 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
885 : {
886 60 : Page page = BufferGetPage(buffer);
887 60 : SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
888 : int i;
889 :
890 : /* Convert redirect pointers to plain placeholders */
891 84 : for (i = 0; i < xldata->nToPlaceholder; i++)
892 : {
893 : SpGistDeadTuple dt;
894 :
895 24 : dt = (SpGistDeadTuple) PageGetItem(page,
896 24 : PageGetItemId(page, itemToPlaceholder[i]));
897 : Assert(dt->tupstate == SPGIST_REDIRECT);
898 24 : dt->tupstate = SPGIST_PLACEHOLDER;
899 24 : ItemPointerSetInvalid(&dt->pointer);
900 : }
901 :
902 : Assert(opaque->nRedirection >= xldata->nToPlaceholder);
903 60 : opaque->nRedirection -= xldata->nToPlaceholder;
904 60 : opaque->nPlaceholder += xldata->nToPlaceholder;
905 :
906 : /* Remove placeholder tuples at end of page */
907 60 : if (xldata->firstPlaceholder != InvalidOffsetNumber)
908 : {
909 60 : int max = PageGetMaxOffsetNumber(page);
910 : OffsetNumber *toDelete;
911 :
912 60 : toDelete = palloc(sizeof(OffsetNumber) * max);
913 :
914 4056 : for (i = xldata->firstPlaceholder; i <= max; i++)
915 3996 : toDelete[i - xldata->firstPlaceholder] = i;
916 :
917 60 : i = max - xldata->firstPlaceholder + 1;
918 : Assert(opaque->nPlaceholder >= i);
919 60 : opaque->nPlaceholder -= i;
920 :
921 : /* The array is sorted, so can use PageIndexMultiDelete */
922 60 : PageIndexMultiDelete(page, toDelete, i);
923 :
924 60 : pfree(toDelete);
925 : }
926 :
927 60 : PageSetLSN(page, lsn);
928 60 : MarkBufferDirty(buffer);
929 : }
930 736 : if (BufferIsValid(buffer))
931 736 : UnlockReleaseBuffer(buffer);
932 736 : }
933 :
934 : void
935 79766 : spg_redo(XLogReaderState *record)
936 : {
937 79766 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
938 : MemoryContext oldCxt;
939 :
940 79766 : oldCxt = MemoryContextSwitchTo(opCtx);
941 79766 : switch (info)
942 : {
943 78000 : case XLOG_SPGIST_ADD_LEAF:
944 78000 : spgRedoAddLeaf(record);
945 78000 : break;
946 152 : case XLOG_SPGIST_MOVE_LEAFS:
947 152 : spgRedoMoveLeafs(record);
948 152 : break;
949 202 : case XLOG_SPGIST_ADD_NODE:
950 202 : spgRedoAddNode(record);
951 202 : break;
952 202 : case XLOG_SPGIST_SPLIT_TUPLE:
953 202 : spgRedoSplitTuple(record);
954 202 : break;
955 434 : case XLOG_SPGIST_PICKSPLIT:
956 434 : spgRedoPickSplit(record);
957 434 : break;
958 40 : case XLOG_SPGIST_VACUUM_LEAF:
959 40 : spgRedoVacuumLeaf(record);
960 40 : break;
961 0 : case XLOG_SPGIST_VACUUM_ROOT:
962 0 : spgRedoVacuumRoot(record);
963 0 : break;
964 736 : case XLOG_SPGIST_VACUUM_REDIRECT:
965 736 : spgRedoVacuumRedirect(record);
966 736 : break;
967 0 : default:
968 0 : elog(PANIC, "spg_redo: unknown op code %u", info);
969 : }
970 :
971 79766 : MemoryContextSwitchTo(oldCxt);
972 79766 : MemoryContextReset(opCtx);
973 79766 : }
974 :
975 : void
976 392 : spg_xlog_startup(void)
977 : {
978 392 : opCtx = AllocSetContextCreate(CurrentMemoryContext,
979 : "SP-GiST temporary context",
980 : ALLOCSET_DEFAULT_SIZES);
981 392 : }
982 :
983 : void
984 288 : spg_xlog_cleanup(void)
985 : {
986 288 : MemoryContextDelete(opCtx);
987 288 : opCtx = NULL;
988 288 : }
989 :
990 : /*
991 : * Mask a SpGist page before performing consistency checks on it.
992 : */
993 : void
994 160300 : spg_mask(char *pagedata, BlockNumber blkno)
995 : {
996 160300 : Page page = (Page) pagedata;
997 160300 : PageHeader pagehdr = (PageHeader) page;
998 :
999 160300 : mask_page_lsn_and_checksum(page);
1000 :
1001 160300 : mask_page_hint_bits(page);
1002 :
1003 : /*
1004 : * Mask the unused space, but only if the page's pd_lower appears to have
1005 : * been set correctly.
1006 : */
1007 160300 : if (pagehdr->pd_lower >= SizeOfPageHeaderData)
1008 160300 : mask_unused_space(page);
1009 160300 : }
|