LCOV - code coverage report
Current view: top level - src/backend/access/brin - brin.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 409 458 89.3 %
Date: 2020-06-03 10:06:28 Functions: 23 25 92.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * brin.c
       3             :  *      Implementation of BRIN indexes for Postgres
       4             :  *
       5             :  * See src/backend/access/brin/README for details.
       6             :  *
       7             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/access/brin/brin.c
      12             :  *
      13             :  * TODO
      14             :  *      * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
      15             :  */
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/brin.h"
      19             : #include "access/brin_page.h"
      20             : #include "access/brin_pageops.h"
      21             : #include "access/brin_xlog.h"
      22             : #include "access/relation.h"
      23             : #include "access/reloptions.h"
      24             : #include "access/relscan.h"
      25             : #include "access/table.h"
      26             : #include "access/tableam.h"
      27             : #include "access/xloginsert.h"
      28             : #include "catalog/index.h"
      29             : #include "catalog/pg_am.h"
      30             : #include "commands/vacuum.h"
      31             : #include "miscadmin.h"
      32             : #include "pgstat.h"
      33             : #include "postmaster/autovacuum.h"
      34             : #include "storage/bufmgr.h"
      35             : #include "storage/freespace.h"
      36             : #include "utils/acl.h"
      37             : #include "utils/builtins.h"
      38             : #include "utils/index_selfuncs.h"
      39             : #include "utils/memutils.h"
      40             : #include "utils/rel.h"
      41             : 
      42             : 
      43             : /*
      44             :  * We use a BrinBuildState during initial construction of a BRIN index.
      45             :  * The running state is kept in a BrinMemTuple.
      46             :  */
      47             : typedef struct BrinBuildState
      48             : {
      49             :     Relation    bs_irel;
      50             :     int         bs_numtuples;
      51             :     Buffer      bs_currentInsertBuf;
      52             :     BlockNumber bs_pagesPerRange;
      53             :     BlockNumber bs_currRangeStart;
      54             :     BrinRevmap *bs_rmAccess;
      55             :     BrinDesc   *bs_bdesc;
      56             :     BrinMemTuple *bs_dtuple;
      57             : } BrinBuildState;
      58             : 
      59             : /*
      60             :  * Struct used as "opaque" during index scans
      61             :  */
      62             : typedef struct BrinOpaque
      63             : {
      64             :     BlockNumber bo_pagesPerRange;
      65             :     BrinRevmap *bo_rmAccess;
      66             :     BrinDesc   *bo_bdesc;
      67             : } BrinOpaque;
      68             : 
      69             : #define BRIN_ALL_BLOCKRANGES    InvalidBlockNumber
      70             : 
      71             : static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
      72             :                                                   BrinRevmap *revmap, BlockNumber pagesPerRange);
      73             : static void terminate_brin_buildstate(BrinBuildState *state);
      74             : static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
      75             :                           bool include_partial, double *numSummarized, double *numExisting);
      76             : static void form_and_insert_tuple(BrinBuildState *state);
      77             : static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
      78             :                          BrinTuple *b);
      79             : static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
      80             : 
      81             : 
      82             : /*
      83             :  * BRIN handler function: return IndexAmRoutine with access method parameters
      84             :  * and callbacks.
      85             :  */
      86             : Datum
      87         486 : brinhandler(PG_FUNCTION_ARGS)
      88             : {
      89         486 :     IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
      90             : 
      91         486 :     amroutine->amstrategies = 0;
      92         486 :     amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
      93         486 :     amroutine->amoptsprocnum = BRIN_PROCNUM_OPTIONS;
      94         486 :     amroutine->amcanorder = false;
      95         486 :     amroutine->amcanorderbyop = false;
      96         486 :     amroutine->amcanbackward = false;
      97         486 :     amroutine->amcanunique = false;
      98         486 :     amroutine->amcanmulticol = true;
      99         486 :     amroutine->amoptionalkey = true;
     100         486 :     amroutine->amsearcharray = false;
     101         486 :     amroutine->amsearchnulls = true;
     102         486 :     amroutine->amstorage = true;
     103         486 :     amroutine->amclusterable = false;
     104         486 :     amroutine->ampredlocks = false;
     105         486 :     amroutine->amcanparallel = false;
     106         486 :     amroutine->amcaninclude = false;
     107         486 :     amroutine->amusemaintenanceworkmem = false;
     108         486 :     amroutine->amparallelvacuumoptions =
     109             :         VACUUM_OPTION_PARALLEL_CLEANUP;
     110         486 :     amroutine->amkeytype = InvalidOid;
     111             : 
     112         486 :     amroutine->ambuild = brinbuild;
     113         486 :     amroutine->ambuildempty = brinbuildempty;
     114         486 :     amroutine->aminsert = brininsert;
     115         486 :     amroutine->ambulkdelete = brinbulkdelete;
     116         486 :     amroutine->amvacuumcleanup = brinvacuumcleanup;
     117         486 :     amroutine->amcanreturn = NULL;
     118         486 :     amroutine->amcostestimate = brincostestimate;
     119         486 :     amroutine->amoptions = brinoptions;
     120         486 :     amroutine->amproperty = NULL;
     121         486 :     amroutine->ambuildphasename = NULL;
     122         486 :     amroutine->amvalidate = brinvalidate;
     123         486 :     amroutine->ambeginscan = brinbeginscan;
     124         486 :     amroutine->amrescan = brinrescan;
     125         486 :     amroutine->amgettuple = NULL;
     126         486 :     amroutine->amgetbitmap = bringetbitmap;
     127         486 :     amroutine->amendscan = brinendscan;
     128         486 :     amroutine->ammarkpos = NULL;
     129         486 :     amroutine->amrestrpos = NULL;
     130         486 :     amroutine->amestimateparallelscan = NULL;
     131         486 :     amroutine->aminitparallelscan = NULL;
     132         486 :     amroutine->amparallelrescan = NULL;
     133             : 
     134         486 :     PG_RETURN_POINTER(amroutine);
     135             : }
     136             : 
     137             : /*
     138             :  * A tuple in the heap is being inserted.  To keep a brin index up to date,
     139             :  * we need to obtain the relevant index tuple and compare its stored values
     140             :  * with those of the new tuple.  If the tuple values are not consistent with
     141             :  * the summary tuple, we need to update the index tuple.
     142             :  *
     143             :  * If autosummarization is enabled, check if we need to summarize the previous
     144             :  * page range.
     145             :  *
     146             :  * If the range is not currently summarized (i.e. the revmap returns NULL for
     147             :  * it), there's nothing to do for this tuple.
     148             :  */
     149             : bool
     150        9286 : brininsert(Relation idxRel, Datum *values, bool *nulls,
     151             :            ItemPointer heaptid, Relation heapRel,
     152             :            IndexUniqueCheck checkUnique,
     153             :            IndexInfo *indexInfo)
     154             : {
     155             :     BlockNumber pagesPerRange;
     156             :     BlockNumber origHeapBlk;
     157             :     BlockNumber heapBlk;
     158        9286 :     BrinDesc   *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
     159             :     BrinRevmap *revmap;
     160        9286 :     Buffer      buf = InvalidBuffer;
     161        9286 :     MemoryContext tupcxt = NULL;
     162        9286 :     MemoryContext oldcxt = CurrentMemoryContext;
     163        9286 :     bool        autosummarize = BrinGetAutoSummarize(idxRel);
     164             : 
     165        9286 :     revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
     166             : 
     167             :     /*
     168             :      * origHeapBlk is the block number where the insertion occurred.  heapBlk
     169             :      * is the first block in the corresponding page range.
     170             :      */
     171        9286 :     origHeapBlk = ItemPointerGetBlockNumber(heaptid);
     172        9286 :     heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
     173             : 
     174             :     for (;;)
     175           0 :     {
     176        9286 :         bool        need_insert = false;
     177             :         OffsetNumber off;
     178             :         BrinTuple  *brtup;
     179             :         BrinMemTuple *dtup;
     180             :         int         keyno;
     181             : 
     182        9286 :         CHECK_FOR_INTERRUPTS();
     183             : 
     184             :         /*
     185             :          * If auto-summarization is enabled and we just inserted the first
     186             :          * tuple into the first block of a new non-first page range, request a
     187             :          * summarization run of the previous range.
     188             :          */
     189        9286 :         if (autosummarize &&
     190         156 :             heapBlk > 0 &&
     191         156 :             heapBlk == origHeapBlk &&
     192         156 :             ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
     193             :         {
     194           8 :             BlockNumber lastPageRange = heapBlk - 1;
     195             :             BrinTuple  *lastPageTuple;
     196             : 
     197             :             lastPageTuple =
     198           8 :                 brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
     199             :                                          NULL, BUFFER_LOCK_SHARE, NULL);
     200           8 :             if (!lastPageTuple)
     201             :             {
     202             :                 bool        recorded;
     203             : 
     204           6 :                 recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
     205             :                                                  RelationGetRelid(idxRel),
     206             :                                                  lastPageRange);
     207           6 :                 if (!recorded)
     208           0 :                     ereport(LOG,
     209             :                             (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     210             :                              errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
     211             :                                     RelationGetRelationName(idxRel),
     212             :                                     lastPageRange)));
     213             :             }
     214             :             else
     215           2 :                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     216             :         }
     217             : 
     218        9286 :         brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
     219             :                                          NULL, BUFFER_LOCK_SHARE, NULL);
     220             : 
     221             :         /* if range is unsummarized, there's nothing to do */
     222        9286 :         if (!brtup)
     223         188 :             break;
     224             : 
     225             :         /* First time through in this statement? */
     226        9098 :         if (bdesc == NULL)
     227             :         {
     228         294 :             MemoryContextSwitchTo(indexInfo->ii_Context);
     229         294 :             bdesc = brin_build_desc(idxRel);
     230         294 :             indexInfo->ii_AmCache = (void *) bdesc;
     231         294 :             MemoryContextSwitchTo(oldcxt);
     232             :         }
     233             :         /* First time through in this brininsert call? */
     234        9098 :         if (tupcxt == NULL)
     235             :         {
     236        9098 :             tupcxt = AllocSetContextCreate(CurrentMemoryContext,
     237             :                                            "brininsert cxt",
     238             :                                            ALLOCSET_DEFAULT_SIZES);
     239        9098 :             MemoryContextSwitchTo(tupcxt);
     240             :         }
     241             : 
     242        9098 :         dtup = brin_deform_tuple(bdesc, brtup, NULL);
     243             : 
     244             :         /*
     245             :          * Compare the key values of the new tuple to the stored index values;
     246             :          * our deformed tuple will get updated if the new tuple doesn't fit
     247             :          * the original range (note this means we can't break out of the loop
     248             :          * early). Make a note of whether this happens, so that we know to
     249             :          * insert the modified tuple later.
     250             :          */
     251       42556 :         for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
     252             :         {
     253             :             Datum       result;
     254             :             BrinValues *bval;
     255             :             FmgrInfo   *addValue;
     256             : 
     257       33458 :             bval = &dtup->bt_columns[keyno];
     258       33458 :             addValue = index_getprocinfo(idxRel, keyno + 1,
     259             :                                          BRIN_PROCNUM_ADDVALUE);
     260      100374 :             result = FunctionCall4Coll(addValue,
     261       33458 :                                        idxRel->rd_indcollation[keyno],
     262             :                                        PointerGetDatum(bdesc),
     263             :                                        PointerGetDatum(bval),
     264       33458 :                                        values[keyno],
     265       33458 :                                        nulls[keyno]);
     266             :             /* if that returned true, we need to insert the updated tuple */
     267       33458 :             need_insert |= DatumGetBool(result);
     268             :         }
     269             : 
     270        9098 :         if (!need_insert)
     271             :         {
     272             :             /*
     273             :              * The tuple is consistent with the new values, so there's nothing
     274             :              * to do.
     275             :              */
     276        8194 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     277             :         }
     278             :         else
     279             :         {
     280         904 :             Page        page = BufferGetPage(buf);
     281         904 :             ItemId      lp = PageGetItemId(page, off);
     282             :             Size        origsz;
     283             :             BrinTuple  *origtup;
     284             :             Size        newsz;
     285             :             BrinTuple  *newtup;
     286             :             bool        samepage;
     287             : 
     288             :             /*
     289             :              * Make a copy of the old tuple, so that we can compare it after
     290             :              * re-acquiring the lock.
     291             :              */
     292         904 :             origsz = ItemIdGetLength(lp);
     293         904 :             origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
     294             : 
     295             :             /*
     296             :              * Before releasing the lock, check if we can attempt a same-page
     297             :              * update.  Another process could insert a tuple concurrently in
     298             :              * the same page though, so downstream we must be prepared to cope
     299             :              * if this turns out to not be possible after all.
     300             :              */
     301         904 :             newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
     302         904 :             samepage = brin_can_do_samepage_update(buf, origsz, newsz);
     303         904 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     304             : 
     305             :             /*
     306             :              * Try to update the tuple.  If this doesn't work for whatever
     307             :              * reason, we need to restart from the top; the revmap might be
     308             :              * pointing at a different tuple for this block now, so we need to
     309             :              * recompute to ensure both our new heap tuple and the other
     310             :              * inserter's are covered by the combined tuple.  It might be that
     311             :              * we don't need to update at all.
     312             :              */
     313         904 :             if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
     314             :                                buf, off, origtup, origsz, newtup, newsz,
     315             :                                samepage))
     316             :             {
     317             :                 /* no luck; start over */
     318           0 :                 MemoryContextResetAndDeleteChildren(tupcxt);
     319           0 :                 continue;
     320             :             }
     321             :         }
     322             : 
     323             :         /* success! */
     324        9098 :         break;
     325             :     }
     326             : 
     327        9286 :     brinRevmapTerminate(revmap);
     328        9286 :     if (BufferIsValid(buf))
     329        9100 :         ReleaseBuffer(buf);
     330        9286 :     MemoryContextSwitchTo(oldcxt);
     331        9286 :     if (tupcxt != NULL)
     332        9098 :         MemoryContextDelete(tupcxt);
     333             : 
     334        9286 :     return false;
     335             : }
     336             : 
     337             : /*
     338             :  * Initialize state for a BRIN index scan.
     339             :  *
     340             :  * We read the metapage here to determine the pages-per-range number that this
     341             :  * index was built with.  Note that since this cannot be changed while we're
     342             :  * holding lock on index, it's not necessary to recompute it during brinrescan.
     343             :  */
     344             : IndexScanDesc
     345         992 : brinbeginscan(Relation r, int nkeys, int norderbys)
     346             : {
     347             :     IndexScanDesc scan;
     348             :     BrinOpaque *opaque;
     349             : 
     350         992 :     scan = RelationGetIndexScan(r, nkeys, norderbys);
     351             : 
     352         992 :     opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
     353         992 :     opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
     354             :                                                scan->xs_snapshot);
     355         992 :     opaque->bo_bdesc = brin_build_desc(r);
     356         992 :     scan->opaque = opaque;
     357             : 
     358         992 :     return scan;
     359             : }
     360             : 
     361             : /*
     362             :  * Execute the index scan.
     363             :  *
     364             :  * This works by reading index TIDs from the revmap, and obtaining the index
     365             :  * tuples pointed to by them; the summary values in the index tuples are
     366             :  * compared to the scan keys.  We return into the TID bitmap all the pages in
     367             :  * ranges corresponding to index tuples that match the scan keys.
     368             :  *
     369             :  * If a TID from the revmap is read as InvalidTID, we know that range is
     370             :  * unsummarized.  Pages in those ranges need to be returned regardless of scan
     371             :  * keys.
     372             :  */
     373             : int64
     374         992 : bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
     375             : {
     376         992 :     Relation    idxRel = scan->indexRelation;
     377         992 :     Buffer      buf = InvalidBuffer;
     378             :     BrinDesc   *bdesc;
     379             :     Oid         heapOid;
     380             :     Relation    heapRel;
     381             :     BrinOpaque *opaque;
     382             :     BlockNumber nblocks;
     383             :     BlockNumber heapBlk;
     384         992 :     int         totalpages = 0;
     385             :     FmgrInfo   *consistentFn;
     386             :     MemoryContext oldcxt;
     387             :     MemoryContext perRangeCxt;
     388             :     BrinMemTuple *dtup;
     389         992 :     BrinTuple  *btup = NULL;
     390         992 :     Size        btupsz = 0;
     391             : 
     392         992 :     opaque = (BrinOpaque *) scan->opaque;
     393         992 :     bdesc = opaque->bo_bdesc;
     394         992 :     pgstat_count_index_scan(idxRel);
     395             : 
     396             :     /*
     397             :      * We need to know the size of the table so that we know how long to
     398             :      * iterate on the revmap.
     399             :      */
     400         992 :     heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
     401         992 :     heapRel = table_open(heapOid, AccessShareLock);
     402         992 :     nblocks = RelationGetNumberOfBlocks(heapRel);
     403         992 :     table_close(heapRel, AccessShareLock);
     404             : 
     405             :     /*
     406             :      * Make room for the consistent support procedures of indexed columns.  We
     407             :      * don't look them up here; we do that lazily the first time we see a scan
     408             :      * key reference each of them.  We rely on zeroing fn_oid to InvalidOid.
     409             :      */
     410         992 :     consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
     411             : 
     412             :     /* allocate an initial in-memory tuple, out of the per-range memcxt */
     413         992 :     dtup = brin_new_memtuple(bdesc);
     414             : 
     415             :     /*
     416             :      * Setup and use a per-range memory context, which is reset every time we
     417             :      * loop below.  This avoids having to free the tuples within the loop.
     418             :      */
     419         992 :     perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
     420             :                                         "bringetbitmap cxt",
     421             :                                         ALLOCSET_DEFAULT_SIZES);
     422         992 :     oldcxt = MemoryContextSwitchTo(perRangeCxt);
     423             : 
     424             :     /*
     425             :      * Now scan the revmap.  We start by querying for heap page 0,
     426             :      * incrementing by the number of pages per range; this gives us a full
     427             :      * view of the table.
     428             :      */
     429      100192 :     for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
     430             :     {
     431             :         bool        addrange;
     432       99200 :         bool        gottuple = false;
     433             :         BrinTuple  *tup;
     434             :         OffsetNumber off;
     435             :         Size        size;
     436             : 
     437       99200 :         CHECK_FOR_INTERRUPTS();
     438             : 
     439       99200 :         MemoryContextResetAndDeleteChildren(perRangeCxt);
     440             : 
     441       99200 :         tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
     442             :                                        &off, &size, BUFFER_LOCK_SHARE,
     443             :                                        scan->xs_snapshot);
     444       99200 :         if (tup)
     445             :         {
     446       99200 :             gottuple = true;
     447       99200 :             btup = brin_copy_tuple(tup, size, btup, &btupsz);
     448       99200 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     449             :         }
     450             : 
     451             :         /*
     452             :          * For page ranges with no indexed tuple, we must return the whole
     453             :          * range; otherwise, compare it to the scan keys.
     454             :          */
     455       99200 :         if (!gottuple)
     456             :         {
     457           0 :             addrange = true;
     458             :         }
     459             :         else
     460             :         {
     461       99200 :             dtup = brin_deform_tuple(bdesc, btup, dtup);
     462       99200 :             if (dtup->bt_placeholder)
     463             :             {
     464             :                 /*
     465             :                  * Placeholder tuples are always returned, regardless of the
     466             :                  * values stored in them.
     467             :                  */
     468           0 :                 addrange = true;
     469             :             }
     470             :             else
     471             :             {
     472             :                 int         keyno;
     473             : 
     474             :                 /*
     475             :                  * Compare scan keys with summary values stored for the range.
     476             :                  * If scan keys are matched, the page range must be added to
     477             :                  * the bitmap.  We initially assume the range needs to be
     478             :                  * added; in particular this serves the case where there are
     479             :                  * no keys.
     480             :                  */
     481       99200 :                 addrange = true;
     482      173108 :                 for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
     483             :                 {
     484       99200 :                     ScanKey     key = &scan->keyData[keyno];
     485       99200 :                     AttrNumber  keyattno = key->sk_attno;
     486       99200 :                     BrinValues *bval = &dtup->bt_columns[keyattno - 1];
     487             :                     Datum       add;
     488             : 
     489             :                     /*
     490             :                      * The collation of the scan key must match the collation
     491             :                      * used in the index column (but only if the search is not
     492             :                      * IS NULL/ IS NOT NULL).  Otherwise we shouldn't be using
     493             :                      * this index ...
     494             :                      */
     495             :                     Assert((key->sk_flags & SK_ISNULL) ||
     496             :                            (key->sk_collation ==
     497             :                             TupleDescAttr(bdesc->bd_tupdesc,
     498             :                                           keyattno - 1)->attcollation));
     499             : 
     500             :                     /* First time this column? look up consistent function */
     501       99200 :                     if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
     502             :                     {
     503             :                         FmgrInfo   *tmp;
     504             : 
     505         992 :                         tmp = index_getprocinfo(idxRel, keyattno,
     506             :                                                 BRIN_PROCNUM_CONSISTENT);
     507         992 :                         fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
     508             :                                        CurrentMemoryContext);
     509             :                     }
     510             : 
     511             :                     /*
     512             :                      * Check whether the scan key is consistent with the page
     513             :                      * range values; if so, have the pages in the range added
     514             :                      * to the output bitmap.
     515             :                      *
     516             :                      * When there are multiple scan keys, failure to meet the
     517             :                      * criteria for a single one of them is enough to discard
     518             :                      * the range as a whole, so break out of the loop as soon
     519             :                      * as a false return value is obtained.
     520             :                      */
     521       99200 :                     add = FunctionCall3Coll(&consistentFn[keyattno - 1],
     522             :                                             key->sk_collation,
     523             :                                             PointerGetDatum(bdesc),
     524             :                                             PointerGetDatum(bval),
     525             :                                             PointerGetDatum(key));
     526       99200 :                     addrange = DatumGetBool(add);
     527       99200 :                     if (!addrange)
     528       25292 :                         break;
     529             :                 }
     530             :             }
     531             :         }
     532             : 
     533             :         /* add the pages in the range to the output bitmap, if needed */
     534       99200 :         if (addrange)
     535             :         {
     536             :             BlockNumber pageno;
     537             : 
     538       73908 :             for (pageno = heapBlk;
     539      147816 :                  pageno <= heapBlk + opaque->bo_pagesPerRange - 1;
     540       73908 :                  pageno++)
     541             :             {
     542       73908 :                 MemoryContextSwitchTo(oldcxt);
     543       73908 :                 tbm_add_page(tbm, pageno);
     544       73908 :                 totalpages++;
     545       73908 :                 MemoryContextSwitchTo(perRangeCxt);
     546             :             }
     547             :         }
     548             :     }
     549             : 
     550         992 :     MemoryContextSwitchTo(oldcxt);
     551         992 :     MemoryContextDelete(perRangeCxt);
     552             : 
     553         992 :     if (buf != InvalidBuffer)
     554         992 :         ReleaseBuffer(buf);
     555             : 
     556             :     /*
     557             :      * XXX We have an approximation of the number of *pages* that our scan
     558             :      * returns, but we don't have a precise idea of the number of heap tuples
     559             :      * involved.
     560             :      */
     561         992 :     return totalpages * 10;
     562             : }
     563             : 
     564             : /*
     565             :  * Re-initialize state for a BRIN index scan
     566             :  */
     567             : void
     568         992 : brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
     569             :            ScanKey orderbys, int norderbys)
     570             : {
     571             :     /*
     572             :      * Other index AMs preprocess the scan keys at this point, or sometime
     573             :      * early during the scan; this lets them optimize by removing redundant
     574             :      * keys, or doing early returns when they are impossible to satisfy; see
     575             :      * _bt_preprocess_keys for an example.  Something like that could be added
     576             :      * here someday, too.
     577             :      */
     578             : 
     579         992 :     if (scankey && scan->numberOfKeys > 0)
     580         992 :         memmove(scan->keyData, scankey,
     581         992 :                 scan->numberOfKeys * sizeof(ScanKeyData));
     582         992 : }
     583             : 
     584             : /*
     585             :  * Close down a BRIN index scan
     586             :  */
     587             : void
     588         992 : brinendscan(IndexScanDesc scan)
     589             : {
     590         992 :     BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
     591             : 
     592         992 :     brinRevmapTerminate(opaque->bo_rmAccess);
     593         992 :     brin_free_desc(opaque->bo_bdesc);
     594         992 :     pfree(opaque);
     595         992 : }
     596             : 
     597             : /*
     598             :  * Per-heap-tuple callback for table_index_build_scan.
     599             :  *
     600             :  * Note we don't worry about the page range at the end of the table here; it is
     601             :  * present in the build state struct after we're called the last time, but not
     602             :  * inserted into the index.  Caller must ensure to do so, if appropriate.
     603             :  */
     604             : static void
     605      284714 : brinbuildCallback(Relation index,
     606             :                   ItemPointer tid,
     607             :                   Datum *values,
     608             :                   bool *isnull,
     609             :                   bool tupleIsAlive,
     610             :                   void *brstate)
     611             : {
     612      284714 :     BrinBuildState *state = (BrinBuildState *) brstate;
     613             :     BlockNumber thisblock;
     614             :     int         i;
     615             : 
     616      284714 :     thisblock = ItemPointerGetBlockNumber(tid);
     617             : 
     618             :     /*
     619             :      * If we're in a block that belongs to a future range, summarize what
     620             :      * we've got and start afresh.  Note the scan might have skipped many
     621             :      * pages, if they were devoid of live tuples; make sure to insert index
     622             :      * tuples for those too.
     623             :      */
     624      285292 :     while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
     625             :     {
     626             : 
     627             :         BRIN_elog((DEBUG2,
     628             :                    "brinbuildCallback: completed a range: %u--%u",
     629             :                    state->bs_currRangeStart,
     630             :                    state->bs_currRangeStart + state->bs_pagesPerRange));
     631             : 
     632             :         /* create the index tuple and insert it */
     633         578 :         form_and_insert_tuple(state);
     634             : 
     635             :         /* set state to correspond to the next range */
     636         578 :         state->bs_currRangeStart += state->bs_pagesPerRange;
     637             : 
     638             :         /* re-initialize state for it */
     639         578 :         brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
     640             :     }
     641             : 
     642             :     /* Accumulate the current tuple into the running state */
     643      585900 :     for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
     644             :     {
     645             :         FmgrInfo   *addValue;
     646             :         BrinValues *col;
     647      301186 :         Form_pg_attribute attr = TupleDescAttr(state->bs_bdesc->bd_tupdesc, i);
     648             : 
     649      301186 :         col = &state->bs_dtuple->bt_columns[i];
     650      301186 :         addValue = index_getprocinfo(index, i + 1,
     651             :                                      BRIN_PROCNUM_ADDVALUE);
     652             : 
     653             :         /*
     654             :          * Update dtuple state, if and as necessary.
     655             :          */
     656      903558 :         FunctionCall4Coll(addValue,
     657             :                           attr->attcollation,
     658      301186 :                           PointerGetDatum(state->bs_bdesc),
     659             :                           PointerGetDatum(col),
     660      301186 :                           values[i], isnull[i]);
     661             :     }
     662      284714 : }
     663             : 
     664             : /*
     665             :  * brinbuild() -- build a new BRIN index.
     666             :  */
     667             : IndexBuildResult *
     668          42 : brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
     669             : {
     670             :     IndexBuildResult *result;
     671             :     double      reltuples;
     672             :     double      idxtuples;
     673             :     BrinRevmap *revmap;
     674             :     BrinBuildState *state;
     675             :     Buffer      meta;
     676             :     BlockNumber pagesPerRange;
     677             : 
     678             :     /*
     679             :      * We expect to be called exactly once for any index relation.
     680             :      */
     681          42 :     if (RelationGetNumberOfBlocks(index) != 0)
     682           0 :         elog(ERROR, "index \"%s\" already contains data",
     683             :              RelationGetRelationName(index));
     684             : 
     685             :     /*
     686             :      * Critical section not required, because on error the creation of the
     687             :      * whole relation will be rolled back.
     688             :      */
     689             : 
     690          42 :     meta = ReadBuffer(index, P_NEW);
     691             :     Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
     692          42 :     LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
     693             : 
     694          42 :     brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
     695             :                        BRIN_CURRENT_VERSION);
     696          42 :     MarkBufferDirty(meta);
     697             : 
     698          42 :     if (RelationNeedsWAL(index))
     699             :     {
     700             :         xl_brin_createidx xlrec;
     701             :         XLogRecPtr  recptr;
     702             :         Page        page;
     703             : 
     704          40 :         xlrec.version = BRIN_CURRENT_VERSION;
     705          40 :         xlrec.pagesPerRange = BrinGetPagesPerRange(index);
     706             : 
     707          40 :         XLogBeginInsert();
     708          40 :         XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
     709          40 :         XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
     710             : 
     711          40 :         recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
     712             : 
     713          40 :         page = BufferGetPage(meta);
     714          40 :         PageSetLSN(page, recptr);
     715             :     }
     716             : 
     717          42 :     UnlockReleaseBuffer(meta);
     718             : 
     719             :     /*
     720             :      * Initialize our state, including the deformed tuple state.
     721             :      */
     722          42 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
     723          42 :     state = initialize_brin_buildstate(index, revmap, pagesPerRange);
     724             : 
     725             :     /*
     726             :      * Now scan the relation.  No syncscan allowed here because we want the
     727             :      * heap blocks in physical order.
     728             :      */
     729          42 :     reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
     730             :                                        brinbuildCallback, (void *) state, NULL);
     731             : 
     732             :     /* process the final batch */
     733          42 :     form_and_insert_tuple(state);
     734             : 
     735             :     /* release resources */
     736          42 :     idxtuples = state->bs_numtuples;
     737          42 :     brinRevmapTerminate(state->bs_rmAccess);
     738          42 :     terminate_brin_buildstate(state);
     739             : 
     740             :     /*
     741             :      * Return statistics
     742             :      */
     743          42 :     result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
     744             : 
     745          42 :     result->heap_tuples = reltuples;
     746          42 :     result->index_tuples = idxtuples;
     747             : 
     748          42 :     return result;
     749             : }
     750             : 
     751             : void
     752           0 : brinbuildempty(Relation index)
     753             : {
     754             :     Buffer      metabuf;
     755             : 
     756             :     /* An empty BRIN index has a metapage only. */
     757             :     metabuf =
     758           0 :         ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
     759           0 :     LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
     760             : 
     761             :     /* Initialize and xlog metabuffer. */
     762           0 :     START_CRIT_SECTION();
     763           0 :     brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
     764             :                        BRIN_CURRENT_VERSION);
     765           0 :     MarkBufferDirty(metabuf);
     766           0 :     log_newpage_buffer(metabuf, true);
     767           0 :     END_CRIT_SECTION();
     768             : 
     769           0 :     UnlockReleaseBuffer(metabuf);
     770           0 : }
     771             : 
     772             : /*
     773             :  * brinbulkdelete
     774             :  *      Since there are no per-heap-tuple index tuples in BRIN indexes,
     775             :  *      there's not a lot we can do here.
     776             :  *
     777             :  * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
     778             :  * tuple is deleted), meaning the need to re-run summarization on the affected
     779             :  * range.  Would need to add an extra flag in brintuples for that.
     780             :  */
     781             : IndexBulkDeleteResult *
     782           6 : brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
     783             :                IndexBulkDeleteCallback callback, void *callback_state)
     784             : {
     785             :     /* allocate stats if first time through, else re-use existing struct */
     786           6 :     if (stats == NULL)
     787           6 :         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     788             : 
     789           6 :     return stats;
     790             : }
     791             : 
     792             : /*
     793             :  * This routine is in charge of "vacuuming" a BRIN index: we just summarize
     794             :  * ranges that are currently unsummarized.
     795             :  */
     796             : IndexBulkDeleteResult *
     797          66 : brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
     798             : {
     799             :     Relation    heapRel;
     800             : 
     801             :     /* No-op in ANALYZE ONLY mode */
     802          66 :     if (info->analyze_only)
     803          26 :         return stats;
     804             : 
     805          40 :     if (!stats)
     806          34 :         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     807          40 :     stats->num_pages = RelationGetNumberOfBlocks(info->index);
     808             :     /* rest of stats is initialized by zeroing */
     809             : 
     810          40 :     heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
     811             :                          AccessShareLock);
     812             : 
     813          40 :     brin_vacuum_scan(info->index, info->strategy);
     814             : 
     815          40 :     brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
     816             :                   &stats->num_index_tuples, &stats->num_index_tuples);
     817             : 
     818          40 :     table_close(heapRel, AccessShareLock);
     819             : 
     820          40 :     return stats;
     821             : }
     822             : 
     823             : /*
     824             :  * reloptions processor for BRIN indexes
     825             :  */
     826             : bytea *
     827         192 : brinoptions(Datum reloptions, bool validate)
     828             : {
     829             :     static const relopt_parse_elt tab[] = {
     830             :         {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
     831             :         {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
     832             :     };
     833             : 
     834         192 :     return (bytea *) build_reloptions(reloptions, validate,
     835             :                                       RELOPT_KIND_BRIN,
     836             :                                       sizeof(BrinOptions),
     837             :                                       tab, lengthof(tab));
     838             : }
     839             : 
     840             : /*
     841             :  * SQL-callable function to scan through an index and summarize all ranges
     842             :  * that are not currently summarized.
     843             :  */
     844             : Datum
     845          14 : brin_summarize_new_values(PG_FUNCTION_ARGS)
     846             : {
     847          14 :     Datum       relation = PG_GETARG_DATUM(0);
     848             : 
     849          14 :     return DirectFunctionCall2(brin_summarize_range,
     850             :                                relation,
     851             :                                Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
     852             : }
     853             : 
     854             : /*
     855             :  * SQL-callable function to summarize the indicated page range, if not already
     856             :  * summarized.  If the second argument is BRIN_ALL_BLOCKRANGES, all
     857             :  * unsummarized ranges are summarized.
     858             :  */
     859             : Datum
     860          48 : brin_summarize_range(PG_FUNCTION_ARGS)
     861             : {
     862          48 :     Oid         indexoid = PG_GETARG_OID(0);
     863          48 :     int64       heapBlk64 = PG_GETARG_INT64(1);
     864             :     BlockNumber heapBlk;
     865             :     Oid         heapoid;
     866             :     Relation    indexRel;
     867             :     Relation    heapRel;
     868          48 :     double      numSummarized = 0;
     869             : 
     870          48 :     if (RecoveryInProgress())
     871           0 :         ereport(ERROR,
     872             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     873             :                  errmsg("recovery is in progress"),
     874             :                  errhint("BRIN control functions cannot be executed during recovery.")));
     875             : 
     876          48 :     if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
     877             :     {
     878           8 :         char       *blk = psprintf(INT64_FORMAT, heapBlk64);
     879             : 
     880           8 :         ereport(ERROR,
     881             :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
     882             :                  errmsg("block number out of range: %s", blk)));
     883             :     }
     884          40 :     heapBlk = (BlockNumber) heapBlk64;
     885             : 
     886             :     /*
     887             :      * We must lock table before index to avoid deadlocks.  However, if the
     888             :      * passed indexoid isn't an index then IndexGetRelation() will fail.
     889             :      * Rather than emitting a not-very-helpful error message, postpone
     890             :      * complaining, expecting that the is-it-an-index test below will fail.
     891             :      */
     892          40 :     heapoid = IndexGetRelation(indexoid, true);
     893          40 :     if (OidIsValid(heapoid))
     894          36 :         heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
     895             :     else
     896           4 :         heapRel = NULL;
     897             : 
     898          40 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
     899             : 
     900             :     /* Must be a BRIN index */
     901          36 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
     902          36 :         indexRel->rd_rel->relam != BRIN_AM_OID)
     903           4 :         ereport(ERROR,
     904             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     905             :                  errmsg("\"%s\" is not a BRIN index",
     906             :                         RelationGetRelationName(indexRel))));
     907             : 
     908             :     /* User must own the index (comparable to privileges needed for VACUUM) */
     909          32 :     if (!pg_class_ownercheck(indexoid, GetUserId()))
     910           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
     911           0 :                        RelationGetRelationName(indexRel));
     912             : 
     913             :     /*
     914             :      * Since we did the IndexGetRelation call above without any lock, it's
     915             :      * barely possible that a race against an index drop/recreation could have
     916             :      * netted us the wrong table.  Recheck.
     917             :      */
     918          32 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
     919           0 :         ereport(ERROR,
     920             :                 (errcode(ERRCODE_UNDEFINED_TABLE),
     921             :                  errmsg("could not open parent table of index %s",
     922             :                         RelationGetRelationName(indexRel))));
     923             : 
     924             :     /* OK, do it */
     925          32 :     brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
     926             : 
     927          32 :     relation_close(indexRel, ShareUpdateExclusiveLock);
     928          32 :     relation_close(heapRel, ShareUpdateExclusiveLock);
     929             : 
     930          32 :     PG_RETURN_INT32((int32) numSummarized);
     931             : }
     932             : 
     933             : /*
     934             :  * SQL-callable interface to mark a range as no longer summarized
     935             :  */
     936             : Datum
     937          24 : brin_desummarize_range(PG_FUNCTION_ARGS)
     938             : {
     939          24 :     Oid         indexoid = PG_GETARG_OID(0);
     940          24 :     int64       heapBlk64 = PG_GETARG_INT64(1);
     941             :     BlockNumber heapBlk;
     942             :     Oid         heapoid;
     943             :     Relation    heapRel;
     944             :     Relation    indexRel;
     945             :     bool        done;
     946             : 
     947          24 :     if (RecoveryInProgress())
     948           0 :         ereport(ERROR,
     949             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     950             :                  errmsg("recovery is in progress"),
     951             :                  errhint("BRIN control functions cannot be executed during recovery.")));
     952             : 
     953          24 :     if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
     954             :     {
     955           4 :         char       *blk = psprintf(INT64_FORMAT, heapBlk64);
     956             : 
     957           4 :         ereport(ERROR,
     958             :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
     959             :                  errmsg("block number out of range: %s", blk)));
     960             :     }
     961          20 :     heapBlk = (BlockNumber) heapBlk64;
     962             : 
     963             :     /*
     964             :      * We must lock table before index to avoid deadlocks.  However, if the
     965             :      * passed indexoid isn't an index then IndexGetRelation() will fail.
     966             :      * Rather than emitting a not-very-helpful error message, postpone
     967             :      * complaining, expecting that the is-it-an-index test below will fail.
     968             :      */
     969          20 :     heapoid = IndexGetRelation(indexoid, true);
     970          20 :     if (OidIsValid(heapoid))
     971          20 :         heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
     972             :     else
     973           0 :         heapRel = NULL;
     974             : 
     975          20 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
     976             : 
     977             :     /* Must be a BRIN index */
     978          20 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
     979          20 :         indexRel->rd_rel->relam != BRIN_AM_OID)
     980           0 :         ereport(ERROR,
     981             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     982             :                  errmsg("\"%s\" is not a BRIN index",
     983             :                         RelationGetRelationName(indexRel))));
     984             : 
     985             :     /* User must own the index (comparable to privileges needed for VACUUM) */
     986          20 :     if (!pg_class_ownercheck(indexoid, GetUserId()))
     987           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
     988           0 :                        RelationGetRelationName(indexRel));
     989             : 
     990             :     /*
     991             :      * Since we did the IndexGetRelation call above without any lock, it's
     992             :      * barely possible that a race against an index drop/recreation could have
     993             :      * netted us the wrong table.  Recheck.
     994             :      */
     995          20 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
     996           0 :         ereport(ERROR,
     997             :                 (errcode(ERRCODE_UNDEFINED_TABLE),
     998             :                  errmsg("could not open parent table of index %s",
     999             :                         RelationGetRelationName(indexRel))));
    1000             : 
    1001             :     /* the revmap does the hard work */
    1002             :     do
    1003             :     {
    1004          20 :         done = brinRevmapDesummarizeRange(indexRel, heapBlk);
    1005             :     }
    1006          20 :     while (!done);
    1007             : 
    1008          20 :     relation_close(indexRel, ShareUpdateExclusiveLock);
    1009          20 :     relation_close(heapRel, ShareUpdateExclusiveLock);
    1010             : 
    1011          20 :     PG_RETURN_VOID();
    1012             : }
    1013             : 
    1014             : /*
    1015             :  * Build a BrinDesc used to create or scan a BRIN index
    1016             :  */
    1017             : BrinDesc *
    1018        1372 : brin_build_desc(Relation rel)
    1019             : {
    1020             :     BrinOpcInfo **opcinfo;
    1021             :     BrinDesc   *bdesc;
    1022             :     TupleDesc   tupdesc;
    1023        1372 :     int         totalstored = 0;
    1024             :     int         keyno;
    1025             :     long        totalsize;
    1026             :     MemoryContext cxt;
    1027             :     MemoryContext oldcxt;
    1028             : 
    1029        1372 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
    1030             :                                 "brin desc cxt",
    1031             :                                 ALLOCSET_SMALL_SIZES);
    1032        1372 :     oldcxt = MemoryContextSwitchTo(cxt);
    1033        1372 :     tupdesc = RelationGetDescr(rel);
    1034             : 
    1035             :     /*
    1036             :      * Obtain BrinOpcInfo for each indexed column.  While at it, accumulate
    1037             :      * the number of columns stored, since the number is opclass-defined.
    1038             :      */
    1039        1372 :     opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
    1040       32150 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
    1041             :     {
    1042             :         FmgrInfo   *opcInfoFn;
    1043       30778 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
    1044             : 
    1045       30778 :         opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
    1046             : 
    1047       30778 :         opcinfo[keyno] = (BrinOpcInfo *)
    1048       30778 :             DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
    1049       30778 :         totalstored += opcinfo[keyno]->oi_nstored;
    1050             :     }
    1051             : 
    1052             :     /* Allocate our result struct and fill it in */
    1053        1372 :     totalsize = offsetof(BrinDesc, bd_info) +
    1054        1372 :         sizeof(BrinOpcInfo *) * tupdesc->natts;
    1055             : 
    1056        1372 :     bdesc = palloc(totalsize);
    1057        1372 :     bdesc->bd_context = cxt;
    1058        1372 :     bdesc->bd_index = rel;
    1059        1372 :     bdesc->bd_tupdesc = tupdesc;
    1060        1372 :     bdesc->bd_disktdesc = NULL; /* generated lazily */
    1061        1372 :     bdesc->bd_totalstored = totalstored;
    1062             : 
    1063       32150 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
    1064       30778 :         bdesc->bd_info[keyno] = opcinfo[keyno];
    1065        1372 :     pfree(opcinfo);
    1066             : 
    1067        1372 :     MemoryContextSwitchTo(oldcxt);
    1068             : 
    1069        1372 :     return bdesc;
    1070             : }
    1071             : 
    1072             : void
    1073        1078 : brin_free_desc(BrinDesc *bdesc)
    1074             : {
    1075             :     /* make sure the tupdesc is still valid */
    1076             :     Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
    1077             :     /* no need for retail pfree */
    1078        1078 :     MemoryContextDelete(bdesc->bd_context);
    1079        1078 : }
    1080             : 
    1081             : /*
    1082             :  * Fetch index's statistical data into *stats
    1083             :  */
    1084             : void
    1085        3988 : brinGetStats(Relation index, BrinStatsData *stats)
    1086             : {
    1087             :     Buffer      metabuffer;
    1088             :     Page        metapage;
    1089             :     BrinMetaPageData *metadata;
    1090             : 
    1091        3988 :     metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
    1092        3988 :     LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
    1093        3988 :     metapage = BufferGetPage(metabuffer);
    1094        3988 :     metadata = (BrinMetaPageData *) PageGetContents(metapage);
    1095             : 
    1096        3988 :     stats->pagesPerRange = metadata->pagesPerRange;
    1097        3988 :     stats->revmapNumPages = metadata->lastRevmapPage - 1;
    1098             : 
    1099        3988 :     UnlockReleaseBuffer(metabuffer);
    1100        3988 : }
    1101             : 
    1102             : /*
    1103             :  * Initialize a BrinBuildState appropriate to create tuples on the given index.
    1104             :  */
    1105             : static BrinBuildState *
    1106          64 : initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
    1107             :                            BlockNumber pagesPerRange)
    1108             : {
    1109             :     BrinBuildState *state;
    1110             : 
    1111          64 :     state = palloc(sizeof(BrinBuildState));
    1112             : 
    1113          64 :     state->bs_irel = idxRel;
    1114          64 :     state->bs_numtuples = 0;
    1115          64 :     state->bs_currentInsertBuf = InvalidBuffer;
    1116          64 :     state->bs_pagesPerRange = pagesPerRange;
    1117          64 :     state->bs_currRangeStart = 0;
    1118          64 :     state->bs_rmAccess = revmap;
    1119          64 :     state->bs_bdesc = brin_build_desc(idxRel);
    1120          64 :     state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
    1121             : 
    1122          64 :     brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
    1123             : 
    1124          64 :     return state;
    1125             : }
    1126             : 
    1127             : /*
    1128             :  * Release resources associated with a BrinBuildState.
    1129             :  */
    1130             : static void
    1131          64 : terminate_brin_buildstate(BrinBuildState *state)
    1132             : {
    1133             :     /*
    1134             :      * Release the last index buffer used.  We might as well ensure that
    1135             :      * whatever free space remains in that page is available in FSM, too.
    1136             :      */
    1137          64 :     if (!BufferIsInvalid(state->bs_currentInsertBuf))
    1138             :     {
    1139             :         Page        page;
    1140             :         Size        freespace;
    1141             :         BlockNumber blk;
    1142             : 
    1143          42 :         page = BufferGetPage(state->bs_currentInsertBuf);
    1144          42 :         freespace = PageGetFreeSpace(page);
    1145          42 :         blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
    1146          42 :         ReleaseBuffer(state->bs_currentInsertBuf);
    1147          42 :         RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
    1148          42 :         FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
    1149             :     }
    1150             : 
    1151          64 :     brin_free_desc(state->bs_bdesc);
    1152          64 :     pfree(state->bs_dtuple);
    1153          64 :     pfree(state);
    1154          64 : }
    1155             : 
    1156             : /*
    1157             :  * On the given BRIN index, summarize the heap page range that corresponds
    1158             :  * to the heap block number given.
    1159             :  *
    1160             :  * This routine can run in parallel with insertions into the heap.  To avoid
    1161             :  * missing those values from the summary tuple, we first insert a placeholder
    1162             :  * index tuple into the index, then execute the heap scan; transactions
    1163             :  * concurrent with the scan update the placeholder tuple.  After the scan, we
    1164             :  * union the placeholder tuple with the one computed by this routine.  The
    1165             :  * update of the index value happens in a loop, so that if somebody updates
    1166             :  * the placeholder tuple after we read it, we detect the case and try again.
    1167             :  * This ensures that the concurrently inserted tuples are not lost.
    1168             :  *
    1169             :  * A further corner case is this routine being asked to summarize the partial
    1170             :  * range at the end of the table.  heapNumBlocks is the (possibly outdated)
    1171             :  * table size; if we notice that the requested range lies beyond that size,
    1172             :  * we re-compute the table size after inserting the placeholder tuple, to
    1173             :  * avoid missing pages that were appended recently.
    1174             :  */
    1175             : static void
    1176          42 : summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
    1177             :                 BlockNumber heapBlk, BlockNumber heapNumBlks)
    1178             : {
    1179             :     Buffer      phbuf;
    1180             :     BrinTuple  *phtup;
    1181             :     Size        phsz;
    1182             :     OffsetNumber offset;
    1183             :     BlockNumber scanNumBlks;
    1184             : 
    1185             :     /*
    1186             :      * Insert the placeholder tuple
    1187             :      */
    1188          42 :     phbuf = InvalidBuffer;
    1189          42 :     phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
    1190          42 :     offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
    1191             :                            state->bs_rmAccess, &phbuf,
    1192             :                            heapBlk, phtup, phsz);
    1193             : 
    1194             :     /*
    1195             :      * Compute range end.  We hold ShareUpdateExclusive lock on table, so it
    1196             :      * cannot shrink concurrently (but it can grow).
    1197             :      */
    1198             :     Assert(heapBlk % state->bs_pagesPerRange == 0);
    1199          42 :     if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
    1200             :     {
    1201             :         /*
    1202             :          * If we're asked to scan what we believe to be the final range on the
    1203             :          * table (i.e. a range that might be partial) we need to recompute our
    1204             :          * idea of what the latest page is after inserting the placeholder
    1205             :          * tuple.  Anyone that grows the table later will update the
    1206             :          * placeholder tuple, so it doesn't matter that we won't scan these
    1207             :          * pages ourselves.  Careful: the table might have been extended
    1208             :          * beyond the current range, so clamp our result.
    1209             :          *
    1210             :          * Fortunately, this should occur infrequently.
    1211             :          */
    1212           4 :         scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
    1213             :                           state->bs_pagesPerRange);
    1214             :     }
    1215             :     else
    1216             :     {
    1217             :         /* Easy case: range is known to be complete */
    1218          38 :         scanNumBlks = state->bs_pagesPerRange;
    1219             :     }
    1220             : 
    1221             :     /*
    1222             :      * Execute the partial heap scan covering the heap blocks in the specified
    1223             :      * page range, summarizing the heap tuples in it.  This scan stops just
    1224             :      * short of brinbuildCallback creating the new index entry.
    1225             :      *
    1226             :      * Note that it is critical we use the "any visible" mode of
    1227             :      * table_index_build_range_scan here: otherwise, we would miss tuples
    1228             :      * inserted by transactions that are still in progress, among other corner
    1229             :      * cases.
    1230             :      */
    1231          42 :     state->bs_currRangeStart = heapBlk;
    1232          42 :     table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
    1233             :                                  heapBlk, scanNumBlks,
    1234             :                                  brinbuildCallback, (void *) state, NULL);
    1235             : 
    1236             :     /*
    1237             :      * Now we update the values obtained by the scan with the placeholder
    1238             :      * tuple.  We do this in a loop which only terminates if we're able to
    1239             :      * update the placeholder tuple successfully; if we are not, this means
    1240             :      * somebody else modified the placeholder tuple after we read it.
    1241             :      */
    1242             :     for (;;)
    1243           0 :     {
    1244             :         BrinTuple  *newtup;
    1245             :         Size        newsize;
    1246             :         bool        didupdate;
    1247             :         bool        samepage;
    1248             : 
    1249          42 :         CHECK_FOR_INTERRUPTS();
    1250             : 
    1251             :         /*
    1252             :          * Update the summary tuple and try to update.
    1253             :          */
    1254          42 :         newtup = brin_form_tuple(state->bs_bdesc,
    1255             :                                  heapBlk, state->bs_dtuple, &newsize);
    1256          42 :         samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
    1257             :         didupdate =
    1258          42 :             brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
    1259             :                           state->bs_rmAccess, heapBlk, phbuf, offset,
    1260             :                           phtup, phsz, newtup, newsize, samepage);
    1261          42 :         brin_free_tuple(phtup);
    1262          42 :         brin_free_tuple(newtup);
    1263             : 
    1264             :         /* If the update succeeded, we're done. */
    1265          42 :         if (didupdate)
    1266          42 :             break;
    1267             : 
    1268             :         /*
    1269             :          * If the update didn't work, it might be because somebody updated the
    1270             :          * placeholder tuple concurrently.  Extract the new version, union it
    1271             :          * with the values we have from the scan, and start over.  (There are
    1272             :          * other reasons for the update to fail, but it's simple to treat them
    1273             :          * the same.)
    1274             :          */
    1275           0 :         phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
    1276             :                                          &offset, &phsz, BUFFER_LOCK_SHARE,
    1277             :                                          NULL);
    1278             :         /* the placeholder tuple must exist */
    1279           0 :         if (phtup == NULL)
    1280           0 :             elog(ERROR, "missing placeholder tuple");
    1281           0 :         phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
    1282           0 :         LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
    1283             : 
    1284             :         /* merge it into the tuple from the heap scan */
    1285           0 :         union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
    1286             :     }
    1287             : 
    1288          42 :     ReleaseBuffer(phbuf);
    1289          42 : }
    1290             : 
    1291             : /*
    1292             :  * Summarize page ranges that are not already summarized.  If pageRange is
    1293             :  * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
    1294             :  * page range containing the given heap page number is scanned.
    1295             :  * If include_partial is true, then the partial range at the end of the table
    1296             :  * is summarized, otherwise not.
    1297             :  *
    1298             :  * For each new index tuple inserted, *numSummarized (if not NULL) is
    1299             :  * incremented; for each existing tuple, *numExisting (if not NULL) is
    1300             :  * incremented.
    1301             :  */
    1302             : static void
    1303          72 : brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
    1304             :               bool include_partial, double *numSummarized, double *numExisting)
    1305             : {
    1306             :     BrinRevmap *revmap;
    1307          72 :     BrinBuildState *state = NULL;
    1308          72 :     IndexInfo  *indexInfo = NULL;
    1309             :     BlockNumber heapNumBlocks;
    1310             :     BlockNumber pagesPerRange;
    1311             :     Buffer      buf;
    1312             :     BlockNumber startBlk;
    1313             : 
    1314          72 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
    1315             : 
    1316             :     /* determine range of pages to process */
    1317          72 :     heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
    1318          72 :     if (pageRange == BRIN_ALL_BLOCKRANGES)
    1319          50 :         startBlk = 0;
    1320             :     else
    1321             :     {
    1322          22 :         startBlk = (pageRange / pagesPerRange) * pagesPerRange;
    1323          22 :         heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
    1324             :     }
    1325          72 :     if (startBlk > heapNumBlocks)
    1326             :     {
    1327             :         /* Nothing to do if start point is beyond end of table */
    1328           0 :         brinRevmapTerminate(revmap);
    1329           0 :         return;
    1330             :     }
    1331             : 
    1332             :     /*
    1333             :      * Scan the revmap to find unsummarized items.
    1334             :      */
    1335          72 :     buf = InvalidBuffer;
    1336        1932 :     for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
    1337             :     {
    1338             :         BrinTuple  *tup;
    1339             :         OffsetNumber off;
    1340             : 
    1341             :         /*
    1342             :          * Unless requested to summarize even a partial range, go away now if
    1343             :          * we think the next range is partial.  Caller would pass true when it
    1344             :          * is typically run once bulk data loading is done
    1345             :          * (brin_summarize_new_values), and false when it is typically the
    1346             :          * result of arbitrarily-scheduled maintenance command (vacuuming).
    1347             :          */
    1348        1888 :         if (!include_partial &&
    1349        1434 :             (startBlk + pagesPerRange > heapNumBlocks))
    1350          28 :             break;
    1351             : 
    1352        1860 :         CHECK_FOR_INTERRUPTS();
    1353             : 
    1354        1860 :         tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
    1355             :                                        BUFFER_LOCK_SHARE, NULL);
    1356        1860 :         if (tup == NULL)
    1357             :         {
    1358             :             /* no revmap entry for this heap range. Summarize it. */
    1359          42 :             if (state == NULL)
    1360             :             {
    1361             :                 /* first time through */
    1362             :                 Assert(!indexInfo);
    1363          22 :                 state = initialize_brin_buildstate(index, revmap,
    1364             :                                                    pagesPerRange);
    1365          22 :                 indexInfo = BuildIndexInfo(index);
    1366             :             }
    1367          42 :             summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
    1368             : 
    1369             :             /* and re-initialize state for the next range */
    1370          42 :             brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
    1371             : 
    1372          42 :             if (numSummarized)
    1373          42 :                 *numSummarized += 1.0;
    1374             :         }
    1375             :         else
    1376             :         {
    1377        1818 :             if (numExisting)
    1378        1380 :                 *numExisting += 1.0;
    1379        1818 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
    1380             :         }
    1381             :     }
    1382             : 
    1383          72 :     if (BufferIsValid(buf))
    1384          46 :         ReleaseBuffer(buf);
    1385             : 
    1386             :     /* free resources */
    1387          72 :     brinRevmapTerminate(revmap);
    1388          72 :     if (state)
    1389             :     {
    1390          22 :         terminate_brin_buildstate(state);
    1391          22 :         pfree(indexInfo);
    1392             :     }
    1393             : }
    1394             : 
    1395             : /*
    1396             :  * Given a deformed tuple in the build state, convert it into the on-disk
    1397             :  * format and insert it into the index, making the revmap point to it.
    1398             :  */
    1399             : static void
    1400         620 : form_and_insert_tuple(BrinBuildState *state)
    1401             : {
    1402             :     BrinTuple  *tup;
    1403             :     Size        size;
    1404             : 
    1405         620 :     tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
    1406             :                           state->bs_dtuple, &size);
    1407         620 :     brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
    1408             :                   &state->bs_currentInsertBuf, state->bs_currRangeStart,
    1409             :                   tup, size);
    1410         620 :     state->bs_numtuples++;
    1411             : 
    1412         620 :     pfree(tup);
    1413         620 : }
    1414             : 
    1415             : /*
    1416             :  * Given two deformed tuples, adjust the first one so that it's consistent
    1417             :  * with the summary values in both.
    1418             :  */
    1419             : static void
    1420           0 : union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
    1421             : {
    1422             :     int         keyno;
    1423             :     BrinMemTuple *db;
    1424             :     MemoryContext cxt;
    1425             :     MemoryContext oldcxt;
    1426             : 
    1427             :     /* Use our own memory context to avoid retail pfree */
    1428           0 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
    1429             :                                 "brin union",
    1430             :                                 ALLOCSET_DEFAULT_SIZES);
    1431           0 :     oldcxt = MemoryContextSwitchTo(cxt);
    1432           0 :     db = brin_deform_tuple(bdesc, b, NULL);
    1433           0 :     MemoryContextSwitchTo(oldcxt);
    1434             : 
    1435           0 :     for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
    1436             :     {
    1437             :         FmgrInfo   *unionFn;
    1438           0 :         BrinValues *col_a = &a->bt_columns[keyno];
    1439           0 :         BrinValues *col_b = &db->bt_columns[keyno];
    1440             : 
    1441           0 :         unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
    1442             :                                     BRIN_PROCNUM_UNION);
    1443           0 :         FunctionCall3Coll(unionFn,
    1444           0 :                           bdesc->bd_index->rd_indcollation[keyno],
    1445             :                           PointerGetDatum(bdesc),
    1446             :                           PointerGetDatum(col_a),
    1447             :                           PointerGetDatum(col_b));
    1448             :     }
    1449             : 
    1450           0 :     MemoryContextDelete(cxt);
    1451           0 : }
    1452             : 
    1453             : /*
    1454             :  * brin_vacuum_scan
    1455             :  *      Do a complete scan of the index during VACUUM.
    1456             :  *
    1457             :  * This routine scans the complete index looking for uncatalogued index pages,
    1458             :  * i.e. those that might have been lost due to a crash after index extension
    1459             :  * and such.
    1460             :  */
    1461             : static void
    1462          40 : brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
    1463             : {
    1464             :     BlockNumber nblocks;
    1465             :     BlockNumber blkno;
    1466             : 
    1467             :     /*
    1468             :      * Scan the index in physical order, and clean up any possible mess in
    1469             :      * each page.
    1470             :      */
    1471          40 :     nblocks = RelationGetNumberOfBlocks(idxrel);
    1472         256 :     for (blkno = 0; blkno < nblocks; blkno++)
    1473             :     {
    1474             :         Buffer      buf;
    1475             : 
    1476         216 :         CHECK_FOR_INTERRUPTS();
    1477             : 
    1478         216 :         buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
    1479             :                                  RBM_NORMAL, strategy);
    1480             : 
    1481         216 :         brin_page_cleanup(idxrel, buf);
    1482             : 
    1483         216 :         ReleaseBuffer(buf);
    1484             :     }
    1485             : 
    1486             :     /*
    1487             :      * Update all upper pages in the index's FSM, as well.  This ensures not
    1488             :      * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
    1489             :      * but also that any pre-existing damage or out-of-dateness is repaired.
    1490             :      */
    1491          40 :     FreeSpaceMapVacuum(idxrel);
    1492          40 : }

Generated by: LCOV version 1.13