LCOV - code coverage report
Current view: top level - src/backend/access/spgist - spgxlog.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 398 448 88.8 %
Date: 2025-01-18 04:15:08 Functions: 13 14 92.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * spgxlog.c
       4             :  *    WAL replay logic for SP-GiST
       5             :  *
       6             :  *
       7             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *           src/backend/access/spgist/spgxlog.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/bufmask.h"
      18             : #include "access/spgist_private.h"
      19             : #include "access/spgxlog.h"
      20             : #include "access/xlogutils.h"
      21             : #include "storage/standby.h"
      22             : #include "utils/memutils.h"
      23             : 
      24             : 
      25             : static MemoryContext opCtx;     /* working memory for operations */
      26             : 
      27             : 
      28             : /*
      29             :  * Prepare a dummy SpGistState, with just the minimum info needed for replay.
      30             :  *
      31             :  * At present, all we need is enough info to support spgFormDeadTuple(),
      32             :  * plus the isBuild flag.
      33             :  */
      34             : static void
      35         828 : fillFakeState(SpGistState *state, spgxlogState stateSrc)
      36             : {
      37         828 :     memset(state, 0, sizeof(*state));
      38             : 
      39         828 :     state->redirectXid = stateSrc.redirectXid;
      40         828 :     state->isBuild = stateSrc.isBuild;
      41         828 :     state->deadTupleStorage = palloc0(SGDTSIZE);
      42         828 : }
      43             : 
      44             : /*
      45             :  * Add a leaf tuple, or replace an existing placeholder tuple.  This is used
      46             :  * to replay SpGistPageAddNewItem() operations.  If the offset points at an
      47             :  * existing tuple, it had better be a placeholder tuple.
      48             :  */
      49             : static void
      50      145196 : addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
      51             : {
      52      145196 :     if (offset <= PageGetMaxOffsetNumber(page))
      53             :     {
      54       36648 :         SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
      55             :                                                            PageGetItemId(page, offset));
      56             : 
      57       36648 :         if (dt->tupstate != SPGIST_PLACEHOLDER)
      58           0 :             elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
      59             : 
      60             :         Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
      61       36648 :         SpGistPageGetOpaque(page)->nPlaceholder--;
      62             : 
      63       36648 :         PageIndexTupleDelete(page, offset);
      64             :     }
      65             : 
      66             :     Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
      67             : 
      68      145196 :     if (PageAddItem(page, tuple, size, offset, false, false) != offset)
      69           0 :         elog(ERROR, "failed to add item of size %u to SPGiST index page",
      70             :              size);
      71      145196 : }
      72             : 
      73             : static void
      74       78000 : spgRedoAddLeaf(XLogReaderState *record)
      75             : {
      76       78000 :     XLogRecPtr  lsn = record->EndRecPtr;
      77       78000 :     char       *ptr = XLogRecGetData(record);
      78       78000 :     spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
      79             :     char       *leafTuple;
      80             :     SpGistLeafTupleData leafTupleHdr;
      81             :     Buffer      buffer;
      82             :     Page        page;
      83             :     XLogRedoAction action;
      84             : 
      85       78000 :     ptr += sizeof(spgxlogAddLeaf);
      86       78000 :     leafTuple = ptr;
      87             :     /* the leaf tuple is unaligned, so make a copy to access its header */
      88       78000 :     memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
      89             : 
      90             :     /*
      91             :      * In normal operation we would have both current and parent pages locked
      92             :      * simultaneously; but in WAL replay it should be safe to update the leaf
      93             :      * page before updating the parent.
      94             :      */
      95       78000 :     if (xldata->newPage)
      96             :     {
      97           0 :         buffer = XLogInitBufferForRedo(record, 0);
      98           0 :         SpGistInitBuffer(buffer,
      99           0 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     100           0 :         action = BLK_NEEDS_REDO;
     101             :     }
     102             :     else
     103       78000 :         action = XLogReadBufferForRedo(record, 0, &buffer);
     104             : 
     105       78000 :     if (action == BLK_NEEDS_REDO)
     106             :     {
     107       77642 :         page = BufferGetPage(buffer);
     108             : 
     109             :         /* insert new tuple */
     110       77642 :         if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
     111             :         {
     112             :             /* normal cases, tuple was added by SpGistPageAddNewItem */
     113       77642 :             addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
     114       77642 :                               xldata->offnumLeaf);
     115             : 
     116             :             /* update head tuple's chain link if needed */
     117       77642 :             if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
     118             :             {
     119             :                 SpGistLeafTuple head;
     120             : 
     121       76362 :                 head = (SpGistLeafTuple) PageGetItem(page,
     122       76362 :                                                      PageGetItemId(page, xldata->offnumHeadLeaf));
     123             :                 Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
     124       76362 :                 SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
     125             :             }
     126             :         }
     127             :         else
     128             :         {
     129             :             /* replacing a DEAD tuple */
     130           0 :             PageIndexTupleDelete(page, xldata->offnumLeaf);
     131           0 :             if (PageAddItem(page,
     132             :                             (Item) leafTuple, leafTupleHdr.size,
     133           0 :                             xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
     134           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     135             :                      leafTupleHdr.size);
     136             :         }
     137             : 
     138       77642 :         PageSetLSN(page, lsn);
     139       77642 :         MarkBufferDirty(buffer);
     140             :     }
     141       78000 :     if (BufferIsValid(buffer))
     142       78000 :         UnlockReleaseBuffer(buffer);
     143             : 
     144             :     /* update parent downlink if necessary */
     145       78000 :     if (xldata->offnumParent != InvalidOffsetNumber)
     146             :     {
     147         240 :         if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
     148             :         {
     149             :             SpGistInnerTuple tuple;
     150             :             BlockNumber blknoLeaf;
     151             : 
     152         240 :             XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
     153             : 
     154         240 :             page = BufferGetPage(buffer);
     155             : 
     156         240 :             tuple = (SpGistInnerTuple) PageGetItem(page,
     157         240 :                                                    PageGetItemId(page, xldata->offnumParent));
     158             : 
     159         240 :             spgUpdateNodeLink(tuple, xldata->nodeI,
     160         240 :                               blknoLeaf, xldata->offnumLeaf);
     161             : 
     162         240 :             PageSetLSN(page, lsn);
     163         240 :             MarkBufferDirty(buffer);
     164             :         }
     165         240 :         if (BufferIsValid(buffer))
     166         240 :             UnlockReleaseBuffer(buffer);
     167             :     }
     168       78000 : }
     169             : 
     170             : static void
     171         152 : spgRedoMoveLeafs(XLogReaderState *record)
     172             : {
     173         152 :     XLogRecPtr  lsn = record->EndRecPtr;
     174         152 :     char       *ptr = XLogRecGetData(record);
     175         152 :     spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
     176             :     SpGistState state;
     177             :     OffsetNumber *toDelete;
     178             :     OffsetNumber *toInsert;
     179             :     int         nInsert;
     180             :     Buffer      buffer;
     181             :     Page        page;
     182             :     XLogRedoAction action;
     183             :     BlockNumber blknoDst;
     184             : 
     185         152 :     XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
     186             : 
     187         152 :     fillFakeState(&state, xldata->stateSrc);
     188             : 
     189         152 :     nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
     190             : 
     191         152 :     ptr += SizeOfSpgxlogMoveLeafs;
     192         152 :     toDelete = (OffsetNumber *) ptr;
     193         152 :     ptr += sizeof(OffsetNumber) * xldata->nMoves;
     194         152 :     toInsert = (OffsetNumber *) ptr;
     195         152 :     ptr += sizeof(OffsetNumber) * nInsert;
     196             : 
     197             :     /* now ptr points to the list of leaf tuples */
     198             : 
     199             :     /*
     200             :      * In normal operation we would have all three pages (source, dest, and
     201             :      * parent) locked simultaneously; but in WAL replay it should be safe to
     202             :      * update them one at a time, as long as we do it in the right order.
     203             :      */
     204             : 
     205             :     /* Insert tuples on the dest page (do first, so redirect is valid) */
     206         152 :     if (xldata->newPage)
     207             :     {
     208          64 :         buffer = XLogInitBufferForRedo(record, 1);
     209          64 :         SpGistInitBuffer(buffer,
     210          64 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     211          64 :         action = BLK_NEEDS_REDO;
     212             :     }
     213             :     else
     214          88 :         action = XLogReadBufferForRedo(record, 1, &buffer);
     215             : 
     216         152 :     if (action == BLK_NEEDS_REDO)
     217             :     {
     218             :         int         i;
     219             : 
     220         150 :         page = BufferGetPage(buffer);
     221             : 
     222        6638 :         for (i = 0; i < nInsert; i++)
     223             :         {
     224             :             char       *leafTuple;
     225             :             SpGistLeafTupleData leafTupleHdr;
     226             : 
     227             :             /*
     228             :              * the tuples are not aligned, so must copy to access the size
     229             :              * field.
     230             :              */
     231        6488 :             leafTuple = ptr;
     232        6488 :             memcpy(&leafTupleHdr, leafTuple,
     233             :                    sizeof(SpGistLeafTupleData));
     234             : 
     235        6488 :             addOrReplaceTuple(page, (Item) leafTuple,
     236        6488 :                               leafTupleHdr.size, toInsert[i]);
     237        6488 :             ptr += leafTupleHdr.size;
     238             :         }
     239             : 
     240         150 :         PageSetLSN(page, lsn);
     241         150 :         MarkBufferDirty(buffer);
     242             :     }
     243         152 :     if (BufferIsValid(buffer))
     244         152 :         UnlockReleaseBuffer(buffer);
     245             : 
     246             :     /* Delete tuples from the source page, inserting a redirection pointer */
     247         152 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     248             :     {
     249         152 :         page = BufferGetPage(buffer);
     250             : 
     251         152 :         spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
     252         152 :                                 state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
     253             :                                 SPGIST_PLACEHOLDER,
     254             :                                 blknoDst,
     255         152 :                                 toInsert[nInsert - 1]);
     256             : 
     257         152 :         PageSetLSN(page, lsn);
     258         152 :         MarkBufferDirty(buffer);
     259             :     }
     260         152 :     if (BufferIsValid(buffer))
     261         152 :         UnlockReleaseBuffer(buffer);
     262             : 
     263             :     /* And update the parent downlink */
     264         152 :     if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
     265             :     {
     266             :         SpGistInnerTuple tuple;
     267             : 
     268         148 :         page = BufferGetPage(buffer);
     269             : 
     270         148 :         tuple = (SpGistInnerTuple) PageGetItem(page,
     271         148 :                                                PageGetItemId(page, xldata->offnumParent));
     272             : 
     273         148 :         spgUpdateNodeLink(tuple, xldata->nodeI,
     274         148 :                           blknoDst, toInsert[nInsert - 1]);
     275             : 
     276         148 :         PageSetLSN(page, lsn);
     277         148 :         MarkBufferDirty(buffer);
     278             :     }
     279         152 :     if (BufferIsValid(buffer))
     280         152 :         UnlockReleaseBuffer(buffer);
     281         152 : }
     282             : 
     283             : static void
     284         202 : spgRedoAddNode(XLogReaderState *record)
     285             : {
     286         202 :     XLogRecPtr  lsn = record->EndRecPtr;
     287         202 :     char       *ptr = XLogRecGetData(record);
     288         202 :     spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
     289             :     char       *innerTuple;
     290             :     SpGistInnerTupleData innerTupleHdr;
     291             :     SpGistState state;
     292             :     Buffer      buffer;
     293             :     Page        page;
     294             :     XLogRedoAction action;
     295             : 
     296         202 :     ptr += sizeof(spgxlogAddNode);
     297         202 :     innerTuple = ptr;
     298             :     /* the tuple is unaligned, so make a copy to access its header */
     299         202 :     memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
     300             : 
     301         202 :     fillFakeState(&state, xldata->stateSrc);
     302             : 
     303         202 :     if (!XLogRecHasBlockRef(record, 1))
     304             :     {
     305             :         /* update in place */
     306             :         Assert(xldata->parentBlk == -1);
     307         200 :         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     308             :         {
     309         200 :             page = BufferGetPage(buffer);
     310             : 
     311         200 :             PageIndexTupleDelete(page, xldata->offnum);
     312         200 :             if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
     313             :                             xldata->offnum,
     314         200 :                             false, false) != xldata->offnum)
     315           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     316             :                      innerTupleHdr.size);
     317             : 
     318         200 :             PageSetLSN(page, lsn);
     319         200 :             MarkBufferDirty(buffer);
     320             :         }
     321         200 :         if (BufferIsValid(buffer))
     322         200 :             UnlockReleaseBuffer(buffer);
     323             :     }
     324             :     else
     325             :     {
     326             :         BlockNumber blkno;
     327             :         BlockNumber blknoNew;
     328             : 
     329           2 :         XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
     330           2 :         XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
     331             : 
     332             :         /*
     333             :          * In normal operation we would have all three pages (source, dest,
     334             :          * and parent) locked simultaneously; but in WAL replay it should be
     335             :          * safe to update them one at a time, as long as we do it in the right
     336             :          * order. We must insert the new tuple before replacing the old tuple
     337             :          * with the redirect tuple.
     338             :          */
     339             : 
     340             :         /* Install new tuple first so redirect is valid */
     341           2 :         if (xldata->newPage)
     342             :         {
     343             :             /* AddNode is not used for nulls pages */
     344           2 :             buffer = XLogInitBufferForRedo(record, 1);
     345           2 :             SpGistInitBuffer(buffer, 0);
     346           2 :             action = BLK_NEEDS_REDO;
     347             :         }
     348             :         else
     349           0 :             action = XLogReadBufferForRedo(record, 1, &buffer);
     350           2 :         if (action == BLK_NEEDS_REDO)
     351             :         {
     352           2 :             page = BufferGetPage(buffer);
     353             : 
     354           2 :             addOrReplaceTuple(page, (Item) innerTuple,
     355           2 :                               innerTupleHdr.size, xldata->offnumNew);
     356             : 
     357             :             /*
     358             :              * If parent is in this same page, update it now.
     359             :              */
     360           2 :             if (xldata->parentBlk == 1)
     361             :             {
     362             :                 SpGistInnerTuple parentTuple;
     363             : 
     364           0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     365           0 :                                                              PageGetItemId(page, xldata->offnumParent));
     366             : 
     367           0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     368           0 :                                   blknoNew, xldata->offnumNew);
     369             :             }
     370           2 :             PageSetLSN(page, lsn);
     371           2 :             MarkBufferDirty(buffer);
     372             :         }
     373           2 :         if (BufferIsValid(buffer))
     374           2 :             UnlockReleaseBuffer(buffer);
     375             : 
     376             :         /* Delete old tuple, replacing it with redirect or placeholder tuple */
     377           2 :         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     378             :         {
     379             :             SpGistDeadTuple dt;
     380             : 
     381           2 :             page = BufferGetPage(buffer);
     382             : 
     383           2 :             if (state.isBuild)
     384           0 :                 dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
     385             :                                       InvalidBlockNumber,
     386             :                                       InvalidOffsetNumber);
     387             :             else
     388           2 :                 dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
     389             :                                       blknoNew,
     390           2 :                                       xldata->offnumNew);
     391             : 
     392           2 :             PageIndexTupleDelete(page, xldata->offnum);
     393           2 :             if (PageAddItem(page, (Item) dt, dt->size,
     394             :                             xldata->offnum,
     395           2 :                             false, false) != xldata->offnum)
     396           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     397             :                      dt->size);
     398             : 
     399           2 :             if (state.isBuild)
     400           0 :                 SpGistPageGetOpaque(page)->nPlaceholder++;
     401             :             else
     402           2 :                 SpGistPageGetOpaque(page)->nRedirection++;
     403             : 
     404             :             /*
     405             :              * If parent is in this same page, update it now.
     406             :              */
     407           2 :             if (xldata->parentBlk == 0)
     408             :             {
     409             :                 SpGistInnerTuple parentTuple;
     410             : 
     411           0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     412           0 :                                                              PageGetItemId(page, xldata->offnumParent));
     413             : 
     414           0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     415           0 :                                   blknoNew, xldata->offnumNew);
     416             :             }
     417           2 :             PageSetLSN(page, lsn);
     418           2 :             MarkBufferDirty(buffer);
     419             :         }
     420           2 :         if (BufferIsValid(buffer))
     421           2 :             UnlockReleaseBuffer(buffer);
     422             : 
     423             :         /*
     424             :          * Update parent downlink (if we didn't do it as part of the source or
     425             :          * destination page update already).
     426             :          */
     427           2 :         if (xldata->parentBlk == 2)
     428             :         {
     429           2 :             if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
     430             :             {
     431             :                 SpGistInnerTuple parentTuple;
     432             : 
     433           2 :                 page = BufferGetPage(buffer);
     434             : 
     435           2 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     436           2 :                                                              PageGetItemId(page, xldata->offnumParent));
     437             : 
     438           2 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     439           2 :                                   blknoNew, xldata->offnumNew);
     440             : 
     441           2 :                 PageSetLSN(page, lsn);
     442           2 :                 MarkBufferDirty(buffer);
     443             :             }
     444           2 :             if (BufferIsValid(buffer))
     445           2 :                 UnlockReleaseBuffer(buffer);
     446             :         }
     447             :     }
     448         202 : }
     449             : 
     450             : static void
     451         202 : spgRedoSplitTuple(XLogReaderState *record)
     452             : {
     453         202 :     XLogRecPtr  lsn = record->EndRecPtr;
     454         202 :     char       *ptr = XLogRecGetData(record);
     455         202 :     spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
     456             :     char       *prefixTuple;
     457             :     SpGistInnerTupleData prefixTupleHdr;
     458             :     char       *postfixTuple;
     459             :     SpGistInnerTupleData postfixTupleHdr;
     460             :     Buffer      buffer;
     461             :     Page        page;
     462             :     XLogRedoAction action;
     463             : 
     464         202 :     ptr += sizeof(spgxlogSplitTuple);
     465         202 :     prefixTuple = ptr;
     466             :     /* the prefix tuple is unaligned, so make a copy to access its header */
     467         202 :     memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
     468         202 :     ptr += prefixTupleHdr.size;
     469         202 :     postfixTuple = ptr;
     470             :     /* postfix tuple is also unaligned */
     471         202 :     memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
     472             : 
     473             :     /*
     474             :      * In normal operation we would have both pages locked simultaneously; but
     475             :      * in WAL replay it should be safe to update them one at a time, as long
     476             :      * as we do it in the right order.
     477             :      */
     478             : 
     479             :     /* insert postfix tuple first to avoid dangling link */
     480         202 :     if (!xldata->postfixBlkSame)
     481             :     {
     482          54 :         if (xldata->newPage)
     483             :         {
     484           2 :             buffer = XLogInitBufferForRedo(record, 1);
     485             :             /* SplitTuple is not used for nulls pages */
     486           2 :             SpGistInitBuffer(buffer, 0);
     487           2 :             action = BLK_NEEDS_REDO;
     488             :         }
     489             :         else
     490          52 :             action = XLogReadBufferForRedo(record, 1, &buffer);
     491          54 :         if (action == BLK_NEEDS_REDO)
     492             :         {
     493          54 :             page = BufferGetPage(buffer);
     494             : 
     495          54 :             addOrReplaceTuple(page, (Item) postfixTuple,
     496          54 :                               postfixTupleHdr.size, xldata->offnumPostfix);
     497             : 
     498          54 :             PageSetLSN(page, lsn);
     499          54 :             MarkBufferDirty(buffer);
     500             :         }
     501          54 :         if (BufferIsValid(buffer))
     502          54 :             UnlockReleaseBuffer(buffer);
     503             :     }
     504             : 
     505             :     /* now handle the original page */
     506         202 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     507             :     {
     508         202 :         page = BufferGetPage(buffer);
     509             : 
     510         202 :         PageIndexTupleDelete(page, xldata->offnumPrefix);
     511         202 :         if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
     512         202 :                         xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
     513           0 :             elog(ERROR, "failed to add item of size %u to SPGiST index page",
     514             :                  prefixTupleHdr.size);
     515             : 
     516         202 :         if (xldata->postfixBlkSame)
     517         148 :             addOrReplaceTuple(page, (Item) postfixTuple,
     518         148 :                               postfixTupleHdr.size,
     519         148 :                               xldata->offnumPostfix);
     520             : 
     521         202 :         PageSetLSN(page, lsn);
     522         202 :         MarkBufferDirty(buffer);
     523             :     }
     524         202 :     if (BufferIsValid(buffer))
     525         202 :         UnlockReleaseBuffer(buffer);
     526         202 : }
     527             : 
     528             : static void
     529         434 : spgRedoPickSplit(XLogReaderState *record)
     530             : {
     531         434 :     XLogRecPtr  lsn = record->EndRecPtr;
     532         434 :     char       *ptr = XLogRecGetData(record);
     533         434 :     spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
     534             :     char       *innerTuple;
     535             :     SpGistInnerTupleData innerTupleHdr;
     536             :     SpGistState state;
     537             :     OffsetNumber *toDelete;
     538             :     OffsetNumber *toInsert;
     539             :     uint8      *leafPageSelect;
     540             :     Buffer      srcBuffer;
     541             :     Buffer      destBuffer;
     542             :     Buffer      innerBuffer;
     543             :     Page        srcPage;
     544             :     Page        destPage;
     545             :     Page        page;
     546             :     int         i;
     547             :     BlockNumber blknoInner;
     548             :     XLogRedoAction action;
     549             : 
     550         434 :     XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
     551             : 
     552         434 :     fillFakeState(&state, xldata->stateSrc);
     553             : 
     554         434 :     ptr += SizeOfSpgxlogPickSplit;
     555         434 :     toDelete = (OffsetNumber *) ptr;
     556         434 :     ptr += sizeof(OffsetNumber) * xldata->nDelete;
     557         434 :     toInsert = (OffsetNumber *) ptr;
     558         434 :     ptr += sizeof(OffsetNumber) * xldata->nInsert;
     559         434 :     leafPageSelect = (uint8 *) ptr;
     560         434 :     ptr += sizeof(uint8) * xldata->nInsert;
     561             : 
     562         434 :     innerTuple = ptr;
     563             :     /* the inner tuple is unaligned, so make a copy to access its header */
     564         434 :     memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
     565         434 :     ptr += innerTupleHdr.size;
     566             : 
     567             :     /* now ptr points to the list of leaf tuples */
     568             : 
     569         434 :     if (xldata->isRootSplit)
     570             :     {
     571             :         /* when splitting root, we touch it only in the guise of new inner */
     572           6 :         srcBuffer = InvalidBuffer;
     573           6 :         srcPage = NULL;
     574             :     }
     575         428 :     else if (xldata->initSrc)
     576             :     {
     577             :         /* just re-init the source page */
     578           0 :         srcBuffer = XLogInitBufferForRedo(record, 0);
     579           0 :         srcPage = (Page) BufferGetPage(srcBuffer);
     580             : 
     581           0 :         SpGistInitBuffer(srcBuffer,
     582           0 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     583             :         /* don't update LSN etc till we're done with it */
     584             :     }
     585             :     else
     586             :     {
     587             :         /*
     588             :          * Delete the specified tuples from source page.  (In case we're in
     589             :          * Hot Standby, we need to hold lock on the page till we're done
     590             :          * inserting leaf tuples and the new inner tuple, else the added
     591             :          * redirect tuple will be a dangling link.)
     592             :          */
     593         428 :         srcPage = NULL;
     594         428 :         if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
     595             :         {
     596         424 :             srcPage = BufferGetPage(srcBuffer);
     597             : 
     598             :             /*
     599             :              * We have it a bit easier here than in doPickSplit(), because we
     600             :              * know the inner tuple's location already, so we can inject the
     601             :              * correct redirection tuple now.
     602             :              */
     603         424 :             if (!state.isBuild)
     604         424 :                 spgPageIndexMultiDelete(&state, srcPage,
     605         424 :                                         toDelete, xldata->nDelete,
     606             :                                         SPGIST_REDIRECT,
     607             :                                         SPGIST_PLACEHOLDER,
     608             :                                         blknoInner,
     609         424 :                                         xldata->offnumInner);
     610             :             else
     611           0 :                 spgPageIndexMultiDelete(&state, srcPage,
     612           0 :                                         toDelete, xldata->nDelete,
     613             :                                         SPGIST_PLACEHOLDER,
     614             :                                         SPGIST_PLACEHOLDER,
     615             :                                         InvalidBlockNumber,
     616             :                                         InvalidOffsetNumber);
     617             : 
     618             :             /* don't update LSN etc till we're done with it */
     619             :         }
     620             :     }
     621             : 
     622             :     /* try to access dest page if any */
     623         434 :     if (!XLogRecHasBlockRef(record, 1))
     624             :     {
     625           0 :         destBuffer = InvalidBuffer;
     626           0 :         destPage = NULL;
     627             :     }
     628         434 :     else if (xldata->initDest)
     629             :     {
     630             :         /* just re-init the dest page */
     631         404 :         destBuffer = XLogInitBufferForRedo(record, 1);
     632         404 :         destPage = (Page) BufferGetPage(destBuffer);
     633             : 
     634         404 :         SpGistInitBuffer(destBuffer,
     635         404 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     636             :         /* don't update LSN etc till we're done with it */
     637             :     }
     638             :     else
     639             :     {
     640             :         /*
     641             :          * We could probably release the page lock immediately in the
     642             :          * full-page-image case, but for safety let's hold it till later.
     643             :          */
     644          30 :         if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
     645          26 :             destPage = (Page) BufferGetPage(destBuffer);
     646             :         else
     647           4 :             destPage = NULL;    /* don't do any page updates */
     648             :     }
     649             : 
     650             :     /* restore leaf tuples to src and/or dest page */
     651       61208 :     for (i = 0; i < xldata->nInsert; i++)
     652             :     {
     653             :         char       *leafTuple;
     654             :         SpGistLeafTupleData leafTupleHdr;
     655             : 
     656             :         /* the tuples are not aligned, so must copy to access the size field. */
     657       60774 :         leafTuple = ptr;
     658       60774 :         memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
     659       60774 :         ptr += leafTupleHdr.size;
     660             : 
     661       60774 :         page = leafPageSelect[i] ? destPage : srcPage;
     662       60774 :         if (page == NULL)
     663         314 :             continue;           /* no need to touch this page */
     664             : 
     665       60460 :         addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
     666       60460 :                           toInsert[i]);
     667             :     }
     668             : 
     669             :     /* Now update src and dest page LSNs if needed */
     670         434 :     if (srcPage != NULL)
     671             :     {
     672         424 :         PageSetLSN(srcPage, lsn);
     673         424 :         MarkBufferDirty(srcBuffer);
     674             :     }
     675         434 :     if (destPage != NULL)
     676             :     {
     677         430 :         PageSetLSN(destPage, lsn);
     678         430 :         MarkBufferDirty(destBuffer);
     679             :     }
     680             : 
     681             :     /* restore new inner tuple */
     682         434 :     if (xldata->initInner)
     683             :     {
     684          12 :         innerBuffer = XLogInitBufferForRedo(record, 2);
     685          12 :         SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
     686          12 :         action = BLK_NEEDS_REDO;
     687             :     }
     688             :     else
     689         422 :         action = XLogReadBufferForRedo(record, 2, &innerBuffer);
     690             : 
     691         434 :     if (action == BLK_NEEDS_REDO)
     692             :     {
     693         402 :         page = BufferGetPage(innerBuffer);
     694             : 
     695         402 :         addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
     696         402 :                           xldata->offnumInner);
     697             : 
     698             :         /* if inner is also parent, update link while we're here */
     699         402 :         if (xldata->innerIsParent)
     700             :         {
     701             :             SpGistInnerTuple parent;
     702             : 
     703         364 :             parent = (SpGistInnerTuple) PageGetItem(page,
     704         364 :                                                     PageGetItemId(page, xldata->offnumParent));
     705         364 :             spgUpdateNodeLink(parent, xldata->nodeI,
     706         364 :                               blknoInner, xldata->offnumInner);
     707             :         }
     708             : 
     709         402 :         PageSetLSN(page, lsn);
     710         402 :         MarkBufferDirty(innerBuffer);
     711             :     }
     712         434 :     if (BufferIsValid(innerBuffer))
     713         434 :         UnlockReleaseBuffer(innerBuffer);
     714             : 
     715             :     /*
     716             :      * Now we can release the leaf-page locks.  It's okay to do this before
     717             :      * updating the parent downlink.
     718             :      */
     719         434 :     if (BufferIsValid(srcBuffer))
     720         428 :         UnlockReleaseBuffer(srcBuffer);
     721         434 :     if (BufferIsValid(destBuffer))
     722         434 :         UnlockReleaseBuffer(destBuffer);
     723             : 
     724             :     /* update parent downlink, unless we did it above */
     725         434 :     if (XLogRecHasBlockRef(record, 3))
     726             :     {
     727             :         Buffer      parentBuffer;
     728             : 
     729          32 :         if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
     730             :         {
     731             :             SpGistInnerTuple parent;
     732             : 
     733          26 :             page = BufferGetPage(parentBuffer);
     734             : 
     735          26 :             parent = (SpGistInnerTuple) PageGetItem(page,
     736          26 :                                                     PageGetItemId(page, xldata->offnumParent));
     737          26 :             spgUpdateNodeLink(parent, xldata->nodeI,
     738          26 :                               blknoInner, xldata->offnumInner);
     739             : 
     740          26 :             PageSetLSN(page, lsn);
     741          26 :             MarkBufferDirty(parentBuffer);
     742             :         }
     743          32 :         if (BufferIsValid(parentBuffer))
     744          32 :             UnlockReleaseBuffer(parentBuffer);
     745             :     }
     746             :     else
     747             :         Assert(xldata->innerIsParent || xldata->isRootSplit);
     748         434 : }
     749             : 
     750             : static void
     751          40 : spgRedoVacuumLeaf(XLogReaderState *record)
     752             : {
     753          40 :     XLogRecPtr  lsn = record->EndRecPtr;
     754          40 :     char       *ptr = XLogRecGetData(record);
     755          40 :     spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
     756             :     OffsetNumber *toDead;
     757             :     OffsetNumber *toPlaceholder;
     758             :     OffsetNumber *moveSrc;
     759             :     OffsetNumber *moveDest;
     760             :     OffsetNumber *chainSrc;
     761             :     OffsetNumber *chainDest;
     762             :     SpGistState state;
     763             :     Buffer      buffer;
     764             :     Page        page;
     765             :     int         i;
     766             : 
     767          40 :     fillFakeState(&state, xldata->stateSrc);
     768             : 
     769          40 :     ptr += SizeOfSpgxlogVacuumLeaf;
     770          40 :     toDead = (OffsetNumber *) ptr;
     771          40 :     ptr += sizeof(OffsetNumber) * xldata->nDead;
     772          40 :     toPlaceholder = (OffsetNumber *) ptr;
     773          40 :     ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
     774          40 :     moveSrc = (OffsetNumber *) ptr;
     775          40 :     ptr += sizeof(OffsetNumber) * xldata->nMove;
     776          40 :     moveDest = (OffsetNumber *) ptr;
     777          40 :     ptr += sizeof(OffsetNumber) * xldata->nMove;
     778          40 :     chainSrc = (OffsetNumber *) ptr;
     779          40 :     ptr += sizeof(OffsetNumber) * xldata->nChain;
     780          40 :     chainDest = (OffsetNumber *) ptr;
     781             : 
     782          40 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     783             :     {
     784          14 :         page = BufferGetPage(buffer);
     785             : 
     786          14 :         spgPageIndexMultiDelete(&state, page,
     787          14 :                                 toDead, xldata->nDead,
     788             :                                 SPGIST_DEAD, SPGIST_DEAD,
     789             :                                 InvalidBlockNumber,
     790             :                                 InvalidOffsetNumber);
     791             : 
     792          14 :         spgPageIndexMultiDelete(&state, page,
     793          14 :                                 toPlaceholder, xldata->nPlaceholder,
     794             :                                 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
     795             :                                 InvalidBlockNumber,
     796             :                                 InvalidOffsetNumber);
     797             : 
     798             :         /* see comments in vacuumLeafPage() */
     799          28 :         for (i = 0; i < xldata->nMove; i++)
     800             :         {
     801          14 :             ItemId      idSrc = PageGetItemId(page, moveSrc[i]);
     802          14 :             ItemId      idDest = PageGetItemId(page, moveDest[i]);
     803             :             ItemIdData  tmp;
     804             : 
     805          14 :             tmp = *idSrc;
     806          14 :             *idSrc = *idDest;
     807          14 :             *idDest = tmp;
     808             :         }
     809             : 
     810          14 :         spgPageIndexMultiDelete(&state, page,
     811          14 :                                 moveSrc, xldata->nMove,
     812             :                                 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
     813             :                                 InvalidBlockNumber,
     814             :                                 InvalidOffsetNumber);
     815             : 
     816          28 :         for (i = 0; i < xldata->nChain; i++)
     817             :         {
     818             :             SpGistLeafTuple lt;
     819             : 
     820          14 :             lt = (SpGistLeafTuple) PageGetItem(page,
     821          14 :                                                PageGetItemId(page, chainSrc[i]));
     822             :             Assert(lt->tupstate == SPGIST_LIVE);
     823          14 :             SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
     824             :         }
     825             : 
     826          14 :         PageSetLSN(page, lsn);
     827          14 :         MarkBufferDirty(buffer);
     828             :     }
     829          40 :     if (BufferIsValid(buffer))
     830          40 :         UnlockReleaseBuffer(buffer);
     831          40 : }
     832             : 
     833             : static void
     834           0 : spgRedoVacuumRoot(XLogReaderState *record)
     835             : {
     836           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     837           0 :     char       *ptr = XLogRecGetData(record);
     838           0 :     spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
     839             :     OffsetNumber *toDelete;
     840             :     Buffer      buffer;
     841             :     Page        page;
     842             : 
     843           0 :     toDelete = xldata->offsets;
     844             : 
     845           0 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     846             :     {
     847           0 :         page = BufferGetPage(buffer);
     848             : 
     849             :         /* The tuple numbers are in order */
     850           0 :         PageIndexMultiDelete(page, toDelete, xldata->nDelete);
     851             : 
     852           0 :         PageSetLSN(page, lsn);
     853           0 :         MarkBufferDirty(buffer);
     854             :     }
     855           0 :     if (BufferIsValid(buffer))
     856           0 :         UnlockReleaseBuffer(buffer);
     857           0 : }
     858             : 
     859             : static void
     860         736 : spgRedoVacuumRedirect(XLogReaderState *record)
     861             : {
     862         736 :     XLogRecPtr  lsn = record->EndRecPtr;
     863         736 :     char       *ptr = XLogRecGetData(record);
     864         736 :     spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
     865             :     OffsetNumber *itemToPlaceholder;
     866             :     Buffer      buffer;
     867             : 
     868         736 :     itemToPlaceholder = xldata->offsets;
     869             : 
     870             :     /*
     871             :      * If any redirection tuples are being removed, make sure there are no
     872             :      * live Hot Standby transactions that might need to see them.
     873             :      */
     874         736 :     if (InHotStandby)
     875             :     {
     876             :         RelFileLocator locator;
     877             : 
     878         736 :         XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
     879         736 :         ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
     880         736 :                                             xldata->isCatalogRel,
     881             :                                             locator);
     882             :     }
     883             : 
     884         736 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     885             :     {
     886          60 :         Page        page = BufferGetPage(buffer);
     887          60 :         SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
     888             :         int         i;
     889             : 
     890             :         /* Convert redirect pointers to plain placeholders */
     891          84 :         for (i = 0; i < xldata->nToPlaceholder; i++)
     892             :         {
     893             :             SpGistDeadTuple dt;
     894             : 
     895          24 :             dt = (SpGistDeadTuple) PageGetItem(page,
     896          24 :                                                PageGetItemId(page, itemToPlaceholder[i]));
     897             :             Assert(dt->tupstate == SPGIST_REDIRECT);
     898          24 :             dt->tupstate = SPGIST_PLACEHOLDER;
     899          24 :             ItemPointerSetInvalid(&dt->pointer);
     900             :         }
     901             : 
     902             :         Assert(opaque->nRedirection >= xldata->nToPlaceholder);
     903          60 :         opaque->nRedirection -= xldata->nToPlaceholder;
     904          60 :         opaque->nPlaceholder += xldata->nToPlaceholder;
     905             : 
     906             :         /* Remove placeholder tuples at end of page */
     907          60 :         if (xldata->firstPlaceholder != InvalidOffsetNumber)
     908             :         {
     909          60 :             int         max = PageGetMaxOffsetNumber(page);
     910             :             OffsetNumber *toDelete;
     911             : 
     912          60 :             toDelete = palloc(sizeof(OffsetNumber) * max);
     913             : 
     914        4056 :             for (i = xldata->firstPlaceholder; i <= max; i++)
     915        3996 :                 toDelete[i - xldata->firstPlaceholder] = i;
     916             : 
     917          60 :             i = max - xldata->firstPlaceholder + 1;
     918             :             Assert(opaque->nPlaceholder >= i);
     919          60 :             opaque->nPlaceholder -= i;
     920             : 
     921             :             /* The array is sorted, so can use PageIndexMultiDelete */
     922          60 :             PageIndexMultiDelete(page, toDelete, i);
     923             : 
     924          60 :             pfree(toDelete);
     925             :         }
     926             : 
     927          60 :         PageSetLSN(page, lsn);
     928          60 :         MarkBufferDirty(buffer);
     929             :     }
     930         736 :     if (BufferIsValid(buffer))
     931         736 :         UnlockReleaseBuffer(buffer);
     932         736 : }
     933             : 
     934             : void
     935       79766 : spg_redo(XLogReaderState *record)
     936             : {
     937       79766 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     938             :     MemoryContext oldCxt;
     939             : 
     940       79766 :     oldCxt = MemoryContextSwitchTo(opCtx);
     941       79766 :     switch (info)
     942             :     {
     943       78000 :         case XLOG_SPGIST_ADD_LEAF:
     944       78000 :             spgRedoAddLeaf(record);
     945       78000 :             break;
     946         152 :         case XLOG_SPGIST_MOVE_LEAFS:
     947         152 :             spgRedoMoveLeafs(record);
     948         152 :             break;
     949         202 :         case XLOG_SPGIST_ADD_NODE:
     950         202 :             spgRedoAddNode(record);
     951         202 :             break;
     952         202 :         case XLOG_SPGIST_SPLIT_TUPLE:
     953         202 :             spgRedoSplitTuple(record);
     954         202 :             break;
     955         434 :         case XLOG_SPGIST_PICKSPLIT:
     956         434 :             spgRedoPickSplit(record);
     957         434 :             break;
     958          40 :         case XLOG_SPGIST_VACUUM_LEAF:
     959          40 :             spgRedoVacuumLeaf(record);
     960          40 :             break;
     961           0 :         case XLOG_SPGIST_VACUUM_ROOT:
     962           0 :             spgRedoVacuumRoot(record);
     963           0 :             break;
     964         736 :         case XLOG_SPGIST_VACUUM_REDIRECT:
     965         736 :             spgRedoVacuumRedirect(record);
     966         736 :             break;
     967           0 :         default:
     968           0 :             elog(PANIC, "spg_redo: unknown op code %u", info);
     969             :     }
     970             : 
     971       79766 :     MemoryContextSwitchTo(oldCxt);
     972       79766 :     MemoryContextReset(opCtx);
     973       79766 : }
     974             : 
     975             : void
     976         392 : spg_xlog_startup(void)
     977             : {
     978         392 :     opCtx = AllocSetContextCreate(CurrentMemoryContext,
     979             :                                   "SP-GiST temporary context",
     980             :                                   ALLOCSET_DEFAULT_SIZES);
     981         392 : }
     982             : 
     983             : void
     984         288 : spg_xlog_cleanup(void)
     985             : {
     986         288 :     MemoryContextDelete(opCtx);
     987         288 :     opCtx = NULL;
     988         288 : }
     989             : 
     990             : /*
     991             :  * Mask a SpGist page before performing consistency checks on it.
     992             :  */
     993             : void
     994      160300 : spg_mask(char *pagedata, BlockNumber blkno)
     995             : {
     996      160300 :     Page        page = (Page) pagedata;
     997      160300 :     PageHeader  pagehdr = (PageHeader) page;
     998             : 
     999      160300 :     mask_page_lsn_and_checksum(page);
    1000             : 
    1001      160300 :     mask_page_hint_bits(page);
    1002             : 
    1003             :     /*
    1004             :      * Mask the unused space, but only if the page's pd_lower appears to have
    1005             :      * been set correctly.
    1006             :      */
    1007      160300 :     if (pagehdr->pd_lower >= SizeOfPageHeaderData)
    1008      160300 :         mask_unused_space(page);
    1009      160300 : }

Generated by: LCOV version 1.14