LCOV - code coverage report
Current view: top level - src/backend/access/spgist - spgxlog.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 89.1 % 441 393
Test Date: 2026-03-01 15:14:58 Functions: 92.9 % 14 13
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * spgxlog.c
       4              :  *    WAL replay logic for SP-GiST
       5              :  *
       6              :  *
       7              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       8              :  * Portions Copyright (c) 1994, Regents of the University of California
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *           src/backend/access/spgist/spgxlog.c
      12              :  *
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/bufmask.h"
      18              : #include "access/spgist_private.h"
      19              : #include "access/spgxlog.h"
      20              : #include "access/xlogutils.h"
      21              : #include "storage/standby.h"
      22              : #include "utils/memutils.h"
      23              : 
      24              : 
      25              : static MemoryContext opCtx;     /* working memory for operations */
      26              : 
      27              : 
      28              : /*
      29              :  * Prepare a dummy SpGistState, with just the minimum info needed for replay.
      30              :  *
      31              :  * At present, all we need is enough info to support spgFormDeadTuple(),
      32              :  * plus the isBuild flag.
      33              :  */
      34              : static void
      35          525 : fillFakeState(SpGistState *state, spgxlogState stateSrc)
      36              : {
      37          525 :     memset(state, 0, sizeof(*state));
      38              : 
      39          525 :     state->redirectXid = stateSrc.redirectXid;
      40          525 :     state->isBuild = stateSrc.isBuild;
      41          525 :     state->deadTupleStorage = palloc0(SGDTSIZE);
      42          525 : }
      43              : 
      44              : /*
      45              :  * Add a leaf tuple, or replace an existing placeholder tuple.  This is used
      46              :  * to replay SpGistPageAddNewItem() operations.  If the offset points at an
      47              :  * existing tuple, it had better be a placeholder tuple.
      48              :  */
      49              : static void
      50        70570 : addOrReplaceTuple(Page page, const void *tuple, int size, OffsetNumber offset)
      51              : {
      52        70570 :     if (offset <= PageGetMaxOffsetNumber(page))
      53              :     {
      54        19076 :         SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
      55        19076 :                                                            PageGetItemId(page, offset));
      56              : 
      57        19076 :         if (dt->tupstate != SPGIST_PLACEHOLDER)
      58            0 :             elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
      59              : 
      60              :         Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
      61        19076 :         SpGistPageGetOpaque(page)->nPlaceholder--;
      62              : 
      63        19076 :         PageIndexTupleDelete(page, offset);
      64              :     }
      65              : 
      66              :     Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
      67              : 
      68        70570 :     if (PageAddItem(page, tuple, size, offset, false, false) != offset)
      69            0 :         elog(ERROR, "failed to add item of size %u to SPGiST index page",
      70              :              size);
      71        70570 : }
      72              : 
      73              : static void
      74        38969 : spgRedoAddLeaf(XLogReaderState *record)
      75              : {
      76        38969 :     XLogRecPtr  lsn = record->EndRecPtr;
      77        38969 :     char       *ptr = XLogRecGetData(record);
      78        38969 :     spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
      79              :     char       *leafTuple;
      80              :     SpGistLeafTupleData leafTupleHdr;
      81              :     Buffer      buffer;
      82              :     Page        page;
      83              :     XLogRedoAction action;
      84              : 
      85        38969 :     ptr += sizeof(spgxlogAddLeaf);
      86        38969 :     leafTuple = ptr;
      87              :     /* the leaf tuple is unaligned, so make a copy to access its header */
      88        38969 :     memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
      89              : 
      90              :     /*
      91              :      * In normal operation we would have both current and parent pages locked
      92              :      * simultaneously; but in WAL replay it should be safe to update the leaf
      93              :      * page before updating the parent.
      94              :      */
      95        38969 :     if (xldata->newPage)
      96              :     {
      97            1 :         buffer = XLogInitBufferForRedo(record, 0);
      98            1 :         SpGistInitBuffer(buffer,
      99            1 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     100            1 :         action = BLK_NEEDS_REDO;
     101              :     }
     102              :     else
     103        38968 :         action = XLogReadBufferForRedo(record, 0, &buffer);
     104              : 
     105        38969 :     if (action == BLK_NEEDS_REDO)
     106              :     {
     107        38787 :         page = BufferGetPage(buffer);
     108              : 
     109              :         /* insert new tuple */
     110        38787 :         if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
     111              :         {
     112              :             /* normal cases, tuple was added by SpGistPageAddNewItem */
     113        38787 :             addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, xldata->offnumLeaf);
     114              : 
     115              :             /* update head tuple's chain link if needed */
     116        38787 :             if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
     117              :             {
     118              :                 SpGistLeafTuple head;
     119              : 
     120        38146 :                 head = (SpGistLeafTuple) PageGetItem(page,
     121        38146 :                                                      PageGetItemId(page, xldata->offnumHeadLeaf));
     122              :                 Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
     123        38146 :                 SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
     124              :             }
     125              :         }
     126              :         else
     127              :         {
     128              :             /* replacing a DEAD tuple */
     129            0 :             PageIndexTupleDelete(page, xldata->offnumLeaf);
     130            0 :             if (PageAddItem(page,
     131              :                             leafTuple, leafTupleHdr.size,
     132            0 :                             xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
     133            0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     134              :                      leafTupleHdr.size);
     135              :         }
     136              : 
     137        38787 :         PageSetLSN(page, lsn);
     138        38787 :         MarkBufferDirty(buffer);
     139              :     }
     140        38969 :     if (BufferIsValid(buffer))
     141        38969 :         UnlockReleaseBuffer(buffer);
     142              : 
     143              :     /* update parent downlink if necessary */
     144        38969 :     if (xldata->offnumParent != InvalidOffsetNumber)
     145              :     {
     146          120 :         if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
     147              :         {
     148              :             SpGistInnerTuple tuple;
     149              :             BlockNumber blknoLeaf;
     150              : 
     151          120 :             XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
     152              : 
     153          120 :             page = BufferGetPage(buffer);
     154              : 
     155          120 :             tuple = (SpGistInnerTuple) PageGetItem(page,
     156          120 :                                                    PageGetItemId(page, xldata->offnumParent));
     157              : 
     158          120 :             spgUpdateNodeLink(tuple, xldata->nodeI,
     159          120 :                               blknoLeaf, xldata->offnumLeaf);
     160              : 
     161          120 :             PageSetLSN(page, lsn);
     162          120 :             MarkBufferDirty(buffer);
     163              :         }
     164          120 :         if (BufferIsValid(buffer))
     165          120 :             UnlockReleaseBuffer(buffer);
     166              :     }
     167        38969 : }
     168              : 
     169              : static void
     170           76 : spgRedoMoveLeafs(XLogReaderState *record)
     171              : {
     172           76 :     XLogRecPtr  lsn = record->EndRecPtr;
     173           76 :     char       *ptr = XLogRecGetData(record);
     174           76 :     spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
     175              :     SpGistState state;
     176              :     OffsetNumber *toDelete;
     177              :     OffsetNumber *toInsert;
     178              :     int         nInsert;
     179              :     Buffer      buffer;
     180              :     Page        page;
     181              :     XLogRedoAction action;
     182              :     BlockNumber blknoDst;
     183              : 
     184           76 :     XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
     185              : 
     186           76 :     fillFakeState(&state, xldata->stateSrc);
     187              : 
     188           76 :     nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
     189              : 
     190           76 :     ptr += SizeOfSpgxlogMoveLeafs;
     191           76 :     toDelete = (OffsetNumber *) ptr;
     192           76 :     ptr += sizeof(OffsetNumber) * xldata->nMoves;
     193           76 :     toInsert = (OffsetNumber *) ptr;
     194           76 :     ptr += sizeof(OffsetNumber) * nInsert;
     195              : 
     196              :     /* now ptr points to the list of leaf tuples */
     197              : 
     198              :     /*
     199              :      * In normal operation we would have all three pages (source, dest, and
     200              :      * parent) locked simultaneously; but in WAL replay it should be safe to
     201              :      * update them one at a time, as long as we do it in the right order.
     202              :      */
     203              : 
     204              :     /* Insert tuples on the dest page (do first, so redirect is valid) */
     205           76 :     if (xldata->newPage)
     206              :     {
     207           33 :         buffer = XLogInitBufferForRedo(record, 1);
     208           33 :         SpGistInitBuffer(buffer,
     209           33 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     210           33 :         action = BLK_NEEDS_REDO;
     211              :     }
     212              :     else
     213           43 :         action = XLogReadBufferForRedo(record, 1, &buffer);
     214              : 
     215           76 :     if (action == BLK_NEEDS_REDO)
     216              :     {
     217              :         int         i;
     218              : 
     219           74 :         page = BufferGetPage(buffer);
     220              : 
     221         3284 :         for (i = 0; i < nInsert; i++)
     222              :         {
     223              :             char       *leafTuple;
     224              :             SpGistLeafTupleData leafTupleHdr;
     225              : 
     226              :             /*
     227              :              * the tuples are not aligned, so must copy to access the size
     228              :              * field.
     229              :              */
     230         3210 :             leafTuple = ptr;
     231         3210 :             memcpy(&leafTupleHdr, leafTuple,
     232              :                    sizeof(SpGistLeafTupleData));
     233              : 
     234         3210 :             addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, toInsert[i]);
     235         3210 :             ptr += leafTupleHdr.size;
     236              :         }
     237              : 
     238           74 :         PageSetLSN(page, lsn);
     239           74 :         MarkBufferDirty(buffer);
     240              :     }
     241           76 :     if (BufferIsValid(buffer))
     242           76 :         UnlockReleaseBuffer(buffer);
     243              : 
     244              :     /* Delete tuples from the source page, inserting a redirection pointer */
     245           76 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     246              :     {
     247           76 :         page = BufferGetPage(buffer);
     248              : 
     249           76 :         spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
     250           76 :                                 state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
     251              :                                 SPGIST_PLACEHOLDER,
     252              :                                 blknoDst,
     253           76 :                                 toInsert[nInsert - 1]);
     254              : 
     255           76 :         PageSetLSN(page, lsn);
     256           76 :         MarkBufferDirty(buffer);
     257              :     }
     258           76 :     if (BufferIsValid(buffer))
     259           76 :         UnlockReleaseBuffer(buffer);
     260              : 
     261              :     /* And update the parent downlink */
     262           76 :     if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
     263              :     {
     264              :         SpGistInnerTuple tuple;
     265              : 
     266           74 :         page = BufferGetPage(buffer);
     267              : 
     268           74 :         tuple = (SpGistInnerTuple) PageGetItem(page,
     269           74 :                                                PageGetItemId(page, xldata->offnumParent));
     270              : 
     271           74 :         spgUpdateNodeLink(tuple, xldata->nodeI,
     272           74 :                           blknoDst, toInsert[nInsert - 1]);
     273              : 
     274           74 :         PageSetLSN(page, lsn);
     275           74 :         MarkBufferDirty(buffer);
     276              :     }
     277           76 :     if (BufferIsValid(buffer))
     278           76 :         UnlockReleaseBuffer(buffer);
     279           76 : }
     280              : 
     281              : static void
     282          101 : spgRedoAddNode(XLogReaderState *record)
     283              : {
     284          101 :     XLogRecPtr  lsn = record->EndRecPtr;
     285          101 :     char       *ptr = XLogRecGetData(record);
     286          101 :     spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
     287              :     char       *innerTuple;
     288              :     SpGistInnerTupleData innerTupleHdr;
     289              :     SpGistState state;
     290              :     Buffer      buffer;
     291              :     Page        page;
     292              :     XLogRedoAction action;
     293              : 
     294          101 :     ptr += sizeof(spgxlogAddNode);
     295          101 :     innerTuple = ptr;
     296              :     /* the tuple is unaligned, so make a copy to access its header */
     297          101 :     memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
     298              : 
     299          101 :     fillFakeState(&state, xldata->stateSrc);
     300              : 
     301          101 :     if (!XLogRecHasBlockRef(record, 1))
     302              :     {
     303              :         /* update in place */
     304              :         Assert(xldata->parentBlk == -1);
     305          100 :         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     306              :         {
     307          100 :             page = BufferGetPage(buffer);
     308              : 
     309          100 :             PageIndexTupleDelete(page, xldata->offnum);
     310          100 :             if (PageAddItem(page, innerTuple, innerTupleHdr.size,
     311              :                             xldata->offnum,
     312          100 :                             false, false) != xldata->offnum)
     313            0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     314              :                      innerTupleHdr.size);
     315              : 
     316          100 :             PageSetLSN(page, lsn);
     317          100 :             MarkBufferDirty(buffer);
     318              :         }
     319          200 :         if (BufferIsValid(buffer))
     320          100 :             UnlockReleaseBuffer(buffer);
     321              :     }
     322              :     else
     323              :     {
     324              :         BlockNumber blkno;
     325              :         BlockNumber blknoNew;
     326              : 
     327            1 :         XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
     328            1 :         XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
     329              : 
     330              :         /*
     331              :          * In normal operation we would have all three pages (source, dest,
     332              :          * and parent) locked simultaneously; but in WAL replay it should be
     333              :          * safe to update them one at a time, as long as we do it in the right
     334              :          * order. We must insert the new tuple before replacing the old tuple
     335              :          * with the redirect tuple.
     336              :          */
     337              : 
     338              :         /* Install new tuple first so redirect is valid */
     339            1 :         if (xldata->newPage)
     340              :         {
     341              :             /* AddNode is not used for nulls pages */
     342            1 :             buffer = XLogInitBufferForRedo(record, 1);
     343            1 :             SpGistInitBuffer(buffer, 0);
     344            1 :             action = BLK_NEEDS_REDO;
     345              :         }
     346              :         else
     347            0 :             action = XLogReadBufferForRedo(record, 1, &buffer);
     348            1 :         if (action == BLK_NEEDS_REDO)
     349              :         {
     350            1 :             page = BufferGetPage(buffer);
     351              : 
     352            1 :             addOrReplaceTuple(page, innerTuple, innerTupleHdr.size, xldata->offnumNew);
     353              : 
     354              :             /*
     355              :              * If parent is in this same page, update it now.
     356              :              */
     357            1 :             if (xldata->parentBlk == 1)
     358              :             {
     359              :                 SpGistInnerTuple parentTuple;
     360              : 
     361            0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     362            0 :                                                              PageGetItemId(page, xldata->offnumParent));
     363              : 
     364            0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     365            0 :                                   blknoNew, xldata->offnumNew);
     366              :             }
     367            1 :             PageSetLSN(page, lsn);
     368            1 :             MarkBufferDirty(buffer);
     369              :         }
     370            1 :         if (BufferIsValid(buffer))
     371            1 :             UnlockReleaseBuffer(buffer);
     372              : 
     373              :         /* Delete old tuple, replacing it with redirect or placeholder tuple */
     374            1 :         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     375              :         {
     376              :             SpGistDeadTuple dt;
     377              : 
     378            1 :             page = BufferGetPage(buffer);
     379              : 
     380            1 :             if (state.isBuild)
     381            0 :                 dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
     382              :                                       InvalidBlockNumber,
     383              :                                       InvalidOffsetNumber);
     384              :             else
     385            1 :                 dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
     386              :                                       blknoNew,
     387            1 :                                       xldata->offnumNew);
     388              : 
     389            1 :             PageIndexTupleDelete(page, xldata->offnum);
     390            1 :             if (PageAddItem(page, dt, dt->size,
     391              :                             xldata->offnum,
     392            1 :                             false, false) != xldata->offnum)
     393            0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     394              :                      dt->size);
     395              : 
     396            1 :             if (state.isBuild)
     397            0 :                 SpGistPageGetOpaque(page)->nPlaceholder++;
     398              :             else
     399            1 :                 SpGistPageGetOpaque(page)->nRedirection++;
     400              : 
     401              :             /*
     402              :              * If parent is in this same page, update it now.
     403              :              */
     404            1 :             if (xldata->parentBlk == 0)
     405              :             {
     406              :                 SpGistInnerTuple parentTuple;
     407              : 
     408            0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     409            0 :                                                              PageGetItemId(page, xldata->offnumParent));
     410              : 
     411            0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     412            0 :                                   blknoNew, xldata->offnumNew);
     413              :             }
     414            1 :             PageSetLSN(page, lsn);
     415            1 :             MarkBufferDirty(buffer);
     416              :         }
     417            1 :         if (BufferIsValid(buffer))
     418            1 :             UnlockReleaseBuffer(buffer);
     419              : 
     420              :         /*
     421              :          * Update parent downlink (if we didn't do it as part of the source or
     422              :          * destination page update already).
     423              :          */
     424            1 :         if (xldata->parentBlk == 2)
     425              :         {
     426            1 :             if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
     427              :             {
     428              :                 SpGistInnerTuple parentTuple;
     429              : 
     430            1 :                 page = BufferGetPage(buffer);
     431              : 
     432            1 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     433            1 :                                                              PageGetItemId(page, xldata->offnumParent));
     434              : 
     435            1 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     436            1 :                                   blknoNew, xldata->offnumNew);
     437              : 
     438            1 :                 PageSetLSN(page, lsn);
     439            1 :                 MarkBufferDirty(buffer);
     440              :             }
     441            1 :             if (BufferIsValid(buffer))
     442            1 :                 UnlockReleaseBuffer(buffer);
     443              :         }
     444              :     }
     445          101 : }
     446              : 
     447              : static void
     448          101 : spgRedoSplitTuple(XLogReaderState *record)
     449              : {
     450          101 :     XLogRecPtr  lsn = record->EndRecPtr;
     451          101 :     char       *ptr = XLogRecGetData(record);
     452          101 :     spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
     453              :     char       *prefixTuple;
     454              :     SpGistInnerTupleData prefixTupleHdr;
     455              :     char       *postfixTuple;
     456              :     SpGistInnerTupleData postfixTupleHdr;
     457              :     Buffer      buffer;
     458              :     Page        page;
     459              :     XLogRedoAction action;
     460              : 
     461          101 :     ptr += sizeof(spgxlogSplitTuple);
     462          101 :     prefixTuple = ptr;
     463              :     /* the prefix tuple is unaligned, so make a copy to access its header */
     464          101 :     memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
     465          101 :     ptr += prefixTupleHdr.size;
     466          101 :     postfixTuple = ptr;
     467              :     /* postfix tuple is also unaligned */
     468          101 :     memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
     469              : 
     470              :     /*
     471              :      * In normal operation we would have both pages locked simultaneously; but
     472              :      * in WAL replay it should be safe to update them one at a time, as long
     473              :      * as we do it in the right order.
     474              :      */
     475              : 
     476              :     /* insert postfix tuple first to avoid dangling link */
     477          101 :     if (!xldata->postfixBlkSame)
     478              :     {
     479           27 :         if (xldata->newPage)
     480              :         {
     481            1 :             buffer = XLogInitBufferForRedo(record, 1);
     482              :             /* SplitTuple is not used for nulls pages */
     483            1 :             SpGistInitBuffer(buffer, 0);
     484            1 :             action = BLK_NEEDS_REDO;
     485              :         }
     486              :         else
     487           26 :             action = XLogReadBufferForRedo(record, 1, &buffer);
     488           27 :         if (action == BLK_NEEDS_REDO)
     489              :         {
     490           27 :             page = BufferGetPage(buffer);
     491              : 
     492           27 :             addOrReplaceTuple(page, postfixTuple, postfixTupleHdr.size, xldata->offnumPostfix);
     493              : 
     494           27 :             PageSetLSN(page, lsn);
     495           27 :             MarkBufferDirty(buffer);
     496              :         }
     497           27 :         if (BufferIsValid(buffer))
     498           27 :             UnlockReleaseBuffer(buffer);
     499              :     }
     500              : 
     501              :     /* now handle the original page */
     502          101 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     503              :     {
     504          100 :         page = BufferGetPage(buffer);
     505              : 
     506          100 :         PageIndexTupleDelete(page, xldata->offnumPrefix);
     507          100 :         if (PageAddItem(page, prefixTuple, prefixTupleHdr.size,
     508          100 :                         xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
     509            0 :             elog(ERROR, "failed to add item of size %u to SPGiST index page",
     510              :                  prefixTupleHdr.size);
     511              : 
     512          100 :         if (xldata->postfixBlkSame)
     513           74 :             addOrReplaceTuple(page, postfixTuple, postfixTupleHdr.size, xldata->offnumPostfix);
     514              : 
     515          100 :         PageSetLSN(page, lsn);
     516          100 :         MarkBufferDirty(buffer);
     517              :     }
     518          101 :     if (BufferIsValid(buffer))
     519          101 :         UnlockReleaseBuffer(buffer);
     520          101 : }
     521              : 
     522              : static void
     523          203 : spgRedoPickSplit(XLogReaderState *record)
     524              : {
     525          203 :     XLogRecPtr  lsn = record->EndRecPtr;
     526          203 :     char       *ptr = XLogRecGetData(record);
     527          203 :     spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
     528              :     char       *innerTuple;
     529              :     SpGistInnerTupleData innerTupleHdr;
     530              :     SpGistState state;
     531              :     OffsetNumber *toDelete;
     532              :     OffsetNumber *toInsert;
     533              :     uint8      *leafPageSelect;
     534              :     Buffer      srcBuffer;
     535              :     Buffer      destBuffer;
     536              :     Buffer      innerBuffer;
     537              :     Page        srcPage;
     538              :     Page        destPage;
     539              :     Page        page;
     540              :     int         i;
     541              :     BlockNumber blknoInner;
     542              :     XLogRedoAction action;
     543              : 
     544          203 :     XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
     545              : 
     546          203 :     fillFakeState(&state, xldata->stateSrc);
     547              : 
     548          203 :     ptr += SizeOfSpgxlogPickSplit;
     549          203 :     toDelete = (OffsetNumber *) ptr;
     550          203 :     ptr += sizeof(OffsetNumber) * xldata->nDelete;
     551          203 :     toInsert = (OffsetNumber *) ptr;
     552          203 :     ptr += sizeof(OffsetNumber) * xldata->nInsert;
     553          203 :     leafPageSelect = (uint8 *) ptr;
     554          203 :     ptr += sizeof(uint8) * xldata->nInsert;
     555              : 
     556          203 :     innerTuple = ptr;
     557              :     /* the inner tuple is unaligned, so make a copy to access its header */
     558          203 :     memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
     559          203 :     ptr += innerTupleHdr.size;
     560              : 
     561              :     /* now ptr points to the list of leaf tuples */
     562              : 
     563          203 :     if (xldata->isRootSplit)
     564              :     {
     565              :         /* when splitting root, we touch it only in the guise of new inner */
     566            3 :         srcBuffer = InvalidBuffer;
     567            3 :         srcPage = NULL;
     568              :     }
     569          200 :     else if (xldata->initSrc)
     570              :     {
     571              :         /* just re-init the source page */
     572            0 :         srcBuffer = XLogInitBufferForRedo(record, 0);
     573            0 :         srcPage = BufferGetPage(srcBuffer);
     574              : 
     575            0 :         SpGistInitBuffer(srcBuffer,
     576            0 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     577              :         /* don't update LSN etc till we're done with it */
     578              :     }
     579              :     else
     580              :     {
     581              :         /*
     582              :          * Delete the specified tuples from source page.  (In case we're in
     583              :          * Hot Standby, we need to hold lock on the page till we're done
     584              :          * inserting leaf tuples and the new inner tuple, else the added
     585              :          * redirect tuple will be a dangling link.)
     586              :          */
     587          200 :         srcPage = NULL;
     588          200 :         if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
     589              :         {
     590          200 :             srcPage = BufferGetPage(srcBuffer);
     591              : 
     592              :             /*
     593              :              * We have it a bit easier here than in doPickSplit(), because we
     594              :              * know the inner tuple's location already, so we can inject the
     595              :              * correct redirection tuple now.
     596              :              */
     597          200 :             if (!state.isBuild)
     598          200 :                 spgPageIndexMultiDelete(&state, srcPage,
     599          200 :                                         toDelete, xldata->nDelete,
     600              :                                         SPGIST_REDIRECT,
     601              :                                         SPGIST_PLACEHOLDER,
     602              :                                         blknoInner,
     603          200 :                                         xldata->offnumInner);
     604              :             else
     605            0 :                 spgPageIndexMultiDelete(&state, srcPage,
     606            0 :                                         toDelete, xldata->nDelete,
     607              :                                         SPGIST_PLACEHOLDER,
     608              :                                         SPGIST_PLACEHOLDER,
     609              :                                         InvalidBlockNumber,
     610              :                                         InvalidOffsetNumber);
     611              : 
     612              :             /* don't update LSN etc till we're done with it */
     613              :         }
     614              :     }
     615              : 
     616              :     /* try to access dest page if any */
     617          203 :     if (!XLogRecHasBlockRef(record, 1))
     618              :     {
     619            0 :         destBuffer = InvalidBuffer;
     620            0 :         destPage = NULL;
     621              :     }
     622          203 :     else if (xldata->initDest)
     623              :     {
     624              :         /* just re-init the dest page */
     625          188 :         destBuffer = XLogInitBufferForRedo(record, 1);
     626          188 :         destPage = BufferGetPage(destBuffer);
     627              : 
     628          188 :         SpGistInitBuffer(destBuffer,
     629          188 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     630              :         /* don't update LSN etc till we're done with it */
     631              :     }
     632              :     else
     633              :     {
     634              :         /*
     635              :          * We could probably release the page lock immediately in the
     636              :          * full-page-image case, but for safety let's hold it till later.
     637              :          */
     638           15 :         if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
     639           15 :             destPage = BufferGetPage(destBuffer);
     640              :         else
     641            0 :             destPage = NULL;    /* don't do any page updates */
     642              :     }
     643              : 
     644              :     /* restore leaf tuples to src and/or dest page */
     645        28484 :     for (i = 0; i < xldata->nInsert; i++)
     646              :     {
     647              :         char       *leafTuple;
     648              :         SpGistLeafTupleData leafTupleHdr;
     649              : 
     650              :         /* the tuples are not aligned, so must copy to access the size field. */
     651        28281 :         leafTuple = ptr;
     652        28281 :         memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
     653        28281 :         ptr += leafTupleHdr.size;
     654              : 
     655        28281 :         page = leafPageSelect[i] ? destPage : srcPage;
     656        28281 :         if (page == NULL)
     657            0 :             continue;           /* no need to touch this page */
     658              : 
     659        28281 :         addOrReplaceTuple(page, leafTuple, leafTupleHdr.size, toInsert[i]);
     660              :     }
     661              : 
     662              :     /* Now update src and dest page LSNs if needed */
     663          203 :     if (srcPage != NULL)
     664              :     {
     665          200 :         PageSetLSN(srcPage, lsn);
     666          200 :         MarkBufferDirty(srcBuffer);
     667              :     }
     668          203 :     if (destPage != NULL)
     669              :     {
     670          203 :         PageSetLSN(destPage, lsn);
     671          203 :         MarkBufferDirty(destBuffer);
     672              :     }
     673              : 
     674              :     /* restore new inner tuple */
     675          203 :     if (xldata->initInner)
     676              :     {
     677            6 :         innerBuffer = XLogInitBufferForRedo(record, 2);
     678            6 :         SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
     679            6 :         action = BLK_NEEDS_REDO;
     680              :     }
     681              :     else
     682          197 :         action = XLogReadBufferForRedo(record, 2, &innerBuffer);
     683              : 
     684          203 :     if (action == BLK_NEEDS_REDO)
     685              :     {
     686          190 :         page = BufferGetPage(innerBuffer);
     687              : 
     688          190 :         addOrReplaceTuple(page, innerTuple, innerTupleHdr.size, xldata->offnumInner);
     689              : 
     690              :         /* if inner is also parent, update link while we're here */
     691          190 :         if (xldata->innerIsParent)
     692              :         {
     693              :             SpGistInnerTuple parent;
     694              : 
     695          172 :             parent = (SpGistInnerTuple) PageGetItem(page,
     696          172 :                                                     PageGetItemId(page, xldata->offnumParent));
     697          172 :             spgUpdateNodeLink(parent, xldata->nodeI,
     698          172 :                               blknoInner, xldata->offnumInner);
     699              :         }
     700              : 
     701          190 :         PageSetLSN(page, lsn);
     702          190 :         MarkBufferDirty(innerBuffer);
     703              :     }
     704          203 :     if (BufferIsValid(innerBuffer))
     705          203 :         UnlockReleaseBuffer(innerBuffer);
     706              : 
     707              :     /*
     708              :      * Now we can release the leaf-page locks.  It's okay to do this before
     709              :      * updating the parent downlink.
     710              :      */
     711          203 :     if (BufferIsValid(srcBuffer))
     712          200 :         UnlockReleaseBuffer(srcBuffer);
     713          203 :     if (BufferIsValid(destBuffer))
     714          203 :         UnlockReleaseBuffer(destBuffer);
     715              : 
     716              :     /* update parent downlink, unless we did it above */
     717          203 :     if (XLogRecHasBlockRef(record, 3))
     718              :     {
     719              :         Buffer      parentBuffer;
     720              : 
     721           15 :         if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
     722              :         {
     723              :             SpGistInnerTuple parent;
     724              : 
     725           14 :             page = BufferGetPage(parentBuffer);
     726              : 
     727           14 :             parent = (SpGistInnerTuple) PageGetItem(page,
     728           14 :                                                     PageGetItemId(page, xldata->offnumParent));
     729           14 :             spgUpdateNodeLink(parent, xldata->nodeI,
     730           14 :                               blknoInner, xldata->offnumInner);
     731              : 
     732           14 :             PageSetLSN(page, lsn);
     733           14 :             MarkBufferDirty(parentBuffer);
     734              :         }
     735           15 :         if (BufferIsValid(parentBuffer))
     736           15 :             UnlockReleaseBuffer(parentBuffer);
     737              :     }
     738              :     else
     739              :         Assert(xldata->innerIsParent || xldata->isRootSplit);
     740          203 : }
     741              : 
     742              : static void
     743          145 : spgRedoVacuumLeaf(XLogReaderState *record)
     744              : {
     745          145 :     XLogRecPtr  lsn = record->EndRecPtr;
     746          145 :     char       *ptr = XLogRecGetData(record);
     747          145 :     spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
     748              :     OffsetNumber *toDead;
     749              :     OffsetNumber *toPlaceholder;
     750              :     OffsetNumber *moveSrc;
     751              :     OffsetNumber *moveDest;
     752              :     OffsetNumber *chainSrc;
     753              :     OffsetNumber *chainDest;
     754              :     SpGistState state;
     755              :     Buffer      buffer;
     756              :     Page        page;
     757              :     int         i;
     758              : 
     759          145 :     fillFakeState(&state, xldata->stateSrc);
     760              : 
     761          145 :     ptr += SizeOfSpgxlogVacuumLeaf;
     762          145 :     toDead = (OffsetNumber *) ptr;
     763          145 :     ptr += sizeof(OffsetNumber) * xldata->nDead;
     764          145 :     toPlaceholder = (OffsetNumber *) ptr;
     765          145 :     ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
     766          145 :     moveSrc = (OffsetNumber *) ptr;
     767          145 :     ptr += sizeof(OffsetNumber) * xldata->nMove;
     768          145 :     moveDest = (OffsetNumber *) ptr;
     769          145 :     ptr += sizeof(OffsetNumber) * xldata->nMove;
     770          145 :     chainSrc = (OffsetNumber *) ptr;
     771          145 :     ptr += sizeof(OffsetNumber) * xldata->nChain;
     772          145 :     chainDest = (OffsetNumber *) ptr;
     773              : 
     774          145 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     775              :     {
     776            9 :         page = BufferGetPage(buffer);
     777              : 
     778            9 :         spgPageIndexMultiDelete(&state, page,
     779            9 :                                 toDead, xldata->nDead,
     780              :                                 SPGIST_DEAD, SPGIST_DEAD,
     781              :                                 InvalidBlockNumber,
     782              :                                 InvalidOffsetNumber);
     783              : 
     784            9 :         spgPageIndexMultiDelete(&state, page,
     785            9 :                                 toPlaceholder, xldata->nPlaceholder,
     786              :                                 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
     787              :                                 InvalidBlockNumber,
     788              :                                 InvalidOffsetNumber);
     789              : 
     790              :         /* see comments in vacuumLeafPage() */
     791           17 :         for (i = 0; i < xldata->nMove; i++)
     792              :         {
     793            8 :             ItemId      idSrc = PageGetItemId(page, moveSrc[i]);
     794            8 :             ItemId      idDest = PageGetItemId(page, moveDest[i]);
     795              :             ItemIdData  tmp;
     796              : 
     797            8 :             tmp = *idSrc;
     798            8 :             *idSrc = *idDest;
     799            8 :             *idDest = tmp;
     800              :         }
     801              : 
     802            9 :         spgPageIndexMultiDelete(&state, page,
     803            9 :                                 moveSrc, xldata->nMove,
     804              :                                 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
     805              :                                 InvalidBlockNumber,
     806              :                                 InvalidOffsetNumber);
     807              : 
     808           21 :         for (i = 0; i < xldata->nChain; i++)
     809              :         {
     810              :             SpGistLeafTuple lt;
     811              : 
     812           12 :             lt = (SpGistLeafTuple) PageGetItem(page,
     813           12 :                                                PageGetItemId(page, chainSrc[i]));
     814              :             Assert(lt->tupstate == SPGIST_LIVE);
     815           12 :             SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
     816              :         }
     817              : 
     818            9 :         PageSetLSN(page, lsn);
     819            9 :         MarkBufferDirty(buffer);
     820              :     }
     821          145 :     if (BufferIsValid(buffer))
     822          145 :         UnlockReleaseBuffer(buffer);
     823          145 : }
     824              : 
     825              : static void
     826            0 : spgRedoVacuumRoot(XLogReaderState *record)
     827              : {
     828            0 :     XLogRecPtr  lsn = record->EndRecPtr;
     829            0 :     char       *ptr = XLogRecGetData(record);
     830            0 :     spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
     831              :     OffsetNumber *toDelete;
     832              :     Buffer      buffer;
     833              :     Page        page;
     834              : 
     835            0 :     toDelete = xldata->offsets;
     836              : 
     837            0 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     838              :     {
     839            0 :         page = BufferGetPage(buffer);
     840              : 
     841              :         /* The tuple numbers are in order */
     842            0 :         PageIndexMultiDelete(page, toDelete, xldata->nDelete);
     843              : 
     844            0 :         PageSetLSN(page, lsn);
     845            0 :         MarkBufferDirty(buffer);
     846              :     }
     847            0 :     if (BufferIsValid(buffer))
     848            0 :         UnlockReleaseBuffer(buffer);
     849            0 : }
     850              : 
     851              : static void
     852          585 : spgRedoVacuumRedirect(XLogReaderState *record)
     853              : {
     854          585 :     XLogRecPtr  lsn = record->EndRecPtr;
     855          585 :     char       *ptr = XLogRecGetData(record);
     856          585 :     spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
     857              :     OffsetNumber *itemToPlaceholder;
     858              :     Buffer      buffer;
     859              : 
     860          585 :     itemToPlaceholder = xldata->offsets;
     861              : 
     862              :     /*
     863              :      * If any redirection tuples are being removed, make sure there are no
     864              :      * live Hot Standby transactions that might need to see them.
     865              :      */
     866          585 :     if (InHotStandby)
     867              :     {
     868              :         RelFileLocator locator;
     869              : 
     870          585 :         XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
     871          585 :         ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
     872          585 :                                             xldata->isCatalogRel,
     873              :                                             locator);
     874              :     }
     875              : 
     876          585 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     877              :     {
     878           89 :         Page        page = BufferGetPage(buffer);
     879           89 :         SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
     880              :         int         i;
     881              : 
     882              :         /* Convert redirect pointers to plain placeholders */
     883           92 :         for (i = 0; i < xldata->nToPlaceholder; i++)
     884              :         {
     885              :             SpGistDeadTuple dt;
     886              : 
     887            3 :             dt = (SpGistDeadTuple) PageGetItem(page,
     888            3 :                                                PageGetItemId(page, itemToPlaceholder[i]));
     889              :             Assert(dt->tupstate == SPGIST_REDIRECT);
     890            3 :             dt->tupstate = SPGIST_PLACEHOLDER;
     891            3 :             ItemPointerSetInvalid(&dt->pointer);
     892              :         }
     893              : 
     894              :         Assert(opaque->nRedirection >= xldata->nToPlaceholder);
     895           89 :         opaque->nRedirection -= xldata->nToPlaceholder;
     896           89 :         opaque->nPlaceholder += xldata->nToPlaceholder;
     897              : 
     898              :         /* Remove placeholder tuples at end of page */
     899           89 :         if (xldata->firstPlaceholder != InvalidOffsetNumber)
     900              :         {
     901           89 :             int         max = PageGetMaxOffsetNumber(page);
     902              :             OffsetNumber *toDelete;
     903              : 
     904           89 :             toDelete = palloc_array(OffsetNumber, max);
     905              : 
     906         1247 :             for (i = xldata->firstPlaceholder; i <= max; i++)
     907         1158 :                 toDelete[i - xldata->firstPlaceholder] = i;
     908              : 
     909           89 :             i = max - xldata->firstPlaceholder + 1;
     910              :             Assert(opaque->nPlaceholder >= i);
     911           89 :             opaque->nPlaceholder -= i;
     912              : 
     913              :             /* The array is sorted, so can use PageIndexMultiDelete */
     914           89 :             PageIndexMultiDelete(page, toDelete, i);
     915              : 
     916           89 :             pfree(toDelete);
     917              :         }
     918              : 
     919           89 :         PageSetLSN(page, lsn);
     920           89 :         MarkBufferDirty(buffer);
     921              :     }
     922          585 :     if (BufferIsValid(buffer))
     923          585 :         UnlockReleaseBuffer(buffer);
     924          585 : }
     925              : 
     926              : void
     927        40180 : spg_redo(XLogReaderState *record)
     928              : {
     929        40180 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     930              :     MemoryContext oldCxt;
     931              : 
     932        40180 :     oldCxt = MemoryContextSwitchTo(opCtx);
     933        40180 :     switch (info)
     934              :     {
     935        38969 :         case XLOG_SPGIST_ADD_LEAF:
     936        38969 :             spgRedoAddLeaf(record);
     937        38969 :             break;
     938           76 :         case XLOG_SPGIST_MOVE_LEAFS:
     939           76 :             spgRedoMoveLeafs(record);
     940           76 :             break;
     941          101 :         case XLOG_SPGIST_ADD_NODE:
     942          101 :             spgRedoAddNode(record);
     943          101 :             break;
     944          101 :         case XLOG_SPGIST_SPLIT_TUPLE:
     945          101 :             spgRedoSplitTuple(record);
     946          101 :             break;
     947          203 :         case XLOG_SPGIST_PICKSPLIT:
     948          203 :             spgRedoPickSplit(record);
     949          203 :             break;
     950          145 :         case XLOG_SPGIST_VACUUM_LEAF:
     951          145 :             spgRedoVacuumLeaf(record);
     952          145 :             break;
     953            0 :         case XLOG_SPGIST_VACUUM_ROOT:
     954            0 :             spgRedoVacuumRoot(record);
     955            0 :             break;
     956          585 :         case XLOG_SPGIST_VACUUM_REDIRECT:
     957          585 :             spgRedoVacuumRedirect(record);
     958          585 :             break;
     959            0 :         default:
     960            0 :             elog(PANIC, "spg_redo: unknown op code %u", info);
     961              :     }
     962              : 
     963        40180 :     MemoryContextSwitchTo(oldCxt);
     964        40180 :     MemoryContextReset(opCtx);
     965        40180 : }
     966              : 
     967              : void
     968          214 : spg_xlog_startup(void)
     969              : {
     970          214 :     opCtx = AllocSetContextCreate(CurrentMemoryContext,
     971              :                                   "SP-GiST temporary context",
     972              :                                   ALLOCSET_DEFAULT_SIZES);
     973          214 : }
     974              : 
     975              : void
     976          154 : spg_xlog_cleanup(void)
     977              : {
     978          154 :     MemoryContextDelete(opCtx);
     979          154 :     opCtx = NULL;
     980          154 : }
     981              : 
     982              : /*
     983              :  * Mask a SpGist page before performing consistency checks on it.
     984              :  */
     985              : void
     986        80132 : spg_mask(char *pagedata, BlockNumber blkno)
     987              : {
     988        80132 :     Page        page = (Page) pagedata;
     989        80132 :     PageHeader  pagehdr = (PageHeader) page;
     990              : 
     991        80132 :     mask_page_lsn_and_checksum(page);
     992              : 
     993        80132 :     mask_page_hint_bits(page);
     994              : 
     995              :     /*
     996              :      * Mask the unused space, but only if the page's pd_lower appears to have
     997              :      * been set correctly.
     998              :      */
     999        80132 :     if (pagehdr->pd_lower >= SizeOfPageHeaderData)
    1000        80132 :         mask_unused_space(page);
    1001        80132 : }
        

Generated by: LCOV version 2.0-1