LCOV - code coverage report
Current view: top level - src/backend/access/spgist - spgxlog.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 7 429 1.6 %
Date: 2019-09-19 17:07:13 Functions: 2 14 14.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * spgxlog.c
       4             :  *    WAL replay logic for SP-GiST
       5             :  *
       6             :  *
       7             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *           src/backend/access/spgist/spgxlog.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/bufmask.h"
      18             : #include "access/spgist_private.h"
      19             : #include "access/spgxlog.h"
      20             : #include "access/transam.h"
      21             : #include "access/xlog.h"
      22             : #include "access/xlogutils.h"
      23             : #include "storage/standby.h"
      24             : #include "utils/memutils.h"
      25             : 
      26             : 
      27             : static MemoryContext opCtx;     /* working memory for operations */
      28             : 
      29             : 
      30             : /*
      31             :  * Prepare a dummy SpGistState, with just the minimum info needed for replay.
      32             :  *
      33             :  * At present, all we need is enough info to support spgFormDeadTuple(),
      34             :  * plus the isBuild flag.
      35             :  */
      36             : static void
      37           0 : fillFakeState(SpGistState *state, spgxlogState stateSrc)
      38             : {
      39           0 :     memset(state, 0, sizeof(*state));
      40             : 
      41           0 :     state->myXid = stateSrc.myXid;
      42           0 :     state->isBuild = stateSrc.isBuild;
      43           0 :     state->deadTupleStorage = palloc0(SGDTSIZE);
      44           0 : }
      45             : 
      46             : /*
      47             :  * Add a leaf tuple, or replace an existing placeholder tuple.  This is used
      48             :  * to replay SpGistPageAddNewItem() operations.  If the offset points at an
      49             :  * existing tuple, it had better be a placeholder tuple.
      50             :  */
      51             : static void
      52           0 : addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
      53             : {
      54           0 :     if (offset <= PageGetMaxOffsetNumber(page))
      55             :     {
      56           0 :         SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
      57             :                                                            PageGetItemId(page, offset));
      58             : 
      59           0 :         if (dt->tupstate != SPGIST_PLACEHOLDER)
      60           0 :             elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
      61             : 
      62             :         Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
      63           0 :         SpGistPageGetOpaque(page)->nPlaceholder--;
      64             : 
      65           0 :         PageIndexTupleDelete(page, offset);
      66             :     }
      67             : 
      68             :     Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
      69             : 
      70           0 :     if (PageAddItem(page, tuple, size, offset, false, false) != offset)
      71           0 :         elog(ERROR, "failed to add item of size %u to SPGiST index page",
      72             :              size);
      73           0 : }
      74             : 
      75             : static void
      76           0 : spgRedoAddLeaf(XLogReaderState *record)
      77             : {
      78           0 :     XLogRecPtr  lsn = record->EndRecPtr;
      79           0 :     char       *ptr = XLogRecGetData(record);
      80           0 :     spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
      81             :     char       *leafTuple;
      82             :     SpGistLeafTupleData leafTupleHdr;
      83             :     Buffer      buffer;
      84             :     Page        page;
      85             :     XLogRedoAction action;
      86             : 
      87           0 :     ptr += sizeof(spgxlogAddLeaf);
      88           0 :     leafTuple = ptr;
      89             :     /* the leaf tuple is unaligned, so make a copy to access its header */
      90           0 :     memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
      91             : 
      92             :     /*
      93             :      * In normal operation we would have both current and parent pages locked
      94             :      * simultaneously; but in WAL replay it should be safe to update the leaf
      95             :      * page before updating the parent.
      96             :      */
      97           0 :     if (xldata->newPage)
      98             :     {
      99           0 :         buffer = XLogInitBufferForRedo(record, 0);
     100           0 :         SpGistInitBuffer(buffer,
     101           0 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     102           0 :         action = BLK_NEEDS_REDO;
     103             :     }
     104             :     else
     105           0 :         action = XLogReadBufferForRedo(record, 0, &buffer);
     106             : 
     107           0 :     if (action == BLK_NEEDS_REDO)
     108             :     {
     109           0 :         page = BufferGetPage(buffer);
     110             : 
     111             :         /* insert new tuple */
     112           0 :         if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
     113             :         {
     114             :             /* normal cases, tuple was added by SpGistPageAddNewItem */
     115           0 :             addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
     116           0 :                               xldata->offnumLeaf);
     117             : 
     118             :             /* update head tuple's chain link if needed */
     119           0 :             if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
     120             :             {
     121             :                 SpGistLeafTuple head;
     122             : 
     123           0 :                 head = (SpGistLeafTuple) PageGetItem(page,
     124             :                                                      PageGetItemId(page, xldata->offnumHeadLeaf));
     125             :                 Assert(head->nextOffset == leafTupleHdr.nextOffset);
     126           0 :                 head->nextOffset = xldata->offnumLeaf;
     127             :             }
     128             :         }
     129             :         else
     130             :         {
     131             :             /* replacing a DEAD tuple */
     132           0 :             PageIndexTupleDelete(page, xldata->offnumLeaf);
     133           0 :             if (PageAddItem(page,
     134             :                             (Item) leafTuple, leafTupleHdr.size,
     135           0 :                             xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
     136           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     137             :                      leafTupleHdr.size);
     138             :         }
     139             : 
     140           0 :         PageSetLSN(page, lsn);
     141           0 :         MarkBufferDirty(buffer);
     142             :     }
     143           0 :     if (BufferIsValid(buffer))
     144           0 :         UnlockReleaseBuffer(buffer);
     145             : 
     146             :     /* update parent downlink if necessary */
     147           0 :     if (xldata->offnumParent != InvalidOffsetNumber)
     148             :     {
     149           0 :         if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
     150             :         {
     151             :             SpGistInnerTuple tuple;
     152             :             BlockNumber blknoLeaf;
     153             : 
     154           0 :             XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
     155             : 
     156           0 :             page = BufferGetPage(buffer);
     157             : 
     158           0 :             tuple = (SpGistInnerTuple) PageGetItem(page,
     159             :                                                    PageGetItemId(page, xldata->offnumParent));
     160             : 
     161           0 :             spgUpdateNodeLink(tuple, xldata->nodeI,
     162           0 :                               blknoLeaf, xldata->offnumLeaf);
     163             : 
     164           0 :             PageSetLSN(page, lsn);
     165           0 :             MarkBufferDirty(buffer);
     166             :         }
     167           0 :         if (BufferIsValid(buffer))
     168           0 :             UnlockReleaseBuffer(buffer);
     169             :     }
     170           0 : }
     171             : 
     172             : static void
     173           0 : spgRedoMoveLeafs(XLogReaderState *record)
     174             : {
     175           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     176           0 :     char       *ptr = XLogRecGetData(record);
     177           0 :     spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
     178             :     SpGistState state;
     179             :     OffsetNumber *toDelete;
     180             :     OffsetNumber *toInsert;
     181             :     int         nInsert;
     182             :     Buffer      buffer;
     183             :     Page        page;
     184             :     XLogRedoAction action;
     185             :     BlockNumber blknoDst;
     186             : 
     187           0 :     XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
     188             : 
     189           0 :     fillFakeState(&state, xldata->stateSrc);
     190             : 
     191           0 :     nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
     192             : 
     193           0 :     ptr += SizeOfSpgxlogMoveLeafs;
     194           0 :     toDelete = (OffsetNumber *) ptr;
     195           0 :     ptr += sizeof(OffsetNumber) * xldata->nMoves;
     196           0 :     toInsert = (OffsetNumber *) ptr;
     197           0 :     ptr += sizeof(OffsetNumber) * nInsert;
     198             : 
     199             :     /* now ptr points to the list of leaf tuples */
     200             : 
     201             :     /*
     202             :      * In normal operation we would have all three pages (source, dest, and
     203             :      * parent) locked simultaneously; but in WAL replay it should be safe to
     204             :      * update them one at a time, as long as we do it in the right order.
     205             :      */
     206             : 
     207             :     /* Insert tuples on the dest page (do first, so redirect is valid) */
     208           0 :     if (xldata->newPage)
     209             :     {
     210           0 :         buffer = XLogInitBufferForRedo(record, 1);
     211           0 :         SpGistInitBuffer(buffer,
     212           0 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     213           0 :         action = BLK_NEEDS_REDO;
     214             :     }
     215             :     else
     216           0 :         action = XLogReadBufferForRedo(record, 1, &buffer);
     217             : 
     218           0 :     if (action == BLK_NEEDS_REDO)
     219             :     {
     220             :         int         i;
     221             : 
     222           0 :         page = BufferGetPage(buffer);
     223             : 
     224           0 :         for (i = 0; i < nInsert; i++)
     225             :         {
     226             :             char       *leafTuple;
     227             :             SpGistLeafTupleData leafTupleHdr;
     228             : 
     229             :             /*
     230             :              * the tuples are not aligned, so must copy to access the size
     231             :              * field.
     232             :              */
     233           0 :             leafTuple = ptr;
     234           0 :             memcpy(&leafTupleHdr, leafTuple,
     235             :                    sizeof(SpGistLeafTupleData));
     236             : 
     237           0 :             addOrReplaceTuple(page, (Item) leafTuple,
     238           0 :                               leafTupleHdr.size, toInsert[i]);
     239           0 :             ptr += leafTupleHdr.size;
     240             :         }
     241             : 
     242           0 :         PageSetLSN(page, lsn);
     243           0 :         MarkBufferDirty(buffer);
     244             :     }
     245           0 :     if (BufferIsValid(buffer))
     246           0 :         UnlockReleaseBuffer(buffer);
     247             : 
     248             :     /* Delete tuples from the source page, inserting a redirection pointer */
     249           0 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     250             :     {
     251           0 :         page = BufferGetPage(buffer);
     252             : 
     253           0 :         spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
     254           0 :                                 state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
     255             :                                 SPGIST_PLACEHOLDER,
     256             :                                 blknoDst,
     257           0 :                                 toInsert[nInsert - 1]);
     258             : 
     259           0 :         PageSetLSN(page, lsn);
     260           0 :         MarkBufferDirty(buffer);
     261             :     }
     262           0 :     if (BufferIsValid(buffer))
     263           0 :         UnlockReleaseBuffer(buffer);
     264             : 
     265             :     /* And update the parent downlink */
     266           0 :     if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
     267             :     {
     268             :         SpGistInnerTuple tuple;
     269             : 
     270           0 :         page = BufferGetPage(buffer);
     271             : 
     272           0 :         tuple = (SpGistInnerTuple) PageGetItem(page,
     273             :                                                PageGetItemId(page, xldata->offnumParent));
     274             : 
     275           0 :         spgUpdateNodeLink(tuple, xldata->nodeI,
     276           0 :                           blknoDst, toInsert[nInsert - 1]);
     277             : 
     278           0 :         PageSetLSN(page, lsn);
     279           0 :         MarkBufferDirty(buffer);
     280             :     }
     281           0 :     if (BufferIsValid(buffer))
     282           0 :         UnlockReleaseBuffer(buffer);
     283           0 : }
     284             : 
     285             : static void
     286           0 : spgRedoAddNode(XLogReaderState *record)
     287             : {
     288           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     289           0 :     char       *ptr = XLogRecGetData(record);
     290           0 :     spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
     291             :     char       *innerTuple;
     292             :     SpGistInnerTupleData innerTupleHdr;
     293             :     SpGistState state;
     294             :     Buffer      buffer;
     295             :     Page        page;
     296             :     XLogRedoAction action;
     297             : 
     298           0 :     ptr += sizeof(spgxlogAddNode);
     299           0 :     innerTuple = ptr;
     300             :     /* the tuple is unaligned, so make a copy to access its header */
     301           0 :     memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
     302             : 
     303           0 :     fillFakeState(&state, xldata->stateSrc);
     304             : 
     305           0 :     if (!XLogRecHasBlockRef(record, 1))
     306             :     {
     307             :         /* update in place */
     308             :         Assert(xldata->parentBlk == -1);
     309           0 :         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     310             :         {
     311           0 :             page = BufferGetPage(buffer);
     312             : 
     313           0 :             PageIndexTupleDelete(page, xldata->offnum);
     314           0 :             if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
     315             :                             xldata->offnum,
     316           0 :                             false, false) != xldata->offnum)
     317           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     318             :                      innerTupleHdr.size);
     319             : 
     320           0 :             PageSetLSN(page, lsn);
     321           0 :             MarkBufferDirty(buffer);
     322             :         }
     323           0 :         if (BufferIsValid(buffer))
     324           0 :             UnlockReleaseBuffer(buffer);
     325             :     }
     326             :     else
     327             :     {
     328             :         BlockNumber blkno;
     329             :         BlockNumber blknoNew;
     330             : 
     331           0 :         XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
     332           0 :         XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
     333             : 
     334             :         /*
     335             :          * In normal operation we would have all three pages (source, dest,
     336             :          * and parent) locked simultaneously; but in WAL replay it should be
     337             :          * safe to update them one at a time, as long as we do it in the right
     338             :          * order. We must insert the new tuple before replacing the old tuple
     339             :          * with the redirect tuple.
     340             :          */
     341             : 
     342             :         /* Install new tuple first so redirect is valid */
     343           0 :         if (xldata->newPage)
     344             :         {
     345             :             /* AddNode is not used for nulls pages */
     346           0 :             buffer = XLogInitBufferForRedo(record, 1);
     347           0 :             SpGistInitBuffer(buffer, 0);
     348           0 :             action = BLK_NEEDS_REDO;
     349             :         }
     350             :         else
     351           0 :             action = XLogReadBufferForRedo(record, 1, &buffer);
     352           0 :         if (action == BLK_NEEDS_REDO)
     353             :         {
     354           0 :             page = BufferGetPage(buffer);
     355             : 
     356           0 :             addOrReplaceTuple(page, (Item) innerTuple,
     357           0 :                               innerTupleHdr.size, xldata->offnumNew);
     358             : 
     359             :             /*
     360             :              * If parent is in this same page, update it now.
     361             :              */
     362           0 :             if (xldata->parentBlk == 1)
     363             :             {
     364             :                 SpGistInnerTuple parentTuple;
     365             : 
     366           0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     367             :                                                              PageGetItemId(page, xldata->offnumParent));
     368             : 
     369           0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     370           0 :                                   blknoNew, xldata->offnumNew);
     371             :             }
     372           0 :             PageSetLSN(page, lsn);
     373           0 :             MarkBufferDirty(buffer);
     374             :         }
     375           0 :         if (BufferIsValid(buffer))
     376           0 :             UnlockReleaseBuffer(buffer);
     377             : 
     378             :         /* Delete old tuple, replacing it with redirect or placeholder tuple */
     379           0 :         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     380             :         {
     381             :             SpGistDeadTuple dt;
     382             : 
     383           0 :             page = BufferGetPage(buffer);
     384             : 
     385           0 :             if (state.isBuild)
     386           0 :                 dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
     387             :                                       InvalidBlockNumber,
     388             :                                       InvalidOffsetNumber);
     389             :             else
     390           0 :                 dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
     391             :                                       blknoNew,
     392           0 :                                       xldata->offnumNew);
     393             : 
     394           0 :             PageIndexTupleDelete(page, xldata->offnum);
     395           0 :             if (PageAddItem(page, (Item) dt, dt->size,
     396             :                             xldata->offnum,
     397           0 :                             false, false) != xldata->offnum)
     398           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     399             :                      dt->size);
     400             : 
     401           0 :             if (state.isBuild)
     402           0 :                 SpGistPageGetOpaque(page)->nPlaceholder++;
     403             :             else
     404           0 :                 SpGistPageGetOpaque(page)->nRedirection++;
     405             : 
     406             :             /*
     407             :              * If parent is in this same page, update it now.
     408             :              */
     409           0 :             if (xldata->parentBlk == 0)
     410             :             {
     411             :                 SpGistInnerTuple parentTuple;
     412             : 
     413           0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     414             :                                                              PageGetItemId(page, xldata->offnumParent));
     415             : 
     416           0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     417           0 :                                   blknoNew, xldata->offnumNew);
     418             :             }
     419           0 :             PageSetLSN(page, lsn);
     420           0 :             MarkBufferDirty(buffer);
     421             :         }
     422           0 :         if (BufferIsValid(buffer))
     423           0 :             UnlockReleaseBuffer(buffer);
     424             : 
     425             :         /*
     426             :          * Update parent downlink (if we didn't do it as part of the source or
     427             :          * destination page update already).
     428             :          */
     429           0 :         if (xldata->parentBlk == 2)
     430             :         {
     431           0 :             if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
     432             :             {
     433             :                 SpGistInnerTuple parentTuple;
     434             : 
     435           0 :                 page = BufferGetPage(buffer);
     436             : 
     437           0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     438             :                                                              PageGetItemId(page, xldata->offnumParent));
     439             : 
     440           0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     441           0 :                                   blknoNew, xldata->offnumNew);
     442             : 
     443           0 :                 PageSetLSN(page, lsn);
     444           0 :                 MarkBufferDirty(buffer);
     445             :             }
     446           0 :             if (BufferIsValid(buffer))
     447           0 :                 UnlockReleaseBuffer(buffer);
     448             :         }
     449             :     }
     450           0 : }
     451             : 
     452             : static void
     453           0 : spgRedoSplitTuple(XLogReaderState *record)
     454             : {
     455           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     456           0 :     char       *ptr = XLogRecGetData(record);
     457           0 :     spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
     458             :     char       *prefixTuple;
     459             :     SpGistInnerTupleData prefixTupleHdr;
     460             :     char       *postfixTuple;
     461             :     SpGistInnerTupleData postfixTupleHdr;
     462             :     Buffer      buffer;
     463             :     Page        page;
     464             :     XLogRedoAction action;
     465             : 
     466           0 :     ptr += sizeof(spgxlogSplitTuple);
     467           0 :     prefixTuple = ptr;
     468             :     /* the prefix tuple is unaligned, so make a copy to access its header */
     469           0 :     memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
     470           0 :     ptr += prefixTupleHdr.size;
     471           0 :     postfixTuple = ptr;
     472             :     /* postfix tuple is also unaligned */
     473           0 :     memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
     474             : 
     475             :     /*
     476             :      * In normal operation we would have both pages locked simultaneously; but
     477             :      * in WAL replay it should be safe to update them one at a time, as long
     478             :      * as we do it in the right order.
     479             :      */
     480             : 
     481             :     /* insert postfix tuple first to avoid dangling link */
     482           0 :     if (!xldata->postfixBlkSame)
     483             :     {
     484           0 :         if (xldata->newPage)
     485             :         {
     486           0 :             buffer = XLogInitBufferForRedo(record, 1);
     487             :             /* SplitTuple is not used for nulls pages */
     488           0 :             SpGistInitBuffer(buffer, 0);
     489           0 :             action = BLK_NEEDS_REDO;
     490             :         }
     491             :         else
     492           0 :             action = XLogReadBufferForRedo(record, 1, &buffer);
     493           0 :         if (action == BLK_NEEDS_REDO)
     494             :         {
     495           0 :             page = BufferGetPage(buffer);
     496             : 
     497           0 :             addOrReplaceTuple(page, (Item) postfixTuple,
     498           0 :                               postfixTupleHdr.size, xldata->offnumPostfix);
     499             : 
     500           0 :             PageSetLSN(page, lsn);
     501           0 :             MarkBufferDirty(buffer);
     502             :         }
     503           0 :         if (BufferIsValid(buffer))
     504           0 :             UnlockReleaseBuffer(buffer);
     505             :     }
     506             : 
     507             :     /* now handle the original page */
     508           0 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     509             :     {
     510           0 :         page = BufferGetPage(buffer);
     511             : 
     512           0 :         PageIndexTupleDelete(page, xldata->offnumPrefix);
     513           0 :         if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
     514           0 :                         xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
     515           0 :             elog(ERROR, "failed to add item of size %u to SPGiST index page",
     516             :                  prefixTupleHdr.size);
     517             : 
     518           0 :         if (xldata->postfixBlkSame)
     519           0 :             addOrReplaceTuple(page, (Item) postfixTuple,
     520           0 :                               postfixTupleHdr.size,
     521           0 :                               xldata->offnumPostfix);
     522             : 
     523           0 :         PageSetLSN(page, lsn);
     524           0 :         MarkBufferDirty(buffer);
     525             :     }
     526           0 :     if (BufferIsValid(buffer))
     527           0 :         UnlockReleaseBuffer(buffer);
     528           0 : }
     529             : 
     530             : static void
     531           0 : spgRedoPickSplit(XLogReaderState *record)
     532             : {
     533           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     534           0 :     char       *ptr = XLogRecGetData(record);
     535           0 :     spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
     536             :     char       *innerTuple;
     537             :     SpGistInnerTupleData innerTupleHdr;
     538             :     SpGistState state;
     539             :     OffsetNumber *toDelete;
     540             :     OffsetNumber *toInsert;
     541             :     uint8      *leafPageSelect;
     542             :     Buffer      srcBuffer;
     543             :     Buffer      destBuffer;
     544             :     Buffer      innerBuffer;
     545             :     Page        srcPage;
     546             :     Page        destPage;
     547             :     Page        page;
     548             :     int         i;
     549             :     BlockNumber blknoInner;
     550             :     XLogRedoAction action;
     551             : 
     552           0 :     XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
     553             : 
     554           0 :     fillFakeState(&state, xldata->stateSrc);
     555             : 
     556           0 :     ptr += SizeOfSpgxlogPickSplit;
     557           0 :     toDelete = (OffsetNumber *) ptr;
     558           0 :     ptr += sizeof(OffsetNumber) * xldata->nDelete;
     559           0 :     toInsert = (OffsetNumber *) ptr;
     560           0 :     ptr += sizeof(OffsetNumber) * xldata->nInsert;
     561           0 :     leafPageSelect = (uint8 *) ptr;
     562           0 :     ptr += sizeof(uint8) * xldata->nInsert;
     563             : 
     564           0 :     innerTuple = ptr;
     565             :     /* the inner tuple is unaligned, so make a copy to access its header */
     566           0 :     memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
     567           0 :     ptr += innerTupleHdr.size;
     568             : 
     569             :     /* now ptr points to the list of leaf tuples */
     570             : 
     571           0 :     if (xldata->isRootSplit)
     572             :     {
     573             :         /* when splitting root, we touch it only in the guise of new inner */
     574           0 :         srcBuffer = InvalidBuffer;
     575           0 :         srcPage = NULL;
     576             :     }
     577           0 :     else if (xldata->initSrc)
     578             :     {
     579             :         /* just re-init the source page */
     580           0 :         srcBuffer = XLogInitBufferForRedo(record, 0);
     581           0 :         srcPage = (Page) BufferGetPage(srcBuffer);
     582             : 
     583           0 :         SpGistInitBuffer(srcBuffer,
     584           0 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     585             :         /* don't update LSN etc till we're done with it */
     586             :     }
     587             :     else
     588             :     {
     589             :         /*
     590             :          * Delete the specified tuples from source page.  (In case we're in
     591             :          * Hot Standby, we need to hold lock on the page till we're done
     592             :          * inserting leaf tuples and the new inner tuple, else the added
     593             :          * redirect tuple will be a dangling link.)
     594             :          */
     595           0 :         srcPage = NULL;
     596           0 :         if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
     597             :         {
     598           0 :             srcPage = BufferGetPage(srcBuffer);
     599             : 
     600             :             /*
     601             :              * We have it a bit easier here than in doPickSplit(), because we
     602             :              * know the inner tuple's location already, so we can inject the
     603             :              * correct redirection tuple now.
     604             :              */
     605           0 :             if (!state.isBuild)
     606           0 :                 spgPageIndexMultiDelete(&state, srcPage,
     607           0 :                                         toDelete, xldata->nDelete,
     608             :                                         SPGIST_REDIRECT,
     609             :                                         SPGIST_PLACEHOLDER,
     610             :                                         blknoInner,
     611           0 :                                         xldata->offnumInner);
     612             :             else
     613           0 :                 spgPageIndexMultiDelete(&state, srcPage,
     614           0 :                                         toDelete, xldata->nDelete,
     615             :                                         SPGIST_PLACEHOLDER,
     616             :                                         SPGIST_PLACEHOLDER,
     617             :                                         InvalidBlockNumber,
     618             :                                         InvalidOffsetNumber);
     619             : 
     620             :             /* don't update LSN etc till we're done with it */
     621             :         }
     622             :     }
     623             : 
     624             :     /* try to access dest page if any */
     625           0 :     if (!XLogRecHasBlockRef(record, 1))
     626             :     {
     627           0 :         destBuffer = InvalidBuffer;
     628           0 :         destPage = NULL;
     629             :     }
     630           0 :     else if (xldata->initDest)
     631             :     {
     632             :         /* just re-init the dest page */
     633           0 :         destBuffer = XLogInitBufferForRedo(record, 1);
     634           0 :         destPage = (Page) BufferGetPage(destBuffer);
     635             : 
     636           0 :         SpGistInitBuffer(destBuffer,
     637           0 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     638             :         /* don't update LSN etc till we're done with it */
     639             :     }
     640             :     else
     641             :     {
     642             :         /*
     643             :          * We could probably release the page lock immediately in the
     644             :          * full-page-image case, but for safety let's hold it till later.
     645             :          */
     646           0 :         if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
     647           0 :             destPage = (Page) BufferGetPage(destBuffer);
     648             :         else
     649           0 :             destPage = NULL;    /* don't do any page updates */
     650             :     }
     651             : 
     652             :     /* restore leaf tuples to src and/or dest page */
     653           0 :     for (i = 0; i < xldata->nInsert; i++)
     654             :     {
     655             :         char       *leafTuple;
     656             :         SpGistLeafTupleData leafTupleHdr;
     657             : 
     658             :         /* the tuples are not aligned, so must copy to access the size field. */
     659           0 :         leafTuple = ptr;
     660           0 :         memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
     661           0 :         ptr += leafTupleHdr.size;
     662             : 
     663           0 :         page = leafPageSelect[i] ? destPage : srcPage;
     664           0 :         if (page == NULL)
     665           0 :             continue;           /* no need to touch this page */
     666             : 
     667           0 :         addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
     668           0 :                           toInsert[i]);
     669             :     }
     670             : 
     671             :     /* Now update src and dest page LSNs if needed */
     672           0 :     if (srcPage != NULL)
     673             :     {
     674           0 :         PageSetLSN(srcPage, lsn);
     675           0 :         MarkBufferDirty(srcBuffer);
     676             :     }
     677           0 :     if (destPage != NULL)
     678             :     {
     679           0 :         PageSetLSN(destPage, lsn);
     680           0 :         MarkBufferDirty(destBuffer);
     681             :     }
     682             : 
     683             :     /* restore new inner tuple */
     684           0 :     if (xldata->initInner)
     685             :     {
     686           0 :         innerBuffer = XLogInitBufferForRedo(record, 2);
     687           0 :         SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
     688           0 :         action = BLK_NEEDS_REDO;
     689             :     }
     690             :     else
     691           0 :         action = XLogReadBufferForRedo(record, 2, &innerBuffer);
     692             : 
     693           0 :     if (action == BLK_NEEDS_REDO)
     694             :     {
     695           0 :         page = BufferGetPage(innerBuffer);
     696             : 
     697           0 :         addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
     698           0 :                           xldata->offnumInner);
     699             : 
     700             :         /* if inner is also parent, update link while we're here */
     701           0 :         if (xldata->innerIsParent)
     702             :         {
     703             :             SpGistInnerTuple parent;
     704             : 
     705           0 :             parent = (SpGistInnerTuple) PageGetItem(page,
     706             :                                                     PageGetItemId(page, xldata->offnumParent));
     707           0 :             spgUpdateNodeLink(parent, xldata->nodeI,
     708           0 :                               blknoInner, xldata->offnumInner);
     709             :         }
     710             : 
     711           0 :         PageSetLSN(page, lsn);
     712           0 :         MarkBufferDirty(innerBuffer);
     713             :     }
     714           0 :     if (BufferIsValid(innerBuffer))
     715           0 :         UnlockReleaseBuffer(innerBuffer);
     716             : 
     717             :     /*
     718             :      * Now we can release the leaf-page locks.  It's okay to do this before
     719             :      * updating the parent downlink.
     720             :      */
     721           0 :     if (BufferIsValid(srcBuffer))
     722           0 :         UnlockReleaseBuffer(srcBuffer);
     723           0 :     if (BufferIsValid(destBuffer))
     724           0 :         UnlockReleaseBuffer(destBuffer);
     725             : 
     726             :     /* update parent downlink, unless we did it above */
     727           0 :     if (XLogRecHasBlockRef(record, 3))
     728             :     {
     729             :         Buffer      parentBuffer;
     730             : 
     731           0 :         if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
     732             :         {
     733             :             SpGistInnerTuple parent;
     734             : 
     735           0 :             page = BufferGetPage(parentBuffer);
     736             : 
     737           0 :             parent = (SpGistInnerTuple) PageGetItem(page,
     738             :                                                     PageGetItemId(page, xldata->offnumParent));
     739           0 :             spgUpdateNodeLink(parent, xldata->nodeI,
     740           0 :                               blknoInner, xldata->offnumInner);
     741             : 
     742           0 :             PageSetLSN(page, lsn);
     743           0 :             MarkBufferDirty(parentBuffer);
     744             :         }
     745           0 :         if (BufferIsValid(parentBuffer))
     746           0 :             UnlockReleaseBuffer(parentBuffer);
     747             :     }
     748             :     else
     749             :         Assert(xldata->innerIsParent || xldata->isRootSplit);
     750           0 : }
     751             : 
     752             : static void
     753           0 : spgRedoVacuumLeaf(XLogReaderState *record)
     754             : {
     755           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     756           0 :     char       *ptr = XLogRecGetData(record);
     757           0 :     spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
     758             :     OffsetNumber *toDead;
     759             :     OffsetNumber *toPlaceholder;
     760             :     OffsetNumber *moveSrc;
     761             :     OffsetNumber *moveDest;
     762             :     OffsetNumber *chainSrc;
     763             :     OffsetNumber *chainDest;
     764             :     SpGistState state;
     765             :     Buffer      buffer;
     766             :     Page        page;
     767             :     int         i;
     768             : 
     769           0 :     fillFakeState(&state, xldata->stateSrc);
     770             : 
     771           0 :     ptr += SizeOfSpgxlogVacuumLeaf;
     772           0 :     toDead = (OffsetNumber *) ptr;
     773           0 :     ptr += sizeof(OffsetNumber) * xldata->nDead;
     774           0 :     toPlaceholder = (OffsetNumber *) ptr;
     775           0 :     ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
     776           0 :     moveSrc = (OffsetNumber *) ptr;
     777           0 :     ptr += sizeof(OffsetNumber) * xldata->nMove;
     778           0 :     moveDest = (OffsetNumber *) ptr;
     779           0 :     ptr += sizeof(OffsetNumber) * xldata->nMove;
     780           0 :     chainSrc = (OffsetNumber *) ptr;
     781           0 :     ptr += sizeof(OffsetNumber) * xldata->nChain;
     782           0 :     chainDest = (OffsetNumber *) ptr;
     783             : 
     784           0 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     785             :     {
     786           0 :         page = BufferGetPage(buffer);
     787             : 
     788           0 :         spgPageIndexMultiDelete(&state, page,
     789           0 :                                 toDead, xldata->nDead,
     790             :                                 SPGIST_DEAD, SPGIST_DEAD,
     791             :                                 InvalidBlockNumber,
     792             :                                 InvalidOffsetNumber);
     793             : 
     794           0 :         spgPageIndexMultiDelete(&state, page,
     795           0 :                                 toPlaceholder, xldata->nPlaceholder,
     796             :                                 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
     797             :                                 InvalidBlockNumber,
     798             :                                 InvalidOffsetNumber);
     799             : 
     800             :         /* see comments in vacuumLeafPage() */
     801           0 :         for (i = 0; i < xldata->nMove; i++)
     802             :         {
     803           0 :             ItemId      idSrc = PageGetItemId(page, moveSrc[i]);
     804           0 :             ItemId      idDest = PageGetItemId(page, moveDest[i]);
     805             :             ItemIdData  tmp;
     806             : 
     807           0 :             tmp = *idSrc;
     808           0 :             *idSrc = *idDest;
     809           0 :             *idDest = tmp;
     810             :         }
     811             : 
     812           0 :         spgPageIndexMultiDelete(&state, page,
     813           0 :                                 moveSrc, xldata->nMove,
     814             :                                 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
     815             :                                 InvalidBlockNumber,
     816             :                                 InvalidOffsetNumber);
     817             : 
     818           0 :         for (i = 0; i < xldata->nChain; i++)
     819             :         {
     820             :             SpGistLeafTuple lt;
     821             : 
     822           0 :             lt = (SpGistLeafTuple) PageGetItem(page,
     823             :                                                PageGetItemId(page, chainSrc[i]));
     824             :             Assert(lt->tupstate == SPGIST_LIVE);
     825           0 :             lt->nextOffset = chainDest[i];
     826             :         }
     827             : 
     828           0 :         PageSetLSN(page, lsn);
     829           0 :         MarkBufferDirty(buffer);
     830             :     }
     831           0 :     if (BufferIsValid(buffer))
     832           0 :         UnlockReleaseBuffer(buffer);
     833           0 : }
     834             : 
     835             : static void
     836           0 : spgRedoVacuumRoot(XLogReaderState *record)
     837             : {
     838           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     839           0 :     char       *ptr = XLogRecGetData(record);
     840           0 :     spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
     841             :     OffsetNumber *toDelete;
     842             :     Buffer      buffer;
     843             :     Page        page;
     844             : 
     845           0 :     toDelete = xldata->offsets;
     846             : 
     847           0 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     848             :     {
     849           0 :         page = BufferGetPage(buffer);
     850             : 
     851             :         /* The tuple numbers are in order */
     852           0 :         PageIndexMultiDelete(page, toDelete, xldata->nDelete);
     853             : 
     854           0 :         PageSetLSN(page, lsn);
     855           0 :         MarkBufferDirty(buffer);
     856             :     }
     857           0 :     if (BufferIsValid(buffer))
     858           0 :         UnlockReleaseBuffer(buffer);
     859           0 : }
     860             : 
     861             : static void
     862           0 : spgRedoVacuumRedirect(XLogReaderState *record)
     863             : {
     864           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     865           0 :     char       *ptr = XLogRecGetData(record);
     866           0 :     spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
     867             :     OffsetNumber *itemToPlaceholder;
     868             :     Buffer      buffer;
     869             : 
     870           0 :     itemToPlaceholder = xldata->offsets;
     871             : 
     872             :     /*
     873             :      * If any redirection tuples are being removed, make sure there are no
     874             :      * live Hot Standby transactions that might need to see them.
     875             :      */
     876           0 :     if (InHotStandby)
     877             :     {
     878           0 :         if (TransactionIdIsValid(xldata->newestRedirectXid))
     879             :         {
     880             :             RelFileNode node;
     881             : 
     882           0 :             XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
     883           0 :             ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
     884             :                                                 node);
     885             :         }
     886             :     }
     887             : 
     888           0 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     889             :     {
     890           0 :         Page        page = BufferGetPage(buffer);
     891           0 :         SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
     892             :         int         i;
     893             : 
     894             :         /* Convert redirect pointers to plain placeholders */
     895           0 :         for (i = 0; i < xldata->nToPlaceholder; i++)
     896             :         {
     897             :             SpGistDeadTuple dt;
     898             : 
     899           0 :             dt = (SpGistDeadTuple) PageGetItem(page,
     900             :                                                PageGetItemId(page, itemToPlaceholder[i]));
     901             :             Assert(dt->tupstate == SPGIST_REDIRECT);
     902           0 :             dt->tupstate = SPGIST_PLACEHOLDER;
     903           0 :             ItemPointerSetInvalid(&dt->pointer);
     904             :         }
     905             : 
     906             :         Assert(opaque->nRedirection >= xldata->nToPlaceholder);
     907           0 :         opaque->nRedirection -= xldata->nToPlaceholder;
     908           0 :         opaque->nPlaceholder += xldata->nToPlaceholder;
     909             : 
     910             :         /* Remove placeholder tuples at end of page */
     911           0 :         if (xldata->firstPlaceholder != InvalidOffsetNumber)
     912             :         {
     913           0 :             int         max = PageGetMaxOffsetNumber(page);
     914             :             OffsetNumber *toDelete;
     915             : 
     916           0 :             toDelete = palloc(sizeof(OffsetNumber) * max);
     917             : 
     918           0 :             for (i = xldata->firstPlaceholder; i <= max; i++)
     919           0 :                 toDelete[i - xldata->firstPlaceholder] = i;
     920             : 
     921           0 :             i = max - xldata->firstPlaceholder + 1;
     922             :             Assert(opaque->nPlaceholder >= i);
     923           0 :             opaque->nPlaceholder -= i;
     924             : 
     925             :             /* The array is sorted, so can use PageIndexMultiDelete */
     926           0 :             PageIndexMultiDelete(page, toDelete, i);
     927             : 
     928           0 :             pfree(toDelete);
     929             :         }
     930             : 
     931           0 :         PageSetLSN(page, lsn);
     932           0 :         MarkBufferDirty(buffer);
     933             :     }
     934           0 :     if (BufferIsValid(buffer))
     935           0 :         UnlockReleaseBuffer(buffer);
     936           0 : }
     937             : 
     938             : void
     939           0 : spg_redo(XLogReaderState *record)
     940             : {
     941           0 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     942             :     MemoryContext oldCxt;
     943             : 
     944           0 :     oldCxt = MemoryContextSwitchTo(opCtx);
     945           0 :     switch (info)
     946             :     {
     947             :         case XLOG_SPGIST_ADD_LEAF:
     948           0 :             spgRedoAddLeaf(record);
     949           0 :             break;
     950             :         case XLOG_SPGIST_MOVE_LEAFS:
     951           0 :             spgRedoMoveLeafs(record);
     952           0 :             break;
     953             :         case XLOG_SPGIST_ADD_NODE:
     954           0 :             spgRedoAddNode(record);
     955           0 :             break;
     956             :         case XLOG_SPGIST_SPLIT_TUPLE:
     957           0 :             spgRedoSplitTuple(record);
     958           0 :             break;
     959             :         case XLOG_SPGIST_PICKSPLIT:
     960           0 :             spgRedoPickSplit(record);
     961           0 :             break;
     962             :         case XLOG_SPGIST_VACUUM_LEAF:
     963           0 :             spgRedoVacuumLeaf(record);
     964           0 :             break;
     965             :         case XLOG_SPGIST_VACUUM_ROOT:
     966           0 :             spgRedoVacuumRoot(record);
     967           0 :             break;
     968             :         case XLOG_SPGIST_VACUUM_REDIRECT:
     969           0 :             spgRedoVacuumRedirect(record);
     970           0 :             break;
     971             :         default:
     972           0 :             elog(PANIC, "spg_redo: unknown op code %u", info);
     973             :     }
     974             : 
     975           0 :     MemoryContextSwitchTo(oldCxt);
     976           0 :     MemoryContextReset(opCtx);
     977           0 : }
     978             : 
     979             : void
     980          96 : spg_xlog_startup(void)
     981             : {
     982          96 :     opCtx = AllocSetContextCreate(CurrentMemoryContext,
     983             :                                   "SP-GiST temporary context",
     984             :                                   ALLOCSET_DEFAULT_SIZES);
     985          96 : }
     986             : 
     987             : void
     988          66 : spg_xlog_cleanup(void)
     989             : {
     990          66 :     MemoryContextDelete(opCtx);
     991          66 :     opCtx = NULL;
     992          66 : }
     993             : 
     994             : /*
     995             :  * Mask a SpGist page before performing consistency checks on it.
     996             :  */
     997             : void
     998           0 : spg_mask(char *pagedata, BlockNumber blkno)
     999             : {
    1000           0 :     Page        page = (Page) pagedata;
    1001           0 :     PageHeader  pagehdr = (PageHeader) page;
    1002             : 
    1003           0 :     mask_page_lsn_and_checksum(page);
    1004             : 
    1005           0 :     mask_page_hint_bits(page);
    1006             : 
    1007             :     /*
    1008             :      * Mask the unused space, but only if the page's pd_lower appears to have
    1009             :      * been set correctly.
    1010             :      */
    1011           0 :     if (pagehdr->pd_lower > SizeOfPageHeaderData)
    1012           0 :         mask_unused_space(page);
    1013           0 : }

Generated by: LCOV version 1.13