LCOV - code coverage report
Current view: top level - src/backend/access/spgist - spgxlog.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 384 441 87.1 %
Date: 2025-11-20 08:18:16 Functions: 13 14 92.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * spgxlog.c
       4             :  *    WAL replay logic for SP-GiST
       5             :  *
       6             :  *
       7             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *           src/backend/access/spgist/spgxlog.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/bufmask.h"
      18             : #include "access/spgist_private.h"
      19             : #include "access/spgxlog.h"
      20             : #include "access/xlogutils.h"
      21             : #include "storage/standby.h"
      22             : #include "utils/memutils.h"
      23             : 
      24             : 
      25             : static MemoryContext opCtx;     /* working memory for operations */
      26             : 
      27             : 
      28             : /*
      29             :  * Prepare a dummy SpGistState, with just the minimum info needed for replay.
      30             :  *
      31             :  * At present, all we need is enough info to support spgFormDeadTuple(),
      32             :  * plus the isBuild flag.
      33             :  */
      34             : static void
      35         814 : fillFakeState(SpGistState *state, spgxlogState stateSrc)
      36             : {
      37         814 :     memset(state, 0, sizeof(*state));
      38             : 
      39         814 :     state->redirectXid = stateSrc.redirectXid;
      40         814 :     state->isBuild = stateSrc.isBuild;
      41         814 :     state->deadTupleStorage = palloc0(SGDTSIZE);
      42         814 : }
      43             : 
      44             : /*
      45             :  * Add a leaf tuple, or replace an existing placeholder tuple.  This is used
      46             :  * to replay SpGistPageAddNewItem() operations.  If the offset points at an
      47             :  * existing tuple, it had better be a placeholder tuple.
      48             :  */
      49             : static void
      50      141466 : addOrReplaceTuple(Page page, const void *tuple, int size, OffsetNumber offset)
      51             : {
      52      141466 :     if (offset <= PageGetMaxOffsetNumber(page))
      53             :     {
      54       38230 :         SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
      55       38230 :                                                            PageGetItemId(page, offset));
      56             : 
      57       38230 :         if (dt->tupstate != SPGIST_PLACEHOLDER)
      58           0 :             elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
      59             : 
      60             :         Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
      61       38230 :         SpGistPageGetOpaque(page)->nPlaceholder--;
      62             : 
      63       38230 :         PageIndexTupleDelete(page, offset);
      64             :     }
      65             : 
      66             :     Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
      67             : 
      68      141466 :     if (PageAddItem(page, tuple, size, offset, false, false) != offset)
      69           0 :         elog(ERROR, "failed to add item of size %u to SPGiST index page",
      70             :              size);
      71      141466 : }
      72             : 
      73             : static void
      74       77936 : spgRedoAddLeaf(XLogReaderState *record)
      75             : {
      76       77936 :     XLogRecPtr  lsn = record->EndRecPtr;
      77       77936 :     char       *ptr = XLogRecGetData(record);
      78       77936 :     spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
      79             :     char       *leafTuple;
      80             :     SpGistLeafTupleData leafTupleHdr;
      81             :     Buffer      buffer;
      82             :     Page        page;
      83             :     XLogRedoAction action;
      84             : 
      85       77936 :     ptr += sizeof(spgxlogAddLeaf);
      86       77936 :     leafTuple = ptr;
      87             :     /* the leaf tuple is unaligned, so make a copy to access its header */
      88       77936 :     memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
      89             : 
      90             :     /*
      91             :      * In normal operation we would have both current and parent pages locked
      92             :      * simultaneously; but in WAL replay it should be safe to update the leaf
      93             :      * page before updating the parent.
      94             :      */
      95       77936 :     if (xldata->newPage)
      96             :     {
      97           0 :         buffer = XLogInitBufferForRedo(record, 0);
      98           0 :         SpGistInitBuffer(buffer,
      99           0 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     100           0 :         action = BLK_NEEDS_REDO;
     101             :     }
     102             :     else
     103       77936 :         action = XLogReadBufferForRedo(record, 0, &buffer);
     104             : 
     105       77936 :     if (action == BLK_NEEDS_REDO)
     106             :     {
     107       77562 :         page = BufferGetPage(buffer);
     108             : 
     109             :         /* insert new tuple */
     110       77562 :         if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
     111             :         {
     112             :             /* normal cases, tuple was added by SpGistPageAddNewItem */
     113       77562 :             addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, xldata->offnumLeaf);
     114             : 
     115             :             /* update head tuple's chain link if needed */
     116       77562 :             if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
     117             :             {
     118             :                 SpGistLeafTuple head;
     119             : 
     120       76282 :                 head = (SpGistLeafTuple) PageGetItem(page,
     121       76282 :                                                      PageGetItemId(page, xldata->offnumHeadLeaf));
     122             :                 Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
     123       76282 :                 SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
     124             :             }
     125             :         }
     126             :         else
     127             :         {
     128             :             /* replacing a DEAD tuple */
     129           0 :             PageIndexTupleDelete(page, xldata->offnumLeaf);
     130           0 :             if (PageAddItem(page,
     131             :                             leafTuple, leafTupleHdr.size,
     132           0 :                             xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
     133           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     134             :                      leafTupleHdr.size);
     135             :         }
     136             : 
     137       77562 :         PageSetLSN(page, lsn);
     138       77562 :         MarkBufferDirty(buffer);
     139             :     }
     140       77936 :     if (BufferIsValid(buffer))
     141       77936 :         UnlockReleaseBuffer(buffer);
     142             : 
     143             :     /* update parent downlink if necessary */
     144       77936 :     if (xldata->offnumParent != InvalidOffsetNumber)
     145             :     {
     146         240 :         if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
     147             :         {
     148             :             SpGistInnerTuple tuple;
     149             :             BlockNumber blknoLeaf;
     150             : 
     151         240 :             XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
     152             : 
     153         240 :             page = BufferGetPage(buffer);
     154             : 
     155         240 :             tuple = (SpGistInnerTuple) PageGetItem(page,
     156         240 :                                                    PageGetItemId(page, xldata->offnumParent));
     157             : 
     158         240 :             spgUpdateNodeLink(tuple, xldata->nodeI,
     159         240 :                               blknoLeaf, xldata->offnumLeaf);
     160             : 
     161         240 :             PageSetLSN(page, lsn);
     162         240 :             MarkBufferDirty(buffer);
     163             :         }
     164         240 :         if (BufferIsValid(buffer))
     165         240 :             UnlockReleaseBuffer(buffer);
     166             :     }
     167       77936 : }
     168             : 
     169             : static void
     170         152 : spgRedoMoveLeafs(XLogReaderState *record)
     171             : {
     172         152 :     XLogRecPtr  lsn = record->EndRecPtr;
     173         152 :     char       *ptr = XLogRecGetData(record);
     174         152 :     spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
     175             :     SpGistState state;
     176             :     OffsetNumber *toDelete;
     177             :     OffsetNumber *toInsert;
     178             :     int         nInsert;
     179             :     Buffer      buffer;
     180             :     Page        page;
     181             :     XLogRedoAction action;
     182             :     BlockNumber blknoDst;
     183             : 
     184         152 :     XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
     185             : 
     186         152 :     fillFakeState(&state, xldata->stateSrc);
     187             : 
     188         152 :     nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
     189             : 
     190         152 :     ptr += SizeOfSpgxlogMoveLeafs;
     191         152 :     toDelete = (OffsetNumber *) ptr;
     192         152 :     ptr += sizeof(OffsetNumber) * xldata->nMoves;
     193         152 :     toInsert = (OffsetNumber *) ptr;
     194         152 :     ptr += sizeof(OffsetNumber) * nInsert;
     195             : 
     196             :     /* now ptr points to the list of leaf tuples */
     197             : 
     198             :     /*
     199             :      * In normal operation we would have all three pages (source, dest, and
     200             :      * parent) locked simultaneously; but in WAL replay it should be safe to
     201             :      * update them one at a time, as long as we do it in the right order.
     202             :      */
     203             : 
     204             :     /* Insert tuples on the dest page (do first, so redirect is valid) */
     205         152 :     if (xldata->newPage)
     206             :     {
     207          64 :         buffer = XLogInitBufferForRedo(record, 1);
     208          64 :         SpGistInitBuffer(buffer,
     209          64 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     210          64 :         action = BLK_NEEDS_REDO;
     211             :     }
     212             :     else
     213          88 :         action = XLogReadBufferForRedo(record, 1, &buffer);
     214             : 
     215         152 :     if (action == BLK_NEEDS_REDO)
     216             :     {
     217             :         int         i;
     218             : 
     219         152 :         page = BufferGetPage(buffer);
     220             : 
     221        6788 :         for (i = 0; i < nInsert; i++)
     222             :         {
     223             :             char       *leafTuple;
     224             :             SpGistLeafTupleData leafTupleHdr;
     225             : 
     226             :             /*
     227             :              * the tuples are not aligned, so must copy to access the size
     228             :              * field.
     229             :              */
     230        6636 :             leafTuple = ptr;
     231        6636 :             memcpy(&leafTupleHdr, leafTuple,
     232             :                    sizeof(SpGistLeafTupleData));
     233             : 
     234        6636 :             addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, toInsert[i]);
     235        6636 :             ptr += leafTupleHdr.size;
     236             :         }
     237             : 
     238         152 :         PageSetLSN(page, lsn);
     239         152 :         MarkBufferDirty(buffer);
     240             :     }
     241         152 :     if (BufferIsValid(buffer))
     242         152 :         UnlockReleaseBuffer(buffer);
     243             : 
     244             :     /* Delete tuples from the source page, inserting a redirection pointer */
     245         152 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     246             :     {
     247         152 :         page = BufferGetPage(buffer);
     248             : 
     249         152 :         spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
     250         152 :                                 state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
     251             :                                 SPGIST_PLACEHOLDER,
     252             :                                 blknoDst,
     253         152 :                                 toInsert[nInsert - 1]);
     254             : 
     255         152 :         PageSetLSN(page, lsn);
     256         152 :         MarkBufferDirty(buffer);
     257             :     }
     258         152 :     if (BufferIsValid(buffer))
     259         152 :         UnlockReleaseBuffer(buffer);
     260             : 
     261             :     /* And update the parent downlink */
     262         152 :     if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
     263             :     {
     264             :         SpGistInnerTuple tuple;
     265             : 
     266         152 :         page = BufferGetPage(buffer);
     267             : 
     268         152 :         tuple = (SpGistInnerTuple) PageGetItem(page,
     269         152 :                                                PageGetItemId(page, xldata->offnumParent));
     270             : 
     271         152 :         spgUpdateNodeLink(tuple, xldata->nodeI,
     272         152 :                           blknoDst, toInsert[nInsert - 1]);
     273             : 
     274         152 :         PageSetLSN(page, lsn);
     275         152 :         MarkBufferDirty(buffer);
     276             :     }
     277         152 :     if (BufferIsValid(buffer))
     278         152 :         UnlockReleaseBuffer(buffer);
     279         152 : }
     280             : 
     281             : static void
     282         202 : spgRedoAddNode(XLogReaderState *record)
     283             : {
     284         202 :     XLogRecPtr  lsn = record->EndRecPtr;
     285         202 :     char       *ptr = XLogRecGetData(record);
     286         202 :     spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
     287             :     char       *innerTuple;
     288             :     SpGistInnerTupleData innerTupleHdr;
     289             :     SpGistState state;
     290             :     Buffer      buffer;
     291             :     Page        page;
     292             :     XLogRedoAction action;
     293             : 
     294         202 :     ptr += sizeof(spgxlogAddNode);
     295         202 :     innerTuple = ptr;
     296             :     /* the tuple is unaligned, so make a copy to access its header */
     297         202 :     memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
     298             : 
     299         202 :     fillFakeState(&state, xldata->stateSrc);
     300             : 
     301         202 :     if (!XLogRecHasBlockRef(record, 1))
     302             :     {
     303             :         /* update in place */
     304             :         Assert(xldata->parentBlk == -1);
     305         200 :         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     306             :         {
     307         200 :             page = BufferGetPage(buffer);
     308             : 
     309         200 :             PageIndexTupleDelete(page, xldata->offnum);
     310         200 :             if (PageAddItem(page, innerTuple, innerTupleHdr.size,
     311             :                             xldata->offnum,
     312         200 :                             false, false) != xldata->offnum)
     313           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     314             :                      innerTupleHdr.size);
     315             : 
     316         200 :             PageSetLSN(page, lsn);
     317         200 :             MarkBufferDirty(buffer);
     318             :         }
     319         200 :         if (BufferIsValid(buffer))
     320         200 :             UnlockReleaseBuffer(buffer);
     321             :     }
     322             :     else
     323             :     {
     324             :         BlockNumber blkno;
     325             :         BlockNumber blknoNew;
     326             : 
     327           2 :         XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
     328           2 :         XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
     329             : 
     330             :         /*
     331             :          * In normal operation we would have all three pages (source, dest,
     332             :          * and parent) locked simultaneously; but in WAL replay it should be
     333             :          * safe to update them one at a time, as long as we do it in the right
     334             :          * order. We must insert the new tuple before replacing the old tuple
     335             :          * with the redirect tuple.
     336             :          */
     337             : 
     338             :         /* Install new tuple first so redirect is valid */
     339           2 :         if (xldata->newPage)
     340             :         {
     341             :             /* AddNode is not used for nulls pages */
     342           2 :             buffer = XLogInitBufferForRedo(record, 1);
     343           2 :             SpGistInitBuffer(buffer, 0);
     344           2 :             action = BLK_NEEDS_REDO;
     345             :         }
     346             :         else
     347           0 :             action = XLogReadBufferForRedo(record, 1, &buffer);
     348           2 :         if (action == BLK_NEEDS_REDO)
     349             :         {
     350           2 :             page = BufferGetPage(buffer);
     351             : 
     352           2 :             addOrReplaceTuple(page, innerTuple, innerTupleHdr.size, xldata->offnumNew);
     353             : 
     354             :             /*
     355             :              * If parent is in this same page, update it now.
     356             :              */
     357           2 :             if (xldata->parentBlk == 1)
     358             :             {
     359             :                 SpGistInnerTuple parentTuple;
     360             : 
     361           0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     362           0 :                                                              PageGetItemId(page, xldata->offnumParent));
     363             : 
     364           0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     365           0 :                                   blknoNew, xldata->offnumNew);
     366             :             }
     367           2 :             PageSetLSN(page, lsn);
     368           2 :             MarkBufferDirty(buffer);
     369             :         }
     370           2 :         if (BufferIsValid(buffer))
     371           2 :             UnlockReleaseBuffer(buffer);
     372             : 
     373             :         /* Delete old tuple, replacing it with redirect or placeholder tuple */
     374           2 :         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     375             :         {
     376             :             SpGistDeadTuple dt;
     377             : 
     378           2 :             page = BufferGetPage(buffer);
     379             : 
     380           2 :             if (state.isBuild)
     381           0 :                 dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
     382             :                                       InvalidBlockNumber,
     383             :                                       InvalidOffsetNumber);
     384             :             else
     385           2 :                 dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
     386             :                                       blknoNew,
     387           2 :                                       xldata->offnumNew);
     388             : 
     389           2 :             PageIndexTupleDelete(page, xldata->offnum);
     390           2 :             if (PageAddItem(page, dt, dt->size,
     391             :                             xldata->offnum,
     392           2 :                             false, false) != xldata->offnum)
     393           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     394             :                      dt->size);
     395             : 
     396           2 :             if (state.isBuild)
     397           0 :                 SpGistPageGetOpaque(page)->nPlaceholder++;
     398             :             else
     399           2 :                 SpGistPageGetOpaque(page)->nRedirection++;
     400             : 
     401             :             /*
     402             :              * If parent is in this same page, update it now.
     403             :              */
     404           2 :             if (xldata->parentBlk == 0)
     405             :             {
     406             :                 SpGistInnerTuple parentTuple;
     407             : 
     408           0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     409           0 :                                                              PageGetItemId(page, xldata->offnumParent));
     410             : 
     411           0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     412           0 :                                   blknoNew, xldata->offnumNew);
     413             :             }
     414           2 :             PageSetLSN(page, lsn);
     415           2 :             MarkBufferDirty(buffer);
     416             :         }
     417           2 :         if (BufferIsValid(buffer))
     418           2 :             UnlockReleaseBuffer(buffer);
     419             : 
     420             :         /*
     421             :          * Update parent downlink (if we didn't do it as part of the source or
     422             :          * destination page update already).
     423             :          */
     424           2 :         if (xldata->parentBlk == 2)
     425             :         {
     426           2 :             if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
     427             :             {
     428             :                 SpGistInnerTuple parentTuple;
     429             : 
     430           0 :                 page = BufferGetPage(buffer);
     431             : 
     432           0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     433           0 :                                                              PageGetItemId(page, xldata->offnumParent));
     434             : 
     435           0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     436           0 :                                   blknoNew, xldata->offnumNew);
     437             : 
     438           0 :                 PageSetLSN(page, lsn);
     439           0 :                 MarkBufferDirty(buffer);
     440             :             }
     441           2 :             if (BufferIsValid(buffer))
     442           2 :                 UnlockReleaseBuffer(buffer);
     443             :         }
     444             :     }
     445         202 : }
     446             : 
     447             : static void
     448         202 : spgRedoSplitTuple(XLogReaderState *record)
     449             : {
     450         202 :     XLogRecPtr  lsn = record->EndRecPtr;
     451         202 :     char       *ptr = XLogRecGetData(record);
     452         202 :     spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
     453             :     char       *prefixTuple;
     454             :     SpGistInnerTupleData prefixTupleHdr;
     455             :     char       *postfixTuple;
     456             :     SpGistInnerTupleData postfixTupleHdr;
     457             :     Buffer      buffer;
     458             :     Page        page;
     459             :     XLogRedoAction action;
     460             : 
     461         202 :     ptr += sizeof(spgxlogSplitTuple);
     462         202 :     prefixTuple = ptr;
     463             :     /* the prefix tuple is unaligned, so make a copy to access its header */
     464         202 :     memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
     465         202 :     ptr += prefixTupleHdr.size;
     466         202 :     postfixTuple = ptr;
     467             :     /* postfix tuple is also unaligned */
     468         202 :     memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
     469             : 
     470             :     /*
     471             :      * In normal operation we would have both pages locked simultaneously; but
     472             :      * in WAL replay it should be safe to update them one at a time, as long
     473             :      * as we do it in the right order.
     474             :      */
     475             : 
     476             :     /* insert postfix tuple first to avoid dangling link */
     477         202 :     if (!xldata->postfixBlkSame)
     478             :     {
     479          54 :         if (xldata->newPage)
     480             :         {
     481           2 :             buffer = XLogInitBufferForRedo(record, 1);
     482             :             /* SplitTuple is not used for nulls pages */
     483           2 :             SpGistInitBuffer(buffer, 0);
     484           2 :             action = BLK_NEEDS_REDO;
     485             :         }
     486             :         else
     487          52 :             action = XLogReadBufferForRedo(record, 1, &buffer);
     488          54 :         if (action == BLK_NEEDS_REDO)
     489             :         {
     490          54 :             page = BufferGetPage(buffer);
     491             : 
     492          54 :             addOrReplaceTuple(page, postfixTuple, postfixTupleHdr.size, xldata->offnumPostfix);
     493             : 
     494          54 :             PageSetLSN(page, lsn);
     495          54 :             MarkBufferDirty(buffer);
     496             :         }
     497          54 :         if (BufferIsValid(buffer))
     498          54 :             UnlockReleaseBuffer(buffer);
     499             :     }
     500             : 
     501             :     /* now handle the original page */
     502         202 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     503             :     {
     504         200 :         page = BufferGetPage(buffer);
     505             : 
     506         200 :         PageIndexTupleDelete(page, xldata->offnumPrefix);
     507         200 :         if (PageAddItem(page, prefixTuple, prefixTupleHdr.size,
     508         200 :                         xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
     509           0 :             elog(ERROR, "failed to add item of size %u to SPGiST index page",
     510             :                  prefixTupleHdr.size);
     511             : 
     512         200 :         if (xldata->postfixBlkSame)
     513         146 :             addOrReplaceTuple(page, postfixTuple, postfixTupleHdr.size, xldata->offnumPostfix);
     514             : 
     515         200 :         PageSetLSN(page, lsn);
     516         200 :         MarkBufferDirty(buffer);
     517             :     }
     518         202 :     if (BufferIsValid(buffer))
     519         202 :         UnlockReleaseBuffer(buffer);
     520         202 : }
     521             : 
     522             : static void
     523         408 : spgRedoPickSplit(XLogReaderState *record)
     524             : {
     525         408 :     XLogRecPtr  lsn = record->EndRecPtr;
     526         408 :     char       *ptr = XLogRecGetData(record);
     527         408 :     spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
     528             :     char       *innerTuple;
     529             :     SpGistInnerTupleData innerTupleHdr;
     530             :     SpGistState state;
     531             :     OffsetNumber *toDelete;
     532             :     OffsetNumber *toInsert;
     533             :     uint8      *leafPageSelect;
     534             :     Buffer      srcBuffer;
     535             :     Buffer      destBuffer;
     536             :     Buffer      innerBuffer;
     537             :     Page        srcPage;
     538             :     Page        destPage;
     539             :     Page        page;
     540             :     int         i;
     541             :     BlockNumber blknoInner;
     542             :     XLogRedoAction action;
     543             : 
     544         408 :     XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
     545             : 
     546         408 :     fillFakeState(&state, xldata->stateSrc);
     547             : 
     548         408 :     ptr += SizeOfSpgxlogPickSplit;
     549         408 :     toDelete = (OffsetNumber *) ptr;
     550         408 :     ptr += sizeof(OffsetNumber) * xldata->nDelete;
     551         408 :     toInsert = (OffsetNumber *) ptr;
     552         408 :     ptr += sizeof(OffsetNumber) * xldata->nInsert;
     553         408 :     leafPageSelect = (uint8 *) ptr;
     554         408 :     ptr += sizeof(uint8) * xldata->nInsert;
     555             : 
     556         408 :     innerTuple = ptr;
     557             :     /* the inner tuple is unaligned, so make a copy to access its header */
     558         408 :     memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
     559         408 :     ptr += innerTupleHdr.size;
     560             : 
     561             :     /* now ptr points to the list of leaf tuples */
     562             : 
     563         408 :     if (xldata->isRootSplit)
     564             :     {
     565             :         /* when splitting root, we touch it only in the guise of new inner */
     566           6 :         srcBuffer = InvalidBuffer;
     567           6 :         srcPage = NULL;
     568             :     }
     569         402 :     else if (xldata->initSrc)
     570             :     {
     571             :         /* just re-init the source page */
     572           0 :         srcBuffer = XLogInitBufferForRedo(record, 0);
     573           0 :         srcPage = BufferGetPage(srcBuffer);
     574             : 
     575           0 :         SpGistInitBuffer(srcBuffer,
     576           0 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     577             :         /* don't update LSN etc till we're done with it */
     578             :     }
     579             :     else
     580             :     {
     581             :         /*
     582             :          * Delete the specified tuples from source page.  (In case we're in
     583             :          * Hot Standby, we need to hold lock on the page till we're done
     584             :          * inserting leaf tuples and the new inner tuple, else the added
     585             :          * redirect tuple will be a dangling link.)
     586             :          */
     587         402 :         srcPage = NULL;
     588         402 :         if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
     589             :         {
     590         402 :             srcPage = BufferGetPage(srcBuffer);
     591             : 
     592             :             /*
     593             :              * We have it a bit easier here than in doPickSplit(), because we
     594             :              * know the inner tuple's location already, so we can inject the
     595             :              * correct redirection tuple now.
     596             :              */
     597         402 :             if (!state.isBuild)
     598         402 :                 spgPageIndexMultiDelete(&state, srcPage,
     599         402 :                                         toDelete, xldata->nDelete,
     600             :                                         SPGIST_REDIRECT,
     601             :                                         SPGIST_PLACEHOLDER,
     602             :                                         blknoInner,
     603         402 :                                         xldata->offnumInner);
     604             :             else
     605           0 :                 spgPageIndexMultiDelete(&state, srcPage,
     606           0 :                                         toDelete, xldata->nDelete,
     607             :                                         SPGIST_PLACEHOLDER,
     608             :                                         SPGIST_PLACEHOLDER,
     609             :                                         InvalidBlockNumber,
     610             :                                         InvalidOffsetNumber);
     611             : 
     612             :             /* don't update LSN etc till we're done with it */
     613             :         }
     614             :     }
     615             : 
     616             :     /* try to access dest page if any */
     617         408 :     if (!XLogRecHasBlockRef(record, 1))
     618             :     {
     619           0 :         destBuffer = InvalidBuffer;
     620           0 :         destPage = NULL;
     621             :     }
     622         408 :     else if (xldata->initDest)
     623             :     {
     624             :         /* just re-init the dest page */
     625         380 :         destBuffer = XLogInitBufferForRedo(record, 1);
     626         380 :         destPage = BufferGetPage(destBuffer);
     627             : 
     628         380 :         SpGistInitBuffer(destBuffer,
     629         380 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     630             :         /* don't update LSN etc till we're done with it */
     631             :     }
     632             :     else
     633             :     {
     634             :         /*
     635             :          * We could probably release the page lock immediately in the
     636             :          * full-page-image case, but for safety let's hold it till later.
     637             :          */
     638          28 :         if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
     639          26 :             destPage = BufferGetPage(destBuffer);
     640             :         else
     641           2 :             destPage = NULL;    /* don't do any page updates */
     642             :     }
     643             : 
     644             :     /* restore leaf tuples to src and/or dest page */
     645       57200 :     for (i = 0; i < xldata->nInsert; i++)
     646             :     {
     647             :         char       *leafTuple;
     648             :         SpGistLeafTupleData leafTupleHdr;
     649             : 
     650             :         /* the tuples are not aligned, so must copy to access the size field. */
     651       56792 :         leafTuple = ptr;
     652       56792 :         memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
     653       56792 :         ptr += leafTupleHdr.size;
     654             : 
     655       56792 :         page = leafPageSelect[i] ? destPage : srcPage;
     656       56792 :         if (page == NULL)
     657          96 :             continue;           /* no need to touch this page */
     658             : 
     659       56696 :         addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, toInsert[i]);
     660             :     }
     661             : 
     662             :     /* Now update src and dest page LSNs if needed */
     663         408 :     if (srcPage != NULL)
     664             :     {
     665         402 :         PageSetLSN(srcPage, lsn);
     666         402 :         MarkBufferDirty(srcBuffer);
     667             :     }
     668         408 :     if (destPage != NULL)
     669             :     {
     670         406 :         PageSetLSN(destPage, lsn);
     671         406 :         MarkBufferDirty(destBuffer);
     672             :     }
     673             : 
     674             :     /* restore new inner tuple */
     675         408 :     if (xldata->initInner)
     676             :     {
     677          12 :         innerBuffer = XLogInitBufferForRedo(record, 2);
     678          12 :         SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
     679          12 :         action = BLK_NEEDS_REDO;
     680             :     }
     681             :     else
     682         396 :         action = XLogReadBufferForRedo(record, 2, &innerBuffer);
     683             : 
     684         408 :     if (action == BLK_NEEDS_REDO)
     685             :     {
     686         370 :         page = BufferGetPage(innerBuffer);
     687             : 
     688         370 :         addOrReplaceTuple(page, innerTuple, innerTupleHdr.size, xldata->offnumInner);
     689             : 
     690             :         /* if inner is also parent, update link while we're here */
     691         370 :         if (xldata->innerIsParent)
     692             :         {
     693             :             SpGistInnerTuple parent;
     694             : 
     695         338 :             parent = (SpGistInnerTuple) PageGetItem(page,
     696         338 :                                                     PageGetItemId(page, xldata->offnumParent));
     697         338 :             spgUpdateNodeLink(parent, xldata->nodeI,
     698         338 :                               blknoInner, xldata->offnumInner);
     699             :         }
     700             : 
     701         370 :         PageSetLSN(page, lsn);
     702         370 :         MarkBufferDirty(innerBuffer);
     703             :     }
     704         408 :     if (BufferIsValid(innerBuffer))
     705         408 :         UnlockReleaseBuffer(innerBuffer);
     706             : 
     707             :     /*
     708             :      * Now we can release the leaf-page locks.  It's okay to do this before
     709             :      * updating the parent downlink.
     710             :      */
     711         408 :     if (BufferIsValid(srcBuffer))
     712         402 :         UnlockReleaseBuffer(srcBuffer);
     713         408 :     if (BufferIsValid(destBuffer))
     714         408 :         UnlockReleaseBuffer(destBuffer);
     715             : 
     716             :     /* update parent downlink, unless we did it above */
     717         408 :     if (XLogRecHasBlockRef(record, 3))
     718             :     {
     719             :         Buffer      parentBuffer;
     720             : 
     721          30 :         if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
     722             :         {
     723             :             SpGistInnerTuple parent;
     724             : 
     725          24 :             page = BufferGetPage(parentBuffer);
     726             : 
     727          24 :             parent = (SpGistInnerTuple) PageGetItem(page,
     728          24 :                                                     PageGetItemId(page, xldata->offnumParent));
     729          24 :             spgUpdateNodeLink(parent, xldata->nodeI,
     730          24 :                               blknoInner, xldata->offnumInner);
     731             : 
     732          24 :             PageSetLSN(page, lsn);
     733          24 :             MarkBufferDirty(parentBuffer);
     734             :         }
     735          30 :         if (BufferIsValid(parentBuffer))
     736          30 :             UnlockReleaseBuffer(parentBuffer);
     737             :     }
     738             :     else
     739             :         Assert(xldata->innerIsParent || xldata->isRootSplit);
     740         408 : }
     741             : 
     742             : static void
     743          52 : spgRedoVacuumLeaf(XLogReaderState *record)
     744             : {
     745          52 :     XLogRecPtr  lsn = record->EndRecPtr;
     746          52 :     char       *ptr = XLogRecGetData(record);
     747          52 :     spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
     748             :     OffsetNumber *toDead;
     749             :     OffsetNumber *toPlaceholder;
     750             :     OffsetNumber *moveSrc;
     751             :     OffsetNumber *moveDest;
     752             :     OffsetNumber *chainSrc;
     753             :     OffsetNumber *chainDest;
     754             :     SpGistState state;
     755             :     Buffer      buffer;
     756             :     Page        page;
     757             :     int         i;
     758             : 
     759          52 :     fillFakeState(&state, xldata->stateSrc);
     760             : 
     761          52 :     ptr += SizeOfSpgxlogVacuumLeaf;
     762          52 :     toDead = (OffsetNumber *) ptr;
     763          52 :     ptr += sizeof(OffsetNumber) * xldata->nDead;
     764          52 :     toPlaceholder = (OffsetNumber *) ptr;
     765          52 :     ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
     766          52 :     moveSrc = (OffsetNumber *) ptr;
     767          52 :     ptr += sizeof(OffsetNumber) * xldata->nMove;
     768          52 :     moveDest = (OffsetNumber *) ptr;
     769          52 :     ptr += sizeof(OffsetNumber) * xldata->nMove;
     770          52 :     chainSrc = (OffsetNumber *) ptr;
     771          52 :     ptr += sizeof(OffsetNumber) * xldata->nChain;
     772          52 :     chainDest = (OffsetNumber *) ptr;
     773             : 
     774          52 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     775             :     {
     776          36 :         page = BufferGetPage(buffer);
     777             : 
     778          36 :         spgPageIndexMultiDelete(&state, page,
     779          36 :                                 toDead, xldata->nDead,
     780             :                                 SPGIST_DEAD, SPGIST_DEAD,
     781             :                                 InvalidBlockNumber,
     782             :                                 InvalidOffsetNumber);
     783             : 
     784          36 :         spgPageIndexMultiDelete(&state, page,
     785          36 :                                 toPlaceholder, xldata->nPlaceholder,
     786             :                                 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
     787             :                                 InvalidBlockNumber,
     788             :                                 InvalidOffsetNumber);
     789             : 
     790             :         /* see comments in vacuumLeafPage() */
     791          72 :         for (i = 0; i < xldata->nMove; i++)
     792             :         {
     793          36 :             ItemId      idSrc = PageGetItemId(page, moveSrc[i]);
     794          36 :             ItemId      idDest = PageGetItemId(page, moveDest[i]);
     795             :             ItemIdData  tmp;
     796             : 
     797          36 :             tmp = *idSrc;
     798          36 :             *idSrc = *idDest;
     799          36 :             *idDest = tmp;
     800             :         }
     801             : 
     802          36 :         spgPageIndexMultiDelete(&state, page,
     803          36 :                                 moveSrc, xldata->nMove,
     804             :                                 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
     805             :                                 InvalidBlockNumber,
     806             :                                 InvalidOffsetNumber);
     807             : 
     808          82 :         for (i = 0; i < xldata->nChain; i++)
     809             :         {
     810             :             SpGistLeafTuple lt;
     811             : 
     812          46 :             lt = (SpGistLeafTuple) PageGetItem(page,
     813          46 :                                                PageGetItemId(page, chainSrc[i]));
     814             :             Assert(lt->tupstate == SPGIST_LIVE);
     815          46 :             SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
     816             :         }
     817             : 
     818          36 :         PageSetLSN(page, lsn);
     819          36 :         MarkBufferDirty(buffer);
     820             :     }
     821          52 :     if (BufferIsValid(buffer))
     822          52 :         UnlockReleaseBuffer(buffer);
     823          52 : }
     824             : 
     825             : static void
     826           0 : spgRedoVacuumRoot(XLogReaderState *record)
     827             : {
     828           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     829           0 :     char       *ptr = XLogRecGetData(record);
     830           0 :     spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
     831             :     OffsetNumber *toDelete;
     832             :     Buffer      buffer;
     833             :     Page        page;
     834             : 
     835           0 :     toDelete = xldata->offsets;
     836             : 
     837           0 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     838             :     {
     839           0 :         page = BufferGetPage(buffer);
     840             : 
     841             :         /* The tuple numbers are in order */
     842           0 :         PageIndexMultiDelete(page, toDelete, xldata->nDelete);
     843             : 
     844           0 :         PageSetLSN(page, lsn);
     845           0 :         MarkBufferDirty(buffer);
     846             :     }
     847           0 :     if (BufferIsValid(buffer))
     848           0 :         UnlockReleaseBuffer(buffer);
     849           0 : }
     850             : 
     851             : static void
     852         798 : spgRedoVacuumRedirect(XLogReaderState *record)
     853             : {
     854         798 :     XLogRecPtr  lsn = record->EndRecPtr;
     855         798 :     char       *ptr = XLogRecGetData(record);
     856         798 :     spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
     857             :     OffsetNumber *itemToPlaceholder;
     858             :     Buffer      buffer;
     859             : 
     860         798 :     itemToPlaceholder = xldata->offsets;
     861             : 
     862             :     /*
     863             :      * If any redirection tuples are being removed, make sure there are no
     864             :      * live Hot Standby transactions that might need to see them.
     865             :      */
     866         798 :     if (InHotStandby)
     867             :     {
     868             :         RelFileLocator locator;
     869             : 
     870         798 :         XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
     871         798 :         ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
     872         798 :                                             xldata->isCatalogRel,
     873             :                                             locator);
     874             :     }
     875             : 
     876         798 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     877             :     {
     878          62 :         Page        page = BufferGetPage(buffer);
     879          62 :         SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
     880             :         int         i;
     881             : 
     882             :         /* Convert redirect pointers to plain placeholders */
     883          68 :         for (i = 0; i < xldata->nToPlaceholder; i++)
     884             :         {
     885             :             SpGistDeadTuple dt;
     886             : 
     887           6 :             dt = (SpGistDeadTuple) PageGetItem(page,
     888           6 :                                                PageGetItemId(page, itemToPlaceholder[i]));
     889             :             Assert(dt->tupstate == SPGIST_REDIRECT);
     890           6 :             dt->tupstate = SPGIST_PLACEHOLDER;
     891           6 :             ItemPointerSetInvalid(&dt->pointer);
     892             :         }
     893             : 
     894             :         Assert(opaque->nRedirection >= xldata->nToPlaceholder);
     895          62 :         opaque->nRedirection -= xldata->nToPlaceholder;
     896          62 :         opaque->nPlaceholder += xldata->nToPlaceholder;
     897             : 
     898             :         /* Remove placeholder tuples at end of page */
     899          62 :         if (xldata->firstPlaceholder != InvalidOffsetNumber)
     900             :         {
     901          62 :             int         max = PageGetMaxOffsetNumber(page);
     902             :             OffsetNumber *toDelete;
     903             : 
     904          62 :             toDelete = palloc(sizeof(OffsetNumber) * max);
     905             : 
     906        2622 :             for (i = xldata->firstPlaceholder; i <= max; i++)
     907        2560 :                 toDelete[i - xldata->firstPlaceholder] = i;
     908             : 
     909          62 :             i = max - xldata->firstPlaceholder + 1;
     910             :             Assert(opaque->nPlaceholder >= i);
     911          62 :             opaque->nPlaceholder -= i;
     912             : 
     913             :             /* The array is sorted, so can use PageIndexMultiDelete */
     914          62 :             PageIndexMultiDelete(page, toDelete, i);
     915             : 
     916          62 :             pfree(toDelete);
     917             :         }
     918             : 
     919          62 :         PageSetLSN(page, lsn);
     920          62 :         MarkBufferDirty(buffer);
     921             :     }
     922         798 :     if (BufferIsValid(buffer))
     923         798 :         UnlockReleaseBuffer(buffer);
     924         798 : }
     925             : 
     926             : void
     927       79750 : spg_redo(XLogReaderState *record)
     928             : {
     929       79750 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     930             :     MemoryContext oldCxt;
     931             : 
     932       79750 :     oldCxt = MemoryContextSwitchTo(opCtx);
     933       79750 :     switch (info)
     934             :     {
     935       77936 :         case XLOG_SPGIST_ADD_LEAF:
     936       77936 :             spgRedoAddLeaf(record);
     937       77936 :             break;
     938         152 :         case XLOG_SPGIST_MOVE_LEAFS:
     939         152 :             spgRedoMoveLeafs(record);
     940         152 :             break;
     941         202 :         case XLOG_SPGIST_ADD_NODE:
     942         202 :             spgRedoAddNode(record);
     943         202 :             break;
     944         202 :         case XLOG_SPGIST_SPLIT_TUPLE:
     945         202 :             spgRedoSplitTuple(record);
     946         202 :             break;
     947         408 :         case XLOG_SPGIST_PICKSPLIT:
     948         408 :             spgRedoPickSplit(record);
     949         408 :             break;
     950          52 :         case XLOG_SPGIST_VACUUM_LEAF:
     951          52 :             spgRedoVacuumLeaf(record);
     952          52 :             break;
     953           0 :         case XLOG_SPGIST_VACUUM_ROOT:
     954           0 :             spgRedoVacuumRoot(record);
     955           0 :             break;
     956         798 :         case XLOG_SPGIST_VACUUM_REDIRECT:
     957         798 :             spgRedoVacuumRedirect(record);
     958         798 :             break;
     959           0 :         default:
     960           0 :             elog(PANIC, "spg_redo: unknown op code %u", info);
     961             :     }
     962             : 
     963       79750 :     MemoryContextSwitchTo(oldCxt);
     964       79750 :     MemoryContextReset(opCtx);
     965       79750 : }
     966             : 
     967             : void
     968         412 : spg_xlog_startup(void)
     969             : {
     970         412 :     opCtx = AllocSetContextCreate(CurrentMemoryContext,
     971             :                                   "SP-GiST temporary context",
     972             :                                   ALLOCSET_DEFAULT_SIZES);
     973         412 : }
     974             : 
     975             : void
     976         298 : spg_xlog_cleanup(void)
     977             : {
     978         298 :     MemoryContextDelete(opCtx);
     979         298 :     opCtx = NULL;
     980         298 : }
     981             : 
     982             : /*
     983             :  * Mask a SpGist page before performing consistency checks on it.
     984             :  */
     985             : void
     986      160032 : spg_mask(char *pagedata, BlockNumber blkno)
     987             : {
     988      160032 :     Page        page = (Page) pagedata;
     989      160032 :     PageHeader  pagehdr = (PageHeader) page;
     990             : 
     991      160032 :     mask_page_lsn_and_checksum(page);
     992             : 
     993      160032 :     mask_page_hint_bits(page);
     994             : 
     995             :     /*
     996             :      * Mask the unused space, but only if the page's pd_lower appears to have
     997             :      * been set correctly.
     998             :      */
     999      160032 :     if (pagehdr->pd_lower >= SizeOfPageHeaderData)
    1000      160032 :         mask_unused_space(page);
    1001      160032 : }

Generated by: LCOV version 1.16