LCOV - code coverage report
Current view: top level - src/backend/access/spgist - spgutils.c (source / functions) Hit Total Coverage
Test: PostgreSQL 12beta2 Lines: 328 372 88.2 %
Date: 2019-06-19 16:07:09 Functions: 22 22 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * spgutils.c
       4             :  *    various support functions for SP-GiST
       5             :  *
       6             :  *
       7             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *          src/backend/access/spgist/spgutils.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/amvalidate.h"
      19             : #include "access/htup_details.h"
      20             : #include "access/reloptions.h"
      21             : #include "access/spgist_private.h"
      22             : #include "access/transam.h"
      23             : #include "access/xact.h"
      24             : #include "catalog/pg_amop.h"
      25             : #include "storage/bufmgr.h"
      26             : #include "storage/indexfsm.h"
      27             : #include "storage/lmgr.h"
      28             : #include "utils/builtins.h"
      29             : #include "utils/catcache.h"
      30             : #include "utils/index_selfuncs.h"
      31             : #include "utils/lsyscache.h"
      32             : #include "utils/syscache.h"
      33             : 
      34             : 
      35             : /*
      36             :  * SP-GiST handler function: return IndexAmRoutine with access method parameters
      37             :  * and callbacks.
      38             :  */
      39             : Datum
      40         606 : spghandler(PG_FUNCTION_ARGS)
      41             : {
      42         606 :     IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
      43             : 
      44         606 :     amroutine->amstrategies = 0;
      45         606 :     amroutine->amsupport = SPGISTNProc;
      46         606 :     amroutine->amcanorder = false;
      47         606 :     amroutine->amcanorderbyop = true;
      48         606 :     amroutine->amcanbackward = false;
      49         606 :     amroutine->amcanunique = false;
      50         606 :     amroutine->amcanmulticol = false;
      51         606 :     amroutine->amoptionalkey = true;
      52         606 :     amroutine->amsearcharray = false;
      53         606 :     amroutine->amsearchnulls = true;
      54         606 :     amroutine->amstorage = false;
      55         606 :     amroutine->amclusterable = false;
      56         606 :     amroutine->ampredlocks = false;
      57         606 :     amroutine->amcanparallel = false;
      58         606 :     amroutine->amcaninclude = false;
      59         606 :     amroutine->amkeytype = InvalidOid;
      60             : 
      61         606 :     amroutine->ambuild = spgbuild;
      62         606 :     amroutine->ambuildempty = spgbuildempty;
      63         606 :     amroutine->aminsert = spginsert;
      64         606 :     amroutine->ambulkdelete = spgbulkdelete;
      65         606 :     amroutine->amvacuumcleanup = spgvacuumcleanup;
      66         606 :     amroutine->amcanreturn = spgcanreturn;
      67         606 :     amroutine->amcostestimate = spgcostestimate;
      68         606 :     amroutine->amoptions = spgoptions;
      69         606 :     amroutine->amproperty = spgproperty;
      70         606 :     amroutine->ambuildphasename = NULL;
      71         606 :     amroutine->amvalidate = spgvalidate;
      72         606 :     amroutine->ambeginscan = spgbeginscan;
      73         606 :     amroutine->amrescan = spgrescan;
      74         606 :     amroutine->amgettuple = spggettuple;
      75         606 :     amroutine->amgetbitmap = spggetbitmap;
      76         606 :     amroutine->amendscan = spgendscan;
      77         606 :     amroutine->ammarkpos = NULL;
      78         606 :     amroutine->amrestrpos = NULL;
      79         606 :     amroutine->amestimateparallelscan = NULL;
      80         606 :     amroutine->aminitparallelscan = NULL;
      81         606 :     amroutine->amparallelrescan = NULL;
      82             : 
      83         606 :     PG_RETURN_POINTER(amroutine);
      84             : }
      85             : 
      86             : /* Fill in a SpGistTypeDesc struct with info about the specified data type */
      87             : static void
      88         492 : fillTypeDesc(SpGistTypeDesc *desc, Oid type)
      89             : {
      90         492 :     desc->type = type;
      91         492 :     get_typlenbyval(type, &desc->attlen, &desc->attbyval);
      92         492 : }
      93             : 
      94             : /*
      95             :  * Fetch local cache of AM-specific info about the index, initializing it
      96             :  * if necessary
      97             :  */
      98             : SpGistCache *
      99     1569694 : spgGetCache(Relation index)
     100             : {
     101             :     SpGistCache *cache;
     102             : 
     103     1569694 :     if (index->rd_amcache == NULL)
     104             :     {
     105             :         Oid         atttype;
     106             :         spgConfigIn in;
     107             :         FmgrInfo   *procinfo;
     108             :         Buffer      metabuffer;
     109             :         SpGistMetaPageData *metadata;
     110             : 
     111         164 :         cache = MemoryContextAllocZero(index->rd_indexcxt,
     112             :                                        sizeof(SpGistCache));
     113             : 
     114             :         /* SPGiST doesn't support multi-column indexes */
     115             :         Assert(index->rd_att->natts == 1);
     116             : 
     117             :         /*
     118             :          * Get the actual data type of the indexed column from the index
     119             :          * tupdesc.  We pass this to the opclass config function so that
     120             :          * polymorphic opclasses are possible.
     121             :          */
     122         164 :         atttype = TupleDescAttr(index->rd_att, 0)->atttypid;
     123             : 
     124             :         /* Call the config function to get config info for the opclass */
     125         164 :         in.attType = atttype;
     126             : 
     127         164 :         procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC);
     128         328 :         FunctionCall2Coll(procinfo,
     129         164 :                           index->rd_indcollation[0],
     130             :                           PointerGetDatum(&in),
     131         164 :                           PointerGetDatum(&cache->config));
     132             : 
     133             :         /* Get the information we need about each relevant datatype */
     134         164 :         fillTypeDesc(&cache->attType, atttype);
     135             : 
     136         178 :         if (OidIsValid(cache->config.leafType) &&
     137          14 :             cache->config.leafType != atttype)
     138             :         {
     139           0 :             if (!OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
     140           0 :                 ereport(ERROR,
     141             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     142             :                          errmsg("compress method must be defined when leaf type is different from input type")));
     143             : 
     144           0 :             fillTypeDesc(&cache->attLeafType, cache->config.leafType);
     145             :         }
     146             :         else
     147             :         {
     148         164 :             cache->attLeafType = cache->attType;
     149             :         }
     150             : 
     151         164 :         fillTypeDesc(&cache->attPrefixType, cache->config.prefixType);
     152         164 :         fillTypeDesc(&cache->attLabelType, cache->config.labelType);
     153             : 
     154             :         /* Last, get the lastUsedPages data from the metapage */
     155         164 :         metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
     156         164 :         LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
     157             : 
     158         164 :         metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
     159             : 
     160         164 :         if (metadata->magicNumber != SPGIST_MAGIC_NUMBER)
     161           0 :             elog(ERROR, "index \"%s\" is not an SP-GiST index",
     162             :                  RelationGetRelationName(index));
     163             : 
     164         164 :         cache->lastUsedPages = metadata->lastUsedPages;
     165             : 
     166         164 :         UnlockReleaseBuffer(metabuffer);
     167             : 
     168         164 :         index->rd_amcache = (void *) cache;
     169             :     }
     170             :     else
     171             :     {
     172             :         /* assume it's up to date */
     173     1569530 :         cache = (SpGistCache *) index->rd_amcache;
     174             :     }
     175             : 
     176     1569694 :     return cache;
     177             : }
     178             : 
     179             : /* Initialize SpGistState for working with the given index */
     180             : void
     181      149940 : initSpGistState(SpGistState *state, Relation index)
     182             : {
     183             :     SpGistCache *cache;
     184             : 
     185             :     /* Get cached static information about index */
     186      149940 :     cache = spgGetCache(index);
     187             : 
     188      149940 :     state->config = cache->config;
     189      149940 :     state->attType = cache->attType;
     190      149940 :     state->attLeafType = cache->attLeafType;
     191      149940 :     state->attPrefixType = cache->attPrefixType;
     192      149940 :     state->attLabelType = cache->attLabelType;
     193             : 
     194             :     /* Make workspace for constructing dead tuples */
     195      149940 :     state->deadTupleStorage = palloc0(SGDTSIZE);
     196             : 
     197             :     /* Set XID to use in redirection tuples */
     198      149940 :     state->myXid = GetTopTransactionIdIfAny();
     199             : 
     200             :     /* Assume we're not in an index build (spgbuild will override) */
     201      149940 :     state->isBuild = false;
     202      149940 : }
     203             : 
     204             : /*
     205             :  * Allocate a new page (either by recycling, or by extending the index file).
     206             :  *
     207             :  * The returned buffer is already pinned and exclusive-locked.
     208             :  * Caller is responsible for initializing the page by calling SpGistInitBuffer.
     209             :  */
     210             : Buffer
     211        3658 : SpGistNewBuffer(Relation index)
     212             : {
     213             :     Buffer      buffer;
     214             :     bool        needLock;
     215             : 
     216             :     /* First, try to get a page from FSM */
     217             :     for (;;)
     218           0 :     {
     219        3658 :         BlockNumber blkno = GetFreeIndexPage(index);
     220             : 
     221        3658 :         if (blkno == InvalidBlockNumber)
     222        3658 :             break;              /* nothing known to FSM */
     223             : 
     224             :         /*
     225             :          * The fixed pages shouldn't ever be listed in FSM, but just in case
     226             :          * one is, ignore it.
     227             :          */
     228           0 :         if (SpGistBlockIsFixed(blkno))
     229           0 :             continue;
     230             : 
     231           0 :         buffer = ReadBuffer(index, blkno);
     232             : 
     233             :         /*
     234             :          * We have to guard against the possibility that someone else already
     235             :          * recycled this page; the buffer may be locked if so.
     236             :          */
     237           0 :         if (ConditionalLockBuffer(buffer))
     238             :         {
     239           0 :             Page        page = BufferGetPage(buffer);
     240             : 
     241           0 :             if (PageIsNew(page))
     242           0 :                 return buffer;  /* OK to use, if never initialized */
     243             : 
     244           0 :             if (SpGistPageIsDeleted(page) || PageIsEmpty(page))
     245           0 :                 return buffer;  /* OK to use */
     246             : 
     247           0 :             LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     248             :         }
     249             : 
     250             :         /* Can't use it, so release buffer and try again */
     251           0 :         ReleaseBuffer(buffer);
     252             :     }
     253             : 
     254             :     /* Must extend the file */
     255        3658 :     needLock = !RELATION_IS_LOCAL(index);
     256        3658 :     if (needLock)
     257        1466 :         LockRelationForExtension(index, ExclusiveLock);
     258             : 
     259        3658 :     buffer = ReadBuffer(index, P_NEW);
     260        3658 :     LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     261             : 
     262        3658 :     if (needLock)
     263        1466 :         UnlockRelationForExtension(index, ExclusiveLock);
     264             : 
     265        3658 :     return buffer;
     266             : }
     267             : 
     268             : /*
     269             :  * Update index metapage's lastUsedPages info from local cache, if possible
     270             :  *
     271             :  * Updating meta page isn't critical for index working, so
     272             :  * 1 use ConditionalLockBuffer to improve concurrency
     273             :  * 2 don't WAL-log metabuffer changes to decrease WAL traffic
     274             :  */
     275             : void
     276      149372 : SpGistUpdateMetaPage(Relation index)
     277             : {
     278      149372 :     SpGistCache *cache = (SpGistCache *) index->rd_amcache;
     279             : 
     280      149372 :     if (cache != NULL)
     281             :     {
     282             :         Buffer      metabuffer;
     283             : 
     284      149372 :         metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
     285             : 
     286      149372 :         if (ConditionalLockBuffer(metabuffer))
     287             :         {
     288      149372 :             Page        metapage = BufferGetPage(metabuffer);
     289      149372 :             SpGistMetaPageData *metadata = SpGistPageGetMeta(metapage);
     290             : 
     291      149372 :             metadata->lastUsedPages = cache->lastUsedPages;
     292             : 
     293             :             /*
     294             :              * Set pd_lower just past the end of the metadata.  This is
     295             :              * essential, because without doing so, metadata will be lost if
     296             :              * xlog.c compresses the page.  (We must do this here because
     297             :              * pre-v11 versions of PG did not set the metapage's pd_lower
     298             :              * correctly, so a pg_upgraded index might contain the wrong
     299             :              * value.)
     300             :              */
     301      149372 :             ((PageHeader) metapage)->pd_lower =
     302      149372 :                 ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) metapage;
     303             : 
     304      149372 :             MarkBufferDirty(metabuffer);
     305      149372 :             UnlockReleaseBuffer(metabuffer);
     306             :         }
     307             :         else
     308             :         {
     309           0 :             ReleaseBuffer(metabuffer);
     310             :         }
     311             :     }
     312      149372 : }
     313             : 
     314             : /* Macro to select proper element of lastUsedPages cache depending on flags */
     315             : /* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */
     316             : #define GET_LUP(c, f)  (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES])
     317             : 
     318             : /*
     319             :  * Allocate and initialize a new buffer of the type and parity specified by
     320             :  * flags.  The returned buffer is already pinned and exclusive-locked.
     321             :  *
     322             :  * When requesting an inner page, if we get one with the wrong parity,
     323             :  * we just release the buffer and try again.  We will get a different page
     324             :  * because GetFreeIndexPage will have marked the page used in FSM.  The page
     325             :  * is entered in our local lastUsedPages cache, so there's some hope of
     326             :  * making use of it later in this session, but otherwise we rely on VACUUM
     327             :  * to eventually re-enter the page in FSM, making it available for recycling.
     328             :  * Note that such a page does not get marked dirty here, so unless it's used
     329             :  * fairly soon, the buffer will just get discarded and the page will remain
     330             :  * as it was on disk.
     331             :  *
     332             :  * When we return a buffer to the caller, the page is *not* entered into
     333             :  * the lastUsedPages cache; we expect the caller will do so after it's taken
     334             :  * whatever space it will use.  This is because after the caller has used up
     335             :  * some space, the page might have less space than whatever was cached already
     336             :  * so we'd rather not trash the old cache entry.
     337             :  */
     338             : static Buffer
     339        3404 : allocNewBuffer(Relation index, int flags)
     340             : {
     341        3404 :     SpGistCache *cache = spgGetCache(index);
     342        3404 :     uint16      pageflags = 0;
     343             : 
     344        3404 :     if (GBUF_REQ_LEAF(flags))
     345        3344 :         pageflags |= SPGIST_LEAF;
     346        3404 :     if (GBUF_REQ_NULLS(flags))
     347           0 :         pageflags |= SPGIST_NULLS;
     348             : 
     349             :     for (;;)
     350          44 :     {
     351             :         Buffer      buffer;
     352             : 
     353        3448 :         buffer = SpGistNewBuffer(index);
     354        3448 :         SpGistInitBuffer(buffer, pageflags);
     355             : 
     356        3448 :         if (pageflags & SPGIST_LEAF)
     357             :         {
     358             :             /* Leaf pages have no parity concerns, so just use it */
     359        3344 :             return buffer;
     360             :         }
     361             :         else
     362             :         {
     363         104 :             BlockNumber blkno = BufferGetBlockNumber(buffer);
     364         104 :             int         blkFlags = GBUF_INNER_PARITY(blkno);
     365             : 
     366         104 :             if ((flags & GBUF_PARITY_MASK) == blkFlags)
     367             :             {
     368             :                 /* Page has right parity, use it */
     369          60 :                 return buffer;
     370             :             }
     371             :             else
     372             :             {
     373             :                 /* Page has wrong parity, record it in cache and try again */
     374          44 :                 if (pageflags & SPGIST_NULLS)
     375           0 :                     blkFlags |= GBUF_NULLS;
     376          44 :                 cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno;
     377          44 :                 cache->lastUsedPages.cachedPage[blkFlags].freeSpace =
     378          44 :                     PageGetExactFreeSpace(BufferGetPage(buffer));
     379          44 :                 UnlockReleaseBuffer(buffer);
     380             :             }
     381             :         }
     382             :     }
     383             : }
     384             : 
     385             : /*
     386             :  * Get a buffer of the type and parity specified by flags, having at least
     387             :  * as much free space as indicated by needSpace.  We use the lastUsedPages
     388             :  * cache to assign the same buffer previously requested when possible.
     389             :  * The returned buffer is already pinned and exclusive-locked.
     390             :  *
     391             :  * *isNew is set true if the page was initialized here, false if it was
     392             :  * already valid.
     393             :  */
     394             : Buffer
     395        6248 : SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
     396             : {
     397        6248 :     SpGistCache *cache = spgGetCache(index);
     398             :     SpGistLastUsedPage *lup;
     399             : 
     400             :     /* Bail out if even an empty page wouldn't meet the demand */
     401        6248 :     if (needSpace > SPGIST_PAGE_CAPACITY)
     402           0 :         elog(ERROR, "desired SPGiST tuple size is too big");
     403             : 
     404             :     /*
     405             :      * If possible, increase the space request to include relation's
     406             :      * fillfactor.  This ensures that when we add unrelated tuples to a page,
     407             :      * we try to keep 100-fillfactor% available for adding tuples that are
     408             :      * related to the ones already on it.  But fillfactor mustn't cause an
     409             :      * error for requests that would otherwise be legal.
     410             :      */
     411        6248 :     needSpace += RelationGetTargetPageFreeSpace(index,
     412             :                                                 SPGIST_DEFAULT_FILLFACTOR);
     413        6248 :     needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY);
     414             : 
     415             :     /* Get the cache entry for this flags setting */
     416        6248 :     lup = GET_LUP(cache, flags);
     417             : 
     418             :     /* If we have nothing cached, just turn it over to allocNewBuffer */
     419        6248 :     if (lup->blkno == InvalidBlockNumber)
     420             :     {
     421         100 :         *isNew = true;
     422         100 :         return allocNewBuffer(index, flags);
     423             :     }
     424             : 
     425             :     /* fixed pages should never be in cache */
     426             :     Assert(!SpGistBlockIsFixed(lup->blkno));
     427             : 
     428             :     /* If cached freeSpace isn't enough, don't bother looking at the page */
     429        6148 :     if (lup->freeSpace >= needSpace)
     430             :     {
     431             :         Buffer      buffer;
     432             :         Page        page;
     433             : 
     434        2844 :         buffer = ReadBuffer(index, lup->blkno);
     435             : 
     436        2844 :         if (!ConditionalLockBuffer(buffer))
     437             :         {
     438             :             /*
     439             :              * buffer is locked by another process, so return a new buffer
     440             :              */
     441           0 :             ReleaseBuffer(buffer);
     442           0 :             *isNew = true;
     443           0 :             return allocNewBuffer(index, flags);
     444             :         }
     445             : 
     446        2844 :         page = BufferGetPage(buffer);
     447             : 
     448        2844 :         if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page))
     449             :         {
     450             :             /* OK to initialize the page */
     451          96 :             uint16      pageflags = 0;
     452             : 
     453          96 :             if (GBUF_REQ_LEAF(flags))
     454          92 :                 pageflags |= SPGIST_LEAF;
     455          96 :             if (GBUF_REQ_NULLS(flags))
     456           0 :                 pageflags |= SPGIST_NULLS;
     457          96 :             SpGistInitBuffer(buffer, pageflags);
     458          96 :             lup->freeSpace = PageGetExactFreeSpace(page) - needSpace;
     459          96 :             *isNew = true;
     460          96 :             return buffer;
     461             :         }
     462             : 
     463             :         /*
     464             :          * Check that page is of right type and has enough space.  We must
     465             :          * recheck this since our cache isn't necessarily up to date.
     466             :          */
     467        8244 :         if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) &&
     468        5496 :             (GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page)))
     469             :         {
     470        2748 :             int         freeSpace = PageGetExactFreeSpace(page);
     471             : 
     472        2748 :             if (freeSpace >= needSpace)
     473             :             {
     474             :                 /* Success, update freespace info and return the buffer */
     475        2748 :                 lup->freeSpace = freeSpace - needSpace;
     476        2748 :                 *isNew = false;
     477        2748 :                 return buffer;
     478             :             }
     479             :         }
     480             : 
     481             :         /*
     482             :          * fallback to allocation of new buffer
     483             :          */
     484           0 :         UnlockReleaseBuffer(buffer);
     485             :     }
     486             : 
     487             :     /* No success with cache, so return a new buffer */
     488        3304 :     *isNew = true;
     489        3304 :     return allocNewBuffer(index, flags);
     490             : }
     491             : 
     492             : /*
     493             :  * Update lastUsedPages cache when done modifying a page.
     494             :  *
     495             :  * We update the appropriate cache entry if it already contained this page
     496             :  * (its freeSpace is likely obsolete), or if this page has more space than
     497             :  * whatever we had cached.
     498             :  */
     499             : void
     500     1408974 : SpGistSetLastUsedPage(Relation index, Buffer buffer)
     501             : {
     502     1408974 :     SpGistCache *cache = spgGetCache(index);
     503             :     SpGistLastUsedPage *lup;
     504             :     int         freeSpace;
     505     1408974 :     Page        page = BufferGetPage(buffer);
     506     1408974 :     BlockNumber blkno = BufferGetBlockNumber(buffer);
     507             :     int         flags;
     508             : 
     509             :     /* Never enter fixed pages (root pages) in cache, though */
     510     1408974 :     if (SpGistBlockIsFixed(blkno))
     511      471424 :         return;
     512             : 
     513      937550 :     if (SpGistPageIsLeaf(page))
     514      470992 :         flags = GBUF_LEAF;
     515             :     else
     516      466558 :         flags = GBUF_INNER_PARITY(blkno);
     517      937550 :     if (SpGistPageStoresNulls(page))
     518           0 :         flags |= GBUF_NULLS;
     519             : 
     520      937550 :     lup = GET_LUP(cache, flags);
     521             : 
     522      937550 :     freeSpace = PageGetExactFreeSpace(page);
     523     1179974 :     if (lup->blkno == InvalidBlockNumber || lup->blkno == blkno ||
     524      242424 :         lup->freeSpace < freeSpace)
     525             :     {
     526      700078 :         lup->blkno = blkno;
     527      700078 :         lup->freeSpace = freeSpace;
     528             :     }
     529             : }
     530             : 
     531             : /*
     532             :  * Initialize an SPGiST page to empty, with specified flags
     533             :  */
     534             : void
     535        3994 : SpGistInitPage(Page page, uint16 f)
     536             : {
     537             :     SpGistPageOpaque opaque;
     538             : 
     539        3994 :     PageInit(page, BLCKSZ, MAXALIGN(sizeof(SpGistPageOpaqueData)));
     540        3994 :     opaque = SpGistPageGetOpaque(page);
     541        3994 :     memset(opaque, 0, sizeof(SpGistPageOpaqueData));
     542        3994 :     opaque->flags = f;
     543        3994 :     opaque->spgist_page_id = SPGIST_PAGE_ID;
     544        3994 : }
     545             : 
     546             : /*
     547             :  * Initialize a buffer's page to empty, with specified flags
     548             :  */
     549             : void
     550        3924 : SpGistInitBuffer(Buffer b, uint16 f)
     551             : {
     552             :     Assert(BufferGetPageSize(b) == BLCKSZ);
     553        3924 :     SpGistInitPage(BufferGetPage(b), f);
     554        3924 : }
     555             : 
     556             : /*
     557             :  * Initialize metadata page
     558             :  */
     559             : void
     560          70 : SpGistInitMetapage(Page page)
     561             : {
     562             :     SpGistMetaPageData *metadata;
     563             :     int         i;
     564             : 
     565          70 :     SpGistInitPage(page, SPGIST_META);
     566          70 :     metadata = SpGistPageGetMeta(page);
     567          70 :     memset(metadata, 0, sizeof(SpGistMetaPageData));
     568          70 :     metadata->magicNumber = SPGIST_MAGIC_NUMBER;
     569             : 
     570             :     /* initialize last-used-page cache to empty */
     571         630 :     for (i = 0; i < SPGIST_CACHED_PAGES; i++)
     572         560 :         metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber;
     573             : 
     574             :     /*
     575             :      * Set pd_lower just past the end of the metadata.  This is essential,
     576             :      * because without doing so, metadata will be lost if xlog.c compresses
     577             :      * the page.
     578             :      */
     579          70 :     ((PageHeader) page)->pd_lower =
     580          70 :         ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) page;
     581          70 : }
     582             : 
     583             : /*
     584             :  * reloptions processing for SPGiST
     585             :  */
     586             : bytea *
     587          76 : spgoptions(Datum reloptions, bool validate)
     588             : {
     589          76 :     return default_reloptions(reloptions, validate, RELOPT_KIND_SPGIST);
     590             : }
     591             : 
     592             : /*
     593             :  * Get the space needed to store a non-null datum of the indicated type.
     594             :  * Note the result is already rounded up to a MAXALIGN boundary.
     595             :  * Also, we follow the SPGiST convention that pass-by-val types are
     596             :  * just stored in their Datum representation (compare memcpyDatum).
     597             :  */
     598             : unsigned int
     599    13877162 : SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum)
     600             : {
     601             :     unsigned int size;
     602             : 
     603    13877162 :     if (att->attbyval)
     604        3704 :         size = sizeof(Datum);
     605    13873458 :     else if (att->attlen > 0)
     606    12646294 :         size = att->attlen;
     607             :     else
     608     1227164 :         size = VARSIZE_ANY(datum);
     609             : 
     610    13877162 :     return MAXALIGN(size);
     611             : }
     612             : 
     613             : /*
     614             :  * Copy the given non-null datum to *target
     615             :  */
     616             : static void
     617      907078 : memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum)
     618             : {
     619             :     unsigned int size;
     620             : 
     621      907078 :     if (att->attbyval)
     622             :     {
     623        3704 :         memcpy(target, &datum, sizeof(Datum));
     624             :     }
     625             :     else
     626             :     {
     627      903374 :         size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum);
     628      903374 :         memcpy(target, DatumGetPointer(datum), size);
     629             :     }
     630      907078 : }
     631             : 
     632             : /*
     633             :  * Construct a leaf tuple containing the given heap TID and datum value
     634             :  */
     635             : SpGistLeafTuple
     636      899892 : spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
     637             :                  Datum datum, bool isnull)
     638             : {
     639             :     SpGistLeafTuple tup;
     640             :     unsigned int size;
     641             : 
     642             :     /* compute space needed (note result is already maxaligned) */
     643      899892 :     size = SGLTHDRSZ;
     644      899892 :     if (!isnull)
     645      899844 :         size += SpGistGetTypeSize(&state->attLeafType, datum);
     646             : 
     647             :     /*
     648             :      * Ensure that we can replace the tuple with a dead tuple later.  This
     649             :      * test is unnecessary when !isnull, but let's be safe.
     650             :      */
     651      899892 :     if (size < SGDTSIZE)
     652           0 :         size = SGDTSIZE;
     653             : 
     654             :     /* OK, form the tuple */
     655      899892 :     tup = (SpGistLeafTuple) palloc0(size);
     656             : 
     657      899892 :     tup->size = size;
     658      899892 :     tup->nextOffset = InvalidOffsetNumber;
     659      899892 :     tup->heapPtr = *heapPtr;
     660      899892 :     if (!isnull)
     661      899844 :         memcpyDatum(SGLTDATAPTR(tup), &state->attLeafType, datum);
     662             : 
     663      899892 :     return tup;
     664             : }
     665             : 
     666             : /*
     667             :  * Construct a node (to go into an inner tuple) containing the given label
     668             :  *
     669             :  * Note that the node's downlink is just set invalid here.  Caller will fill
     670             :  * it in later.
     671             :  */
     672             : SpGistNodeTuple
     673       25148 : spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
     674             : {
     675             :     SpGistNodeTuple tup;
     676             :     unsigned int size;
     677       25148 :     unsigned short infomask = 0;
     678             : 
     679             :     /* compute space needed (note result is already maxaligned) */
     680       25148 :     size = SGNTHDRSZ;
     681       25148 :     if (!isnull)
     682        3244 :         size += SpGistGetTypeSize(&state->attLabelType, label);
     683             : 
     684             :     /*
     685             :      * Here we make sure that the size will fit in the field reserved for it
     686             :      * in t_info.
     687             :      */
     688       25148 :     if ((size & INDEX_SIZE_MASK) != size)
     689           0 :         ereport(ERROR,
     690             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     691             :                  errmsg("index row requires %zu bytes, maximum size is %zu",
     692             :                         (Size) size, (Size) INDEX_SIZE_MASK)));
     693             : 
     694       25148 :     tup = (SpGistNodeTuple) palloc0(size);
     695             : 
     696       25148 :     if (isnull)
     697       21904 :         infomask |= INDEX_NULL_MASK;
     698             :     /* we don't bother setting the INDEX_VAR_MASK bit */
     699       25148 :     infomask |= size;
     700       25148 :     tup->t_info = infomask;
     701             : 
     702             :     /* The TID field will be filled in later */
     703       25148 :     ItemPointerSetInvalid(&tup->t_tid);
     704             : 
     705       25148 :     if (!isnull)
     706        3244 :         memcpyDatum(SGNTDATAPTR(tup), &state->attLabelType, label);
     707             : 
     708       25148 :     return tup;
     709             : }
     710             : 
     711             : /*
     712             :  * Construct an inner tuple containing the given prefix and node array
     713             :  */
     714             : SpGistInnerTuple
     715        5120 : spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix,
     716             :                   int nNodes, SpGistNodeTuple *nodes)
     717             : {
     718             :     SpGistInnerTuple tup;
     719             :     unsigned int size;
     720             :     unsigned int prefixSize;
     721             :     int         i;
     722             :     char       *ptr;
     723             : 
     724             :     /* Compute size needed */
     725        5120 :     if (hasPrefix)
     726        3990 :         prefixSize = SpGistGetTypeSize(&state->attPrefixType, prefix);
     727             :     else
     728        1130 :         prefixSize = 0;
     729             : 
     730        5120 :     size = SGITHDRSZ + prefixSize;
     731             : 
     732             :     /* Note: we rely on node tuple sizes to be maxaligned already */
     733       35516 :     for (i = 0; i < nNodes; i++)
     734       30396 :         size += IndexTupleSize(nodes[i]);
     735             : 
     736             :     /*
     737             :      * Ensure that we can replace the tuple with a dead tuple later.  This
     738             :      * test is unnecessary given current tuple layouts, but let's be safe.
     739             :      */
     740        5120 :     if (size < SGDTSIZE)
     741           0 :         size = SGDTSIZE;
     742             : 
     743             :     /*
     744             :      * Inner tuple should be small enough to fit on a page
     745             :      */
     746        5120 :     if (size > SPGIST_PAGE_CAPACITY - sizeof(ItemIdData))
     747           0 :         ereport(ERROR,
     748             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     749             :                  errmsg("SP-GiST inner tuple size %zu exceeds maximum %zu",
     750             :                         (Size) size,
     751             :                         SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
     752             :                  errhint("Values larger than a buffer page cannot be indexed.")));
     753             : 
     754             :     /*
     755             :      * Check for overflow of header fields --- probably can't fail if the
     756             :      * above succeeded, but let's be paranoid
     757             :      */
     758        5120 :     if (size > SGITMAXSIZE ||
     759        5120 :         prefixSize > SGITMAXPREFIXSIZE ||
     760             :         nNodes > SGITMAXNNODES)
     761           0 :         elog(ERROR, "SPGiST inner tuple header field is too small");
     762             : 
     763             :     /* OK, form the tuple */
     764        5120 :     tup = (SpGistInnerTuple) palloc0(size);
     765             : 
     766        5120 :     tup->nNodes = nNodes;
     767        5120 :     tup->prefixSize = prefixSize;
     768        5120 :     tup->size = size;
     769             : 
     770        5120 :     if (hasPrefix)
     771        3990 :         memcpyDatum(SGITDATAPTR(tup), &state->attPrefixType, prefix);
     772             : 
     773        5120 :     ptr = (char *) SGITNODEPTR(tup);
     774             : 
     775       35516 :     for (i = 0; i < nNodes; i++)
     776             :     {
     777       30396 :         SpGistNodeTuple node = nodes[i];
     778             : 
     779       30396 :         memcpy(ptr, node, IndexTupleSize(node));
     780       30396 :         ptr += IndexTupleSize(node);
     781             :     }
     782             : 
     783        5120 :     return tup;
     784             : }
     785             : 
     786             : /*
     787             :  * Construct a "dead" tuple to replace a tuple being deleted.
     788             :  *
     789             :  * The state can be SPGIST_REDIRECT, SPGIST_DEAD, or SPGIST_PLACEHOLDER.
     790             :  * For a REDIRECT tuple, a pointer (blkno+offset) must be supplied, and
     791             :  * the xid field is filled in automatically.
     792             :  *
     793             :  * This is called in critical sections, so we don't use palloc; the tuple
     794             :  * is built in preallocated storage.  It should be copied before another
     795             :  * call with different parameters can occur.
     796             :  */
     797             : SpGistDeadTuple
     798        7088 : spgFormDeadTuple(SpGistState *state, int tupstate,
     799             :                  BlockNumber blkno, OffsetNumber offnum)
     800             : {
     801        7088 :     SpGistDeadTuple tuple = (SpGistDeadTuple) state->deadTupleStorage;
     802             : 
     803        7088 :     tuple->tupstate = tupstate;
     804        7088 :     tuple->size = SGDTSIZE;
     805        7088 :     tuple->nextOffset = InvalidOffsetNumber;
     806             : 
     807        7088 :     if (tupstate == SPGIST_REDIRECT)
     808             :     {
     809        1190 :         ItemPointerSet(&tuple->pointer, blkno, offnum);
     810             :         Assert(TransactionIdIsValid(state->myXid));
     811        1190 :         tuple->xid = state->myXid;
     812             :     }
     813             :     else
     814             :     {
     815        5898 :         ItemPointerSetInvalid(&tuple->pointer);
     816        5898 :         tuple->xid = InvalidTransactionId;
     817             :     }
     818             : 
     819        7088 :     return tuple;
     820             : }
     821             : 
     822             : /*
     823             :  * Extract the label datums of the nodes within innerTuple
     824             :  *
     825             :  * Returns NULL if label datums are NULLs
     826             :  */
     827             : Datum *
     828    12516414 : spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
     829             : {
     830             :     Datum      *nodeLabels;
     831             :     int         i;
     832             :     SpGistNodeTuple node;
     833             : 
     834             :     /* Either all the labels must be NULL, or none. */
     835    12516414 :     node = SGITNODEPTR(innerTuple);
     836    12516414 :     if (IndexTupleHasNulls(node))
     837             :     {
     838    67220076 :         SGITITERATE(innerTuple, i, node)
     839             :         {
     840    54844796 :             if (!IndexTupleHasNulls(node))
     841           0 :                 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
     842             :         }
     843             :         /* They're all null, so just return NULL */
     844    12375280 :         return NULL;
     845             :     }
     846             :     else
     847             :     {
     848      141134 :         nodeLabels = (Datum *) palloc(sizeof(Datum) * innerTuple->nNodes);
     849     1525946 :         SGITITERATE(innerTuple, i, node)
     850             :         {
     851     1384812 :             if (IndexTupleHasNulls(node))
     852           0 :                 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
     853     1384812 :             nodeLabels[i] = SGNTDATUM(node, state);
     854             :         }
     855      141134 :         return nodeLabels;
     856             :     }
     857             : }
     858             : 
     859             : /*
     860             :  * Add a new item to the page, replacing a PLACEHOLDER item if possible.
     861             :  * Return the location it's inserted at, or InvalidOffsetNumber on failure.
     862             :  *
     863             :  * If startOffset isn't NULL, we start searching for placeholders at
     864             :  * *startOffset, and update that to the next place to search.  This is just
     865             :  * an optimization for repeated insertions.
     866             :  *
     867             :  * If errorOK is false, we throw error when there's not enough room,
     868             :  * rather than returning InvalidOffsetNumber.
     869             :  */
     870             : OffsetNumber
     871      944040 : SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
     872             :                      OffsetNumber *startOffset, bool errorOK)
     873             : {
     874      944040 :     SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
     875             :     OffsetNumber i,
     876             :                 maxoff,
     877             :                 offnum;
     878             : 
     879     1215088 :     if (opaque->nPlaceholder > 0 &&
     880      271048 :         PageGetExactFreeSpace(page) + SGDTSIZE >= MAXALIGN(size))
     881             :     {
     882             :         /* Try to replace a placeholder */
     883      271048 :         maxoff = PageGetMaxOffsetNumber(page);
     884      271048 :         offnum = InvalidOffsetNumber;
     885             : 
     886             :         for (;;)
     887             :         {
     888      271048 :             if (startOffset && *startOffset != InvalidOffsetNumber)
     889       77140 :                 i = *startOffset;
     890             :             else
     891      193908 :                 i = FirstOffsetNumber;
     892    18686444 :             for (; i <= maxoff; i++)
     893             :             {
     894    18686444 :                 SpGistDeadTuple it = (SpGistDeadTuple) PageGetItem(page,
     895             :                                                                    PageGetItemId(page, i));
     896             : 
     897    18686444 :                 if (it->tupstate == SPGIST_PLACEHOLDER)
     898             :                 {
     899      271048 :                     offnum = i;
     900      271048 :                     break;
     901             :                 }
     902             :             }
     903             : 
     904             :             /* Done if we found a placeholder */
     905      271048 :             if (offnum != InvalidOffsetNumber)
     906      271048 :                 break;
     907             : 
     908           0 :             if (startOffset && *startOffset != InvalidOffsetNumber)
     909             :             {
     910             :                 /* Hint was no good, re-search from beginning */
     911           0 :                 *startOffset = InvalidOffsetNumber;
     912           0 :                 continue;
     913             :             }
     914             : 
     915             :             /* Hmm, no placeholder found? */
     916           0 :             opaque->nPlaceholder = 0;
     917           0 :             break;
     918             :         }
     919             : 
     920      271048 :         if (offnum != InvalidOffsetNumber)
     921             :         {
     922             :             /* Replace the placeholder tuple */
     923      271048 :             PageIndexTupleDelete(page, offnum);
     924             : 
     925      271048 :             offnum = PageAddItem(page, item, size, offnum, false, false);
     926             : 
     927             :             /*
     928             :              * We should not have failed given the size check at the top of
     929             :              * the function, but test anyway.  If we did fail, we must PANIC
     930             :              * because we've already deleted the placeholder tuple, and
     931             :              * there's no other way to keep the damage from getting to disk.
     932             :              */
     933      271048 :             if (offnum != InvalidOffsetNumber)
     934             :             {
     935             :                 Assert(opaque->nPlaceholder > 0);
     936      271048 :                 opaque->nPlaceholder--;
     937      271048 :                 if (startOffset)
     938       78872 :                     *startOffset = offnum + 1;
     939             :             }
     940             :             else
     941           0 :                 elog(PANIC, "failed to add item of size %u to SPGiST index page",
     942             :                      (int) size);
     943             : 
     944      271048 :             return offnum;
     945             :         }
     946             :     }
     947             : 
     948             :     /* No luck in replacing a placeholder, so just add it to the page */
     949      672992 :     offnum = PageAddItem(page, item, size,
     950             :                          InvalidOffsetNumber, false, false);
     951             : 
     952      672992 :     if (offnum == InvalidOffsetNumber && !errorOK)
     953           0 :         elog(ERROR, "failed to add item of size %u to SPGiST index page",
     954             :              (int) size);
     955             : 
     956      672992 :     return offnum;
     957             : }
     958             : 
     959             : /*
     960             :  *  spgproperty() -- Check boolean properties of indexes.
     961             :  *
     962             :  * This is optional for most AMs, but is required for SP-GiST because the core
     963             :  * property code doesn't support AMPROP_DISTANCE_ORDERABLE.
     964             :  */
     965             : bool
     966         124 : spgproperty(Oid index_oid, int attno,
     967             :             IndexAMProperty prop, const char *propname,
     968             :             bool *res, bool *isnull)
     969             : {
     970             :     Oid         opclass,
     971             :                 opfamily,
     972             :                 opcintype;
     973             :     CatCList   *catlist;
     974             :     int         i;
     975             : 
     976             :     /* Only answer column-level inquiries */
     977         124 :     if (attno == 0)
     978          44 :         return false;
     979             : 
     980          80 :     switch (prop)
     981             :     {
     982             :         case AMPROP_DISTANCE_ORDERABLE:
     983           8 :             break;
     984             :         default:
     985          72 :             return false;
     986             :     }
     987             : 
     988             :     /*
     989             :      * Currently, SP-GiST distance-ordered scans require that there be a
     990             :      * distance operator in the opclass with the default types. So we assume
     991             :      * that if such a operator exists, then there's a reason for it.
     992             :      */
     993             : 
     994             :     /* First we need to know the column's opclass. */
     995           8 :     opclass = get_index_column_opclass(index_oid, attno);
     996           8 :     if (!OidIsValid(opclass))
     997             :     {
     998           0 :         *isnull = true;
     999           0 :         return true;
    1000             :     }
    1001             : 
    1002             :     /* Now look up the opclass family and input datatype. */
    1003           8 :     if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
    1004             :     {
    1005           0 :         *isnull = true;
    1006           0 :         return true;
    1007             :     }
    1008             : 
    1009             :     /* And now we can check whether the operator is provided. */
    1010           8 :     catlist = SearchSysCacheList1(AMOPSTRATEGY,
    1011             :                                   ObjectIdGetDatum(opfamily));
    1012             : 
    1013           8 :     *res = false;
    1014             : 
    1015          68 :     for (i = 0; i < catlist->n_members; i++)
    1016             :     {
    1017          64 :         HeapTuple   amoptup = &catlist->members[i]->tuple;
    1018          64 :         Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(amoptup);
    1019             : 
    1020          68 :         if (amopform->amoppurpose == AMOP_ORDER &&
    1021           4 :             (amopform->amoplefttype == opcintype ||
    1022           4 :              amopform->amoprighttype == opcintype) &&
    1023           4 :             opfamily_can_sort_type(amopform->amopsortfamily,
    1024             :                                    get_op_rettype(amopform->amopopr)))
    1025             :         {
    1026           4 :             *res = true;
    1027           4 :             break;
    1028             :         }
    1029             :     }
    1030             : 
    1031           8 :     ReleaseSysCacheList(catlist);
    1032             : 
    1033           8 :     *isnull = false;
    1034             : 
    1035           8 :     return true;
    1036             : }

Generated by: LCOV version 1.13