LCOV - code coverage report
Current view: top level - src/backend/access/brin - brin.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 476 541 88.0 %
Date: 2021-12-03 04:09:03 Functions: 25 27 92.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * brin.c
       3             :  *      Implementation of BRIN indexes for Postgres
       4             :  *
       5             :  * See src/backend/access/brin/README for details.
       6             :  *
       7             :  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/access/brin/brin.c
      12             :  *
      13             :  * TODO
      14             :  *      * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
      15             :  */
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/brin.h"
      19             : #include "access/brin_page.h"
      20             : #include "access/brin_pageops.h"
      21             : #include "access/brin_xlog.h"
      22             : #include "access/relation.h"
      23             : #include "access/reloptions.h"
      24             : #include "access/relscan.h"
      25             : #include "access/table.h"
      26             : #include "access/tableam.h"
      27             : #include "access/xloginsert.h"
      28             : #include "catalog/index.h"
      29             : #include "catalog/pg_am.h"
      30             : #include "commands/vacuum.h"
      31             : #include "miscadmin.h"
      32             : #include "pgstat.h"
      33             : #include "postmaster/autovacuum.h"
      34             : #include "storage/bufmgr.h"
      35             : #include "storage/freespace.h"
      36             : #include "utils/acl.h"
      37             : #include "utils/builtins.h"
      38             : #include "utils/datum.h"
      39             : #include "utils/index_selfuncs.h"
      40             : #include "utils/memutils.h"
      41             : #include "utils/rel.h"
      42             : 
      43             : 
      44             : /*
      45             :  * We use a BrinBuildState during initial construction of a BRIN index.
      46             :  * The running state is kept in a BrinMemTuple.
      47             :  */
      48             : typedef struct BrinBuildState
      49             : {
      50             :     Relation    bs_irel;
      51             :     int         bs_numtuples;
      52             :     Buffer      bs_currentInsertBuf;
      53             :     BlockNumber bs_pagesPerRange;
      54             :     BlockNumber bs_currRangeStart;
      55             :     BrinRevmap *bs_rmAccess;
      56             :     BrinDesc   *bs_bdesc;
      57             :     BrinMemTuple *bs_dtuple;
      58             : } BrinBuildState;
      59             : 
      60             : /*
      61             :  * Struct used as "opaque" during index scans
      62             :  */
      63             : typedef struct BrinOpaque
      64             : {
      65             :     BlockNumber bo_pagesPerRange;
      66             :     BrinRevmap *bo_rmAccess;
      67             :     BrinDesc   *bo_bdesc;
      68             : } BrinOpaque;
      69             : 
      70             : #define BRIN_ALL_BLOCKRANGES    InvalidBlockNumber
      71             : 
      72             : static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
      73             :                                                   BrinRevmap *revmap, BlockNumber pagesPerRange);
      74             : static void terminate_brin_buildstate(BrinBuildState *state);
      75             : static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
      76             :                           bool include_partial, double *numSummarized, double *numExisting);
      77             : static void form_and_insert_tuple(BrinBuildState *state);
      78             : static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
      79             :                          BrinTuple *b);
      80             : static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
      81             : static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
      82             :                                 BrinMemTuple *dtup, Datum *values, bool *nulls);
      83             : static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
      84             : 
      85             : /*
      86             :  * BRIN handler function: return IndexAmRoutine with access method parameters
      87             :  * and callbacks.
      88             :  */
      89             : Datum
      90        1236 : brinhandler(PG_FUNCTION_ARGS)
      91             : {
      92        1236 :     IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
      93             : 
      94        1236 :     amroutine->amstrategies = 0;
      95        1236 :     amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
      96        1236 :     amroutine->amoptsprocnum = BRIN_PROCNUM_OPTIONS;
      97        1236 :     amroutine->amcanorder = false;
      98        1236 :     amroutine->amcanorderbyop = false;
      99        1236 :     amroutine->amcanbackward = false;
     100        1236 :     amroutine->amcanunique = false;
     101        1236 :     amroutine->amcanmulticol = true;
     102        1236 :     amroutine->amoptionalkey = true;
     103        1236 :     amroutine->amsearcharray = false;
     104        1236 :     amroutine->amsearchnulls = true;
     105        1236 :     amroutine->amstorage = true;
     106        1236 :     amroutine->amclusterable = false;
     107        1236 :     amroutine->ampredlocks = false;
     108        1236 :     amroutine->amcanparallel = false;
     109        1236 :     amroutine->amcaninclude = false;
     110        1236 :     amroutine->amusemaintenanceworkmem = false;
     111        1236 :     amroutine->amhotblocking = false;
     112        1236 :     amroutine->amparallelvacuumoptions =
     113             :         VACUUM_OPTION_PARALLEL_CLEANUP;
     114        1236 :     amroutine->amkeytype = InvalidOid;
     115             : 
     116        1236 :     amroutine->ambuild = brinbuild;
     117        1236 :     amroutine->ambuildempty = brinbuildempty;
     118        1236 :     amroutine->aminsert = brininsert;
     119        1236 :     amroutine->ambulkdelete = brinbulkdelete;
     120        1236 :     amroutine->amvacuumcleanup = brinvacuumcleanup;
     121        1236 :     amroutine->amcanreturn = NULL;
     122        1236 :     amroutine->amcostestimate = brincostestimate;
     123        1236 :     amroutine->amoptions = brinoptions;
     124        1236 :     amroutine->amproperty = NULL;
     125        1236 :     amroutine->ambuildphasename = NULL;
     126        1236 :     amroutine->amvalidate = brinvalidate;
     127        1236 :     amroutine->amadjustmembers = NULL;
     128        1236 :     amroutine->ambeginscan = brinbeginscan;
     129        1236 :     amroutine->amrescan = brinrescan;
     130        1236 :     amroutine->amgettuple = NULL;
     131        1236 :     amroutine->amgetbitmap = bringetbitmap;
     132        1236 :     amroutine->amendscan = brinendscan;
     133        1236 :     amroutine->ammarkpos = NULL;
     134        1236 :     amroutine->amrestrpos = NULL;
     135        1236 :     amroutine->amestimateparallelscan = NULL;
     136        1236 :     amroutine->aminitparallelscan = NULL;
     137        1236 :     amroutine->amparallelrescan = NULL;
     138             : 
     139        1236 :     PG_RETURN_POINTER(amroutine);
     140             : }
     141             : 
     142             : /*
     143             :  * A tuple in the heap is being inserted.  To keep a brin index up to date,
     144             :  * we need to obtain the relevant index tuple and compare its stored values
     145             :  * with those of the new tuple.  If the tuple values are not consistent with
     146             :  * the summary tuple, we need to update the index tuple.
     147             :  *
     148             :  * If autosummarization is enabled, check if we need to summarize the previous
     149             :  * page range.
     150             :  *
     151             :  * If the range is not currently summarized (i.e. the revmap returns NULL for
     152             :  * it), there's nothing to do for this tuple.
     153             :  */
     154             : bool
     155        8728 : brininsert(Relation idxRel, Datum *values, bool *nulls,
     156             :            ItemPointer heaptid, Relation heapRel,
     157             :            IndexUniqueCheck checkUnique,
     158             :            bool indexUnchanged,
     159             :            IndexInfo *indexInfo)
     160             : {
     161             :     BlockNumber pagesPerRange;
     162             :     BlockNumber origHeapBlk;
     163             :     BlockNumber heapBlk;
     164        8728 :     BrinDesc   *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
     165             :     BrinRevmap *revmap;
     166        8728 :     Buffer      buf = InvalidBuffer;
     167        8728 :     MemoryContext tupcxt = NULL;
     168        8728 :     MemoryContext oldcxt = CurrentMemoryContext;
     169        8728 :     bool        autosummarize = BrinGetAutoSummarize(idxRel);
     170             : 
     171        8728 :     revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
     172             : 
     173             :     /*
     174             :      * origHeapBlk is the block number where the insertion occurred.  heapBlk
     175             :      * is the first block in the corresponding page range.
     176             :      */
     177        8728 :     origHeapBlk = ItemPointerGetBlockNumber(heaptid);
     178        8728 :     heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
     179             : 
     180             :     for (;;)
     181           0 :     {
     182        8728 :         bool        need_insert = false;
     183             :         OffsetNumber off;
     184             :         BrinTuple  *brtup;
     185             :         BrinMemTuple *dtup;
     186             : 
     187        8728 :         CHECK_FOR_INTERRUPTS();
     188             : 
     189             :         /*
     190             :          * If auto-summarization is enabled and we just inserted the first
     191             :          * tuple into the first block of a new non-first page range, request a
     192             :          * summarization run of the previous range.
     193             :          */
     194        8728 :         if (autosummarize &&
     195         156 :             heapBlk > 0 &&
     196         156 :             heapBlk == origHeapBlk &&
     197         156 :             ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
     198             :         {
     199           8 :             BlockNumber lastPageRange = heapBlk - 1;
     200             :             BrinTuple  *lastPageTuple;
     201             : 
     202             :             lastPageTuple =
     203           8 :                 brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
     204             :                                          NULL, BUFFER_LOCK_SHARE, NULL);
     205           8 :             if (!lastPageTuple)
     206             :             {
     207             :                 bool        recorded;
     208             : 
     209           6 :                 recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
     210             :                                                  RelationGetRelid(idxRel),
     211             :                                                  lastPageRange);
     212           6 :                 if (!recorded)
     213           0 :                     ereport(LOG,
     214             :                             (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     215             :                              errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
     216             :                                     RelationGetRelationName(idxRel),
     217             :                                     lastPageRange)));
     218             :             }
     219             :             else
     220           2 :                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     221             :         }
     222             : 
     223        8728 :         brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
     224             :                                          NULL, BUFFER_LOCK_SHARE, NULL);
     225             : 
     226             :         /* if range is unsummarized, there's nothing to do */
     227        8728 :         if (!brtup)
     228         236 :             break;
     229             : 
     230             :         /* First time through in this statement? */
     231        8492 :         if (bdesc == NULL)
     232             :         {
     233         654 :             MemoryContextSwitchTo(indexInfo->ii_Context);
     234         654 :             bdesc = brin_build_desc(idxRel);
     235         654 :             indexInfo->ii_AmCache = (void *) bdesc;
     236         654 :             MemoryContextSwitchTo(oldcxt);
     237             :         }
     238             :         /* First time through in this brininsert call? */
     239        8492 :         if (tupcxt == NULL)
     240             :         {
     241        8492 :             tupcxt = AllocSetContextCreate(CurrentMemoryContext,
     242             :                                            "brininsert cxt",
     243             :                                            ALLOCSET_DEFAULT_SIZES);
     244        8492 :             MemoryContextSwitchTo(tupcxt);
     245             :         }
     246             : 
     247        8492 :         dtup = brin_deform_tuple(bdesc, brtup, NULL);
     248             : 
     249        8492 :         need_insert = add_values_to_range(idxRel, bdesc, dtup, values, nulls);
     250             : 
     251        8492 :         if (!need_insert)
     252             :         {
     253             :             /*
     254             :              * The tuple is consistent with the new values, so there's nothing
     255             :              * to do.
     256             :              */
     257        8404 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     258             :         }
     259             :         else
     260             :         {
     261          88 :             Page        page = BufferGetPage(buf);
     262          88 :             ItemId      lp = PageGetItemId(page, off);
     263             :             Size        origsz;
     264             :             BrinTuple  *origtup;
     265             :             Size        newsz;
     266             :             BrinTuple  *newtup;
     267             :             bool        samepage;
     268             : 
     269             :             /*
     270             :              * Make a copy of the old tuple, so that we can compare it after
     271             :              * re-acquiring the lock.
     272             :              */
     273          88 :             origsz = ItemIdGetLength(lp);
     274          88 :             origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
     275             : 
     276             :             /*
     277             :              * Before releasing the lock, check if we can attempt a same-page
     278             :              * update.  Another process could insert a tuple concurrently in
     279             :              * the same page though, so downstream we must be prepared to cope
     280             :              * if this turns out to not be possible after all.
     281             :              */
     282          88 :             newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
     283          88 :             samepage = brin_can_do_samepage_update(buf, origsz, newsz);
     284          88 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     285             : 
     286             :             /*
     287             :              * Try to update the tuple.  If this doesn't work for whatever
     288             :              * reason, we need to restart from the top; the revmap might be
     289             :              * pointing at a different tuple for this block now, so we need to
     290             :              * recompute to ensure both our new heap tuple and the other
     291             :              * inserter's are covered by the combined tuple.  It might be that
     292             :              * we don't need to update at all.
     293             :              */
     294          88 :             if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
     295             :                                buf, off, origtup, origsz, newtup, newsz,
     296             :                                samepage))
     297             :             {
     298             :                 /* no luck; start over */
     299           0 :                 MemoryContextResetAndDeleteChildren(tupcxt);
     300           0 :                 continue;
     301             :             }
     302             :         }
     303             : 
     304             :         /* success! */
     305        8492 :         break;
     306             :     }
     307             : 
     308        8728 :     brinRevmapTerminate(revmap);
     309        8728 :     if (BufferIsValid(buf))
     310        8494 :         ReleaseBuffer(buf);
     311        8728 :     MemoryContextSwitchTo(oldcxt);
     312        8728 :     if (tupcxt != NULL)
     313        8492 :         MemoryContextDelete(tupcxt);
     314             : 
     315        8728 :     return false;
     316             : }
     317             : 
     318             : /*
     319             :  * Initialize state for a BRIN index scan.
     320             :  *
     321             :  * We read the metapage here to determine the pages-per-range number that this
     322             :  * index was built with.  Note that since this cannot be changed while we're
     323             :  * holding lock on index, it's not necessary to recompute it during brinrescan.
     324             :  */
     325             : IndexScanDesc
     326        1716 : brinbeginscan(Relation r, int nkeys, int norderbys)
     327             : {
     328             :     IndexScanDesc scan;
     329             :     BrinOpaque *opaque;
     330             : 
     331        1716 :     scan = RelationGetIndexScan(r, nkeys, norderbys);
     332             : 
     333        1716 :     opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
     334        1716 :     opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
     335             :                                                scan->xs_snapshot);
     336        1716 :     opaque->bo_bdesc = brin_build_desc(r);
     337        1716 :     scan->opaque = opaque;
     338             : 
     339        1716 :     return scan;
     340             : }
     341             : 
     342             : /*
     343             :  * Execute the index scan.
     344             :  *
     345             :  * This works by reading index TIDs from the revmap, and obtaining the index
     346             :  * tuples pointed to by them; the summary values in the index tuples are
     347             :  * compared to the scan keys.  We return into the TID bitmap all the pages in
     348             :  * ranges corresponding to index tuples that match the scan keys.
     349             :  *
     350             :  * If a TID from the revmap is read as InvalidTID, we know that range is
     351             :  * unsummarized.  Pages in those ranges need to be returned regardless of scan
     352             :  * keys.
     353             :  */
     354             : int64
     355        1716 : bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
     356             : {
     357        1716 :     Relation    idxRel = scan->indexRelation;
     358        1716 :     Buffer      buf = InvalidBuffer;
     359             :     BrinDesc   *bdesc;
     360             :     Oid         heapOid;
     361             :     Relation    heapRel;
     362             :     BrinOpaque *opaque;
     363             :     BlockNumber nblocks;
     364             :     BlockNumber heapBlk;
     365        1716 :     int         totalpages = 0;
     366             :     FmgrInfo   *consistentFn;
     367             :     MemoryContext oldcxt;
     368             :     MemoryContext perRangeCxt;
     369             :     BrinMemTuple *dtup;
     370        1716 :     BrinTuple  *btup = NULL;
     371        1716 :     Size        btupsz = 0;
     372             :     ScanKey   **keys,
     373             :               **nullkeys;
     374             :     int        *nkeys,
     375             :                *nnullkeys;
     376             :     int         keyno;
     377             :     char       *ptr;
     378             :     Size        len;
     379             :     char       *tmp PG_USED_FOR_ASSERTS_ONLY;
     380             : 
     381        1716 :     opaque = (BrinOpaque *) scan->opaque;
     382        1716 :     bdesc = opaque->bo_bdesc;
     383        1716 :     pgstat_count_index_scan(idxRel);
     384             : 
     385             :     /*
     386             :      * We need to know the size of the table so that we know how long to
     387             :      * iterate on the revmap.
     388             :      */
     389        1716 :     heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
     390        1716 :     heapRel = table_open(heapOid, AccessShareLock);
     391        1716 :     nblocks = RelationGetNumberOfBlocks(heapRel);
     392        1716 :     table_close(heapRel, AccessShareLock);
     393             : 
     394             :     /*
     395             :      * Make room for the consistent support procedures of indexed columns.  We
     396             :      * don't look them up here; we do that lazily the first time we see a scan
     397             :      * key reference each of them.  We rely on zeroing fn_oid to InvalidOid.
     398             :      */
     399        1716 :     consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
     400             : 
     401             :     /*
     402             :      * Make room for per-attribute lists of scan keys that we'll pass to the
     403             :      * consistent support procedure. We don't know which attributes have scan
     404             :      * keys, so we allocate space for all attributes. That may use more memory
     405             :      * but it's probably cheaper than determining which attributes are used.
     406             :      *
     407             :      * We keep null and regular keys separate, so that we can pass just the
     408             :      * regular keys to the consistent function easily.
     409             :      *
     410             :      * To reduce the allocation overhead, we allocate one big chunk and then
     411             :      * carve it into smaller arrays ourselves. All the pieces have exactly the
     412             :      * same lifetime, so that's OK.
     413             :      *
     414             :      * XXX The widest index can have 32 attributes, so the amount of wasted
     415             :      * memory is negligible. We could invent a more compact approach (with
     416             :      * just space for used attributes) but that would make the matching more
     417             :      * complex so it's not a good trade-off.
     418             :      */
     419        1716 :     len =
     420        1716 :         MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) +  /* regular keys */
     421        1716 :         MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
     422        1716 :         MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts) +
     423        1716 :         MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) +  /* NULL keys */
     424        1716 :         MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
     425        1716 :         MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
     426             : 
     427        1716 :     ptr = palloc(len);
     428        1716 :     tmp = ptr;
     429             : 
     430        1716 :     keys = (ScanKey **) ptr;
     431        1716 :     ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
     432             : 
     433        1716 :     nullkeys = (ScanKey **) ptr;
     434        1716 :     ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
     435             : 
     436        1716 :     nkeys = (int *) ptr;
     437        1716 :     ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
     438             : 
     439        1716 :     nnullkeys = (int *) ptr;
     440        1716 :     ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
     441             : 
     442       46156 :     for (int i = 0; i < bdesc->bd_tupdesc->natts; i++)
     443             :     {
     444       44440 :         keys[i] = (ScanKey *) ptr;
     445       44440 :         ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
     446             : 
     447       44440 :         nullkeys[i] = (ScanKey *) ptr;
     448       44440 :         ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
     449             :     }
     450             : 
     451             :     Assert(tmp + len == ptr);
     452             : 
     453             :     /* zero the number of keys */
     454        1716 :     memset(nkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
     455        1716 :     memset(nnullkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
     456             : 
     457             :     /* Preprocess the scan keys - split them into per-attribute arrays. */
     458        3432 :     for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
     459             :     {
     460        1716 :         ScanKey     key = &scan->keyData[keyno];
     461        1716 :         AttrNumber  keyattno = key->sk_attno;
     462             : 
     463             :         /*
     464             :          * The collation of the scan key must match the collation used in the
     465             :          * index column (but only if the search is not IS NULL/ IS NOT NULL).
     466             :          * Otherwise we shouldn't be using this index ...
     467             :          */
     468             :         Assert((key->sk_flags & SK_ISNULL) ||
     469             :                (key->sk_collation ==
     470             :                 TupleDescAttr(bdesc->bd_tupdesc,
     471             :                               keyattno - 1)->attcollation));
     472             : 
     473             :         /*
     474             :          * First time we see this index attribute, so init as needed.
     475             :          *
     476             :          * This is a bit of an overkill - we don't know how many scan keys are
     477             :          * there for this attribute, so we simply allocate the largest number
     478             :          * possible (as if all keys were for this attribute). This may waste a
     479             :          * bit of memory, but we only expect small number of scan keys in
     480             :          * general, so this should be negligible, and repeated repalloc calls
     481             :          * are not free either.
     482             :          */
     483        1716 :         if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
     484             :         {
     485             :             FmgrInfo   *tmp;
     486             : 
     487             :             /* First time we see this attribute, so no key/null keys. */
     488             :             Assert(nkeys[keyattno - 1] == 0);
     489             :             Assert(nnullkeys[keyattno - 1] == 0);
     490             : 
     491        1716 :             tmp = index_getprocinfo(idxRel, keyattno,
     492             :                                     BRIN_PROCNUM_CONSISTENT);
     493        1716 :             fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
     494             :                            CurrentMemoryContext);
     495             :         }
     496             : 
     497             :         /* Add key to the proper per-attribute array. */
     498        1716 :         if (key->sk_flags & SK_ISNULL)
     499             :         {
     500          24 :             nullkeys[keyattno - 1][nnullkeys[keyattno - 1]] = key;
     501          24 :             nnullkeys[keyattno - 1]++;
     502             :         }
     503             :         else
     504             :         {
     505        1692 :             keys[keyattno - 1][nkeys[keyattno - 1]] = key;
     506        1692 :             nkeys[keyattno - 1]++;
     507             :         }
     508             :     }
     509             : 
     510             :     /* allocate an initial in-memory tuple, out of the per-range memcxt */
     511        1716 :     dtup = brin_new_memtuple(bdesc);
     512             : 
     513             :     /*
     514             :      * Setup and use a per-range memory context, which is reset every time we
     515             :      * loop below.  This avoids having to free the tuples within the loop.
     516             :      */
     517        1716 :     perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
     518             :                                         "bringetbitmap cxt",
     519             :                                         ALLOCSET_DEFAULT_SIZES);
     520        1716 :     oldcxt = MemoryContextSwitchTo(perRangeCxt);
     521             : 
     522             :     /*
     523             :      * Now scan the revmap.  We start by querying for heap page 0,
     524             :      * incrementing by the number of pages per range; this gives us a full
     525             :      * view of the table.
     526             :      */
     527      126872 :     for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
     528             :     {
     529             :         bool        addrange;
     530      125156 :         bool        gottuple = false;
     531             :         BrinTuple  *tup;
     532             :         OffsetNumber off;
     533             :         Size        size;
     534             : 
     535      125156 :         CHECK_FOR_INTERRUPTS();
     536             : 
     537      125156 :         MemoryContextResetAndDeleteChildren(perRangeCxt);
     538             : 
     539      125156 :         tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
     540             :                                        &off, &size, BUFFER_LOCK_SHARE,
     541             :                                        scan->xs_snapshot);
     542      125156 :         if (tup)
     543             :         {
     544      125156 :             gottuple = true;
     545      125156 :             btup = brin_copy_tuple(tup, size, btup, &btupsz);
     546      125156 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     547             :         }
     548             : 
     549             :         /*
     550             :          * For page ranges with no indexed tuple, we must return the whole
     551             :          * range; otherwise, compare it to the scan keys.
     552             :          */
     553      125156 :         if (!gottuple)
     554             :         {
     555           0 :             addrange = true;
     556             :         }
     557             :         else
     558             :         {
     559      125156 :             dtup = brin_deform_tuple(bdesc, btup, dtup);
     560      125156 :             if (dtup->bt_placeholder)
     561             :             {
     562             :                 /*
     563             :                  * Placeholder tuples are always returned, regardless of the
     564             :                  * values stored in them.
     565             :                  */
     566           0 :                 addrange = true;
     567             :             }
     568             :             else
     569             :             {
     570             :                 int         attno;
     571             : 
     572             :                 /*
     573             :                  * Compare scan keys with summary values stored for the range.
     574             :                  * If scan keys are matched, the page range must be added to
     575             :                  * the bitmap.  We initially assume the range needs to be
     576             :                  * added; in particular this serves the case where there are
     577             :                  * no keys.
     578             :                  */
     579      125156 :                 addrange = true;
     580     3634500 :                 for (attno = 1; attno <= bdesc->bd_tupdesc->natts; attno++)
     581             :                 {
     582             :                     BrinValues *bval;
     583             :                     Datum       add;
     584             :                     Oid         collation;
     585             : 
     586             :                     /*
     587             :                      * skip attributes without any scan keys (both regular and
     588             :                      * IS [NOT] NULL)
     589             :                      */
     590     3510260 :                     if (nkeys[attno - 1] == 0 && nnullkeys[attno - 1] == 0)
     591     3385104 :                         continue;
     592             : 
     593      125156 :                     bval = &dtup->bt_columns[attno - 1];
     594             : 
     595             :                     /*
     596             :                      * First check if there are any IS [NOT] NULL scan keys,
     597             :                      * and if we're violating them. In that case we can
     598             :                      * terminate early, without invoking the support function.
     599             :                      *
     600             :                      * As there may be more keys, we can only determine
     601             :                      * mismatch within this loop.
     602             :                      */
     603      125156 :                     if (bdesc->bd_info[attno - 1]->oi_regular_nulls &&
     604      125156 :                         !check_null_keys(bval, nullkeys[attno - 1],
     605      125156 :                                          nnullkeys[attno - 1]))
     606             :                     {
     607             :                         /*
     608             :                          * If any of the IS [NOT] NULL keys failed, the page
     609             :                          * range as a whole can't pass. So terminate the loop.
     610             :                          */
     611         664 :                         addrange = false;
     612         664 :                         break;
     613             :                     }
     614             : 
     615             :                     /*
     616             :                      * So either there are no IS [NOT] NULL keys, or all
     617             :                      * passed. If there are no regular scan keys, we're done -
     618             :                      * the page range matches. If there are regular keys, but
     619             :                      * the page range is marked as 'all nulls' it can't
     620             :                      * possibly pass (we're assuming the operators are
     621             :                      * strict).
     622             :                      */
     623             : 
     624             :                     /* No regular scan keys - page range as a whole passes. */
     625      124492 :                     if (!nkeys[attno - 1])
     626         824 :                         continue;
     627             : 
     628             :                     Assert((nkeys[attno - 1] > 0) &&
     629             :                            (nkeys[attno - 1] <= scan->numberOfKeys));
     630             : 
     631             :                     /* If it is all nulls, it cannot possibly be consistent. */
     632      123668 :                     if (bval->bv_allnulls)
     633             :                     {
     634         252 :                         addrange = false;
     635         252 :                         break;
     636             :                     }
     637             : 
     638             :                     /*
     639             :                      * Collation from the first key (has to be the same for
     640             :                      * all keys for the same attribute).
     641             :                      */
     642      123416 :                     collation = keys[attno - 1][0]->sk_collation;
     643             : 
     644             :                     /*
     645             :                      * Check whether the scan key is consistent with the page
     646             :                      * range values; if so, have the pages in the range added
     647             :                      * to the output bitmap.
     648             :                      *
     649             :                      * The opclass may or may not support processing of
     650             :                      * multiple scan keys. We can determine that based on the
     651             :                      * number of arguments - functions with extra parameter
     652             :                      * (number of scan keys) do support this, otherwise we
     653             :                      * have to simply pass the scan keys one by one.
     654             :                      */
     655      123416 :                     if (consistentFn[attno - 1].fn_nargs >= 4)
     656             :                     {
     657             :                         /* Check all keys at once */
     658       25008 :                         add = FunctionCall4Coll(&consistentFn[attno - 1],
     659             :                                                 collation,
     660             :                                                 PointerGetDatum(bdesc),
     661             :                                                 PointerGetDatum(bval),
     662       25008 :                                                 PointerGetDatum(keys[attno - 1]),
     663       25008 :                                                 Int32GetDatum(nkeys[attno - 1]));
     664       25008 :                         addrange = DatumGetBool(add);
     665             :                     }
     666             :                     else
     667             :                     {
     668             :                         /*
     669             :                          * Check keys one by one
     670             :                          *
     671             :                          * When there are multiple scan keys, failure to meet
     672             :                          * the criteria for a single one of them is enough to
     673             :                          * discard the range as a whole, so break out of the
     674             :                          * loop as soon as a false return value is obtained.
     675             :                          */
     676             :                         int         keyno;
     677             : 
     678      171892 :                         for (keyno = 0; keyno < nkeys[attno - 1]; keyno++)
     679             :                         {
     680       98408 :                             add = FunctionCall3Coll(&consistentFn[attno - 1],
     681       98408 :                                                     keys[attno - 1][keyno]->sk_collation,
     682             :                                                     PointerGetDatum(bdesc),
     683             :                                                     PointerGetDatum(bval),
     684       98408 :                                                     PointerGetDatum(keys[attno - 1][keyno]));
     685       98408 :                             addrange = DatumGetBool(add);
     686       98408 :                             if (!addrange)
     687       24924 :                                 break;
     688             :                         }
     689             :                     }
     690             :                 }
     691             :             }
     692             :         }
     693             : 
     694             :         /* add the pages in the range to the output bitmap, if needed */
     695      125156 :         if (addrange)
     696             :         {
     697             :             BlockNumber pageno;
     698             : 
     699       90168 :             for (pageno = heapBlk;
     700      180336 :                  pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
     701       90168 :                  pageno++)
     702             :             {
     703       90168 :                 MemoryContextSwitchTo(oldcxt);
     704       90168 :                 tbm_add_page(tbm, pageno);
     705       90168 :                 totalpages++;
     706       90168 :                 MemoryContextSwitchTo(perRangeCxt);
     707             :             }
     708             :         }
     709             :     }
     710             : 
     711        1716 :     MemoryContextSwitchTo(oldcxt);
     712        1716 :     MemoryContextDelete(perRangeCxt);
     713             : 
     714        1716 :     if (buf != InvalidBuffer)
     715        1716 :         ReleaseBuffer(buf);
     716             : 
     717             :     /*
     718             :      * XXX We have an approximation of the number of *pages* that our scan
     719             :      * returns, but we don't have a precise idea of the number of heap tuples
     720             :      * involved.
     721             :      */
     722        1716 :     return totalpages * 10;
     723             : }
     724             : 
     725             : /*
     726             :  * Re-initialize state for a BRIN index scan
     727             :  */
     728             : void
     729        1716 : brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
     730             :            ScanKey orderbys, int norderbys)
     731             : {
     732             :     /*
     733             :      * Other index AMs preprocess the scan keys at this point, or sometime
     734             :      * early during the scan; this lets them optimize by removing redundant
     735             :      * keys, or doing early returns when they are impossible to satisfy; see
     736             :      * _bt_preprocess_keys for an example.  Something like that could be added
     737             :      * here someday, too.
     738             :      */
     739             : 
     740        1716 :     if (scankey && scan->numberOfKeys > 0)
     741        1716 :         memmove(scan->keyData, scankey,
     742        1716 :                 scan->numberOfKeys * sizeof(ScanKeyData));
     743        1716 : }
     744             : 
     745             : /*
     746             :  * Close down a BRIN index scan
     747             :  */
     748             : void
     749        1716 : brinendscan(IndexScanDesc scan)
     750             : {
     751        1716 :     BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
     752             : 
     753        1716 :     brinRevmapTerminate(opaque->bo_rmAccess);
     754        1716 :     brin_free_desc(opaque->bo_bdesc);
     755        1716 :     pfree(opaque);
     756        1716 : }
     757             : 
     758             : /*
     759             :  * Per-heap-tuple callback for table_index_build_scan.
     760             :  *
     761             :  * Note we don't worry about the page range at the end of the table here; it is
     762             :  * present in the build state struct after we're called the last time, but not
     763             :  * inserted into the index.  Caller must ensure to do so, if appropriate.
     764             :  */
     765             : static void
     766      487466 : brinbuildCallback(Relation index,
     767             :                   ItemPointer tid,
     768             :                   Datum *values,
     769             :                   bool *isnull,
     770             :                   bool tupleIsAlive,
     771             :                   void *brstate)
     772             : {
     773      487466 :     BrinBuildState *state = (BrinBuildState *) brstate;
     774             :     BlockNumber thisblock;
     775             : 
     776      487466 :     thisblock = ItemPointerGetBlockNumber(tid);
     777             : 
     778             :     /*
     779             :      * If we're in a block that belongs to a future range, summarize what
     780             :      * we've got and start afresh.  Note the scan might have skipped many
     781             :      * pages, if they were devoid of live tuples; make sure to insert index
     782             :      * tuples for those too.
     783             :      */
     784      488732 :     while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
     785             :     {
     786             : 
     787             :         BRIN_elog((DEBUG2,
     788             :                    "brinbuildCallback: completed a range: %u--%u",
     789             :                    state->bs_currRangeStart,
     790             :                    state->bs_currRangeStart + state->bs_pagesPerRange));
     791             : 
     792             :         /* create the index tuple and insert it */
     793        1266 :         form_and_insert_tuple(state);
     794             : 
     795             :         /* set state to correspond to the next range */
     796        1266 :         state->bs_currRangeStart += state->bs_pagesPerRange;
     797             : 
     798             :         /* re-initialize state for it */
     799        1266 :         brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
     800             :     }
     801             : 
     802             :     /* Accumulate the current tuple into the running state */
     803      487466 :     (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
     804             :                                values, isnull);
     805      487466 : }
     806             : 
     807             : /*
     808             :  * brinbuild() -- build a new BRIN index.
     809             :  */
     810             : IndexBuildResult *
     811         172 : brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
     812             : {
     813             :     IndexBuildResult *result;
     814             :     double      reltuples;
     815             :     double      idxtuples;
     816             :     BrinRevmap *revmap;
     817             :     BrinBuildState *state;
     818             :     Buffer      meta;
     819             :     BlockNumber pagesPerRange;
     820             : 
     821             :     /*
     822             :      * We expect to be called exactly once for any index relation.
     823             :      */
     824         172 :     if (RelationGetNumberOfBlocks(index) != 0)
     825           0 :         elog(ERROR, "index \"%s\" already contains data",
     826             :              RelationGetRelationName(index));
     827             : 
     828             :     /*
     829             :      * Critical section not required, because on error the creation of the
     830             :      * whole relation will be rolled back.
     831             :      */
     832             : 
     833         172 :     meta = ReadBuffer(index, P_NEW);
     834             :     Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
     835         172 :     LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
     836             : 
     837         172 :     brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
     838             :                        BRIN_CURRENT_VERSION);
     839         172 :     MarkBufferDirty(meta);
     840             : 
     841         172 :     if (RelationNeedsWAL(index))
     842             :     {
     843             :         xl_brin_createidx xlrec;
     844             :         XLogRecPtr  recptr;
     845             :         Page        page;
     846             : 
     847         110 :         xlrec.version = BRIN_CURRENT_VERSION;
     848         110 :         xlrec.pagesPerRange = BrinGetPagesPerRange(index);
     849             : 
     850         110 :         XLogBeginInsert();
     851         110 :         XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
     852         110 :         XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
     853             : 
     854         110 :         recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
     855             : 
     856         110 :         page = BufferGetPage(meta);
     857         110 :         PageSetLSN(page, recptr);
     858             :     }
     859             : 
     860         172 :     UnlockReleaseBuffer(meta);
     861             : 
     862             :     /*
     863             :      * Initialize our state, including the deformed tuple state.
     864             :      */
     865         172 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
     866         172 :     state = initialize_brin_buildstate(index, revmap, pagesPerRange);
     867             : 
     868             :     /*
     869             :      * Now scan the relation.  No syncscan allowed here because we want the
     870             :      * heap blocks in physical order.
     871             :      */
     872         172 :     reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
     873             :                                        brinbuildCallback, (void *) state, NULL);
     874             : 
     875             :     /* process the final batch */
     876         172 :     form_and_insert_tuple(state);
     877             : 
     878             :     /* release resources */
     879         172 :     idxtuples = state->bs_numtuples;
     880         172 :     brinRevmapTerminate(state->bs_rmAccess);
     881         172 :     terminate_brin_buildstate(state);
     882             : 
     883             :     /*
     884             :      * Return statistics
     885             :      */
     886         172 :     result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
     887             : 
     888         172 :     result->heap_tuples = reltuples;
     889         172 :     result->index_tuples = idxtuples;
     890             : 
     891         172 :     return result;
     892             : }
     893             : 
     894             : void
     895           0 : brinbuildempty(Relation index)
     896             : {
     897             :     Buffer      metabuf;
     898             : 
     899             :     /* An empty BRIN index has a metapage only. */
     900             :     metabuf =
     901           0 :         ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
     902           0 :     LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
     903             : 
     904             :     /* Initialize and xlog metabuffer. */
     905           0 :     START_CRIT_SECTION();
     906           0 :     brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
     907             :                        BRIN_CURRENT_VERSION);
     908           0 :     MarkBufferDirty(metabuf);
     909           0 :     log_newpage_buffer(metabuf, true);
     910           0 :     END_CRIT_SECTION();
     911             : 
     912           0 :     UnlockReleaseBuffer(metabuf);
     913           0 : }
     914             : 
     915             : /*
     916             :  * brinbulkdelete
     917             :  *      Since there are no per-heap-tuple index tuples in BRIN indexes,
     918             :  *      there's not a lot we can do here.
     919             :  *
     920             :  * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
     921             :  * tuple is deleted), meaning the need to re-run summarization on the affected
     922             :  * range.  Would need to add an extra flag in brintuples for that.
     923             :  */
     924             : IndexBulkDeleteResult *
     925           8 : brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
     926             :                IndexBulkDeleteCallback callback, void *callback_state)
     927             : {
     928             :     /* allocate stats if first time through, else re-use existing struct */
     929           8 :     if (stats == NULL)
     930           8 :         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     931             : 
     932           8 :     return stats;
     933             : }
     934             : 
     935             : /*
     936             :  * This routine is in charge of "vacuuming" a BRIN index: we just summarize
     937             :  * ranges that are currently unsummarized.
     938             :  */
     939             : IndexBulkDeleteResult *
     940          56 : brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
     941             : {
     942             :     Relation    heapRel;
     943             : 
     944             :     /* No-op in ANALYZE ONLY mode */
     945          56 :     if (info->analyze_only)
     946           2 :         return stats;
     947             : 
     948          54 :     if (!stats)
     949          46 :         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     950          54 :     stats->num_pages = RelationGetNumberOfBlocks(info->index);
     951             :     /* rest of stats is initialized by zeroing */
     952             : 
     953          54 :     heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
     954             :                          AccessShareLock);
     955             : 
     956          54 :     brin_vacuum_scan(info->index, info->strategy);
     957             : 
     958          54 :     brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
     959             :                   &stats->num_index_tuples, &stats->num_index_tuples);
     960             : 
     961          54 :     table_close(heapRel, AccessShareLock);
     962             : 
     963          54 :     return stats;
     964             : }
     965             : 
     966             : /*
     967             :  * reloptions processor for BRIN indexes
     968             :  */
     969             : bytea *
     970         376 : brinoptions(Datum reloptions, bool validate)
     971             : {
     972             :     static const relopt_parse_elt tab[] = {
     973             :         {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
     974             :         {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
     975             :     };
     976             : 
     977         376 :     return (bytea *) build_reloptions(reloptions, validate,
     978             :                                       RELOPT_KIND_BRIN,
     979             :                                       sizeof(BrinOptions),
     980             :                                       tab, lengthof(tab));
     981             : }
     982             : 
     983             : /*
     984             :  * SQL-callable function to scan through an index and summarize all ranges
     985             :  * that are not currently summarized.
     986             :  */
     987             : Datum
     988          38 : brin_summarize_new_values(PG_FUNCTION_ARGS)
     989             : {
     990          38 :     Datum       relation = PG_GETARG_DATUM(0);
     991             : 
     992          38 :     return DirectFunctionCall2(brin_summarize_range,
     993             :                                relation,
     994             :                                Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
     995             : }
     996             : 
     997             : /*
     998             :  * SQL-callable function to summarize the indicated page range, if not already
     999             :  * summarized.  If the second argument is BRIN_ALL_BLOCKRANGES, all
    1000             :  * unsummarized ranges are summarized.
    1001             :  */
    1002             : Datum
    1003         120 : brin_summarize_range(PG_FUNCTION_ARGS)
    1004             : {
    1005         120 :     Oid         indexoid = PG_GETARG_OID(0);
    1006         120 :     int64       heapBlk64 = PG_GETARG_INT64(1);
    1007             :     BlockNumber heapBlk;
    1008             :     Oid         heapoid;
    1009             :     Relation    indexRel;
    1010             :     Relation    heapRel;
    1011         120 :     double      numSummarized = 0;
    1012             : 
    1013         120 :     if (RecoveryInProgress())
    1014           0 :         ereport(ERROR,
    1015             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1016             :                  errmsg("recovery is in progress"),
    1017             :                  errhint("BRIN control functions cannot be executed during recovery.")));
    1018             : 
    1019         120 :     if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
    1020             :     {
    1021          24 :         char       *blk = psprintf(INT64_FORMAT, heapBlk64);
    1022             : 
    1023          24 :         ereport(ERROR,
    1024             :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
    1025             :                  errmsg("block number out of range: %s", blk)));
    1026             :     }
    1027          96 :     heapBlk = (BlockNumber) heapBlk64;
    1028             : 
    1029             :     /*
    1030             :      * We must lock table before index to avoid deadlocks.  However, if the
    1031             :      * passed indexoid isn't an index then IndexGetRelation() will fail.
    1032             :      * Rather than emitting a not-very-helpful error message, postpone
    1033             :      * complaining, expecting that the is-it-an-index test below will fail.
    1034             :      */
    1035          96 :     heapoid = IndexGetRelation(indexoid, true);
    1036          96 :     if (OidIsValid(heapoid))
    1037          84 :         heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
    1038             :     else
    1039          12 :         heapRel = NULL;
    1040             : 
    1041          96 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
    1042             : 
    1043             :     /* Must be a BRIN index */
    1044          84 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
    1045          84 :         indexRel->rd_rel->relam != BRIN_AM_OID)
    1046          12 :         ereport(ERROR,
    1047             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1048             :                  errmsg("\"%s\" is not a BRIN index",
    1049             :                         RelationGetRelationName(indexRel))));
    1050             : 
    1051             :     /* User must own the index (comparable to privileges needed for VACUUM) */
    1052          72 :     if (!pg_class_ownercheck(indexoid, GetUserId()))
    1053           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
    1054           0 :                        RelationGetRelationName(indexRel));
    1055             : 
    1056             :     /*
    1057             :      * Since we did the IndexGetRelation call above without any lock, it's
    1058             :      * barely possible that a race against an index drop/recreation could have
    1059             :      * netted us the wrong table.  Recheck.
    1060             :      */
    1061          72 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
    1062           0 :         ereport(ERROR,
    1063             :                 (errcode(ERRCODE_UNDEFINED_TABLE),
    1064             :                  errmsg("could not open parent table of index \"%s\"",
    1065             :                         RelationGetRelationName(indexRel))));
    1066             : 
    1067             :     /* OK, do it */
    1068          72 :     brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
    1069             : 
    1070          72 :     relation_close(indexRel, ShareUpdateExclusiveLock);
    1071          72 :     relation_close(heapRel, ShareUpdateExclusiveLock);
    1072             : 
    1073          72 :     PG_RETURN_INT32((int32) numSummarized);
    1074             : }
    1075             : 
    1076             : /*
    1077             :  * SQL-callable interface to mark a range as no longer summarized
    1078             :  */
    1079             : Datum
    1080          64 : brin_desummarize_range(PG_FUNCTION_ARGS)
    1081             : {
    1082          64 :     Oid         indexoid = PG_GETARG_OID(0);
    1083          64 :     int64       heapBlk64 = PG_GETARG_INT64(1);
    1084             :     BlockNumber heapBlk;
    1085             :     Oid         heapoid;
    1086             :     Relation    heapRel;
    1087             :     Relation    indexRel;
    1088             :     bool        done;
    1089             : 
    1090          64 :     if (RecoveryInProgress())
    1091           0 :         ereport(ERROR,
    1092             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1093             :                  errmsg("recovery is in progress"),
    1094             :                  errhint("BRIN control functions cannot be executed during recovery.")));
    1095             : 
    1096          64 :     if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
    1097             :     {
    1098          12 :         char       *blk = psprintf(INT64_FORMAT, heapBlk64);
    1099             : 
    1100          12 :         ereport(ERROR,
    1101             :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
    1102             :                  errmsg("block number out of range: %s", blk)));
    1103             :     }
    1104          52 :     heapBlk = (BlockNumber) heapBlk64;
    1105             : 
    1106             :     /*
    1107             :      * We must lock table before index to avoid deadlocks.  However, if the
    1108             :      * passed indexoid isn't an index then IndexGetRelation() will fail.
    1109             :      * Rather than emitting a not-very-helpful error message, postpone
    1110             :      * complaining, expecting that the is-it-an-index test below will fail.
    1111             :      */
    1112          52 :     heapoid = IndexGetRelation(indexoid, true);
    1113          52 :     if (OidIsValid(heapoid))
    1114          52 :         heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
    1115             :     else
    1116           0 :         heapRel = NULL;
    1117             : 
    1118          52 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
    1119             : 
    1120             :     /* Must be a BRIN index */
    1121          52 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
    1122          52 :         indexRel->rd_rel->relam != BRIN_AM_OID)
    1123           0 :         ereport(ERROR,
    1124             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1125             :                  errmsg("\"%s\" is not a BRIN index",
    1126             :                         RelationGetRelationName(indexRel))));
    1127             : 
    1128             :     /* User must own the index (comparable to privileges needed for VACUUM) */
    1129          52 :     if (!pg_class_ownercheck(indexoid, GetUserId()))
    1130           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
    1131           0 :                        RelationGetRelationName(indexRel));
    1132             : 
    1133             :     /*
    1134             :      * Since we did the IndexGetRelation call above without any lock, it's
    1135             :      * barely possible that a race against an index drop/recreation could have
    1136             :      * netted us the wrong table.  Recheck.
    1137             :      */
    1138          52 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
    1139           0 :         ereport(ERROR,
    1140             :                 (errcode(ERRCODE_UNDEFINED_TABLE),
    1141             :                  errmsg("could not open parent table of index \"%s\"",
    1142             :                         RelationGetRelationName(indexRel))));
    1143             : 
    1144             :     /* the revmap does the hard work */
    1145             :     do
    1146             :     {
    1147          52 :         done = brinRevmapDesummarizeRange(indexRel, heapBlk);
    1148             :     }
    1149          52 :     while (!done);
    1150             : 
    1151          52 :     relation_close(indexRel, ShareUpdateExclusiveLock);
    1152          52 :     relation_close(heapRel, ShareUpdateExclusiveLock);
    1153             : 
    1154          52 :     PG_RETURN_VOID();
    1155             : }
    1156             : 
    1157             : /*
    1158             :  * Build a BrinDesc used to create or scan a BRIN index
    1159             :  */
    1160             : BrinDesc *
    1161        2604 : brin_build_desc(Relation rel)
    1162             : {
    1163             :     BrinOpcInfo **opcinfo;
    1164             :     BrinDesc   *bdesc;
    1165             :     TupleDesc   tupdesc;
    1166        2604 :     int         totalstored = 0;
    1167             :     int         keyno;
    1168             :     long        totalsize;
    1169             :     MemoryContext cxt;
    1170             :     MemoryContext oldcxt;
    1171             : 
    1172        2604 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
    1173             :                                 "brin desc cxt",
    1174             :                                 ALLOCSET_SMALL_SIZES);
    1175        2604 :     oldcxt = MemoryContextSwitchTo(cxt);
    1176        2604 :     tupdesc = RelationGetDescr(rel);
    1177             : 
    1178             :     /*
    1179             :      * Obtain BrinOpcInfo for each indexed column.  While at it, accumulate
    1180             :      * the number of columns stored, since the number is opclass-defined.
    1181             :      */
    1182        2604 :     opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
    1183       48984 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
    1184             :     {
    1185             :         FmgrInfo   *opcInfoFn;
    1186       46380 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
    1187             : 
    1188       46380 :         opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
    1189             : 
    1190       46380 :         opcinfo[keyno] = (BrinOpcInfo *)
    1191       46380 :             DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
    1192       46380 :         totalstored += opcinfo[keyno]->oi_nstored;
    1193             :     }
    1194             : 
    1195             :     /* Allocate our result struct and fill it in */
    1196        2604 :     totalsize = offsetof(BrinDesc, bd_info) +
    1197        2604 :         sizeof(BrinOpcInfo *) * tupdesc->natts;
    1198             : 
    1199        2604 :     bdesc = palloc(totalsize);
    1200        2604 :     bdesc->bd_context = cxt;
    1201        2604 :     bdesc->bd_index = rel;
    1202        2604 :     bdesc->bd_tupdesc = tupdesc;
    1203        2604 :     bdesc->bd_disktdesc = NULL; /* generated lazily */
    1204        2604 :     bdesc->bd_totalstored = totalstored;
    1205             : 
    1206       48984 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
    1207       46380 :         bdesc->bd_info[keyno] = opcinfo[keyno];
    1208        2604 :     pfree(opcinfo);
    1209             : 
    1210        2604 :     MemoryContextSwitchTo(oldcxt);
    1211             : 
    1212        2604 :     return bdesc;
    1213             : }
    1214             : 
    1215             : void
    1216        1950 : brin_free_desc(BrinDesc *bdesc)
    1217             : {
    1218             :     /* make sure the tupdesc is still valid */
    1219             :     Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
    1220             :     /* no need for retail pfree */
    1221        1950 :     MemoryContextDelete(bdesc->bd_context);
    1222        1950 : }
    1223             : 
    1224             : /*
    1225             :  * Fetch index's statistical data into *stats
    1226             :  */
    1227             : void
    1228        6896 : brinGetStats(Relation index, BrinStatsData *stats)
    1229             : {
    1230             :     Buffer      metabuffer;
    1231             :     Page        metapage;
    1232             :     BrinMetaPageData *metadata;
    1233             : 
    1234        6896 :     metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
    1235        6896 :     LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
    1236        6896 :     metapage = BufferGetPage(metabuffer);
    1237        6896 :     metadata = (BrinMetaPageData *) PageGetContents(metapage);
    1238             : 
    1239        6896 :     stats->pagesPerRange = metadata->pagesPerRange;
    1240        6896 :     stats->revmapNumPages = metadata->lastRevmapPage - 1;
    1241             : 
    1242        6896 :     UnlockReleaseBuffer(metabuffer);
    1243        6896 : }
    1244             : 
    1245             : /*
    1246             :  * Initialize a BrinBuildState appropriate to create tuples on the given index.
    1247             :  */
    1248             : static BrinBuildState *
    1249         210 : initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
    1250             :                            BlockNumber pagesPerRange)
    1251             : {
    1252             :     BrinBuildState *state;
    1253             : 
    1254         210 :     state = palloc(sizeof(BrinBuildState));
    1255             : 
    1256         210 :     state->bs_irel = idxRel;
    1257         210 :     state->bs_numtuples = 0;
    1258         210 :     state->bs_currentInsertBuf = InvalidBuffer;
    1259         210 :     state->bs_pagesPerRange = pagesPerRange;
    1260         210 :     state->bs_currRangeStart = 0;
    1261         210 :     state->bs_rmAccess = revmap;
    1262         210 :     state->bs_bdesc = brin_build_desc(idxRel);
    1263         210 :     state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
    1264             : 
    1265         210 :     brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
    1266             : 
    1267         210 :     return state;
    1268             : }
    1269             : 
    1270             : /*
    1271             :  * Release resources associated with a BrinBuildState.
    1272             :  */
    1273             : static void
    1274         210 : terminate_brin_buildstate(BrinBuildState *state)
    1275             : {
    1276             :     /*
    1277             :      * Release the last index buffer used.  We might as well ensure that
    1278             :      * whatever free space remains in that page is available in FSM, too.
    1279             :      */
    1280         210 :     if (!BufferIsInvalid(state->bs_currentInsertBuf))
    1281             :     {
    1282             :         Page        page;
    1283             :         Size        freespace;
    1284             :         BlockNumber blk;
    1285             : 
    1286         172 :         page = BufferGetPage(state->bs_currentInsertBuf);
    1287         172 :         freespace = PageGetFreeSpace(page);
    1288         172 :         blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
    1289         172 :         ReleaseBuffer(state->bs_currentInsertBuf);
    1290         172 :         RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
    1291         172 :         FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
    1292             :     }
    1293             : 
    1294         210 :     brin_free_desc(state->bs_bdesc);
    1295         210 :     pfree(state->bs_dtuple);
    1296         210 :     pfree(state);
    1297         210 : }
    1298             : 
    1299             : /*
    1300             :  * On the given BRIN index, summarize the heap page range that corresponds
    1301             :  * to the heap block number given.
    1302             :  *
    1303             :  * This routine can run in parallel with insertions into the heap.  To avoid
    1304             :  * missing those values from the summary tuple, we first insert a placeholder
    1305             :  * index tuple into the index, then execute the heap scan; transactions
    1306             :  * concurrent with the scan update the placeholder tuple.  After the scan, we
    1307             :  * union the placeholder tuple with the one computed by this routine.  The
    1308             :  * update of the index value happens in a loop, so that if somebody updates
    1309             :  * the placeholder tuple after we read it, we detect the case and try again.
    1310             :  * This ensures that the concurrently inserted tuples are not lost.
    1311             :  *
    1312             :  * A further corner case is this routine being asked to summarize the partial
    1313             :  * range at the end of the table.  heapNumBlocks is the (possibly outdated)
    1314             :  * table size; if we notice that the requested range lies beyond that size,
    1315             :  * we re-compute the table size after inserting the placeholder tuple, to
    1316             :  * avoid missing pages that were appended recently.
    1317             :  */
    1318             : static void
    1319          86 : summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
    1320             :                 BlockNumber heapBlk, BlockNumber heapNumBlks)
    1321             : {
    1322             :     Buffer      phbuf;
    1323             :     BrinTuple  *phtup;
    1324             :     Size        phsz;
    1325             :     OffsetNumber offset;
    1326             :     BlockNumber scanNumBlks;
    1327             : 
    1328             :     /*
    1329             :      * Insert the placeholder tuple
    1330             :      */
    1331          86 :     phbuf = InvalidBuffer;
    1332          86 :     phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
    1333          86 :     offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
    1334             :                            state->bs_rmAccess, &phbuf,
    1335             :                            heapBlk, phtup, phsz);
    1336             : 
    1337             :     /*
    1338             :      * Compute range end.  We hold ShareUpdateExclusive lock on table, so it
    1339             :      * cannot shrink concurrently (but it can grow).
    1340             :      */
    1341             :     Assert(heapBlk % state->bs_pagesPerRange == 0);
    1342          86 :     if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
    1343             :     {
    1344             :         /*
    1345             :          * If we're asked to scan what we believe to be the final range on the
    1346             :          * table (i.e. a range that might be partial) we need to recompute our
    1347             :          * idea of what the latest page is after inserting the placeholder
    1348             :          * tuple.  Anyone that grows the table later will update the
    1349             :          * placeholder tuple, so it doesn't matter that we won't scan these
    1350             :          * pages ourselves.  Careful: the table might have been extended
    1351             :          * beyond the current range, so clamp our result.
    1352             :          *
    1353             :          * Fortunately, this should occur infrequently.
    1354             :          */
    1355          12 :         scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
    1356             :                           state->bs_pagesPerRange);
    1357             :     }
    1358             :     else
    1359             :     {
    1360             :         /* Easy case: range is known to be complete */
    1361          74 :         scanNumBlks = state->bs_pagesPerRange;
    1362             :     }
    1363             : 
    1364             :     /*
    1365             :      * Execute the partial heap scan covering the heap blocks in the specified
    1366             :      * page range, summarizing the heap tuples in it.  This scan stops just
    1367             :      * short of brinbuildCallback creating the new index entry.
    1368             :      *
    1369             :      * Note that it is critical we use the "any visible" mode of
    1370             :      * table_index_build_range_scan here: otherwise, we would miss tuples
    1371             :      * inserted by transactions that are still in progress, among other corner
    1372             :      * cases.
    1373             :      */
    1374          86 :     state->bs_currRangeStart = heapBlk;
    1375          86 :     table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
    1376             :                                  heapBlk, scanNumBlks,
    1377             :                                  brinbuildCallback, (void *) state, NULL);
    1378             : 
    1379             :     /*
    1380             :      * Now we update the values obtained by the scan with the placeholder
    1381             :      * tuple.  We do this in a loop which only terminates if we're able to
    1382             :      * update the placeholder tuple successfully; if we are not, this means
    1383             :      * somebody else modified the placeholder tuple after we read it.
    1384             :      */
    1385             :     for (;;)
    1386           0 :     {
    1387             :         BrinTuple  *newtup;
    1388             :         Size        newsize;
    1389             :         bool        didupdate;
    1390             :         bool        samepage;
    1391             : 
    1392          86 :         CHECK_FOR_INTERRUPTS();
    1393             : 
    1394             :         /*
    1395             :          * Update the summary tuple and try to update.
    1396             :          */
    1397          86 :         newtup = brin_form_tuple(state->bs_bdesc,
    1398             :                                  heapBlk, state->bs_dtuple, &newsize);
    1399          86 :         samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
    1400             :         didupdate =
    1401          86 :             brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
    1402             :                           state->bs_rmAccess, heapBlk, phbuf, offset,
    1403             :                           phtup, phsz, newtup, newsize, samepage);
    1404          86 :         brin_free_tuple(phtup);
    1405          86 :         brin_free_tuple(newtup);
    1406             : 
    1407             :         /* If the update succeeded, we're done. */
    1408          86 :         if (didupdate)
    1409          86 :             break;
    1410             : 
    1411             :         /*
    1412             :          * If the update didn't work, it might be because somebody updated the
    1413             :          * placeholder tuple concurrently.  Extract the new version, union it
    1414             :          * with the values we have from the scan, and start over.  (There are
    1415             :          * other reasons for the update to fail, but it's simple to treat them
    1416             :          * the same.)
    1417             :          */
    1418           0 :         phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
    1419             :                                          &offset, &phsz, BUFFER_LOCK_SHARE,
    1420             :                                          NULL);
    1421             :         /* the placeholder tuple must exist */
    1422           0 :         if (phtup == NULL)
    1423           0 :             elog(ERROR, "missing placeholder tuple");
    1424           0 :         phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
    1425           0 :         LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
    1426             : 
    1427             :         /* merge it into the tuple from the heap scan */
    1428           0 :         union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
    1429             :     }
    1430             : 
    1431          86 :     ReleaseBuffer(phbuf);
    1432          86 : }
    1433             : 
    1434             : /*
    1435             :  * Summarize page ranges that are not already summarized.  If pageRange is
    1436             :  * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
    1437             :  * page range containing the given heap page number is scanned.
    1438             :  * If include_partial is true, then the partial range at the end of the table
    1439             :  * is summarized, otherwise not.
    1440             :  *
    1441             :  * For each new index tuple inserted, *numSummarized (if not NULL) is
    1442             :  * incremented; for each existing tuple, *numExisting (if not NULL) is
    1443             :  * incremented.
    1444             :  */
    1445             : static void
    1446         126 : brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
    1447             :               bool include_partial, double *numSummarized, double *numExisting)
    1448             : {
    1449             :     BrinRevmap *revmap;
    1450         126 :     BrinBuildState *state = NULL;
    1451         126 :     IndexInfo  *indexInfo = NULL;
    1452             :     BlockNumber heapNumBlocks;
    1453             :     BlockNumber pagesPerRange;
    1454             :     Buffer      buf;
    1455             :     BlockNumber startBlk;
    1456             : 
    1457         126 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
    1458             : 
    1459             :     /* determine range of pages to process */
    1460         126 :     heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
    1461         126 :     if (pageRange == BRIN_ALL_BLOCKRANGES)
    1462          80 :         startBlk = 0;
    1463             :     else
    1464             :     {
    1465          46 :         startBlk = (pageRange / pagesPerRange) * pagesPerRange;
    1466          46 :         heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
    1467             :     }
    1468         126 :     if (startBlk > heapNumBlocks)
    1469             :     {
    1470             :         /* Nothing to do if start point is beyond end of table */
    1471           0 :         brinRevmapTerminate(revmap);
    1472           0 :         return;
    1473             :     }
    1474             : 
    1475             :     /*
    1476             :      * Scan the revmap to find unsummarized items.
    1477             :      */
    1478         126 :     buf = InvalidBuffer;
    1479        2316 :     for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
    1480             :     {
    1481             :         BrinTuple  *tup;
    1482             :         OffsetNumber off;
    1483             : 
    1484             :         /*
    1485             :          * Unless requested to summarize even a partial range, go away now if
    1486             :          * we think the next range is partial.  Caller would pass true when it
    1487             :          * is typically run once bulk data loading is done
    1488             :          * (brin_summarize_new_values), and false when it is typically the
    1489             :          * result of arbitrarily-scheduled maintenance command (vacuuming).
    1490             :          */
    1491        2230 :         if (!include_partial &&
    1492        1364 :             (startBlk + pagesPerRange > heapNumBlocks))
    1493          40 :             break;
    1494             : 
    1495        2190 :         CHECK_FOR_INTERRUPTS();
    1496             : 
    1497        2190 :         tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
    1498             :                                        BUFFER_LOCK_SHARE, NULL);
    1499        2190 :         if (tup == NULL)
    1500             :         {
    1501             :             /* no revmap entry for this heap range. Summarize it. */
    1502          86 :             if (state == NULL)
    1503             :             {
    1504             :                 /* first time through */
    1505             :                 Assert(!indexInfo);
    1506          38 :                 state = initialize_brin_buildstate(index, revmap,
    1507             :                                                    pagesPerRange);
    1508          38 :                 indexInfo = BuildIndexInfo(index);
    1509             :             }
    1510          86 :             summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
    1511             : 
    1512             :             /* and re-initialize state for the next range */
    1513          86 :             brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
    1514             : 
    1515          86 :             if (numSummarized)
    1516          86 :                 *numSummarized += 1.0;
    1517             :         }
    1518             :         else
    1519             :         {
    1520        2104 :             if (numExisting)
    1521        1262 :                 *numExisting += 1.0;
    1522        2104 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
    1523             :         }
    1524             :     }
    1525             : 
    1526         126 :     if (BufferIsValid(buf))
    1527          88 :         ReleaseBuffer(buf);
    1528             : 
    1529             :     /* free resources */
    1530         126 :     brinRevmapTerminate(revmap);
    1531         126 :     if (state)
    1532             :     {
    1533          38 :         terminate_brin_buildstate(state);
    1534          38 :         pfree(indexInfo);
    1535             :     }
    1536             : }
    1537             : 
    1538             : /*
    1539             :  * Given a deformed tuple in the build state, convert it into the on-disk
    1540             :  * format and insert it into the index, making the revmap point to it.
    1541             :  */
    1542             : static void
    1543        1438 : form_and_insert_tuple(BrinBuildState *state)
    1544             : {
    1545             :     BrinTuple  *tup;
    1546             :     Size        size;
    1547             : 
    1548        1438 :     tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
    1549             :                           state->bs_dtuple, &size);
    1550        1438 :     brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
    1551             :                   &state->bs_currentInsertBuf, state->bs_currRangeStart,
    1552             :                   tup, size);
    1553        1438 :     state->bs_numtuples++;
    1554             : 
    1555        1438 :     pfree(tup);
    1556        1438 : }
    1557             : 
    1558             : /*
    1559             :  * Given two deformed tuples, adjust the first one so that it's consistent
    1560             :  * with the summary values in both.
    1561             :  */
    1562             : static void
    1563           0 : union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
    1564             : {
    1565             :     int         keyno;
    1566             :     BrinMemTuple *db;
    1567             :     MemoryContext cxt;
    1568             :     MemoryContext oldcxt;
    1569             : 
    1570             :     /* Use our own memory context to avoid retail pfree */
    1571           0 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
    1572             :                                 "brin union",
    1573             :                                 ALLOCSET_DEFAULT_SIZES);
    1574           0 :     oldcxt = MemoryContextSwitchTo(cxt);
    1575           0 :     db = brin_deform_tuple(bdesc, b, NULL);
    1576           0 :     MemoryContextSwitchTo(oldcxt);
    1577             : 
    1578           0 :     for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
    1579             :     {
    1580             :         FmgrInfo   *unionFn;
    1581           0 :         BrinValues *col_a = &a->bt_columns[keyno];
    1582           0 :         BrinValues *col_b = &db->bt_columns[keyno];
    1583           0 :         BrinOpcInfo *opcinfo = bdesc->bd_info[keyno];
    1584             : 
    1585           0 :         if (opcinfo->oi_regular_nulls)
    1586             :         {
    1587             :             /* Adjust "hasnulls". */
    1588           0 :             if (!col_a->bv_hasnulls && col_b->bv_hasnulls)
    1589           0 :                 col_a->bv_hasnulls = true;
    1590             : 
    1591             :             /* If there are no values in B, there's nothing left to do. */
    1592           0 :             if (col_b->bv_allnulls)
    1593           0 :                 continue;
    1594             : 
    1595             :             /*
    1596             :              * Adjust "allnulls".  If A doesn't have values, just copy the
    1597             :              * values from B into A, and we're done.  We cannot run the
    1598             :              * operators in this case, because values in A might contain
    1599             :              * garbage.  Note we already established that B contains values.
    1600             :              */
    1601           0 :             if (col_a->bv_allnulls)
    1602             :             {
    1603             :                 int         i;
    1604             : 
    1605           0 :                 col_a->bv_allnulls = false;
    1606             : 
    1607           0 :                 for (i = 0; i < opcinfo->oi_nstored; i++)
    1608           0 :                     col_a->bv_values[i] =
    1609           0 :                         datumCopy(col_b->bv_values[i],
    1610           0 :                                   opcinfo->oi_typcache[i]->typbyval,
    1611           0 :                                   opcinfo->oi_typcache[i]->typlen);
    1612             : 
    1613           0 :                 continue;
    1614             :             }
    1615             :         }
    1616             : 
    1617           0 :         unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
    1618             :                                     BRIN_PROCNUM_UNION);
    1619           0 :         FunctionCall3Coll(unionFn,
    1620           0 :                           bdesc->bd_index->rd_indcollation[keyno],
    1621             :                           PointerGetDatum(bdesc),
    1622             :                           PointerGetDatum(col_a),
    1623             :                           PointerGetDatum(col_b));
    1624             :     }
    1625             : 
    1626           0 :     MemoryContextDelete(cxt);
    1627           0 : }
    1628             : 
    1629             : /*
    1630             :  * brin_vacuum_scan
    1631             :  *      Do a complete scan of the index during VACUUM.
    1632             :  *
    1633             :  * This routine scans the complete index looking for uncatalogued index pages,
    1634             :  * i.e. those that might have been lost due to a crash after index extension
    1635             :  * and such.
    1636             :  */
    1637             : static void
    1638          54 : brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
    1639             : {
    1640             :     BlockNumber nblocks;
    1641             :     BlockNumber blkno;
    1642             : 
    1643             :     /*
    1644             :      * Scan the index in physical order, and clean up any possible mess in
    1645             :      * each page.
    1646             :      */
    1647          54 :     nblocks = RelationGetNumberOfBlocks(idxrel);
    1648         292 :     for (blkno = 0; blkno < nblocks; blkno++)
    1649             :     {
    1650             :         Buffer      buf;
    1651             : 
    1652         238 :         CHECK_FOR_INTERRUPTS();
    1653             : 
    1654         238 :         buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
    1655             :                                  RBM_NORMAL, strategy);
    1656             : 
    1657         238 :         brin_page_cleanup(idxrel, buf);
    1658             : 
    1659         238 :         ReleaseBuffer(buf);
    1660             :     }
    1661             : 
    1662             :     /*
    1663             :      * Update all upper pages in the index's FSM, as well.  This ensures not
    1664             :      * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
    1665             :      * but also that any pre-existing damage or out-of-dateness is repaired.
    1666             :      */
    1667          54 :     FreeSpaceMapVacuum(idxrel);
    1668          54 : }
    1669             : 
    1670             : static bool
    1671      495958 : add_values_to_range(Relation idxRel, BrinDesc *bdesc, BrinMemTuple *dtup,
    1672             :                     Datum *values, bool *nulls)
    1673             : {
    1674             :     int         keyno;
    1675      495958 :     bool        modified = false;
    1676             : 
    1677             :     /*
    1678             :      * Compare the key values of the new tuple to the stored index values; our
    1679             :      * deformed tuple will get updated if the new tuple doesn't fit the
    1680             :      * original range (note this means we can't break out of the loop early).
    1681             :      * Make a note of whether this happens, so that we know to insert the
    1682             :      * modified tuple later.
    1683             :      */
    1684     1039692 :     for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
    1685             :     {
    1686             :         Datum       result;
    1687             :         BrinValues *bval;
    1688             :         FmgrInfo   *addValue;
    1689             : 
    1690      543734 :         bval = &dtup->bt_columns[keyno];
    1691             : 
    1692      543734 :         if (bdesc->bd_info[keyno]->oi_regular_nulls && nulls[keyno])
    1693             :         {
    1694             :             /*
    1695             :              * If the new value is null, we record that we saw it if it's the
    1696             :              * first one; otherwise, there's nothing to do.
    1697             :              */
    1698        9296 :             if (!bval->bv_hasnulls)
    1699             :             {
    1700        2240 :                 bval->bv_hasnulls = true;
    1701        2240 :                 modified = true;
    1702             :             }
    1703             : 
    1704        9296 :             continue;
    1705             :         }
    1706             : 
    1707      534438 :         addValue = index_getprocinfo(idxRel, keyno + 1,
    1708             :                                      BRIN_PROCNUM_ADDVALUE);
    1709      534438 :         result = FunctionCall4Coll(addValue,
    1710      534438 :                                    idxRel->rd_indcollation[keyno],
    1711             :                                    PointerGetDatum(bdesc),
    1712             :                                    PointerGetDatum(bval),
    1713      534438 :                                    values[keyno],
    1714      534438 :                                    nulls[keyno]);
    1715             :         /* if that returned true, we need to insert the updated tuple */
    1716      534438 :         modified |= DatumGetBool(result);
    1717             :     }
    1718             : 
    1719      495958 :     return modified;
    1720             : }
    1721             : 
    1722             : static bool
    1723      125156 : check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
    1724             : {
    1725             :     int         keyno;
    1726             : 
    1727             :     /*
    1728             :      * First check if there are any IS [NOT] NULL scan keys, and if we're
    1729             :      * violating them.
    1730             :      */
    1731      125980 :     for (keyno = 0; keyno < nnullkeys; keyno++)
    1732             :     {
    1733        1488 :         ScanKey     key = nullkeys[keyno];
    1734             : 
    1735             :         Assert(key->sk_attno == bval->bv_attno);
    1736             : 
    1737             :         /* Handle only IS NULL/IS NOT NULL tests */
    1738        1488 :         if (!(key->sk_flags & SK_ISNULL))
    1739           0 :             continue;
    1740             : 
    1741        1488 :         if (key->sk_flags & SK_SEARCHNULL)
    1742             :         {
    1743             :             /* IS NULL scan key, but range has no NULLs */
    1744         744 :             if (!bval->bv_allnulls && !bval->bv_hasnulls)
    1745         652 :                 return false;
    1746             :         }
    1747         744 :         else if (key->sk_flags & SK_SEARCHNOTNULL)
    1748             :         {
    1749             :             /*
    1750             :              * For IS NOT NULL, we can only skip ranges that are known to have
    1751             :              * only nulls.
    1752             :              */
    1753         744 :             if (bval->bv_allnulls)
    1754          12 :                 return false;
    1755             :         }
    1756             :         else
    1757             :         {
    1758             :             /*
    1759             :              * Neither IS NULL nor IS NOT NULL was used; assume all indexable
    1760             :              * operators are strict and thus return false with NULL value in
    1761             :              * the scan key.
    1762             :              */
    1763           0 :             return false;
    1764             :         }
    1765             :     }
    1766             : 
    1767      124492 :     return true;
    1768             : }

Generated by: LCOV version 1.14