LCOV - code coverage report
Current view: top level - src/backend/access/spgist - spgxlog.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 384 441 87.1 %
Date: 2025-10-31 04:18:36 Functions: 13 14 92.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * spgxlog.c
       4             :  *    WAL replay logic for SP-GiST
       5             :  *
       6             :  *
       7             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *           src/backend/access/spgist/spgxlog.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/bufmask.h"
      18             : #include "access/spgist_private.h"
      19             : #include "access/spgxlog.h"
      20             : #include "access/xlogutils.h"
      21             : #include "storage/standby.h"
      22             : #include "utils/memutils.h"
      23             : 
      24             : 
      25             : static MemoryContext opCtx;     /* working memory for operations */
      26             : 
      27             : 
      28             : /*
      29             :  * Prepare a dummy SpGistState, with just the minimum info needed for replay.
      30             :  *
      31             :  * At present, all we need is enough info to support spgFormDeadTuple(),
      32             :  * plus the isBuild flag.
      33             :  */
      34             : static void
      35        1090 : fillFakeState(SpGistState *state, spgxlogState stateSrc)
      36             : {
      37        1090 :     memset(state, 0, sizeof(*state));
      38             : 
      39        1090 :     state->redirectXid = stateSrc.redirectXid;
      40        1090 :     state->isBuild = stateSrc.isBuild;
      41        1090 :     state->deadTupleStorage = palloc0(SGDTSIZE);
      42        1090 : }
      43             : 
      44             : /*
      45             :  * Add a leaf tuple, or replace an existing placeholder tuple.  This is used
      46             :  * to replay SpGistPageAddNewItem() operations.  If the offset points at an
      47             :  * existing tuple, it had better be a placeholder tuple.
      48             :  */
      49             : static void
      50      145192 : addOrReplaceTuple(Page page, const void *tuple, int size, OffsetNumber offset)
      51             : {
      52      145192 :     if (offset <= PageGetMaxOffsetNumber(page))
      53             :     {
      54       36620 :         SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
      55       36620 :                                                            PageGetItemId(page, offset));
      56             : 
      57       36620 :         if (dt->tupstate != SPGIST_PLACEHOLDER)
      58           0 :             elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
      59             : 
      60             :         Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
      61       36620 :         SpGistPageGetOpaque(page)->nPlaceholder--;
      62             : 
      63       36620 :         PageIndexTupleDelete(page, offset);
      64             :     }
      65             : 
      66             :     Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
      67             : 
      68      145192 :     if (PageAddItem(page, tuple, size, offset, false, false) != offset)
      69           0 :         elog(ERROR, "failed to add item of size %u to SPGiST index page",
      70             :              size);
      71      145192 : }
      72             : 
      73             : static void
      74       78002 : spgRedoAddLeaf(XLogReaderState *record)
      75             : {
      76       78002 :     XLogRecPtr  lsn = record->EndRecPtr;
      77       78002 :     char       *ptr = XLogRecGetData(record);
      78       78002 :     spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
      79             :     char       *leafTuple;
      80             :     SpGistLeafTupleData leafTupleHdr;
      81             :     Buffer      buffer;
      82             :     Page        page;
      83             :     XLogRedoAction action;
      84             : 
      85       78002 :     ptr += sizeof(spgxlogAddLeaf);
      86       78002 :     leafTuple = ptr;
      87             :     /* the leaf tuple is unaligned, so make a copy to access its header */
      88       78002 :     memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
      89             : 
      90             :     /*
      91             :      * In normal operation we would have both current and parent pages locked
      92             :      * simultaneously; but in WAL replay it should be safe to update the leaf
      93             :      * page before updating the parent.
      94             :      */
      95       78002 :     if (xldata->newPage)
      96             :     {
      97           0 :         buffer = XLogInitBufferForRedo(record, 0);
      98           0 :         SpGistInitBuffer(buffer,
      99           0 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     100           0 :         action = BLK_NEEDS_REDO;
     101             :     }
     102             :     else
     103       78002 :         action = XLogReadBufferForRedo(record, 0, &buffer);
     104             : 
     105       78002 :     if (action == BLK_NEEDS_REDO)
     106             :     {
     107       77644 :         page = BufferGetPage(buffer);
     108             : 
     109             :         /* insert new tuple */
     110       77644 :         if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
     111             :         {
     112             :             /* normal cases, tuple was added by SpGistPageAddNewItem */
     113       77644 :             addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, xldata->offnumLeaf);
     114             : 
     115             :             /* update head tuple's chain link if needed */
     116       77644 :             if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
     117             :             {
     118             :                 SpGistLeafTuple head;
     119             : 
     120       76362 :                 head = (SpGistLeafTuple) PageGetItem(page,
     121       76362 :                                                      PageGetItemId(page, xldata->offnumHeadLeaf));
     122             :                 Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
     123       76362 :                 SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
     124             :             }
     125             :         }
     126             :         else
     127             :         {
     128             :             /* replacing a DEAD tuple */
     129           0 :             PageIndexTupleDelete(page, xldata->offnumLeaf);
     130           0 :             if (PageAddItem(page,
     131             :                             leafTuple, leafTupleHdr.size,
     132           0 :                             xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
     133           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     134             :                      leafTupleHdr.size);
     135             :         }
     136             : 
     137       77644 :         PageSetLSN(page, lsn);
     138       77644 :         MarkBufferDirty(buffer);
     139             :     }
     140       78002 :     if (BufferIsValid(buffer))
     141       78002 :         UnlockReleaseBuffer(buffer);
     142             : 
     143             :     /* update parent downlink if necessary */
     144       78002 :     if (xldata->offnumParent != InvalidOffsetNumber)
     145             :     {
     146         240 :         if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
     147             :         {
     148             :             SpGistInnerTuple tuple;
     149             :             BlockNumber blknoLeaf;
     150             : 
     151         240 :             XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
     152             : 
     153         240 :             page = BufferGetPage(buffer);
     154             : 
     155         240 :             tuple = (SpGistInnerTuple) PageGetItem(page,
     156         240 :                                                    PageGetItemId(page, xldata->offnumParent));
     157             : 
     158         240 :             spgUpdateNodeLink(tuple, xldata->nodeI,
     159         240 :                               blknoLeaf, xldata->offnumLeaf);
     160             : 
     161         240 :             PageSetLSN(page, lsn);
     162         240 :             MarkBufferDirty(buffer);
     163             :         }
     164         240 :         if (BufferIsValid(buffer))
     165         240 :             UnlockReleaseBuffer(buffer);
     166             :     }
     167       78002 : }
     168             : 
     169             : static void
     170         150 : spgRedoMoveLeafs(XLogReaderState *record)
     171             : {
     172         150 :     XLogRecPtr  lsn = record->EndRecPtr;
     173         150 :     char       *ptr = XLogRecGetData(record);
     174         150 :     spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
     175             :     SpGistState state;
     176             :     OffsetNumber *toDelete;
     177             :     OffsetNumber *toInsert;
     178             :     int         nInsert;
     179             :     Buffer      buffer;
     180             :     Page        page;
     181             :     XLogRedoAction action;
     182             :     BlockNumber blknoDst;
     183             : 
     184         150 :     XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
     185             : 
     186         150 :     fillFakeState(&state, xldata->stateSrc);
     187             : 
     188         150 :     nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
     189             : 
     190         150 :     ptr += SizeOfSpgxlogMoveLeafs;
     191         150 :     toDelete = (OffsetNumber *) ptr;
     192         150 :     ptr += sizeof(OffsetNumber) * xldata->nMoves;
     193         150 :     toInsert = (OffsetNumber *) ptr;
     194         150 :     ptr += sizeof(OffsetNumber) * nInsert;
     195             : 
     196             :     /* now ptr points to the list of leaf tuples */
     197             : 
     198             :     /*
     199             :      * In normal operation we would have all three pages (source, dest, and
     200             :      * parent) locked simultaneously; but in WAL replay it should be safe to
     201             :      * update them one at a time, as long as we do it in the right order.
     202             :      */
     203             : 
     204             :     /* Insert tuples on the dest page (do first, so redirect is valid) */
     205         150 :     if (xldata->newPage)
     206             :     {
     207          64 :         buffer = XLogInitBufferForRedo(record, 1);
     208          64 :         SpGistInitBuffer(buffer,
     209          64 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     210          64 :         action = BLK_NEEDS_REDO;
     211             :     }
     212             :     else
     213          86 :         action = XLogReadBufferForRedo(record, 1, &buffer);
     214             : 
     215         150 :     if (action == BLK_NEEDS_REDO)
     216             :     {
     217             :         int         i;
     218             : 
     219         148 :         page = BufferGetPage(buffer);
     220             : 
     221        6530 :         for (i = 0; i < nInsert; i++)
     222             :         {
     223             :             char       *leafTuple;
     224             :             SpGistLeafTupleData leafTupleHdr;
     225             : 
     226             :             /*
     227             :              * the tuples are not aligned, so must copy to access the size
     228             :              * field.
     229             :              */
     230        6382 :             leafTuple = ptr;
     231        6382 :             memcpy(&leafTupleHdr, leafTuple,
     232             :                    sizeof(SpGistLeafTupleData));
     233             : 
     234        6382 :             addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, toInsert[i]);
     235        6382 :             ptr += leafTupleHdr.size;
     236             :         }
     237             : 
     238         148 :         PageSetLSN(page, lsn);
     239         148 :         MarkBufferDirty(buffer);
     240             :     }
     241         150 :     if (BufferIsValid(buffer))
     242         150 :         UnlockReleaseBuffer(buffer);
     243             : 
     244             :     /* Delete tuples from the source page, inserting a redirection pointer */
     245         150 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     246             :     {
     247         148 :         page = BufferGetPage(buffer);
     248             : 
     249         148 :         spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
     250         148 :                                 state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
     251             :                                 SPGIST_PLACEHOLDER,
     252             :                                 blknoDst,
     253         148 :                                 toInsert[nInsert - 1]);
     254             : 
     255         148 :         PageSetLSN(page, lsn);
     256         148 :         MarkBufferDirty(buffer);
     257             :     }
     258         150 :     if (BufferIsValid(buffer))
     259         150 :         UnlockReleaseBuffer(buffer);
     260             : 
     261             :     /* And update the parent downlink */
     262         150 :     if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
     263             :     {
     264             :         SpGistInnerTuple tuple;
     265             : 
     266         146 :         page = BufferGetPage(buffer);
     267             : 
     268         146 :         tuple = (SpGistInnerTuple) PageGetItem(page,
     269         146 :                                                PageGetItemId(page, xldata->offnumParent));
     270             : 
     271         146 :         spgUpdateNodeLink(tuple, xldata->nodeI,
     272         146 :                           blknoDst, toInsert[nInsert - 1]);
     273             : 
     274         146 :         PageSetLSN(page, lsn);
     275         146 :         MarkBufferDirty(buffer);
     276             :     }
     277         150 :     if (BufferIsValid(buffer))
     278         150 :         UnlockReleaseBuffer(buffer);
     279         150 : }
     280             : 
     281             : static void
     282         202 : spgRedoAddNode(XLogReaderState *record)
     283             : {
     284         202 :     XLogRecPtr  lsn = record->EndRecPtr;
     285         202 :     char       *ptr = XLogRecGetData(record);
     286         202 :     spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
     287             :     char       *innerTuple;
     288             :     SpGistInnerTupleData innerTupleHdr;
     289             :     SpGistState state;
     290             :     Buffer      buffer;
     291             :     Page        page;
     292             :     XLogRedoAction action;
     293             : 
     294         202 :     ptr += sizeof(spgxlogAddNode);
     295         202 :     innerTuple = ptr;
     296             :     /* the tuple is unaligned, so make a copy to access its header */
     297         202 :     memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
     298             : 
     299         202 :     fillFakeState(&state, xldata->stateSrc);
     300             : 
     301         202 :     if (!XLogRecHasBlockRef(record, 1))
     302             :     {
     303             :         /* update in place */
     304             :         Assert(xldata->parentBlk == -1);
     305         200 :         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     306             :         {
     307         200 :             page = BufferGetPage(buffer);
     308             : 
     309         200 :             PageIndexTupleDelete(page, xldata->offnum);
     310         200 :             if (PageAddItem(page, innerTuple, innerTupleHdr.size,
     311             :                             xldata->offnum,
     312         200 :                             false, false) != xldata->offnum)
     313           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     314             :                      innerTupleHdr.size);
     315             : 
     316         200 :             PageSetLSN(page, lsn);
     317         200 :             MarkBufferDirty(buffer);
     318             :         }
     319         200 :         if (BufferIsValid(buffer))
     320         200 :             UnlockReleaseBuffer(buffer);
     321             :     }
     322             :     else
     323             :     {
     324             :         BlockNumber blkno;
     325             :         BlockNumber blknoNew;
     326             : 
     327           2 :         XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
     328           2 :         XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
     329             : 
     330             :         /*
     331             :          * In normal operation we would have all three pages (source, dest,
     332             :          * and parent) locked simultaneously; but in WAL replay it should be
     333             :          * safe to update them one at a time, as long as we do it in the right
     334             :          * order. We must insert the new tuple before replacing the old tuple
     335             :          * with the redirect tuple.
     336             :          */
     337             : 
     338             :         /* Install new tuple first so redirect is valid */
     339           2 :         if (xldata->newPage)
     340             :         {
     341             :             /* AddNode is not used for nulls pages */
     342           2 :             buffer = XLogInitBufferForRedo(record, 1);
     343           2 :             SpGistInitBuffer(buffer, 0);
     344           2 :             action = BLK_NEEDS_REDO;
     345             :         }
     346             :         else
     347           0 :             action = XLogReadBufferForRedo(record, 1, &buffer);
     348           2 :         if (action == BLK_NEEDS_REDO)
     349             :         {
     350           2 :             page = BufferGetPage(buffer);
     351             : 
     352           2 :             addOrReplaceTuple(page, innerTuple, innerTupleHdr.size, xldata->offnumNew);
     353             : 
     354             :             /*
     355             :              * If parent is in this same page, update it now.
     356             :              */
     357           2 :             if (xldata->parentBlk == 1)
     358             :             {
     359             :                 SpGistInnerTuple parentTuple;
     360             : 
     361           0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     362           0 :                                                              PageGetItemId(page, xldata->offnumParent));
     363             : 
     364           0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     365           0 :                                   blknoNew, xldata->offnumNew);
     366             :             }
     367           2 :             PageSetLSN(page, lsn);
     368           2 :             MarkBufferDirty(buffer);
     369             :         }
     370           2 :         if (BufferIsValid(buffer))
     371           2 :             UnlockReleaseBuffer(buffer);
     372             : 
     373             :         /* Delete old tuple, replacing it with redirect or placeholder tuple */
     374           2 :         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     375             :         {
     376             :             SpGistDeadTuple dt;
     377             : 
     378           2 :             page = BufferGetPage(buffer);
     379             : 
     380           2 :             if (state.isBuild)
     381           0 :                 dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
     382             :                                       InvalidBlockNumber,
     383             :                                       InvalidOffsetNumber);
     384             :             else
     385           2 :                 dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
     386             :                                       blknoNew,
     387           2 :                                       xldata->offnumNew);
     388             : 
     389           2 :             PageIndexTupleDelete(page, xldata->offnum);
     390           2 :             if (PageAddItem(page, dt, dt->size,
     391             :                             xldata->offnum,
     392           2 :                             false, false) != xldata->offnum)
     393           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     394             :                      dt->size);
     395             : 
     396           2 :             if (state.isBuild)
     397           0 :                 SpGistPageGetOpaque(page)->nPlaceholder++;
     398             :             else
     399           2 :                 SpGistPageGetOpaque(page)->nRedirection++;
     400             : 
     401             :             /*
     402             :              * If parent is in this same page, update it now.
     403             :              */
     404           2 :             if (xldata->parentBlk == 0)
     405             :             {
     406             :                 SpGistInnerTuple parentTuple;
     407             : 
     408           0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     409           0 :                                                              PageGetItemId(page, xldata->offnumParent));
     410             : 
     411           0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     412           0 :                                   blknoNew, xldata->offnumNew);
     413             :             }
     414           2 :             PageSetLSN(page, lsn);
     415           2 :             MarkBufferDirty(buffer);
     416             :         }
     417           2 :         if (BufferIsValid(buffer))
     418           2 :             UnlockReleaseBuffer(buffer);
     419             : 
     420             :         /*
     421             :          * Update parent downlink (if we didn't do it as part of the source or
     422             :          * destination page update already).
     423             :          */
     424           2 :         if (xldata->parentBlk == 2)
     425             :         {
     426           2 :             if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
     427             :             {
     428             :                 SpGistInnerTuple parentTuple;
     429             : 
     430           0 :                 page = BufferGetPage(buffer);
     431             : 
     432           0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     433           0 :                                                              PageGetItemId(page, xldata->offnumParent));
     434             : 
     435           0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     436           0 :                                   blknoNew, xldata->offnumNew);
     437             : 
     438           0 :                 PageSetLSN(page, lsn);
     439           0 :                 MarkBufferDirty(buffer);
     440             :             }
     441           2 :             if (BufferIsValid(buffer))
     442           2 :                 UnlockReleaseBuffer(buffer);
     443             :         }
     444             :     }
     445         202 : }
     446             : 
     447             : static void
     448         202 : spgRedoSplitTuple(XLogReaderState *record)
     449             : {
     450         202 :     XLogRecPtr  lsn = record->EndRecPtr;
     451         202 :     char       *ptr = XLogRecGetData(record);
     452         202 :     spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
     453             :     char       *prefixTuple;
     454             :     SpGistInnerTupleData prefixTupleHdr;
     455             :     char       *postfixTuple;
     456             :     SpGistInnerTupleData postfixTupleHdr;
     457             :     Buffer      buffer;
     458             :     Page        page;
     459             :     XLogRedoAction action;
     460             : 
     461         202 :     ptr += sizeof(spgxlogSplitTuple);
     462         202 :     prefixTuple = ptr;
     463             :     /* the prefix tuple is unaligned, so make a copy to access its header */
     464         202 :     memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
     465         202 :     ptr += prefixTupleHdr.size;
     466         202 :     postfixTuple = ptr;
     467             :     /* postfix tuple is also unaligned */
     468         202 :     memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
     469             : 
     470             :     /*
     471             :      * In normal operation we would have both pages locked simultaneously; but
     472             :      * in WAL replay it should be safe to update them one at a time, as long
     473             :      * as we do it in the right order.
     474             :      */
     475             : 
     476             :     /* insert postfix tuple first to avoid dangling link */
     477         202 :     if (!xldata->postfixBlkSame)
     478             :     {
     479          54 :         if (xldata->newPage)
     480             :         {
     481           2 :             buffer = XLogInitBufferForRedo(record, 1);
     482             :             /* SplitTuple is not used for nulls pages */
     483           2 :             SpGistInitBuffer(buffer, 0);
     484           2 :             action = BLK_NEEDS_REDO;
     485             :         }
     486             :         else
     487          52 :             action = XLogReadBufferForRedo(record, 1, &buffer);
     488          54 :         if (action == BLK_NEEDS_REDO)
     489             :         {
     490          54 :             page = BufferGetPage(buffer);
     491             : 
     492          54 :             addOrReplaceTuple(page, postfixTuple, postfixTupleHdr.size, xldata->offnumPostfix);
     493             : 
     494          54 :             PageSetLSN(page, lsn);
     495          54 :             MarkBufferDirty(buffer);
     496             :         }
     497          54 :         if (BufferIsValid(buffer))
     498          54 :             UnlockReleaseBuffer(buffer);
     499             :     }
     500             : 
     501             :     /* now handle the original page */
     502         202 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     503             :     {
     504         202 :         page = BufferGetPage(buffer);
     505             : 
     506         202 :         PageIndexTupleDelete(page, xldata->offnumPrefix);
     507         202 :         if (PageAddItem(page, prefixTuple, prefixTupleHdr.size,
     508         202 :                         xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
     509           0 :             elog(ERROR, "failed to add item of size %u to SPGiST index page",
     510             :                  prefixTupleHdr.size);
     511             : 
     512         202 :         if (xldata->postfixBlkSame)
     513         148 :             addOrReplaceTuple(page, postfixTuple, postfixTupleHdr.size, xldata->offnumPostfix);
     514             : 
     515         202 :         PageSetLSN(page, lsn);
     516         202 :         MarkBufferDirty(buffer);
     517             :     }
     518         202 :     if (BufferIsValid(buffer))
     519         202 :         UnlockReleaseBuffer(buffer);
     520         202 : }
     521             : 
     522             : static void
     523         434 : spgRedoPickSplit(XLogReaderState *record)
     524             : {
     525         434 :     XLogRecPtr  lsn = record->EndRecPtr;
     526         434 :     char       *ptr = XLogRecGetData(record);
     527         434 :     spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
     528             :     char       *innerTuple;
     529             :     SpGistInnerTupleData innerTupleHdr;
     530             :     SpGistState state;
     531             :     OffsetNumber *toDelete;
     532             :     OffsetNumber *toInsert;
     533             :     uint8      *leafPageSelect;
     534             :     Buffer      srcBuffer;
     535             :     Buffer      destBuffer;
     536             :     Buffer      innerBuffer;
     537             :     Page        srcPage;
     538             :     Page        destPage;
     539             :     Page        page;
     540             :     int         i;
     541             :     BlockNumber blknoInner;
     542             :     XLogRedoAction action;
     543             : 
     544         434 :     XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
     545             : 
     546         434 :     fillFakeState(&state, xldata->stateSrc);
     547             : 
     548         434 :     ptr += SizeOfSpgxlogPickSplit;
     549         434 :     toDelete = (OffsetNumber *) ptr;
     550         434 :     ptr += sizeof(OffsetNumber) * xldata->nDelete;
     551         434 :     toInsert = (OffsetNumber *) ptr;
     552         434 :     ptr += sizeof(OffsetNumber) * xldata->nInsert;
     553         434 :     leafPageSelect = (uint8 *) ptr;
     554         434 :     ptr += sizeof(uint8) * xldata->nInsert;
     555             : 
     556         434 :     innerTuple = ptr;
     557             :     /* the inner tuple is unaligned, so make a copy to access its header */
     558         434 :     memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
     559         434 :     ptr += innerTupleHdr.size;
     560             : 
     561             :     /* now ptr points to the list of leaf tuples */
     562             : 
     563         434 :     if (xldata->isRootSplit)
     564             :     {
     565             :         /* when splitting root, we touch it only in the guise of new inner */
     566           6 :         srcBuffer = InvalidBuffer;
     567           6 :         srcPage = NULL;
     568             :     }
     569         428 :     else if (xldata->initSrc)
     570             :     {
     571             :         /* just re-init the source page */
     572           0 :         srcBuffer = XLogInitBufferForRedo(record, 0);
     573           0 :         srcPage = BufferGetPage(srcBuffer);
     574             : 
     575           0 :         SpGistInitBuffer(srcBuffer,
     576           0 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     577             :         /* don't update LSN etc till we're done with it */
     578             :     }
     579             :     else
     580             :     {
     581             :         /*
     582             :          * Delete the specified tuples from source page.  (In case we're in
     583             :          * Hot Standby, we need to hold lock on the page till we're done
     584             :          * inserting leaf tuples and the new inner tuple, else the added
     585             :          * redirect tuple will be a dangling link.)
     586             :          */
     587         428 :         srcPage = NULL;
     588         428 :         if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
     589             :         {
     590         424 :             srcPage = BufferGetPage(srcBuffer);
     591             : 
     592             :             /*
     593             :              * We have it a bit easier here than in doPickSplit(), because we
     594             :              * know the inner tuple's location already, so we can inject the
     595             :              * correct redirection tuple now.
     596             :              */
     597         424 :             if (!state.isBuild)
     598         424 :                 spgPageIndexMultiDelete(&state, srcPage,
     599         424 :                                         toDelete, xldata->nDelete,
     600             :                                         SPGIST_REDIRECT,
     601             :                                         SPGIST_PLACEHOLDER,
     602             :                                         blknoInner,
     603         424 :                                         xldata->offnumInner);
     604             :             else
     605           0 :                 spgPageIndexMultiDelete(&state, srcPage,
     606           0 :                                         toDelete, xldata->nDelete,
     607             :                                         SPGIST_PLACEHOLDER,
     608             :                                         SPGIST_PLACEHOLDER,
     609             :                                         InvalidBlockNumber,
     610             :                                         InvalidOffsetNumber);
     611             : 
     612             :             /* don't update LSN etc till we're done with it */
     613             :         }
     614             :     }
     615             : 
     616             :     /* try to access dest page if any */
     617         434 :     if (!XLogRecHasBlockRef(record, 1))
     618             :     {
     619           0 :         destBuffer = InvalidBuffer;
     620           0 :         destPage = NULL;
     621             :     }
     622         434 :     else if (xldata->initDest)
     623             :     {
     624             :         /* just re-init the dest page */
     625         404 :         destBuffer = XLogInitBufferForRedo(record, 1);
     626         404 :         destPage = BufferGetPage(destBuffer);
     627             : 
     628         404 :         SpGistInitBuffer(destBuffer,
     629         404 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     630             :         /* don't update LSN etc till we're done with it */
     631             :     }
     632             :     else
     633             :     {
     634             :         /*
     635             :          * We could probably release the page lock immediately in the
     636             :          * full-page-image case, but for safety let's hold it till later.
     637             :          */
     638          30 :         if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
     639          28 :             destPage = BufferGetPage(destBuffer);
     640             :         else
     641           2 :             destPage = NULL;    /* don't do any page updates */
     642             :     }
     643             : 
     644             :     /* restore leaf tuples to src and/or dest page */
     645       61202 :     for (i = 0; i < xldata->nInsert; i++)
     646             :     {
     647             :         char       *leafTuple;
     648             :         SpGistLeafTupleData leafTupleHdr;
     649             : 
     650             :         /* the tuples are not aligned, so must copy to access the size field. */
     651       60768 :         leafTuple = ptr;
     652       60768 :         memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
     653       60768 :         ptr += leafTupleHdr.size;
     654             : 
     655       60768 :         page = leafPageSelect[i] ? destPage : srcPage;
     656       60768 :         if (page == NULL)
     657         206 :             continue;           /* no need to touch this page */
     658             : 
     659       60562 :         addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, toInsert[i]);
     660             :     }
     661             : 
     662             :     /* Now update src and dest page LSNs if needed */
     663         434 :     if (srcPage != NULL)
     664             :     {
     665         424 :         PageSetLSN(srcPage, lsn);
     666         424 :         MarkBufferDirty(srcBuffer);
     667             :     }
     668         434 :     if (destPage != NULL)
     669             :     {
     670         432 :         PageSetLSN(destPage, lsn);
     671         432 :         MarkBufferDirty(destBuffer);
     672             :     }
     673             : 
     674             :     /* restore new inner tuple */
     675         434 :     if (xldata->initInner)
     676             :     {
     677          12 :         innerBuffer = XLogInitBufferForRedo(record, 2);
     678          12 :         SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
     679          12 :         action = BLK_NEEDS_REDO;
     680             :     }
     681             :     else
     682         422 :         action = XLogReadBufferForRedo(record, 2, &innerBuffer);
     683             : 
     684         434 :     if (action == BLK_NEEDS_REDO)
     685             :     {
     686         400 :         page = BufferGetPage(innerBuffer);
     687             : 
     688         400 :         addOrReplaceTuple(page, innerTuple, innerTupleHdr.size, xldata->offnumInner);
     689             : 
     690             :         /* if inner is also parent, update link while we're here */
     691         400 :         if (xldata->innerIsParent)
     692             :         {
     693             :             SpGistInnerTuple parent;
     694             : 
     695         364 :             parent = (SpGistInnerTuple) PageGetItem(page,
     696         364 :                                                     PageGetItemId(page, xldata->offnumParent));
     697         364 :             spgUpdateNodeLink(parent, xldata->nodeI,
     698         364 :                               blknoInner, xldata->offnumInner);
     699             :         }
     700             : 
     701         400 :         PageSetLSN(page, lsn);
     702         400 :         MarkBufferDirty(innerBuffer);
     703             :     }
     704         434 :     if (BufferIsValid(innerBuffer))
     705         434 :         UnlockReleaseBuffer(innerBuffer);
     706             : 
     707             :     /*
     708             :      * Now we can release the leaf-page locks.  It's okay to do this before
     709             :      * updating the parent downlink.
     710             :      */
     711         434 :     if (BufferIsValid(srcBuffer))
     712         428 :         UnlockReleaseBuffer(srcBuffer);
     713         434 :     if (BufferIsValid(destBuffer))
     714         434 :         UnlockReleaseBuffer(destBuffer);
     715             : 
     716             :     /* update parent downlink, unless we did it above */
     717         434 :     if (XLogRecHasBlockRef(record, 3))
     718             :     {
     719             :         Buffer      parentBuffer;
     720             : 
     721          32 :         if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
     722             :         {
     723             :             SpGistInnerTuple parent;
     724             : 
     725          24 :             page = BufferGetPage(parentBuffer);
     726             : 
     727          24 :             parent = (SpGistInnerTuple) PageGetItem(page,
     728          24 :                                                     PageGetItemId(page, xldata->offnumParent));
     729          24 :             spgUpdateNodeLink(parent, xldata->nodeI,
     730          24 :                               blknoInner, xldata->offnumInner);
     731             : 
     732          24 :             PageSetLSN(page, lsn);
     733          24 :             MarkBufferDirty(parentBuffer);
     734             :         }
     735          32 :         if (BufferIsValid(parentBuffer))
     736          32 :             UnlockReleaseBuffer(parentBuffer);
     737             :     }
     738             :     else
     739             :         Assert(xldata->innerIsParent || xldata->isRootSplit);
     740         434 : }
     741             : 
     742             : static void
     743         304 : spgRedoVacuumLeaf(XLogReaderState *record)
     744             : {
     745         304 :     XLogRecPtr  lsn = record->EndRecPtr;
     746         304 :     char       *ptr = XLogRecGetData(record);
     747         304 :     spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
     748             :     OffsetNumber *toDead;
     749             :     OffsetNumber *toPlaceholder;
     750             :     OffsetNumber *moveSrc;
     751             :     OffsetNumber *moveDest;
     752             :     OffsetNumber *chainSrc;
     753             :     OffsetNumber *chainDest;
     754             :     SpGistState state;
     755             :     Buffer      buffer;
     756             :     Page        page;
     757             :     int         i;
     758             : 
     759         304 :     fillFakeState(&state, xldata->stateSrc);
     760             : 
     761         304 :     ptr += SizeOfSpgxlogVacuumLeaf;
     762         304 :     toDead = (OffsetNumber *) ptr;
     763         304 :     ptr += sizeof(OffsetNumber) * xldata->nDead;
     764         304 :     toPlaceholder = (OffsetNumber *) ptr;
     765         304 :     ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
     766         304 :     moveSrc = (OffsetNumber *) ptr;
     767         304 :     ptr += sizeof(OffsetNumber) * xldata->nMove;
     768         304 :     moveDest = (OffsetNumber *) ptr;
     769         304 :     ptr += sizeof(OffsetNumber) * xldata->nMove;
     770         304 :     chainSrc = (OffsetNumber *) ptr;
     771         304 :     ptr += sizeof(OffsetNumber) * xldata->nChain;
     772         304 :     chainDest = (OffsetNumber *) ptr;
     773             : 
     774         304 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     775             :     {
     776           6 :         page = BufferGetPage(buffer);
     777             : 
     778           6 :         spgPageIndexMultiDelete(&state, page,
     779           6 :                                 toDead, xldata->nDead,
     780             :                                 SPGIST_DEAD, SPGIST_DEAD,
     781             :                                 InvalidBlockNumber,
     782             :                                 InvalidOffsetNumber);
     783             : 
     784           6 :         spgPageIndexMultiDelete(&state, page,
     785           6 :                                 toPlaceholder, xldata->nPlaceholder,
     786             :                                 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
     787             :                                 InvalidBlockNumber,
     788             :                                 InvalidOffsetNumber);
     789             : 
     790             :         /* see comments in vacuumLeafPage() */
     791          10 :         for (i = 0; i < xldata->nMove; i++)
     792             :         {
     793           4 :             ItemId      idSrc = PageGetItemId(page, moveSrc[i]);
     794           4 :             ItemId      idDest = PageGetItemId(page, moveDest[i]);
     795             :             ItemIdData  tmp;
     796             : 
     797           4 :             tmp = *idSrc;
     798           4 :             *idSrc = *idDest;
     799           4 :             *idDest = tmp;
     800             :         }
     801             : 
     802           6 :         spgPageIndexMultiDelete(&state, page,
     803           6 :                                 moveSrc, xldata->nMove,
     804             :                                 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
     805             :                                 InvalidBlockNumber,
     806             :                                 InvalidOffsetNumber);
     807             : 
     808          14 :         for (i = 0; i < xldata->nChain; i++)
     809             :         {
     810             :             SpGistLeafTuple lt;
     811             : 
     812           8 :             lt = (SpGistLeafTuple) PageGetItem(page,
     813           8 :                                                PageGetItemId(page, chainSrc[i]));
     814             :             Assert(lt->tupstate == SPGIST_LIVE);
     815           8 :             SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
     816             :         }
     817             : 
     818           6 :         PageSetLSN(page, lsn);
     819           6 :         MarkBufferDirty(buffer);
     820             :     }
     821         304 :     if (BufferIsValid(buffer))
     822         304 :         UnlockReleaseBuffer(buffer);
     823         304 : }
     824             : 
     825             : static void
     826           0 : spgRedoVacuumRoot(XLogReaderState *record)
     827             : {
     828           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     829           0 :     char       *ptr = XLogRecGetData(record);
     830           0 :     spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
     831             :     OffsetNumber *toDelete;
     832             :     Buffer      buffer;
     833             :     Page        page;
     834             : 
     835           0 :     toDelete = xldata->offsets;
     836             : 
     837           0 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     838             :     {
     839           0 :         page = BufferGetPage(buffer);
     840             : 
     841             :         /* The tuple numbers are in order */
     842           0 :         PageIndexMultiDelete(page, toDelete, xldata->nDelete);
     843             : 
     844           0 :         PageSetLSN(page, lsn);
     845           0 :         MarkBufferDirty(buffer);
     846             :     }
     847           0 :     if (BufferIsValid(buffer))
     848           0 :         UnlockReleaseBuffer(buffer);
     849           0 : }
     850             : 
     851             : static void
     852        1046 : spgRedoVacuumRedirect(XLogReaderState *record)
     853             : {
     854        1046 :     XLogRecPtr  lsn = record->EndRecPtr;
     855        1046 :     char       *ptr = XLogRecGetData(record);
     856        1046 :     spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
     857             :     OffsetNumber *itemToPlaceholder;
     858             :     Buffer      buffer;
     859             : 
     860        1046 :     itemToPlaceholder = xldata->offsets;
     861             : 
     862             :     /*
     863             :      * If any redirection tuples are being removed, make sure there are no
     864             :      * live Hot Standby transactions that might need to see them.
     865             :      */
     866        1046 :     if (InHotStandby)
     867             :     {
     868             :         RelFileLocator locator;
     869             : 
     870        1046 :         XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
     871        1046 :         ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
     872        1046 :                                             xldata->isCatalogRel,
     873             :                                             locator);
     874             :     }
     875             : 
     876        1046 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     877             :     {
     878         302 :         Page        page = BufferGetPage(buffer);
     879         302 :         SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
     880             :         int         i;
     881             : 
     882             :         /* Convert redirect pointers to plain placeholders */
     883         308 :         for (i = 0; i < xldata->nToPlaceholder; i++)
     884             :         {
     885             :             SpGistDeadTuple dt;
     886             : 
     887           6 :             dt = (SpGistDeadTuple) PageGetItem(page,
     888           6 :                                                PageGetItemId(page, itemToPlaceholder[i]));
     889             :             Assert(dt->tupstate == SPGIST_REDIRECT);
     890           6 :             dt->tupstate = SPGIST_PLACEHOLDER;
     891           6 :             ItemPointerSetInvalid(&dt->pointer);
     892             :         }
     893             : 
     894             :         Assert(opaque->nRedirection >= xldata->nToPlaceholder);
     895         302 :         opaque->nRedirection -= xldata->nToPlaceholder;
     896         302 :         opaque->nPlaceholder += xldata->nToPlaceholder;
     897             : 
     898             :         /* Remove placeholder tuples at end of page */
     899         302 :         if (xldata->firstPlaceholder != InvalidOffsetNumber)
     900             :         {
     901         302 :             int         max = PageGetMaxOffsetNumber(page);
     902             :             OffsetNumber *toDelete;
     903             : 
     904         302 :             toDelete = palloc(sizeof(OffsetNumber) * max);
     905             : 
     906       22244 :             for (i = xldata->firstPlaceholder; i <= max; i++)
     907       21942 :                 toDelete[i - xldata->firstPlaceholder] = i;
     908             : 
     909         302 :             i = max - xldata->firstPlaceholder + 1;
     910             :             Assert(opaque->nPlaceholder >= i);
     911         302 :             opaque->nPlaceholder -= i;
     912             : 
     913             :             /* The array is sorted, so can use PageIndexMultiDelete */
     914         302 :             PageIndexMultiDelete(page, toDelete, i);
     915             : 
     916         302 :             pfree(toDelete);
     917             :         }
     918             : 
     919         302 :         PageSetLSN(page, lsn);
     920         302 :         MarkBufferDirty(buffer);
     921             :     }
     922        1046 :     if (BufferIsValid(buffer))
     923        1046 :         UnlockReleaseBuffer(buffer);
     924        1046 : }
     925             : 
     926             : void
     927       80340 : spg_redo(XLogReaderState *record)
     928             : {
     929       80340 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     930             :     MemoryContext oldCxt;
     931             : 
     932       80340 :     oldCxt = MemoryContextSwitchTo(opCtx);
     933       80340 :     switch (info)
     934             :     {
     935       78002 :         case XLOG_SPGIST_ADD_LEAF:
     936       78002 :             spgRedoAddLeaf(record);
     937       78002 :             break;
     938         150 :         case XLOG_SPGIST_MOVE_LEAFS:
     939         150 :             spgRedoMoveLeafs(record);
     940         150 :             break;
     941         202 :         case XLOG_SPGIST_ADD_NODE:
     942         202 :             spgRedoAddNode(record);
     943         202 :             break;
     944         202 :         case XLOG_SPGIST_SPLIT_TUPLE:
     945         202 :             spgRedoSplitTuple(record);
     946         202 :             break;
     947         434 :         case XLOG_SPGIST_PICKSPLIT:
     948         434 :             spgRedoPickSplit(record);
     949         434 :             break;
     950         304 :         case XLOG_SPGIST_VACUUM_LEAF:
     951         304 :             spgRedoVacuumLeaf(record);
     952         304 :             break;
     953           0 :         case XLOG_SPGIST_VACUUM_ROOT:
     954           0 :             spgRedoVacuumRoot(record);
     955           0 :             break;
     956        1046 :         case XLOG_SPGIST_VACUUM_REDIRECT:
     957        1046 :             spgRedoVacuumRedirect(record);
     958        1046 :             break;
     959           0 :         default:
     960           0 :             elog(PANIC, "spg_redo: unknown op code %u", info);
     961             :     }
     962             : 
     963       80340 :     MemoryContextSwitchTo(oldCxt);
     964       80340 :     MemoryContextReset(opCtx);
     965       80340 : }
     966             : 
     967             : void
     968         410 : spg_xlog_startup(void)
     969             : {
     970         410 :     opCtx = AllocSetContextCreate(CurrentMemoryContext,
     971             :                                   "SP-GiST temporary context",
     972             :                                   ALLOCSET_DEFAULT_SIZES);
     973         410 : }
     974             : 
     975             : void
     976         296 : spg_xlog_cleanup(void)
     977             : {
     978         296 :     MemoryContextDelete(opCtx);
     979         296 :     opCtx = NULL;
     980         296 : }
     981             : 
     982             : /*
     983             :  * Mask a SpGist page before performing consistency checks on it.
     984             :  */
     985             : void
     986      160748 : spg_mask(char *pagedata, BlockNumber blkno)
     987             : {
     988      160748 :     Page        page = (Page) pagedata;
     989      160748 :     PageHeader  pagehdr = (PageHeader) page;
     990             : 
     991      160748 :     mask_page_lsn_and_checksum(page);
     992             : 
     993      160748 :     mask_page_hint_bits(page);
     994             : 
     995             :     /*
     996             :      * Mask the unused space, but only if the page's pd_lower appears to have
     997             :      * been set correctly.
     998             :      */
     999      160748 :     if (pagehdr->pd_lower >= SizeOfPageHeaderData)
    1000      160748 :         mask_unused_space(page);
    1001      160748 : }

Generated by: LCOV version 1.16