LCOV - code coverage report
Current view: top level - src/backend/access/brin - brin.c (source / functions) Hit Total Coverage
Test: PostgreSQL 14devel Lines: 410 459 89.3 %
Date: 2021-01-26 02:06:48 Functions: 23 25 92.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * brin.c
       3             :  *      Implementation of BRIN indexes for Postgres
       4             :  *
       5             :  * See src/backend/access/brin/README for details.
       6             :  *
       7             :  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/access/brin/brin.c
      12             :  *
      13             :  * TODO
      14             :  *      * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
      15             :  */
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/brin.h"
      19             : #include "access/brin_page.h"
      20             : #include "access/brin_pageops.h"
      21             : #include "access/brin_xlog.h"
      22             : #include "access/relation.h"
      23             : #include "access/reloptions.h"
      24             : #include "access/relscan.h"
      25             : #include "access/table.h"
      26             : #include "access/tableam.h"
      27             : #include "access/xloginsert.h"
      28             : #include "catalog/index.h"
      29             : #include "catalog/pg_am.h"
      30             : #include "commands/vacuum.h"
      31             : #include "miscadmin.h"
      32             : #include "pgstat.h"
      33             : #include "postmaster/autovacuum.h"
      34             : #include "storage/bufmgr.h"
      35             : #include "storage/freespace.h"
      36             : #include "utils/acl.h"
      37             : #include "utils/builtins.h"
      38             : #include "utils/index_selfuncs.h"
      39             : #include "utils/memutils.h"
      40             : #include "utils/rel.h"
      41             : 
      42             : 
      43             : /*
      44             :  * We use a BrinBuildState during initial construction of a BRIN index.
      45             :  * The running state is kept in a BrinMemTuple.
      46             :  */
      47             : typedef struct BrinBuildState
      48             : {
      49             :     Relation    bs_irel;
      50             :     int         bs_numtuples;
      51             :     Buffer      bs_currentInsertBuf;
      52             :     BlockNumber bs_pagesPerRange;
      53             :     BlockNumber bs_currRangeStart;
      54             :     BrinRevmap *bs_rmAccess;
      55             :     BrinDesc   *bs_bdesc;
      56             :     BrinMemTuple *bs_dtuple;
      57             : } BrinBuildState;
      58             : 
      59             : /*
      60             :  * Struct used as "opaque" during index scans
      61             :  */
      62             : typedef struct BrinOpaque
      63             : {
      64             :     BlockNumber bo_pagesPerRange;
      65             :     BrinRevmap *bo_rmAccess;
      66             :     BrinDesc   *bo_bdesc;
      67             : } BrinOpaque;
      68             : 
      69             : #define BRIN_ALL_BLOCKRANGES    InvalidBlockNumber
      70             : 
      71             : static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
      72             :                                                   BrinRevmap *revmap, BlockNumber pagesPerRange);
      73             : static void terminate_brin_buildstate(BrinBuildState *state);
      74             : static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
      75             :                           bool include_partial, double *numSummarized, double *numExisting);
      76             : static void form_and_insert_tuple(BrinBuildState *state);
      77             : static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
      78             :                          BrinTuple *b);
      79             : static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
      80             : 
      81             : 
      82             : /*
      83             :  * BRIN handler function: return IndexAmRoutine with access method parameters
      84             :  * and callbacks.
      85             :  */
      86             : Datum
      87         468 : brinhandler(PG_FUNCTION_ARGS)
      88             : {
      89         468 :     IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
      90             : 
      91         468 :     amroutine->amstrategies = 0;
      92         468 :     amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
      93         468 :     amroutine->amoptsprocnum = BRIN_PROCNUM_OPTIONS;
      94         468 :     amroutine->amcanorder = false;
      95         468 :     amroutine->amcanorderbyop = false;
      96         468 :     amroutine->amcanbackward = false;
      97         468 :     amroutine->amcanunique = false;
      98         468 :     amroutine->amcanmulticol = true;
      99         468 :     amroutine->amoptionalkey = true;
     100         468 :     amroutine->amsearcharray = false;
     101         468 :     amroutine->amsearchnulls = true;
     102         468 :     amroutine->amstorage = true;
     103         468 :     amroutine->amclusterable = false;
     104         468 :     amroutine->ampredlocks = false;
     105         468 :     amroutine->amcanparallel = false;
     106         468 :     amroutine->amcaninclude = false;
     107         468 :     amroutine->amusemaintenanceworkmem = false;
     108         468 :     amroutine->amparallelvacuumoptions =
     109             :         VACUUM_OPTION_PARALLEL_CLEANUP;
     110         468 :     amroutine->amkeytype = InvalidOid;
     111             : 
     112         468 :     amroutine->ambuild = brinbuild;
     113         468 :     amroutine->ambuildempty = brinbuildempty;
     114         468 :     amroutine->aminsert = brininsert;
     115         468 :     amroutine->ambulkdelete = brinbulkdelete;
     116         468 :     amroutine->amvacuumcleanup = brinvacuumcleanup;
     117         468 :     amroutine->amcanreturn = NULL;
     118         468 :     amroutine->amcostestimate = brincostestimate;
     119         468 :     amroutine->amoptions = brinoptions;
     120         468 :     amroutine->amproperty = NULL;
     121         468 :     amroutine->ambuildphasename = NULL;
     122         468 :     amroutine->amvalidate = brinvalidate;
     123         468 :     amroutine->amadjustmembers = NULL;
     124         468 :     amroutine->ambeginscan = brinbeginscan;
     125         468 :     amroutine->amrescan = brinrescan;
     126         468 :     amroutine->amgettuple = NULL;
     127         468 :     amroutine->amgetbitmap = bringetbitmap;
     128         468 :     amroutine->amendscan = brinendscan;
     129         468 :     amroutine->ammarkpos = NULL;
     130         468 :     amroutine->amrestrpos = NULL;
     131         468 :     amroutine->amestimateparallelscan = NULL;
     132         468 :     amroutine->aminitparallelscan = NULL;
     133         468 :     amroutine->amparallelrescan = NULL;
     134             : 
     135         468 :     PG_RETURN_POINTER(amroutine);
     136             : }
     137             : 
     138             : /*
     139             :  * A tuple in the heap is being inserted.  To keep a brin index up to date,
     140             :  * we need to obtain the relevant index tuple and compare its stored values
     141             :  * with those of the new tuple.  If the tuple values are not consistent with
     142             :  * the summary tuple, we need to update the index tuple.
     143             :  *
     144             :  * If autosummarization is enabled, check if we need to summarize the previous
     145             :  * page range.
     146             :  *
     147             :  * If the range is not currently summarized (i.e. the revmap returns NULL for
     148             :  * it), there's nothing to do for this tuple.
     149             :  */
     150             : bool
     151        9156 : brininsert(Relation idxRel, Datum *values, bool *nulls,
     152             :            ItemPointer heaptid, Relation heapRel,
     153             :            IndexUniqueCheck checkUnique,
     154             :            bool indexUnchanged,
     155             :            IndexInfo *indexInfo)
     156             : {
     157             :     BlockNumber pagesPerRange;
     158             :     BlockNumber origHeapBlk;
     159             :     BlockNumber heapBlk;
     160        9156 :     BrinDesc   *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
     161             :     BrinRevmap *revmap;
     162        9156 :     Buffer      buf = InvalidBuffer;
     163        9156 :     MemoryContext tupcxt = NULL;
     164        9156 :     MemoryContext oldcxt = CurrentMemoryContext;
     165        9156 :     bool        autosummarize = BrinGetAutoSummarize(idxRel);
     166             : 
     167        9156 :     revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
     168             : 
     169             :     /*
     170             :      * origHeapBlk is the block number where the insertion occurred.  heapBlk
     171             :      * is the first block in the corresponding page range.
     172             :      */
     173        9156 :     origHeapBlk = ItemPointerGetBlockNumber(heaptid);
     174        9156 :     heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
     175             : 
     176             :     for (;;)
     177           0 :     {
     178        9156 :         bool        need_insert = false;
     179             :         OffsetNumber off;
     180             :         BrinTuple  *brtup;
     181             :         BrinMemTuple *dtup;
     182             :         int         keyno;
     183             : 
     184        9156 :         CHECK_FOR_INTERRUPTS();
     185             : 
     186             :         /*
     187             :          * If auto-summarization is enabled and we just inserted the first
     188             :          * tuple into the first block of a new non-first page range, request a
     189             :          * summarization run of the previous range.
     190             :          */
     191        9156 :         if (autosummarize &&
     192         156 :             heapBlk > 0 &&
     193         156 :             heapBlk == origHeapBlk &&
     194         156 :             ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
     195             :         {
     196           8 :             BlockNumber lastPageRange = heapBlk - 1;
     197             :             BrinTuple  *lastPageTuple;
     198             : 
     199             :             lastPageTuple =
     200           8 :                 brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
     201             :                                          NULL, BUFFER_LOCK_SHARE, NULL);
     202           8 :             if (!lastPageTuple)
     203             :             {
     204             :                 bool        recorded;
     205             : 
     206           6 :                 recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
     207             :                                                  RelationGetRelid(idxRel),
     208             :                                                  lastPageRange);
     209           6 :                 if (!recorded)
     210           0 :                     ereport(LOG,
     211             :                             (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     212             :                              errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
     213             :                                     RelationGetRelationName(idxRel),
     214             :                                     lastPageRange)));
     215             :             }
     216             :             else
     217           2 :                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     218             :         }
     219             : 
     220        9156 :         brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
     221             :                                          NULL, BUFFER_LOCK_SHARE, NULL);
     222             : 
     223             :         /* if range is unsummarized, there's nothing to do */
     224        9156 :         if (!brtup)
     225         188 :             break;
     226             : 
     227             :         /* First time through in this statement? */
     228        8968 :         if (bdesc == NULL)
     229             :         {
     230         298 :             MemoryContextSwitchTo(indexInfo->ii_Context);
     231         298 :             bdesc = brin_build_desc(idxRel);
     232         298 :             indexInfo->ii_AmCache = (void *) bdesc;
     233         298 :             MemoryContextSwitchTo(oldcxt);
     234             :         }
     235             :         /* First time through in this brininsert call? */
     236        8968 :         if (tupcxt == NULL)
     237             :         {
     238        8968 :             tupcxt = AllocSetContextCreate(CurrentMemoryContext,
     239             :                                            "brininsert cxt",
     240             :                                            ALLOCSET_DEFAULT_SIZES);
     241        8968 :             MemoryContextSwitchTo(tupcxt);
     242             :         }
     243             : 
     244        8968 :         dtup = brin_deform_tuple(bdesc, brtup, NULL);
     245             : 
     246             :         /*
     247             :          * Compare the key values of the new tuple to the stored index values;
     248             :          * our deformed tuple will get updated if the new tuple doesn't fit
     249             :          * the original range (note this means we can't break out of the loop
     250             :          * early). Make a note of whether this happens, so that we know to
     251             :          * insert the modified tuple later.
     252             :          */
     253       42300 :         for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
     254             :         {
     255             :             Datum       result;
     256             :             BrinValues *bval;
     257             :             FmgrInfo   *addValue;
     258             : 
     259       33332 :             bval = &dtup->bt_columns[keyno];
     260       33332 :             addValue = index_getprocinfo(idxRel, keyno + 1,
     261             :                                          BRIN_PROCNUM_ADDVALUE);
     262       99996 :             result = FunctionCall4Coll(addValue,
     263       33332 :                                        idxRel->rd_indcollation[keyno],
     264             :                                        PointerGetDatum(bdesc),
     265             :                                        PointerGetDatum(bval),
     266       33332 :                                        values[keyno],
     267       33332 :                                        nulls[keyno]);
     268             :             /* if that returned true, we need to insert the updated tuple */
     269       33332 :             need_insert |= DatumGetBool(result);
     270             :         }
     271             : 
     272        8968 :         if (!need_insert)
     273             :         {
     274             :             /*
     275             :              * The tuple is consistent with the new values, so there's nothing
     276             :              * to do.
     277             :              */
     278        8060 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     279             :         }
     280             :         else
     281             :         {
     282         908 :             Page        page = BufferGetPage(buf);
     283         908 :             ItemId      lp = PageGetItemId(page, off);
     284             :             Size        origsz;
     285             :             BrinTuple  *origtup;
     286             :             Size        newsz;
     287             :             BrinTuple  *newtup;
     288             :             bool        samepage;
     289             : 
     290             :             /*
     291             :              * Make a copy of the old tuple, so that we can compare it after
     292             :              * re-acquiring the lock.
     293             :              */
     294         908 :             origsz = ItemIdGetLength(lp);
     295         908 :             origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
     296             : 
     297             :             /*
     298             :              * Before releasing the lock, check if we can attempt a same-page
     299             :              * update.  Another process could insert a tuple concurrently in
     300             :              * the same page though, so downstream we must be prepared to cope
     301             :              * if this turns out to not be possible after all.
     302             :              */
     303         908 :             newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
     304         908 :             samepage = brin_can_do_samepage_update(buf, origsz, newsz);
     305         908 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     306             : 
     307             :             /*
     308             :              * Try to update the tuple.  If this doesn't work for whatever
     309             :              * reason, we need to restart from the top; the revmap might be
     310             :              * pointing at a different tuple for this block now, so we need to
     311             :              * recompute to ensure both our new heap tuple and the other
     312             :              * inserter's are covered by the combined tuple.  It might be that
     313             :              * we don't need to update at all.
     314             :              */
     315         908 :             if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
     316             :                                buf, off, origtup, origsz, newtup, newsz,
     317             :                                samepage))
     318             :             {
     319             :                 /* no luck; start over */
     320           0 :                 MemoryContextResetAndDeleteChildren(tupcxt);
     321           0 :                 continue;
     322             :             }
     323             :         }
     324             : 
     325             :         /* success! */
     326        8968 :         break;
     327             :     }
     328             : 
     329        9156 :     brinRevmapTerminate(revmap);
     330        9156 :     if (BufferIsValid(buf))
     331        8970 :         ReleaseBuffer(buf);
     332        9156 :     MemoryContextSwitchTo(oldcxt);
     333        9156 :     if (tupcxt != NULL)
     334        8968 :         MemoryContextDelete(tupcxt);
     335             : 
     336        9156 :     return false;
     337             : }
     338             : 
     339             : /*
     340             :  * Initialize state for a BRIN index scan.
     341             :  *
     342             :  * We read the metapage here to determine the pages-per-range number that this
     343             :  * index was built with.  Note that since this cannot be changed while we're
     344             :  * holding lock on index, it's not necessary to recompute it during brinrescan.
     345             :  */
     346             : IndexScanDesc
     347         996 : brinbeginscan(Relation r, int nkeys, int norderbys)
     348             : {
     349             :     IndexScanDesc scan;
     350             :     BrinOpaque *opaque;
     351             : 
     352         996 :     scan = RelationGetIndexScan(r, nkeys, norderbys);
     353             : 
     354         996 :     opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
     355         996 :     opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
     356             :                                                scan->xs_snapshot);
     357         996 :     opaque->bo_bdesc = brin_build_desc(r);
     358         996 :     scan->opaque = opaque;
     359             : 
     360         996 :     return scan;
     361             : }
     362             : 
     363             : /*
     364             :  * Execute the index scan.
     365             :  *
     366             :  * This works by reading index TIDs from the revmap, and obtaining the index
     367             :  * tuples pointed to by them; the summary values in the index tuples are
     368             :  * compared to the scan keys.  We return into the TID bitmap all the pages in
     369             :  * ranges corresponding to index tuples that match the scan keys.
     370             :  *
     371             :  * If a TID from the revmap is read as InvalidTID, we know that range is
     372             :  * unsummarized.  Pages in those ranges need to be returned regardless of scan
     373             :  * keys.
     374             :  */
     375             : int64
     376         996 : bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
     377             : {
     378         996 :     Relation    idxRel = scan->indexRelation;
     379         996 :     Buffer      buf = InvalidBuffer;
     380             :     BrinDesc   *bdesc;
     381             :     Oid         heapOid;
     382             :     Relation    heapRel;
     383             :     BrinOpaque *opaque;
     384             :     BlockNumber nblocks;
     385             :     BlockNumber heapBlk;
     386         996 :     int         totalpages = 0;
     387             :     FmgrInfo   *consistentFn;
     388             :     MemoryContext oldcxt;
     389             :     MemoryContext perRangeCxt;
     390             :     BrinMemTuple *dtup;
     391         996 :     BrinTuple  *btup = NULL;
     392         996 :     Size        btupsz = 0;
     393             : 
     394         996 :     opaque = (BrinOpaque *) scan->opaque;
     395         996 :     bdesc = opaque->bo_bdesc;
     396         996 :     pgstat_count_index_scan(idxRel);
     397             : 
     398             :     /*
     399             :      * We need to know the size of the table so that we know how long to
     400             :      * iterate on the revmap.
     401             :      */
     402         996 :     heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
     403         996 :     heapRel = table_open(heapOid, AccessShareLock);
     404         996 :     nblocks = RelationGetNumberOfBlocks(heapRel);
     405         996 :     table_close(heapRel, AccessShareLock);
     406             : 
     407             :     /*
     408             :      * Make room for the consistent support procedures of indexed columns.  We
     409             :      * don't look them up here; we do that lazily the first time we see a scan
     410             :      * key reference each of them.  We rely on zeroing fn_oid to InvalidOid.
     411             :      */
     412         996 :     consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
     413             : 
     414             :     /* allocate an initial in-memory tuple, out of the per-range memcxt */
     415         996 :     dtup = brin_new_memtuple(bdesc);
     416             : 
     417             :     /*
     418             :      * Setup and use a per-range memory context, which is reset every time we
     419             :      * loop below.  This avoids having to free the tuples within the loop.
     420             :      */
     421         996 :     perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
     422             :                                         "bringetbitmap cxt",
     423             :                                         ALLOCSET_DEFAULT_SIZES);
     424         996 :     oldcxt = MemoryContextSwitchTo(perRangeCxt);
     425             : 
     426             :     /*
     427             :      * Now scan the revmap.  We start by querying for heap page 0,
     428             :      * incrementing by the number of pages per range; this gives us a full
     429             :      * view of the table.
     430             :      */
     431      100200 :     for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
     432             :     {
     433             :         bool        addrange;
     434       99204 :         bool        gottuple = false;
     435             :         BrinTuple  *tup;
     436             :         OffsetNumber off;
     437             :         Size        size;
     438             : 
     439       99204 :         CHECK_FOR_INTERRUPTS();
     440             : 
     441       99204 :         MemoryContextResetAndDeleteChildren(perRangeCxt);
     442             : 
     443       99204 :         tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
     444             :                                        &off, &size, BUFFER_LOCK_SHARE,
     445             :                                        scan->xs_snapshot);
     446       99204 :         if (tup)
     447             :         {
     448       99204 :             gottuple = true;
     449       99204 :             btup = brin_copy_tuple(tup, size, btup, &btupsz);
     450       99204 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     451             :         }
     452             : 
     453             :         /*
     454             :          * For page ranges with no indexed tuple, we must return the whole
     455             :          * range; otherwise, compare it to the scan keys.
     456             :          */
     457       99204 :         if (!gottuple)
     458             :         {
     459           0 :             addrange = true;
     460             :         }
     461             :         else
     462             :         {
     463       99204 :             dtup = brin_deform_tuple(bdesc, btup, dtup);
     464       99204 :             if (dtup->bt_placeholder)
     465             :             {
     466             :                 /*
     467             :                  * Placeholder tuples are always returned, regardless of the
     468             :                  * values stored in them.
     469             :                  */
     470           0 :                 addrange = true;
     471             :             }
     472             :             else
     473             :             {
     474             :                 int         keyno;
     475             : 
     476             :                 /*
     477             :                  * Compare scan keys with summary values stored for the range.
     478             :                  * If scan keys are matched, the page range must be added to
     479             :                  * the bitmap.  We initially assume the range needs to be
     480             :                  * added; in particular this serves the case where there are
     481             :                  * no keys.
     482             :                  */
     483       99204 :                 addrange = true;
     484      173112 :                 for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
     485             :                 {
     486       99204 :                     ScanKey     key = &scan->keyData[keyno];
     487       99204 :                     AttrNumber  keyattno = key->sk_attno;
     488       99204 :                     BrinValues *bval = &dtup->bt_columns[keyattno - 1];
     489             :                     Datum       add;
     490             : 
     491             :                     /*
     492             :                      * The collation of the scan key must match the collation
     493             :                      * used in the index column (but only if the search is not
     494             :                      * IS NULL/ IS NOT NULL).  Otherwise we shouldn't be using
     495             :                      * this index ...
     496             :                      */
     497             :                     Assert((key->sk_flags & SK_ISNULL) ||
     498             :                            (key->sk_collation ==
     499             :                             TupleDescAttr(bdesc->bd_tupdesc,
     500             :                                           keyattno - 1)->attcollation));
     501             : 
     502             :                     /* First time this column? look up consistent function */
     503       99204 :                     if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
     504             :                     {
     505             :                         FmgrInfo   *tmp;
     506             : 
     507         996 :                         tmp = index_getprocinfo(idxRel, keyattno,
     508             :                                                 BRIN_PROCNUM_CONSISTENT);
     509         996 :                         fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
     510             :                                        CurrentMemoryContext);
     511             :                     }
     512             : 
     513             :                     /*
     514             :                      * Check whether the scan key is consistent with the page
     515             :                      * range values; if so, have the pages in the range added
     516             :                      * to the output bitmap.
     517             :                      *
     518             :                      * When there are multiple scan keys, failure to meet the
     519             :                      * criteria for a single one of them is enough to discard
     520             :                      * the range as a whole, so break out of the loop as soon
     521             :                      * as a false return value is obtained.
     522             :                      */
     523       99204 :                     add = FunctionCall3Coll(&consistentFn[keyattno - 1],
     524             :                                             key->sk_collation,
     525             :                                             PointerGetDatum(bdesc),
     526             :                                             PointerGetDatum(bval),
     527             :                                             PointerGetDatum(key));
     528       99204 :                     addrange = DatumGetBool(add);
     529       99204 :                     if (!addrange)
     530       25296 :                         break;
     531             :                 }
     532             :             }
     533             :         }
     534             : 
     535             :         /* add the pages in the range to the output bitmap, if needed */
     536       99204 :         if (addrange)
     537             :         {
     538             :             BlockNumber pageno;
     539             : 
     540       73908 :             for (pageno = heapBlk;
     541      147816 :                  pageno <= heapBlk + opaque->bo_pagesPerRange - 1;
     542       73908 :                  pageno++)
     543             :             {
     544       73908 :                 MemoryContextSwitchTo(oldcxt);
     545       73908 :                 tbm_add_page(tbm, pageno);
     546       73908 :                 totalpages++;
     547       73908 :                 MemoryContextSwitchTo(perRangeCxt);
     548             :             }
     549             :         }
     550             :     }
     551             : 
     552         996 :     MemoryContextSwitchTo(oldcxt);
     553         996 :     MemoryContextDelete(perRangeCxt);
     554             : 
     555         996 :     if (buf != InvalidBuffer)
     556         996 :         ReleaseBuffer(buf);
     557             : 
     558             :     /*
     559             :      * XXX We have an approximation of the number of *pages* that our scan
     560             :      * returns, but we don't have a precise idea of the number of heap tuples
     561             :      * involved.
     562             :      */
     563         996 :     return totalpages * 10;
     564             : }
     565             : 
     566             : /*
     567             :  * Re-initialize state for a BRIN index scan
     568             :  */
     569             : void
     570         996 : brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
     571             :            ScanKey orderbys, int norderbys)
     572             : {
     573             :     /*
     574             :      * Other index AMs preprocess the scan keys at this point, or sometime
     575             :      * early during the scan; this lets them optimize by removing redundant
     576             :      * keys, or doing early returns when they are impossible to satisfy; see
     577             :      * _bt_preprocess_keys for an example.  Something like that could be added
     578             :      * here someday, too.
     579             :      */
     580             : 
     581         996 :     if (scankey && scan->numberOfKeys > 0)
     582         996 :         memmove(scan->keyData, scankey,
     583         996 :                 scan->numberOfKeys * sizeof(ScanKeyData));
     584         996 : }
     585             : 
     586             : /*
     587             :  * Close down a BRIN index scan
     588             :  */
     589             : void
     590         996 : brinendscan(IndexScanDesc scan)
     591             : {
     592         996 :     BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
     593             : 
     594         996 :     brinRevmapTerminate(opaque->bo_rmAccess);
     595         996 :     brin_free_desc(opaque->bo_bdesc);
     596         996 :     pfree(opaque);
     597         996 : }
     598             : 
     599             : /*
     600             :  * Per-heap-tuple callback for table_index_build_scan.
     601             :  *
     602             :  * Note we don't worry about the page range at the end of the table here; it is
     603             :  * present in the build state struct after we're called the last time, but not
     604             :  * inserted into the index.  Caller must ensure to do so, if appropriate.
     605             :  */
     606             : static void
     607      284718 : brinbuildCallback(Relation index,
     608             :                   ItemPointer tid,
     609             :                   Datum *values,
     610             :                   bool *isnull,
     611             :                   bool tupleIsAlive,
     612             :                   void *brstate)
     613             : {
     614      284718 :     BrinBuildState *state = (BrinBuildState *) brstate;
     615             :     BlockNumber thisblock;
     616             :     int         i;
     617             : 
     618      284718 :     thisblock = ItemPointerGetBlockNumber(tid);
     619             : 
     620             :     /*
     621             :      * If we're in a block that belongs to a future range, summarize what
     622             :      * we've got and start afresh.  Note the scan might have skipped many
     623             :      * pages, if they were devoid of live tuples; make sure to insert index
     624             :      * tuples for those too.
     625             :      */
     626      285296 :     while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
     627             :     {
     628             : 
     629             :         BRIN_elog((DEBUG2,
     630             :                    "brinbuildCallback: completed a range: %u--%u",
     631             :                    state->bs_currRangeStart,
     632             :                    state->bs_currRangeStart + state->bs_pagesPerRange));
     633             : 
     634             :         /* create the index tuple and insert it */
     635         578 :         form_and_insert_tuple(state);
     636             : 
     637             :         /* set state to correspond to the next range */
     638         578 :         state->bs_currRangeStart += state->bs_pagesPerRange;
     639             : 
     640             :         /* re-initialize state for it */
     641         578 :         brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
     642             :     }
     643             : 
     644             :     /* Accumulate the current tuple into the running state */
     645      585912 :     for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
     646             :     {
     647             :         FmgrInfo   *addValue;
     648             :         BrinValues *col;
     649      301194 :         Form_pg_attribute attr = TupleDescAttr(state->bs_bdesc->bd_tupdesc, i);
     650             : 
     651      301194 :         col = &state->bs_dtuple->bt_columns[i];
     652      301194 :         addValue = index_getprocinfo(index, i + 1,
     653             :                                      BRIN_PROCNUM_ADDVALUE);
     654             : 
     655             :         /*
     656             :          * Update dtuple state, if and as necessary.
     657             :          */
     658      903582 :         FunctionCall4Coll(addValue,
     659             :                           attr->attcollation,
     660      301194 :                           PointerGetDatum(state->bs_bdesc),
     661             :                           PointerGetDatum(col),
     662      301194 :                           values[i], isnull[i]);
     663             :     }
     664      284718 : }
     665             : 
     666             : /*
     667             :  * brinbuild() -- build a new BRIN index.
     668             :  */
     669             : IndexBuildResult *
     670          46 : brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
     671             : {
     672             :     IndexBuildResult *result;
     673             :     double      reltuples;
     674             :     double      idxtuples;
     675             :     BrinRevmap *revmap;
     676             :     BrinBuildState *state;
     677             :     Buffer      meta;
     678             :     BlockNumber pagesPerRange;
     679             : 
     680             :     /*
     681             :      * We expect to be called exactly once for any index relation.
     682             :      */
     683          46 :     if (RelationGetNumberOfBlocks(index) != 0)
     684           0 :         elog(ERROR, "index \"%s\" already contains data",
     685             :              RelationGetRelationName(index));
     686             : 
     687             :     /*
     688             :      * Critical section not required, because on error the creation of the
     689             :      * whole relation will be rolled back.
     690             :      */
     691             : 
     692          46 :     meta = ReadBuffer(index, P_NEW);
     693             :     Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
     694          46 :     LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
     695             : 
     696          46 :     brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
     697             :                        BRIN_CURRENT_VERSION);
     698          46 :     MarkBufferDirty(meta);
     699             : 
     700          46 :     if (RelationNeedsWAL(index))
     701             :     {
     702             :         xl_brin_createidx xlrec;
     703             :         XLogRecPtr  recptr;
     704             :         Page        page;
     705             : 
     706          44 :         xlrec.version = BRIN_CURRENT_VERSION;
     707          44 :         xlrec.pagesPerRange = BrinGetPagesPerRange(index);
     708             : 
     709          44 :         XLogBeginInsert();
     710          44 :         XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
     711          44 :         XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
     712             : 
     713          44 :         recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
     714             : 
     715          44 :         page = BufferGetPage(meta);
     716          44 :         PageSetLSN(page, recptr);
     717             :     }
     718             : 
     719          46 :     UnlockReleaseBuffer(meta);
     720             : 
     721             :     /*
     722             :      * Initialize our state, including the deformed tuple state.
     723             :      */
     724          46 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
     725          46 :     state = initialize_brin_buildstate(index, revmap, pagesPerRange);
     726             : 
     727             :     /*
     728             :      * Now scan the relation.  No syncscan allowed here because we want the
     729             :      * heap blocks in physical order.
     730             :      */
     731          46 :     reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
     732             :                                        brinbuildCallback, (void *) state, NULL);
     733             : 
     734             :     /* process the final batch */
     735          46 :     form_and_insert_tuple(state);
     736             : 
     737             :     /* release resources */
     738          46 :     idxtuples = state->bs_numtuples;
     739          46 :     brinRevmapTerminate(state->bs_rmAccess);
     740          46 :     terminate_brin_buildstate(state);
     741             : 
     742             :     /*
     743             :      * Return statistics
     744             :      */
     745          46 :     result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
     746             : 
     747          46 :     result->heap_tuples = reltuples;
     748          46 :     result->index_tuples = idxtuples;
     749             : 
     750          46 :     return result;
     751             : }
     752             : 
     753             : void
     754           0 : brinbuildempty(Relation index)
     755             : {
     756             :     Buffer      metabuf;
     757             : 
     758             :     /* An empty BRIN index has a metapage only. */
     759             :     metabuf =
     760           0 :         ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
     761           0 :     LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
     762             : 
     763             :     /* Initialize and xlog metabuffer. */
     764           0 :     START_CRIT_SECTION();
     765           0 :     brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
     766             :                        BRIN_CURRENT_VERSION);
     767           0 :     MarkBufferDirty(metabuf);
     768           0 :     log_newpage_buffer(metabuf, true);
     769           0 :     END_CRIT_SECTION();
     770             : 
     771           0 :     UnlockReleaseBuffer(metabuf);
     772           0 : }
     773             : 
     774             : /*
     775             :  * brinbulkdelete
     776             :  *      Since there are no per-heap-tuple index tuples in BRIN indexes,
     777             :  *      there's not a lot we can do here.
     778             :  *
     779             :  * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
     780             :  * tuple is deleted), meaning the need to re-run summarization on the affected
     781             :  * range.  Would need to add an extra flag in brintuples for that.
     782             :  */
     783             : IndexBulkDeleteResult *
     784           8 : brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
     785             :                IndexBulkDeleteCallback callback, void *callback_state)
     786             : {
     787             :     /* allocate stats if first time through, else re-use existing struct */
     788           8 :     if (stats == NULL)
     789           8 :         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     790             : 
     791           8 :     return stats;
     792             : }
     793             : 
     794             : /*
     795             :  * This routine is in charge of "vacuuming" a BRIN index: we just summarize
     796             :  * ranges that are currently unsummarized.
     797             :  */
     798             : IndexBulkDeleteResult *
     799          32 : brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
     800             : {
     801             :     Relation    heapRel;
     802             : 
     803             :     /* No-op in ANALYZE ONLY mode */
     804          32 :     if (info->analyze_only)
     805           2 :         return stats;
     806             : 
     807          30 :     if (!stats)
     808          22 :         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     809          30 :     stats->num_pages = RelationGetNumberOfBlocks(info->index);
     810             :     /* rest of stats is initialized by zeroing */
     811             : 
     812          30 :     heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
     813             :                          AccessShareLock);
     814             : 
     815          30 :     brin_vacuum_scan(info->index, info->strategy);
     816             : 
     817          30 :     brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
     818             :                   &stats->num_index_tuples, &stats->num_index_tuples);
     819             : 
     820          30 :     table_close(heapRel, AccessShareLock);
     821             : 
     822          30 :     return stats;
     823             : }
     824             : 
     825             : /*
     826             :  * reloptions processor for BRIN indexes
     827             :  */
     828             : bytea *
     829         152 : brinoptions(Datum reloptions, bool validate)
     830             : {
     831             :     static const relopt_parse_elt tab[] = {
     832             :         {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
     833             :         {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
     834             :     };
     835             : 
     836         152 :     return (bytea *) build_reloptions(reloptions, validate,
     837             :                                       RELOPT_KIND_BRIN,
     838             :                                       sizeof(BrinOptions),
     839             :                                       tab, lengthof(tab));
     840             : }
     841             : 
     842             : /*
     843             :  * SQL-callable function to scan through an index and summarize all ranges
     844             :  * that are not currently summarized.
     845             :  */
     846             : Datum
     847          14 : brin_summarize_new_values(PG_FUNCTION_ARGS)
     848             : {
     849          14 :     Datum       relation = PG_GETARG_DATUM(0);
     850             : 
     851          14 :     return DirectFunctionCall2(brin_summarize_range,
     852             :                                relation,
     853             :                                Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
     854             : }
     855             : 
     856             : /*
     857             :  * SQL-callable function to summarize the indicated page range, if not already
     858             :  * summarized.  If the second argument is BRIN_ALL_BLOCKRANGES, all
     859             :  * unsummarized ranges are summarized.
     860             :  */
     861             : Datum
     862          48 : brin_summarize_range(PG_FUNCTION_ARGS)
     863             : {
     864          48 :     Oid         indexoid = PG_GETARG_OID(0);
     865          48 :     int64       heapBlk64 = PG_GETARG_INT64(1);
     866             :     BlockNumber heapBlk;
     867             :     Oid         heapoid;
     868             :     Relation    indexRel;
     869             :     Relation    heapRel;
     870          48 :     double      numSummarized = 0;
     871             : 
     872          48 :     if (RecoveryInProgress())
     873           0 :         ereport(ERROR,
     874             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     875             :                  errmsg("recovery is in progress"),
     876             :                  errhint("BRIN control functions cannot be executed during recovery.")));
     877             : 
     878          48 :     if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
     879             :     {
     880           8 :         char       *blk = psprintf(INT64_FORMAT, heapBlk64);
     881             : 
     882           8 :         ereport(ERROR,
     883             :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
     884             :                  errmsg("block number out of range: %s", blk)));
     885             :     }
     886          40 :     heapBlk = (BlockNumber) heapBlk64;
     887             : 
     888             :     /*
     889             :      * We must lock table before index to avoid deadlocks.  However, if the
     890             :      * passed indexoid isn't an index then IndexGetRelation() will fail.
     891             :      * Rather than emitting a not-very-helpful error message, postpone
     892             :      * complaining, expecting that the is-it-an-index test below will fail.
     893             :      */
     894          40 :     heapoid = IndexGetRelation(indexoid, true);
     895          40 :     if (OidIsValid(heapoid))
     896          36 :         heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
     897             :     else
     898           4 :         heapRel = NULL;
     899             : 
     900          40 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
     901             : 
     902             :     /* Must be a BRIN index */
     903          36 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
     904          36 :         indexRel->rd_rel->relam != BRIN_AM_OID)
     905           4 :         ereport(ERROR,
     906             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     907             :                  errmsg("\"%s\" is not a BRIN index",
     908             :                         RelationGetRelationName(indexRel))));
     909             : 
     910             :     /* User must own the index (comparable to privileges needed for VACUUM) */
     911          32 :     if (!pg_class_ownercheck(indexoid, GetUserId()))
     912           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
     913           0 :                        RelationGetRelationName(indexRel));
     914             : 
     915             :     /*
     916             :      * Since we did the IndexGetRelation call above without any lock, it's
     917             :      * barely possible that a race against an index drop/recreation could have
     918             :      * netted us the wrong table.  Recheck.
     919             :      */
     920          32 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
     921           0 :         ereport(ERROR,
     922             :                 (errcode(ERRCODE_UNDEFINED_TABLE),
     923             :                  errmsg("could not open parent table of index %s",
     924             :                         RelationGetRelationName(indexRel))));
     925             : 
     926             :     /* OK, do it */
     927          32 :     brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
     928             : 
     929          32 :     relation_close(indexRel, ShareUpdateExclusiveLock);
     930          32 :     relation_close(heapRel, ShareUpdateExclusiveLock);
     931             : 
     932          32 :     PG_RETURN_INT32((int32) numSummarized);
     933             : }
     934             : 
     935             : /*
     936             :  * SQL-callable interface to mark a range as no longer summarized
     937             :  */
     938             : Datum
     939          24 : brin_desummarize_range(PG_FUNCTION_ARGS)
     940             : {
     941          24 :     Oid         indexoid = PG_GETARG_OID(0);
     942          24 :     int64       heapBlk64 = PG_GETARG_INT64(1);
     943             :     BlockNumber heapBlk;
     944             :     Oid         heapoid;
     945             :     Relation    heapRel;
     946             :     Relation    indexRel;
     947             :     bool        done;
     948             : 
     949          24 :     if (RecoveryInProgress())
     950           0 :         ereport(ERROR,
     951             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     952             :                  errmsg("recovery is in progress"),
     953             :                  errhint("BRIN control functions cannot be executed during recovery.")));
     954             : 
     955          24 :     if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
     956             :     {
     957           4 :         char       *blk = psprintf(INT64_FORMAT, heapBlk64);
     958             : 
     959           4 :         ereport(ERROR,
     960             :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
     961             :                  errmsg("block number out of range: %s", blk)));
     962             :     }
     963          20 :     heapBlk = (BlockNumber) heapBlk64;
     964             : 
     965             :     /*
     966             :      * We must lock table before index to avoid deadlocks.  However, if the
     967             :      * passed indexoid isn't an index then IndexGetRelation() will fail.
     968             :      * Rather than emitting a not-very-helpful error message, postpone
     969             :      * complaining, expecting that the is-it-an-index test below will fail.
     970             :      */
     971          20 :     heapoid = IndexGetRelation(indexoid, true);
     972          20 :     if (OidIsValid(heapoid))
     973          20 :         heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
     974             :     else
     975           0 :         heapRel = NULL;
     976             : 
     977          20 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
     978             : 
     979             :     /* Must be a BRIN index */
     980          20 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
     981          20 :         indexRel->rd_rel->relam != BRIN_AM_OID)
     982           0 :         ereport(ERROR,
     983             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     984             :                  errmsg("\"%s\" is not a BRIN index",
     985             :                         RelationGetRelationName(indexRel))));
     986             : 
     987             :     /* User must own the index (comparable to privileges needed for VACUUM) */
     988          20 :     if (!pg_class_ownercheck(indexoid, GetUserId()))
     989           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
     990           0 :                        RelationGetRelationName(indexRel));
     991             : 
     992             :     /*
     993             :      * Since we did the IndexGetRelation call above without any lock, it's
     994             :      * barely possible that a race against an index drop/recreation could have
     995             :      * netted us the wrong table.  Recheck.
     996             :      */
     997          20 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
     998           0 :         ereport(ERROR,
     999             :                 (errcode(ERRCODE_UNDEFINED_TABLE),
    1000             :                  errmsg("could not open parent table of index %s",
    1001             :                         RelationGetRelationName(indexRel))));
    1002             : 
    1003             :     /* the revmap does the hard work */
    1004             :     do
    1005             :     {
    1006          20 :         done = brinRevmapDesummarizeRange(indexRel, heapBlk);
    1007             :     }
    1008          20 :     while (!done);
    1009             : 
    1010          20 :     relation_close(indexRel, ShareUpdateExclusiveLock);
    1011          20 :     relation_close(heapRel, ShareUpdateExclusiveLock);
    1012             : 
    1013          20 :     PG_RETURN_VOID();
    1014             : }
    1015             : 
    1016             : /*
    1017             :  * Build a BrinDesc used to create or scan a BRIN index
    1018             :  */
    1019             : BrinDesc *
    1020        1386 : brin_build_desc(Relation rel)
    1021             : {
    1022             :     BrinOpcInfo **opcinfo;
    1023             :     BrinDesc   *bdesc;
    1024             :     TupleDesc   tupdesc;
    1025        1386 :     int         totalstored = 0;
    1026             :     int         keyno;
    1027             :     long        totalsize;
    1028             :     MemoryContext cxt;
    1029             :     MemoryContext oldcxt;
    1030             : 
    1031        1386 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
    1032             :                                 "brin desc cxt",
    1033             :                                 ALLOCSET_SMALL_SIZES);
    1034        1386 :     oldcxt = MemoryContextSwitchTo(cxt);
    1035        1386 :     tupdesc = RelationGetDescr(rel);
    1036             : 
    1037             :     /*
    1038             :      * Obtain BrinOpcInfo for each indexed column.  While at it, accumulate
    1039             :      * the number of columns stored, since the number is opclass-defined.
    1040             :      */
    1041        1386 :     opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
    1042       32190 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
    1043             :     {
    1044             :         FmgrInfo   *opcInfoFn;
    1045       30804 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
    1046             : 
    1047       30804 :         opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
    1048             : 
    1049       30804 :         opcinfo[keyno] = (BrinOpcInfo *)
    1050       30804 :             DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
    1051       30804 :         totalstored += opcinfo[keyno]->oi_nstored;
    1052             :     }
    1053             : 
    1054             :     /* Allocate our result struct and fill it in */
    1055        1386 :     totalsize = offsetof(BrinDesc, bd_info) +
    1056        1386 :         sizeof(BrinOpcInfo *) * tupdesc->natts;
    1057             : 
    1058        1386 :     bdesc = palloc(totalsize);
    1059        1386 :     bdesc->bd_context = cxt;
    1060        1386 :     bdesc->bd_index = rel;
    1061        1386 :     bdesc->bd_tupdesc = tupdesc;
    1062        1386 :     bdesc->bd_disktdesc = NULL; /* generated lazily */
    1063        1386 :     bdesc->bd_totalstored = totalstored;
    1064             : 
    1065       32190 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
    1066       30804 :         bdesc->bd_info[keyno] = opcinfo[keyno];
    1067        1386 :     pfree(opcinfo);
    1068             : 
    1069        1386 :     MemoryContextSwitchTo(oldcxt);
    1070             : 
    1071        1386 :     return bdesc;
    1072             : }
    1073             : 
    1074             : void
    1075        1088 : brin_free_desc(BrinDesc *bdesc)
    1076             : {
    1077             :     /* make sure the tupdesc is still valid */
    1078             :     Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
    1079             :     /* no need for retail pfree */
    1080        1088 :     MemoryContextDelete(bdesc->bd_context);
    1081        1088 : }
    1082             : 
    1083             : /*
    1084             :  * Fetch index's statistical data into *stats
    1085             :  */
    1086             : void
    1087        3996 : brinGetStats(Relation index, BrinStatsData *stats)
    1088             : {
    1089             :     Buffer      metabuffer;
    1090             :     Page        metapage;
    1091             :     BrinMetaPageData *metadata;
    1092             : 
    1093        3996 :     metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
    1094        3996 :     LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
    1095        3996 :     metapage = BufferGetPage(metabuffer);
    1096        3996 :     metadata = (BrinMetaPageData *) PageGetContents(metapage);
    1097             : 
    1098        3996 :     stats->pagesPerRange = metadata->pagesPerRange;
    1099        3996 :     stats->revmapNumPages = metadata->lastRevmapPage - 1;
    1100             : 
    1101        3996 :     UnlockReleaseBuffer(metabuffer);
    1102        3996 : }
    1103             : 
    1104             : /*
    1105             :  * Initialize a BrinBuildState appropriate to create tuples on the given index.
    1106             :  */
    1107             : static BrinBuildState *
    1108          68 : initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
    1109             :                            BlockNumber pagesPerRange)
    1110             : {
    1111             :     BrinBuildState *state;
    1112             : 
    1113          68 :     state = palloc(sizeof(BrinBuildState));
    1114             : 
    1115          68 :     state->bs_irel = idxRel;
    1116          68 :     state->bs_numtuples = 0;
    1117          68 :     state->bs_currentInsertBuf = InvalidBuffer;
    1118          68 :     state->bs_pagesPerRange = pagesPerRange;
    1119          68 :     state->bs_currRangeStart = 0;
    1120          68 :     state->bs_rmAccess = revmap;
    1121          68 :     state->bs_bdesc = brin_build_desc(idxRel);
    1122          68 :     state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
    1123             : 
    1124          68 :     brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
    1125             : 
    1126          68 :     return state;
    1127             : }
    1128             : 
    1129             : /*
    1130             :  * Release resources associated with a BrinBuildState.
    1131             :  */
    1132             : static void
    1133          68 : terminate_brin_buildstate(BrinBuildState *state)
    1134             : {
    1135             :     /*
    1136             :      * Release the last index buffer used.  We might as well ensure that
    1137             :      * whatever free space remains in that page is available in FSM, too.
    1138             :      */
    1139          68 :     if (!BufferIsInvalid(state->bs_currentInsertBuf))
    1140             :     {
    1141             :         Page        page;
    1142             :         Size        freespace;
    1143             :         BlockNumber blk;
    1144             : 
    1145          46 :         page = BufferGetPage(state->bs_currentInsertBuf);
    1146          46 :         freespace = PageGetFreeSpace(page);
    1147          46 :         blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
    1148          46 :         ReleaseBuffer(state->bs_currentInsertBuf);
    1149          46 :         RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
    1150          46 :         FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
    1151             :     }
    1152             : 
    1153          68 :     brin_free_desc(state->bs_bdesc);
    1154          68 :     pfree(state->bs_dtuple);
    1155          68 :     pfree(state);
    1156          68 : }
    1157             : 
    1158             : /*
    1159             :  * On the given BRIN index, summarize the heap page range that corresponds
    1160             :  * to the heap block number given.
    1161             :  *
    1162             :  * This routine can run in parallel with insertions into the heap.  To avoid
    1163             :  * missing those values from the summary tuple, we first insert a placeholder
    1164             :  * index tuple into the index, then execute the heap scan; transactions
    1165             :  * concurrent with the scan update the placeholder tuple.  After the scan, we
    1166             :  * union the placeholder tuple with the one computed by this routine.  The
    1167             :  * update of the index value happens in a loop, so that if somebody updates
    1168             :  * the placeholder tuple after we read it, we detect the case and try again.
    1169             :  * This ensures that the concurrently inserted tuples are not lost.
    1170             :  *
    1171             :  * A further corner case is this routine being asked to summarize the partial
    1172             :  * range at the end of the table.  heapNumBlocks is the (possibly outdated)
    1173             :  * table size; if we notice that the requested range lies beyond that size,
    1174             :  * we re-compute the table size after inserting the placeholder tuple, to
    1175             :  * avoid missing pages that were appended recently.
    1176             :  */
    1177             : static void
    1178          42 : summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
    1179             :                 BlockNumber heapBlk, BlockNumber heapNumBlks)
    1180             : {
    1181             :     Buffer      phbuf;
    1182             :     BrinTuple  *phtup;
    1183             :     Size        phsz;
    1184             :     OffsetNumber offset;
    1185             :     BlockNumber scanNumBlks;
    1186             : 
    1187             :     /*
    1188             :      * Insert the placeholder tuple
    1189             :      */
    1190          42 :     phbuf = InvalidBuffer;
    1191          42 :     phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
    1192          42 :     offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
    1193             :                            state->bs_rmAccess, &phbuf,
    1194             :                            heapBlk, phtup, phsz);
    1195             : 
    1196             :     /*
    1197             :      * Compute range end.  We hold ShareUpdateExclusive lock on table, so it
    1198             :      * cannot shrink concurrently (but it can grow).
    1199             :      */
    1200             :     Assert(heapBlk % state->bs_pagesPerRange == 0);
    1201          42 :     if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
    1202             :     {
    1203             :         /*
    1204             :          * If we're asked to scan what we believe to be the final range on the
    1205             :          * table (i.e. a range that might be partial) we need to recompute our
    1206             :          * idea of what the latest page is after inserting the placeholder
    1207             :          * tuple.  Anyone that grows the table later will update the
    1208             :          * placeholder tuple, so it doesn't matter that we won't scan these
    1209             :          * pages ourselves.  Careful: the table might have been extended
    1210             :          * beyond the current range, so clamp our result.
    1211             :          *
    1212             :          * Fortunately, this should occur infrequently.
    1213             :          */
    1214           4 :         scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
    1215             :                           state->bs_pagesPerRange);
    1216             :     }
    1217             :     else
    1218             :     {
    1219             :         /* Easy case: range is known to be complete */
    1220          38 :         scanNumBlks = state->bs_pagesPerRange;
    1221             :     }
    1222             : 
    1223             :     /*
    1224             :      * Execute the partial heap scan covering the heap blocks in the specified
    1225             :      * page range, summarizing the heap tuples in it.  This scan stops just
    1226             :      * short of brinbuildCallback creating the new index entry.
    1227             :      *
    1228             :      * Note that it is critical we use the "any visible" mode of
    1229             :      * table_index_build_range_scan here: otherwise, we would miss tuples
    1230             :      * inserted by transactions that are still in progress, among other corner
    1231             :      * cases.
    1232             :      */
    1233          42 :     state->bs_currRangeStart = heapBlk;
    1234          42 :     table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
    1235             :                                  heapBlk, scanNumBlks,
    1236             :                                  brinbuildCallback, (void *) state, NULL);
    1237             : 
    1238             :     /*
    1239             :      * Now we update the values obtained by the scan with the placeholder
    1240             :      * tuple.  We do this in a loop which only terminates if we're able to
    1241             :      * update the placeholder tuple successfully; if we are not, this means
    1242             :      * somebody else modified the placeholder tuple after we read it.
    1243             :      */
    1244             :     for (;;)
    1245           0 :     {
    1246             :         BrinTuple  *newtup;
    1247             :         Size        newsize;
    1248             :         bool        didupdate;
    1249             :         bool        samepage;
    1250             : 
    1251          42 :         CHECK_FOR_INTERRUPTS();
    1252             : 
    1253             :         /*
    1254             :          * Update the summary tuple and try to update.
    1255             :          */
    1256          42 :         newtup = brin_form_tuple(state->bs_bdesc,
    1257             :                                  heapBlk, state->bs_dtuple, &newsize);
    1258          42 :         samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
    1259             :         didupdate =
    1260          42 :             brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
    1261             :                           state->bs_rmAccess, heapBlk, phbuf, offset,
    1262             :                           phtup, phsz, newtup, newsize, samepage);
    1263          42 :         brin_free_tuple(phtup);
    1264          42 :         brin_free_tuple(newtup);
    1265             : 
    1266             :         /* If the update succeeded, we're done. */
    1267          42 :         if (didupdate)
    1268          42 :             break;
    1269             : 
    1270             :         /*
    1271             :          * If the update didn't work, it might be because somebody updated the
    1272             :          * placeholder tuple concurrently.  Extract the new version, union it
    1273             :          * with the values we have from the scan, and start over.  (There are
    1274             :          * other reasons for the update to fail, but it's simple to treat them
    1275             :          * the same.)
    1276             :          */
    1277           0 :         phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
    1278             :                                          &offset, &phsz, BUFFER_LOCK_SHARE,
    1279             :                                          NULL);
    1280             :         /* the placeholder tuple must exist */
    1281           0 :         if (phtup == NULL)
    1282           0 :             elog(ERROR, "missing placeholder tuple");
    1283           0 :         phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
    1284           0 :         LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
    1285             : 
    1286             :         /* merge it into the tuple from the heap scan */
    1287           0 :         union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
    1288             :     }
    1289             : 
    1290          42 :     ReleaseBuffer(phbuf);
    1291          42 : }
    1292             : 
    1293             : /*
    1294             :  * Summarize page ranges that are not already summarized.  If pageRange is
    1295             :  * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
    1296             :  * page range containing the given heap page number is scanned.
    1297             :  * If include_partial is true, then the partial range at the end of the table
    1298             :  * is summarized, otherwise not.
    1299             :  *
    1300             :  * For each new index tuple inserted, *numSummarized (if not NULL) is
    1301             :  * incremented; for each existing tuple, *numExisting (if not NULL) is
    1302             :  * incremented.
    1303             :  */
    1304             : static void
    1305          62 : brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
    1306             :               bool include_partial, double *numSummarized, double *numExisting)
    1307             : {
    1308             :     BrinRevmap *revmap;
    1309          62 :     BrinBuildState *state = NULL;
    1310          62 :     IndexInfo  *indexInfo = NULL;
    1311             :     BlockNumber heapNumBlocks;
    1312             :     BlockNumber pagesPerRange;
    1313             :     Buffer      buf;
    1314             :     BlockNumber startBlk;
    1315             : 
    1316          62 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
    1317             : 
    1318             :     /* determine range of pages to process */
    1319          62 :     heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
    1320          62 :     if (pageRange == BRIN_ALL_BLOCKRANGES)
    1321          40 :         startBlk = 0;
    1322             :     else
    1323             :     {
    1324          22 :         startBlk = (pageRange / pagesPerRange) * pagesPerRange;
    1325          22 :         heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
    1326             :     }
    1327          62 :     if (startBlk > heapNumBlocks)
    1328             :     {
    1329             :         /* Nothing to do if start point is beyond end of table */
    1330           0 :         brinRevmapTerminate(revmap);
    1331           0 :         return;
    1332             :     }
    1333             : 
    1334             :     /*
    1335             :      * Scan the revmap to find unsummarized items.
    1336             :      */
    1337          62 :     buf = InvalidBuffer;
    1338        1116 :     for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
    1339             :     {
    1340             :         BrinTuple  *tup;
    1341             :         OffsetNumber off;
    1342             : 
    1343             :         /*
    1344             :          * Unless requested to summarize even a partial range, go away now if
    1345             :          * we think the next range is partial.  Caller would pass true when it
    1346             :          * is typically run once bulk data loading is done
    1347             :          * (brin_summarize_new_values), and false when it is typically the
    1348             :          * result of arbitrarily-scheduled maintenance command (vacuuming).
    1349             :          */
    1350        1078 :         if (!include_partial &&
    1351         624 :             (startBlk + pagesPerRange > heapNumBlocks))
    1352          24 :             break;
    1353             : 
    1354        1054 :         CHECK_FOR_INTERRUPTS();
    1355             : 
    1356        1054 :         tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
    1357             :                                        BUFFER_LOCK_SHARE, NULL);
    1358        1054 :         if (tup == NULL)
    1359             :         {
    1360             :             /* no revmap entry for this heap range. Summarize it. */
    1361          42 :             if (state == NULL)
    1362             :             {
    1363             :                 /* first time through */
    1364             :                 Assert(!indexInfo);
    1365          22 :                 state = initialize_brin_buildstate(index, revmap,
    1366             :                                                    pagesPerRange);
    1367          22 :                 indexInfo = BuildIndexInfo(index);
    1368             :             }
    1369          42 :             summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
    1370             : 
    1371             :             /* and re-initialize state for the next range */
    1372          42 :             brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
    1373             : 
    1374          42 :             if (numSummarized)
    1375          42 :                 *numSummarized += 1.0;
    1376             :         }
    1377             :         else
    1378             :         {
    1379        1012 :             if (numExisting)
    1380         574 :                 *numExisting += 1.0;
    1381        1012 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
    1382             :         }
    1383             :     }
    1384             : 
    1385          62 :     if (BufferIsValid(buf))
    1386          32 :         ReleaseBuffer(buf);
    1387             : 
    1388             :     /* free resources */
    1389          62 :     brinRevmapTerminate(revmap);
    1390          62 :     if (state)
    1391             :     {
    1392          22 :         terminate_brin_buildstate(state);
    1393          22 :         pfree(indexInfo);
    1394             :     }
    1395             : }
    1396             : 
    1397             : /*
    1398             :  * Given a deformed tuple in the build state, convert it into the on-disk
    1399             :  * format and insert it into the index, making the revmap point to it.
    1400             :  */
    1401             : static void
    1402         624 : form_and_insert_tuple(BrinBuildState *state)
    1403             : {
    1404             :     BrinTuple  *tup;
    1405             :     Size        size;
    1406             : 
    1407         624 :     tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
    1408             :                           state->bs_dtuple, &size);
    1409         624 :     brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
    1410             :                   &state->bs_currentInsertBuf, state->bs_currRangeStart,
    1411             :                   tup, size);
    1412         624 :     state->bs_numtuples++;
    1413             : 
    1414         624 :     pfree(tup);
    1415         624 : }
    1416             : 
    1417             : /*
    1418             :  * Given two deformed tuples, adjust the first one so that it's consistent
    1419             :  * with the summary values in both.
    1420             :  */
    1421             : static void
    1422           0 : union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
    1423             : {
    1424             :     int         keyno;
    1425             :     BrinMemTuple *db;
    1426             :     MemoryContext cxt;
    1427             :     MemoryContext oldcxt;
    1428             : 
    1429             :     /* Use our own memory context to avoid retail pfree */
    1430           0 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
    1431             :                                 "brin union",
    1432             :                                 ALLOCSET_DEFAULT_SIZES);
    1433           0 :     oldcxt = MemoryContextSwitchTo(cxt);
    1434           0 :     db = brin_deform_tuple(bdesc, b, NULL);
    1435           0 :     MemoryContextSwitchTo(oldcxt);
    1436             : 
    1437           0 :     for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
    1438             :     {
    1439             :         FmgrInfo   *unionFn;
    1440           0 :         BrinValues *col_a = &a->bt_columns[keyno];
    1441           0 :         BrinValues *col_b = &db->bt_columns[keyno];
    1442             : 
    1443           0 :         unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
    1444             :                                     BRIN_PROCNUM_UNION);
    1445           0 :         FunctionCall3Coll(unionFn,
    1446           0 :                           bdesc->bd_index->rd_indcollation[keyno],
    1447             :                           PointerGetDatum(bdesc),
    1448             :                           PointerGetDatum(col_a),
    1449             :                           PointerGetDatum(col_b));
    1450             :     }
    1451             : 
    1452           0 :     MemoryContextDelete(cxt);
    1453           0 : }
    1454             : 
    1455             : /*
    1456             :  * brin_vacuum_scan
    1457             :  *      Do a complete scan of the index during VACUUM.
    1458             :  *
    1459             :  * This routine scans the complete index looking for uncatalogued index pages,
    1460             :  * i.e. those that might have been lost due to a crash after index extension
    1461             :  * and such.
    1462             :  */
    1463             : static void
    1464          30 : brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
    1465             : {
    1466             :     BlockNumber nblocks;
    1467             :     BlockNumber blkno;
    1468             : 
    1469             :     /*
    1470             :      * Scan the index in physical order, and clean up any possible mess in
    1471             :      * each page.
    1472             :      */
    1473          30 :     nblocks = RelationGetNumberOfBlocks(idxrel);
    1474         156 :     for (blkno = 0; blkno < nblocks; blkno++)
    1475             :     {
    1476             :         Buffer      buf;
    1477             : 
    1478         126 :         CHECK_FOR_INTERRUPTS();
    1479             : 
    1480         126 :         buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
    1481             :                                  RBM_NORMAL, strategy);
    1482             : 
    1483         126 :         brin_page_cleanup(idxrel, buf);
    1484             : 
    1485         126 :         ReleaseBuffer(buf);
    1486             :     }
    1487             : 
    1488             :     /*
    1489             :      * Update all upper pages in the index's FSM, as well.  This ensures not
    1490             :      * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
    1491             :      * but also that any pre-existing damage or out-of-dateness is repaired.
    1492             :      */
    1493          30 :     FreeSpaceMapVacuum(idxrel);
    1494          30 : }

Generated by: LCOV version 1.13