LCOV - code coverage report
Current view: top level - src/backend/access/brin - brin.c (source / functions) Hit Total Coverage
Test: PostgreSQL 12beta2 Lines: 412 462 89.2 %
Date: 2019-06-18 07:06:57 Functions: 23 25 92.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * brin.c
       3             :  *      Implementation of BRIN indexes for Postgres
       4             :  *
       5             :  * See src/backend/access/brin/README for details.
       6             :  *
       7             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/access/brin/brin.c
      12             :  *
      13             :  * TODO
      14             :  *      * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
      15             :  */
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/brin.h"
      19             : #include "access/brin_page.h"
      20             : #include "access/brin_pageops.h"
      21             : #include "access/brin_xlog.h"
      22             : #include "access/relation.h"
      23             : #include "access/reloptions.h"
      24             : #include "access/relscan.h"
      25             : #include "access/table.h"
      26             : #include "access/tableam.h"
      27             : #include "access/xloginsert.h"
      28             : #include "catalog/index.h"
      29             : #include "catalog/pg_am.h"
      30             : #include "miscadmin.h"
      31             : #include "pgstat.h"
      32             : #include "postmaster/autovacuum.h"
      33             : #include "storage/bufmgr.h"
      34             : #include "storage/freespace.h"
      35             : #include "utils/builtins.h"
      36             : #include "utils/index_selfuncs.h"
      37             : #include "utils/memutils.h"
      38             : #include "utils/rel.h"
      39             : 
      40             : 
      41             : /*
      42             :  * We use a BrinBuildState during initial construction of a BRIN index.
      43             :  * The running state is kept in a BrinMemTuple.
      44             :  */
      45             : typedef struct BrinBuildState
      46             : {
      47             :     Relation    bs_irel;
      48             :     int         bs_numtuples;
      49             :     Buffer      bs_currentInsertBuf;
      50             :     BlockNumber bs_pagesPerRange;
      51             :     BlockNumber bs_currRangeStart;
      52             :     BrinRevmap *bs_rmAccess;
      53             :     BrinDesc   *bs_bdesc;
      54             :     BrinMemTuple *bs_dtuple;
      55             : } BrinBuildState;
      56             : 
      57             : /*
      58             :  * Struct used as "opaque" during index scans
      59             :  */
      60             : typedef struct BrinOpaque
      61             : {
      62             :     BlockNumber bo_pagesPerRange;
      63             :     BrinRevmap *bo_rmAccess;
      64             :     BrinDesc   *bo_bdesc;
      65             : } BrinOpaque;
      66             : 
      67             : #define BRIN_ALL_BLOCKRANGES    InvalidBlockNumber
      68             : 
      69             : static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
      70             :                                                   BrinRevmap *revmap, BlockNumber pagesPerRange);
      71             : static void terminate_brin_buildstate(BrinBuildState *state);
      72             : static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
      73             :                           bool include_partial, double *numSummarized, double *numExisting);
      74             : static void form_and_insert_tuple(BrinBuildState *state);
      75             : static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
      76             :                          BrinTuple *b);
      77             : static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
      78             : 
      79             : 
      80             : /*
      81             :  * BRIN handler function: return IndexAmRoutine with access method parameters
      82             :  * and callbacks.
      83             :  */
      84             : Datum
      85         436 : brinhandler(PG_FUNCTION_ARGS)
      86             : {
      87         436 :     IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
      88             : 
      89         436 :     amroutine->amstrategies = 0;
      90         436 :     amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
      91         436 :     amroutine->amcanorder = false;
      92         436 :     amroutine->amcanorderbyop = false;
      93         436 :     amroutine->amcanbackward = false;
      94         436 :     amroutine->amcanunique = false;
      95         436 :     amroutine->amcanmulticol = true;
      96         436 :     amroutine->amoptionalkey = true;
      97         436 :     amroutine->amsearcharray = false;
      98         436 :     amroutine->amsearchnulls = true;
      99         436 :     amroutine->amstorage = true;
     100         436 :     amroutine->amclusterable = false;
     101         436 :     amroutine->ampredlocks = false;
     102         436 :     amroutine->amcanparallel = false;
     103         436 :     amroutine->amcaninclude = false;
     104         436 :     amroutine->amkeytype = InvalidOid;
     105             : 
     106         436 :     amroutine->ambuild = brinbuild;
     107         436 :     amroutine->ambuildempty = brinbuildempty;
     108         436 :     amroutine->aminsert = brininsert;
     109         436 :     amroutine->ambulkdelete = brinbulkdelete;
     110         436 :     amroutine->amvacuumcleanup = brinvacuumcleanup;
     111         436 :     amroutine->amcanreturn = NULL;
     112         436 :     amroutine->amcostestimate = brincostestimate;
     113         436 :     amroutine->amoptions = brinoptions;
     114         436 :     amroutine->amproperty = NULL;
     115         436 :     amroutine->ambuildphasename = NULL;
     116         436 :     amroutine->amvalidate = brinvalidate;
     117         436 :     amroutine->ambeginscan = brinbeginscan;
     118         436 :     amroutine->amrescan = brinrescan;
     119         436 :     amroutine->amgettuple = NULL;
     120         436 :     amroutine->amgetbitmap = bringetbitmap;
     121         436 :     amroutine->amendscan = brinendscan;
     122         436 :     amroutine->ammarkpos = NULL;
     123         436 :     amroutine->amrestrpos = NULL;
     124         436 :     amroutine->amestimateparallelscan = NULL;
     125         436 :     amroutine->aminitparallelscan = NULL;
     126         436 :     amroutine->amparallelrescan = NULL;
     127             : 
     128         436 :     PG_RETURN_POINTER(amroutine);
     129             : }
     130             : 
     131             : /*
     132             :  * A tuple in the heap is being inserted.  To keep a brin index up to date,
     133             :  * we need to obtain the relevant index tuple and compare its stored values
     134             :  * with those of the new tuple.  If the tuple values are not consistent with
     135             :  * the summary tuple, we need to update the index tuple.
     136             :  *
     137             :  * If autosummarization is enabled, check if we need to summarize the previous
     138             :  * page range.
     139             :  *
     140             :  * If the range is not currently summarized (i.e. the revmap returns NULL for
     141             :  * it), there's nothing to do for this tuple.
     142             :  */
     143             : bool
     144        1336 : brininsert(Relation idxRel, Datum *values, bool *nulls,
     145             :            ItemPointer heaptid, Relation heapRel,
     146             :            IndexUniqueCheck checkUnique,
     147             :            IndexInfo *indexInfo)
     148             : {
     149             :     BlockNumber pagesPerRange;
     150             :     BlockNumber origHeapBlk;
     151             :     BlockNumber heapBlk;
     152        1336 :     BrinDesc   *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
     153             :     BrinRevmap *revmap;
     154        1336 :     Buffer      buf = InvalidBuffer;
     155        1336 :     MemoryContext tupcxt = NULL;
     156        1336 :     MemoryContext oldcxt = CurrentMemoryContext;
     157        1336 :     bool        autosummarize = BrinGetAutoSummarize(idxRel);
     158             : 
     159        1336 :     revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
     160             : 
     161             :     /*
     162             :      * origHeapBlk is the block number where the insertion occurred.  heapBlk
     163             :      * is the first block in the corresponding page range.
     164             :      */
     165        1336 :     origHeapBlk = ItemPointerGetBlockNumber(heaptid);
     166        1336 :     heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
     167             : 
     168             :     for (;;)
     169           0 :     {
     170        1336 :         bool        need_insert = false;
     171             :         OffsetNumber off;
     172             :         BrinTuple  *brtup;
     173             :         BrinMemTuple *dtup;
     174             :         int         keyno;
     175             : 
     176        1336 :         CHECK_FOR_INTERRUPTS();
     177             : 
     178             :         /*
     179             :          * If auto-summarization is enabled and we just inserted the first
     180             :          * tuple into the first block of a new non-first page range, request a
     181             :          * summarization run of the previous range.
     182             :          */
     183        1336 :         if (autosummarize &&
     184         156 :             heapBlk > 0 &&
     185         156 :             heapBlk == origHeapBlk &&
     186         156 :             ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
     187             :         {
     188           8 :             BlockNumber lastPageRange = heapBlk - 1;
     189             :             BrinTuple  *lastPageTuple;
     190             : 
     191           8 :             lastPageTuple =
     192             :                 brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
     193             :                                          NULL, BUFFER_LOCK_SHARE, NULL);
     194           8 :             if (!lastPageTuple)
     195             :             {
     196             :                 bool        recorded;
     197             : 
     198           6 :                 recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
     199             :                                                  RelationGetRelid(idxRel),
     200             :                                                  lastPageRange);
     201           6 :                 if (!recorded)
     202           0 :                     ereport(LOG,
     203             :                             (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     204             :                              errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
     205             :                                     RelationGetRelationName(idxRel),
     206             :                                     lastPageRange)));
     207             :             }
     208             :             else
     209           2 :                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     210             :         }
     211             : 
     212        1336 :         brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
     213             :                                          NULL, BUFFER_LOCK_SHARE, NULL);
     214             : 
     215             :         /* if range is unsummarized, there's nothing to do */
     216        1336 :         if (!brtup)
     217        1524 :             break;
     218             : 
     219             :         /* First time through in this statement? */
     220        1148 :         if (bdesc == NULL)
     221             :         {
     222         274 :             MemoryContextSwitchTo(indexInfo->ii_Context);
     223         274 :             bdesc = brin_build_desc(idxRel);
     224         274 :             indexInfo->ii_AmCache = (void *) bdesc;
     225         274 :             MemoryContextSwitchTo(oldcxt);
     226             :         }
     227             :         /* First time through in this brininsert call? */
     228        1148 :         if (tupcxt == NULL)
     229             :         {
     230        1148 :             tupcxt = AllocSetContextCreate(CurrentMemoryContext,
     231             :                                            "brininsert cxt",
     232             :                                            ALLOCSET_DEFAULT_SIZES);
     233        1148 :             MemoryContextSwitchTo(tupcxt);
     234             :         }
     235             : 
     236        1148 :         dtup = brin_deform_tuple(bdesc, brtup, NULL);
     237             : 
     238             :         /*
     239             :          * Compare the key values of the new tuple to the stored index values;
     240             :          * our deformed tuple will get updated if the new tuple doesn't fit
     241             :          * the original range (note this means we can't break out of the loop
     242             :          * early). Make a note of whether this happens, so that we know to
     243             :          * insert the modified tuple later.
     244             :          */
     245       26656 :         for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
     246             :         {
     247             :             Datum       result;
     248             :             BrinValues *bval;
     249             :             FmgrInfo   *addValue;
     250             : 
     251       25508 :             bval = &dtup->bt_columns[keyno];
     252       25508 :             addValue = index_getprocinfo(idxRel, keyno + 1,
     253             :                                          BRIN_PROCNUM_ADDVALUE);
     254       76524 :             result = FunctionCall4Coll(addValue,
     255       25508 :                                        idxRel->rd_indcollation[keyno],
     256             :                                        PointerGetDatum(bdesc),
     257             :                                        PointerGetDatum(bval),
     258       25508 :                                        values[keyno],
     259       25508 :                                        nulls[keyno]);
     260             :             /* if that returned true, we need to insert the updated tuple */
     261       25508 :             need_insert |= DatumGetBool(result);
     262             :         }
     263             : 
     264        1148 :         if (!need_insert)
     265             :         {
     266             :             /*
     267             :              * The tuple is consistent with the new values, so there's nothing
     268             :              * to do.
     269             :              */
     270         256 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     271             :         }
     272             :         else
     273             :         {
     274         892 :             Page        page = BufferGetPage(buf);
     275         892 :             ItemId      lp = PageGetItemId(page, off);
     276             :             Size        origsz;
     277             :             BrinTuple  *origtup;
     278             :             Size        newsz;
     279             :             BrinTuple  *newtup;
     280             :             bool        samepage;
     281             : 
     282             :             /*
     283             :              * Make a copy of the old tuple, so that we can compare it after
     284             :              * re-acquiring the lock.
     285             :              */
     286         892 :             origsz = ItemIdGetLength(lp);
     287         892 :             origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
     288             : 
     289             :             /*
     290             :              * Before releasing the lock, check if we can attempt a same-page
     291             :              * update.  Another process could insert a tuple concurrently in
     292             :              * the same page though, so downstream we must be prepared to cope
     293             :              * if this turns out to not be possible after all.
     294             :              */
     295         892 :             newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
     296         892 :             samepage = brin_can_do_samepage_update(buf, origsz, newsz);
     297         892 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     298             : 
     299             :             /*
     300             :              * Try to update the tuple.  If this doesn't work for whatever
     301             :              * reason, we need to restart from the top; the revmap might be
     302             :              * pointing at a different tuple for this block now, so we need to
     303             :              * recompute to ensure both our new heap tuple and the other
     304             :              * inserter's are covered by the combined tuple.  It might be that
     305             :              * we don't need to update at all.
     306             :              */
     307         892 :             if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
     308             :                                buf, off, origtup, origsz, newtup, newsz,
     309             :                                samepage))
     310             :             {
     311             :                 /* no luck; start over */
     312           0 :                 MemoryContextResetAndDeleteChildren(tupcxt);
     313           0 :                 continue;
     314             :             }
     315             :         }
     316             : 
     317             :         /* success! */
     318        1148 :         break;
     319             :     }
     320             : 
     321        1336 :     brinRevmapTerminate(revmap);
     322        1336 :     if (BufferIsValid(buf))
     323        1150 :         ReleaseBuffer(buf);
     324        1336 :     MemoryContextSwitchTo(oldcxt);
     325        1336 :     if (tupcxt != NULL)
     326        1148 :         MemoryContextDelete(tupcxt);
     327             : 
     328        1336 :     return false;
     329             : }
     330             : 
     331             : /*
     332             :  * Initialize state for a BRIN index scan.
     333             :  *
     334             :  * We read the metapage here to determine the pages-per-range number that this
     335             :  * index was built with.  Note that since this cannot be changed while we're
     336             :  * holding lock on index, it's not necessary to recompute it during brinrescan.
     337             :  */
     338             : IndexScanDesc
     339         992 : brinbeginscan(Relation r, int nkeys, int norderbys)
     340             : {
     341             :     IndexScanDesc scan;
     342             :     BrinOpaque *opaque;
     343             : 
     344         992 :     scan = RelationGetIndexScan(r, nkeys, norderbys);
     345             : 
     346         992 :     opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
     347         992 :     opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
     348             :                                                scan->xs_snapshot);
     349         992 :     opaque->bo_bdesc = brin_build_desc(r);
     350         992 :     scan->opaque = opaque;
     351             : 
     352         992 :     return scan;
     353             : }
     354             : 
     355             : /*
     356             :  * Execute the index scan.
     357             :  *
     358             :  * This works by reading index TIDs from the revmap, and obtaining the index
     359             :  * tuples pointed to by them; the summary values in the index tuples are
     360             :  * compared to the scan keys.  We return into the TID bitmap all the pages in
     361             :  * ranges corresponding to index tuples that match the scan keys.
     362             :  *
     363             :  * If a TID from the revmap is read as InvalidTID, we know that range is
     364             :  * unsummarized.  Pages in those ranges need to be returned regardless of scan
     365             :  * keys.
     366             :  */
     367             : int64
     368         992 : bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
     369             : {
     370         992 :     Relation    idxRel = scan->indexRelation;
     371         992 :     Buffer      buf = InvalidBuffer;
     372             :     BrinDesc   *bdesc;
     373             :     Oid         heapOid;
     374             :     Relation    heapRel;
     375             :     BrinOpaque *opaque;
     376             :     BlockNumber nblocks;
     377             :     BlockNumber heapBlk;
     378         992 :     int         totalpages = 0;
     379             :     FmgrInfo   *consistentFn;
     380             :     MemoryContext oldcxt;
     381             :     MemoryContext perRangeCxt;
     382             :     BrinMemTuple *dtup;
     383         992 :     BrinTuple  *btup = NULL;
     384         992 :     Size        btupsz = 0;
     385             : 
     386         992 :     opaque = (BrinOpaque *) scan->opaque;
     387         992 :     bdesc = opaque->bo_bdesc;
     388         992 :     pgstat_count_index_scan(idxRel);
     389             : 
     390             :     /*
     391             :      * We need to know the size of the table so that we know how long to
     392             :      * iterate on the revmap.
     393             :      */
     394         992 :     heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
     395         992 :     heapRel = table_open(heapOid, AccessShareLock);
     396         992 :     nblocks = RelationGetNumberOfBlocks(heapRel);
     397         992 :     table_close(heapRel, AccessShareLock);
     398             : 
     399             :     /*
     400             :      * Make room for the consistent support procedures of indexed columns.  We
     401             :      * don't look them up here; we do that lazily the first time we see a scan
     402             :      * key reference each of them.  We rely on zeroing fn_oid to InvalidOid.
     403             :      */
     404         992 :     consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
     405             : 
     406             :     /* allocate an initial in-memory tuple, out of the per-range memcxt */
     407         992 :     dtup = brin_new_memtuple(bdesc);
     408             : 
     409             :     /*
     410             :      * Setup and use a per-range memory context, which is reset every time we
     411             :      * loop below.  This avoids having to free the tuples within the loop.
     412             :      */
     413         992 :     perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
     414             :                                         "bringetbitmap cxt",
     415             :                                         ALLOCSET_DEFAULT_SIZES);
     416         992 :     oldcxt = MemoryContextSwitchTo(perRangeCxt);
     417             : 
     418             :     /*
     419             :      * Now scan the revmap.  We start by querying for heap page 0,
     420             :      * incrementing by the number of pages per range; this gives us a full
     421             :      * view of the table.
     422             :      */
     423      100192 :     for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
     424             :     {
     425             :         bool        addrange;
     426       99200 :         bool        gottuple = false;
     427             :         BrinTuple  *tup;
     428             :         OffsetNumber off;
     429             :         Size        size;
     430             : 
     431       99200 :         CHECK_FOR_INTERRUPTS();
     432             : 
     433       99200 :         MemoryContextResetAndDeleteChildren(perRangeCxt);
     434             : 
     435       99200 :         tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
     436             :                                        &off, &size, BUFFER_LOCK_SHARE,
     437             :                                        scan->xs_snapshot);
     438       99200 :         if (tup)
     439             :         {
     440       99200 :             gottuple = true;
     441       99200 :             btup = brin_copy_tuple(tup, size, btup, &btupsz);
     442       99200 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     443             :         }
     444             : 
     445             :         /*
     446             :          * For page ranges with no indexed tuple, we must return the whole
     447             :          * range; otherwise, compare it to the scan keys.
     448             :          */
     449       99200 :         if (!gottuple)
     450             :         {
     451           0 :             addrange = true;
     452             :         }
     453             :         else
     454             :         {
     455       99200 :             dtup = brin_deform_tuple(bdesc, btup, dtup);
     456       99200 :             if (dtup->bt_placeholder)
     457             :             {
     458             :                 /*
     459             :                  * Placeholder tuples are always returned, regardless of the
     460             :                  * values stored in them.
     461             :                  */
     462           0 :                 addrange = true;
     463             :             }
     464             :             else
     465             :             {
     466             :                 int         keyno;
     467             : 
     468             :                 /*
     469             :                  * Compare scan keys with summary values stored for the range.
     470             :                  * If scan keys are matched, the page range must be added to
     471             :                  * the bitmap.  We initially assume the range needs to be
     472             :                  * added; in particular this serves the case where there are
     473             :                  * no keys.
     474             :                  */
     475       99200 :                 addrange = true;
     476      173108 :                 for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
     477             :                 {
     478       99200 :                     ScanKey     key = &scan->keyData[keyno];
     479       99200 :                     AttrNumber  keyattno = key->sk_attno;
     480       99200 :                     BrinValues *bval = &dtup->bt_columns[keyattno - 1];
     481             :                     Datum       add;
     482             : 
     483             :                     /*
     484             :                      * The collation of the scan key must match the collation
     485             :                      * used in the index column (but only if the search is not
     486             :                      * IS NULL/ IS NOT NULL).  Otherwise we shouldn't be using
     487             :                      * this index ...
     488             :                      */
     489             :                     Assert((key->sk_flags & SK_ISNULL) ||
     490             :                            (key->sk_collation ==
     491             :                             TupleDescAttr(bdesc->bd_tupdesc,
     492             :                                           keyattno - 1)->attcollation));
     493             : 
     494             :                     /* First time this column? look up consistent function */
     495       99200 :                     if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
     496             :                     {
     497             :                         FmgrInfo   *tmp;
     498             : 
     499         992 :                         tmp = index_getprocinfo(idxRel, keyattno,
     500             :                                                 BRIN_PROCNUM_CONSISTENT);
     501         992 :                         fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
     502             :                                        CurrentMemoryContext);
     503             :                     }
     504             : 
     505             :                     /*
     506             :                      * Check whether the scan key is consistent with the page
     507             :                      * range values; if so, have the pages in the range added
     508             :                      * to the output bitmap.
     509             :                      *
     510             :                      * When there are multiple scan keys, failure to meet the
     511             :                      * criteria for a single one of them is enough to discard
     512             :                      * the range as a whole, so break out of the loop as soon
     513             :                      * as a false return value is obtained.
     514             :                      */
     515       99200 :                     add = FunctionCall3Coll(&consistentFn[keyattno - 1],
     516             :                                             key->sk_collation,
     517             :                                             PointerGetDatum(bdesc),
     518             :                                             PointerGetDatum(bval),
     519             :                                             PointerGetDatum(key));
     520       99200 :                     addrange = DatumGetBool(add);
     521       99200 :                     if (!addrange)
     522       25292 :                         break;
     523             :                 }
     524             :             }
     525             :         }
     526             : 
     527             :         /* add the pages in the range to the output bitmap, if needed */
     528       99200 :         if (addrange)
     529             :         {
     530             :             BlockNumber pageno;
     531             : 
     532      221724 :             for (pageno = heapBlk;
     533      147816 :                  pageno <= heapBlk + opaque->bo_pagesPerRange - 1;
     534       73908 :                  pageno++)
     535             :             {
     536       73908 :                 MemoryContextSwitchTo(oldcxt);
     537       73908 :                 tbm_add_page(tbm, pageno);
     538       73908 :                 totalpages++;
     539       73908 :                 MemoryContextSwitchTo(perRangeCxt);
     540             :             }
     541             :         }
     542             :     }
     543             : 
     544         992 :     MemoryContextSwitchTo(oldcxt);
     545         992 :     MemoryContextDelete(perRangeCxt);
     546             : 
     547         992 :     if (buf != InvalidBuffer)
     548         992 :         ReleaseBuffer(buf);
     549             : 
     550             :     /*
     551             :      * XXX We have an approximation of the number of *pages* that our scan
     552             :      * returns, but we don't have a precise idea of the number of heap tuples
     553             :      * involved.
     554             :      */
     555         992 :     return totalpages * 10;
     556             : }
     557             : 
     558             : /*
     559             :  * Re-initialize state for a BRIN index scan
     560             :  */
     561             : void
     562         992 : brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
     563             :            ScanKey orderbys, int norderbys)
     564             : {
     565             :     /*
     566             :      * Other index AMs preprocess the scan keys at this point, or sometime
     567             :      * early during the scan; this lets them optimize by removing redundant
     568             :      * keys, or doing early returns when they are impossible to satisfy; see
     569             :      * _bt_preprocess_keys for an example.  Something like that could be added
     570             :      * here someday, too.
     571             :      */
     572             : 
     573         992 :     if (scankey && scan->numberOfKeys > 0)
     574         992 :         memmove(scan->keyData, scankey,
     575         992 :                 scan->numberOfKeys * sizeof(ScanKeyData));
     576         992 : }
     577             : 
     578             : /*
     579             :  * Close down a BRIN index scan
     580             :  */
     581             : void
     582         992 : brinendscan(IndexScanDesc scan)
     583             : {
     584         992 :     BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
     585             : 
     586         992 :     brinRevmapTerminate(opaque->bo_rmAccess);
     587         992 :     brin_free_desc(opaque->bo_bdesc);
     588         992 :     pfree(opaque);
     589         992 : }
     590             : 
     591             : /*
     592             :  * Per-heap-tuple callback for table_index_build_scan.
     593             :  *
     594             :  * Note we don't worry about the page range at the end of the table here; it is
     595             :  * present in the build state struct after we're called the last time, but not
     596             :  * inserted into the index.  Caller must ensure to do so, if appropriate.
     597             :  */
     598             : static void
     599      280714 : brinbuildCallback(Relation index,
     600             :                   HeapTuple htup,
     601             :                   Datum *values,
     602             :                   bool *isnull,
     603             :                   bool tupleIsAlive,
     604             :                   void *brstate)
     605             : {
     606      280714 :     BrinBuildState *state = (BrinBuildState *) brstate;
     607             :     BlockNumber thisblock;
     608             :     int         i;
     609             : 
     610      280714 :     thisblock = ItemPointerGetBlockNumber(&htup->t_self);
     611             : 
     612             :     /*
     613             :      * If we're in a block that belongs to a future range, summarize what
     614             :      * we've got and start afresh.  Note the scan might have skipped many
     615             :      * pages, if they were devoid of live tuples; make sure to insert index
     616             :      * tuples for those too.
     617             :      */
     618      562006 :     while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
     619             :     {
     620             : 
     621             :         BRIN_elog((DEBUG2,
     622             :                    "brinbuildCallback: completed a range: %u--%u",
     623             :                    state->bs_currRangeStart,
     624             :                    state->bs_currRangeStart + state->bs_pagesPerRange));
     625             : 
     626             :         /* create the index tuple and insert it */
     627         578 :         form_and_insert_tuple(state);
     628             : 
     629             :         /* set state to correspond to the next range */
     630         578 :         state->bs_currRangeStart += state->bs_pagesPerRange;
     631             : 
     632             :         /* re-initialize state for it */
     633         578 :         brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
     634             :     }
     635             : 
     636             :     /* Accumulate the current tuple into the running state */
     637      577900 :     for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
     638             :     {
     639             :         FmgrInfo   *addValue;
     640             :         BrinValues *col;
     641      297186 :         Form_pg_attribute attr = TupleDescAttr(state->bs_bdesc->bd_tupdesc, i);
     642             : 
     643      297186 :         col = &state->bs_dtuple->bt_columns[i];
     644      297186 :         addValue = index_getprocinfo(index, i + 1,
     645             :                                      BRIN_PROCNUM_ADDVALUE);
     646             : 
     647             :         /*
     648             :          * Update dtuple state, if and as necessary.
     649             :          */
     650      891558 :         FunctionCall4Coll(addValue,
     651             :                           attr->attcollation,
     652      297186 :                           PointerGetDatum(state->bs_bdesc),
     653             :                           PointerGetDatum(col),
     654      594372 :                           values[i], isnull[i]);
     655             :     }
     656      280714 : }
     657             : 
     658             : /*
     659             :  * brinbuild() -- build a new BRIN index.
     660             :  */
     661             : IndexBuildResult *
     662          34 : brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
     663             : {
     664             :     IndexBuildResult *result;
     665             :     double      reltuples;
     666             :     double      idxtuples;
     667             :     BrinRevmap *revmap;
     668             :     BrinBuildState *state;
     669             :     Buffer      meta;
     670             :     BlockNumber pagesPerRange;
     671             : 
     672             :     /*
     673             :      * We expect to be called exactly once for any index relation.
     674             :      */
     675          34 :     if (RelationGetNumberOfBlocks(index) != 0)
     676           0 :         elog(ERROR, "index \"%s\" already contains data",
     677             :              RelationGetRelationName(index));
     678             : 
     679             :     /*
     680             :      * Critical section not required, because on error the creation of the
     681             :      * whole relation will be rolled back.
     682             :      */
     683             : 
     684          34 :     meta = ReadBuffer(index, P_NEW);
     685             :     Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
     686          34 :     LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
     687             : 
     688          34 :     brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
     689             :                        BRIN_CURRENT_VERSION);
     690          34 :     MarkBufferDirty(meta);
     691             : 
     692          34 :     if (RelationNeedsWAL(index))
     693             :     {
     694             :         xl_brin_createidx xlrec;
     695             :         XLogRecPtr  recptr;
     696             :         Page        page;
     697             : 
     698          34 :         xlrec.version = BRIN_CURRENT_VERSION;
     699          34 :         xlrec.pagesPerRange = BrinGetPagesPerRange(index);
     700             : 
     701          34 :         XLogBeginInsert();
     702          34 :         XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
     703          34 :         XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
     704             : 
     705          34 :         recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
     706             : 
     707          34 :         page = BufferGetPage(meta);
     708          34 :         PageSetLSN(page, recptr);
     709             :     }
     710             : 
     711          34 :     UnlockReleaseBuffer(meta);
     712             : 
     713             :     /*
     714             :      * Initialize our state, including the deformed tuple state.
     715             :      */
     716          34 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
     717          34 :     state = initialize_brin_buildstate(index, revmap, pagesPerRange);
     718             : 
     719             :     /*
     720             :      * Now scan the relation.  No syncscan allowed here because we want the
     721             :      * heap blocks in physical order.
     722             :      */
     723          34 :     reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
     724             :                                        brinbuildCallback, (void *) state, NULL);
     725             : 
     726             :     /* process the final batch */
     727          34 :     form_and_insert_tuple(state);
     728             : 
     729             :     /* release resources */
     730          34 :     idxtuples = state->bs_numtuples;
     731          34 :     brinRevmapTerminate(state->bs_rmAccess);
     732          34 :     terminate_brin_buildstate(state);
     733             : 
     734             :     /*
     735             :      * Return statistics
     736             :      */
     737          34 :     result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
     738             : 
     739          34 :     result->heap_tuples = reltuples;
     740          34 :     result->index_tuples = idxtuples;
     741             : 
     742          34 :     return result;
     743             : }
     744             : 
     745             : void
     746           0 : brinbuildempty(Relation index)
     747             : {
     748             :     Buffer      metabuf;
     749             : 
     750             :     /* An empty BRIN index has a metapage only. */
     751           0 :     metabuf =
     752             :         ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
     753           0 :     LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
     754             : 
     755             :     /* Initialize and xlog metabuffer. */
     756           0 :     START_CRIT_SECTION();
     757           0 :     brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
     758             :                        BRIN_CURRENT_VERSION);
     759           0 :     MarkBufferDirty(metabuf);
     760           0 :     log_newpage_buffer(metabuf, true);
     761           0 :     END_CRIT_SECTION();
     762             : 
     763           0 :     UnlockReleaseBuffer(metabuf);
     764           0 : }
     765             : 
     766             : /*
     767             :  * brinbulkdelete
     768             :  *      Since there are no per-heap-tuple index tuples in BRIN indexes,
     769             :  *      there's not a lot we can do here.
     770             :  *
     771             :  * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
     772             :  * tuple is deleted), meaning the need to re-run summarization on the affected
     773             :  * range.  Would need to add an extra flag in brintuples for that.
     774             :  */
     775             : IndexBulkDeleteResult *
     776           2 : brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
     777             :                IndexBulkDeleteCallback callback, void *callback_state)
     778             : {
     779             :     /* allocate stats if first time through, else re-use existing struct */
     780           2 :     if (stats == NULL)
     781           2 :         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     782             : 
     783           2 :     return stats;
     784             : }
     785             : 
     786             : /*
     787             :  * This routine is in charge of "vacuuming" a BRIN index: we just summarize
     788             :  * ranges that are currently unsummarized.
     789             :  */
     790             : IndexBulkDeleteResult *
     791          54 : brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
     792             : {
     793             :     Relation    heapRel;
     794             : 
     795             :     /* No-op in ANALYZE ONLY mode */
     796          54 :     if (info->analyze_only)
     797          34 :         return stats;
     798             : 
     799          20 :     if (!stats)
     800          18 :         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     801          20 :     stats->num_pages = RelationGetNumberOfBlocks(info->index);
     802             :     /* rest of stats is initialized by zeroing */
     803             : 
     804          20 :     heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
     805             :                          AccessShareLock);
     806             : 
     807          20 :     brin_vacuum_scan(info->index, info->strategy);
     808             : 
     809          20 :     brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
     810             :                   &stats->num_index_tuples, &stats->num_index_tuples);
     811             : 
     812          20 :     table_close(heapRel, AccessShareLock);
     813             : 
     814          20 :     return stats;
     815             : }
     816             : 
     817             : /*
     818             :  * reloptions processor for BRIN indexes
     819             :  */
     820             : bytea *
     821         194 : brinoptions(Datum reloptions, bool validate)
     822             : {
     823             :     relopt_value *options;
     824             :     BrinOptions *rdopts;
     825             :     int         numoptions;
     826             :     static const relopt_parse_elt tab[] = {
     827             :         {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
     828             :         {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
     829             :     };
     830             : 
     831         194 :     options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN,
     832             :                               &numoptions);
     833             : 
     834             :     /* if none set, we're done */
     835         194 :     if (numoptions == 0)
     836           0 :         return NULL;
     837             : 
     838         194 :     rdopts = allocateReloptStruct(sizeof(BrinOptions), options, numoptions);
     839             : 
     840         194 :     fillRelOptions((void *) rdopts, sizeof(BrinOptions), options, numoptions,
     841             :                    validate, tab, lengthof(tab));
     842             : 
     843         194 :     pfree(options);
     844             : 
     845         194 :     return (bytea *) rdopts;
     846             : }
     847             : 
     848             : /*
     849             :  * SQL-callable function to scan through an index and summarize all ranges
     850             :  * that are not currently summarized.
     851             :  */
     852             : Datum
     853          14 : brin_summarize_new_values(PG_FUNCTION_ARGS)
     854             : {
     855          14 :     Datum       relation = PG_GETARG_DATUM(0);
     856             : 
     857          14 :     return DirectFunctionCall2(brin_summarize_range,
     858             :                                relation,
     859             :                                Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
     860             : }
     861             : 
     862             : /*
     863             :  * SQL-callable function to summarize the indicated page range, if not already
     864             :  * summarized.  If the second argument is BRIN_ALL_BLOCKRANGES, all
     865             :  * unsummarized ranges are summarized.
     866             :  */
     867             : Datum
     868          44 : brin_summarize_range(PG_FUNCTION_ARGS)
     869             : {
     870          44 :     Oid         indexoid = PG_GETARG_OID(0);
     871          44 :     int64       heapBlk64 = PG_GETARG_INT64(1);
     872             :     BlockNumber heapBlk;
     873             :     Oid         heapoid;
     874             :     Relation    indexRel;
     875             :     Relation    heapRel;
     876          44 :     double      numSummarized = 0;
     877             : 
     878          44 :     if (RecoveryInProgress())
     879           0 :         ereport(ERROR,
     880             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     881             :                  errmsg("recovery is in progress"),
     882             :                  errhint("BRIN control functions cannot be executed during recovery.")));
     883             : 
     884          44 :     if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
     885             :     {
     886           8 :         char       *blk = psprintf(INT64_FORMAT, heapBlk64);
     887             : 
     888           8 :         ereport(ERROR,
     889             :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
     890             :                  errmsg("block number out of range: %s", blk)));
     891             :     }
     892          36 :     heapBlk = (BlockNumber) heapBlk64;
     893             : 
     894             :     /*
     895             :      * We must lock table before index to avoid deadlocks.  However, if the
     896             :      * passed indexoid isn't an index then IndexGetRelation() will fail.
     897             :      * Rather than emitting a not-very-helpful error message, postpone
     898             :      * complaining, expecting that the is-it-an-index test below will fail.
     899             :      */
     900          36 :     heapoid = IndexGetRelation(indexoid, true);
     901          36 :     if (OidIsValid(heapoid))
     902          32 :         heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
     903             :     else
     904           4 :         heapRel = NULL;
     905             : 
     906          36 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
     907             : 
     908             :     /* Must be a BRIN index */
     909          64 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
     910          32 :         indexRel->rd_rel->relam != BRIN_AM_OID)
     911           4 :         ereport(ERROR,
     912             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     913             :                  errmsg("\"%s\" is not a BRIN index",
     914             :                         RelationGetRelationName(indexRel))));
     915             : 
     916             :     /* User must own the index (comparable to privileges needed for VACUUM) */
     917          28 :     if (!pg_class_ownercheck(indexoid, GetUserId()))
     918           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
     919           0 :                        RelationGetRelationName(indexRel));
     920             : 
     921             :     /*
     922             :      * Since we did the IndexGetRelation call above without any lock, it's
     923             :      * barely possible that a race against an index drop/recreation could have
     924             :      * netted us the wrong table.  Recheck.
     925             :      */
     926          28 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
     927           0 :         ereport(ERROR,
     928             :                 (errcode(ERRCODE_UNDEFINED_TABLE),
     929             :                  errmsg("could not open parent table of index %s",
     930             :                         RelationGetRelationName(indexRel))));
     931             : 
     932             :     /* OK, do it */
     933          28 :     brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
     934             : 
     935          28 :     relation_close(indexRel, ShareUpdateExclusiveLock);
     936          28 :     relation_close(heapRel, ShareUpdateExclusiveLock);
     937             : 
     938          28 :     PG_RETURN_INT32((int32) numSummarized);
     939             : }
     940             : 
     941             : /*
     942             :  * SQL-callable interface to mark a range as no longer summarized
     943             :  */
     944             : Datum
     945          20 : brin_desummarize_range(PG_FUNCTION_ARGS)
     946             : {
     947          20 :     Oid         indexoid = PG_GETARG_OID(0);
     948          20 :     int64       heapBlk64 = PG_GETARG_INT64(1);
     949             :     BlockNumber heapBlk;
     950             :     Oid         heapoid;
     951             :     Relation    heapRel;
     952             :     Relation    indexRel;
     953             :     bool        done;
     954             : 
     955          20 :     if (RecoveryInProgress())
     956           0 :         ereport(ERROR,
     957             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     958             :                  errmsg("recovery is in progress"),
     959             :                  errhint("BRIN control functions cannot be executed during recovery.")));
     960             : 
     961          20 :     if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
     962             :     {
     963           4 :         char       *blk = psprintf(INT64_FORMAT, heapBlk64);
     964             : 
     965           4 :         ereport(ERROR,
     966             :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
     967             :                  errmsg("block number out of range: %s", blk)));
     968             :     }
     969          16 :     heapBlk = (BlockNumber) heapBlk64;
     970             : 
     971             :     /*
     972             :      * We must lock table before index to avoid deadlocks.  However, if the
     973             :      * passed indexoid isn't an index then IndexGetRelation() will fail.
     974             :      * Rather than emitting a not-very-helpful error message, postpone
     975             :      * complaining, expecting that the is-it-an-index test below will fail.
     976             :      */
     977          16 :     heapoid = IndexGetRelation(indexoid, true);
     978          16 :     if (OidIsValid(heapoid))
     979          16 :         heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
     980             :     else
     981           0 :         heapRel = NULL;
     982             : 
     983          16 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
     984             : 
     985             :     /* Must be a BRIN index */
     986          32 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
     987          16 :         indexRel->rd_rel->relam != BRIN_AM_OID)
     988           0 :         ereport(ERROR,
     989             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     990             :                  errmsg("\"%s\" is not a BRIN index",
     991             :                         RelationGetRelationName(indexRel))));
     992             : 
     993             :     /* User must own the index (comparable to privileges needed for VACUUM) */
     994          16 :     if (!pg_class_ownercheck(indexoid, GetUserId()))
     995           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
     996           0 :                        RelationGetRelationName(indexRel));
     997             : 
     998             :     /*
     999             :      * Since we did the IndexGetRelation call above without any lock, it's
    1000             :      * barely possible that a race against an index drop/recreation could have
    1001             :      * netted us the wrong table.  Recheck.
    1002             :      */
    1003          16 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
    1004           0 :         ereport(ERROR,
    1005             :                 (errcode(ERRCODE_UNDEFINED_TABLE),
    1006             :                  errmsg("could not open parent table of index %s",
    1007             :                         RelationGetRelationName(indexRel))));
    1008             : 
    1009             :     /* the revmap does the hard work */
    1010             :     do
    1011             :     {
    1012          16 :         done = brinRevmapDesummarizeRange(indexRel, heapBlk);
    1013             :     }
    1014          16 :     while (!done);
    1015             : 
    1016          16 :     relation_close(indexRel, ShareUpdateExclusiveLock);
    1017          16 :     relation_close(heapRel, ShareUpdateExclusiveLock);
    1018             : 
    1019          16 :     PG_RETURN_VOID();
    1020             : }
    1021             : 
    1022             : /*
    1023             :  * Build a BrinDesc used to create or scan a BRIN index
    1024             :  */
    1025             : BrinDesc *
    1026        1346 : brin_build_desc(Relation rel)
    1027             : {
    1028             :     BrinOpcInfo **opcinfo;
    1029             :     BrinDesc   *bdesc;
    1030             :     TupleDesc   tupdesc;
    1031        1346 :     int         totalstored = 0;
    1032             :     int         keyno;
    1033             :     long        totalsize;
    1034             :     MemoryContext cxt;
    1035             :     MemoryContext oldcxt;
    1036             : 
    1037        1346 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
    1038             :                                 "brin desc cxt",
    1039             :                                 ALLOCSET_SMALL_SIZES);
    1040        1346 :     oldcxt = MemoryContextSwitchTo(cxt);
    1041        1346 :     tupdesc = RelationGetDescr(rel);
    1042             : 
    1043             :     /*
    1044             :      * Obtain BrinOpcInfo for each indexed column.  While at it, accumulate
    1045             :      * the number of columns stored, since the number is opclass-defined.
    1046             :      */
    1047        1346 :     opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
    1048       32098 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
    1049             :     {
    1050             :         FmgrInfo   *opcInfoFn;
    1051       30752 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
    1052             : 
    1053       30752 :         opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
    1054             : 
    1055       61504 :         opcinfo[keyno] = (BrinOpcInfo *)
    1056       30752 :             DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
    1057       30752 :         totalstored += opcinfo[keyno]->oi_nstored;
    1058             :     }
    1059             : 
    1060             :     /* Allocate our result struct and fill it in */
    1061        1346 :     totalsize = offsetof(BrinDesc, bd_info) +
    1062        1346 :         sizeof(BrinOpcInfo *) * tupdesc->natts;
    1063             : 
    1064        1346 :     bdesc = palloc(totalsize);
    1065        1346 :     bdesc->bd_context = cxt;
    1066        1346 :     bdesc->bd_index = rel;
    1067        1346 :     bdesc->bd_tupdesc = tupdesc;
    1068        1346 :     bdesc->bd_disktdesc = NULL; /* generated lazily */
    1069        1346 :     bdesc->bd_totalstored = totalstored;
    1070             : 
    1071       32098 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
    1072       30752 :         bdesc->bd_info[keyno] = opcinfo[keyno];
    1073        1346 :     pfree(opcinfo);
    1074             : 
    1075        1346 :     MemoryContextSwitchTo(oldcxt);
    1076             : 
    1077        1346 :     return bdesc;
    1078             : }
    1079             : 
    1080             : void
    1081        1072 : brin_free_desc(BrinDesc *bdesc)
    1082             : {
    1083             :     /* make sure the tupdesc is still valid */
    1084             :     Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
    1085             :     /* no need for retail pfree */
    1086        1072 :     MemoryContextDelete(bdesc->bd_context);
    1087        1072 : }
    1088             : 
    1089             : /*
    1090             :  * Fetch index's statistical data into *stats
    1091             :  */
    1092             : void
    1093        3980 : brinGetStats(Relation index, BrinStatsData *stats)
    1094             : {
    1095             :     Buffer      metabuffer;
    1096             :     Page        metapage;
    1097             :     BrinMetaPageData *metadata;
    1098             : 
    1099        3980 :     metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
    1100        3980 :     LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
    1101        3980 :     metapage = BufferGetPage(metabuffer);
    1102        3980 :     metadata = (BrinMetaPageData *) PageGetContents(metapage);
    1103             : 
    1104        3980 :     stats->pagesPerRange = metadata->pagesPerRange;
    1105        3980 :     stats->revmapNumPages = metadata->lastRevmapPage - 1;
    1106             : 
    1107        3980 :     UnlockReleaseBuffer(metabuffer);
    1108        3980 : }
    1109             : 
    1110             : /*
    1111             :  * Initialize a BrinBuildState appropriate to create tuples on the given index.
    1112             :  */
    1113             : static BrinBuildState *
    1114          56 : initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
    1115             :                            BlockNumber pagesPerRange)
    1116             : {
    1117             :     BrinBuildState *state;
    1118             : 
    1119          56 :     state = palloc(sizeof(BrinBuildState));
    1120             : 
    1121          56 :     state->bs_irel = idxRel;
    1122          56 :     state->bs_numtuples = 0;
    1123          56 :     state->bs_currentInsertBuf = InvalidBuffer;
    1124          56 :     state->bs_pagesPerRange = pagesPerRange;
    1125          56 :     state->bs_currRangeStart = 0;
    1126          56 :     state->bs_rmAccess = revmap;
    1127          56 :     state->bs_bdesc = brin_build_desc(idxRel);
    1128          56 :     state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
    1129             : 
    1130          56 :     brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
    1131             : 
    1132          56 :     return state;
    1133             : }
    1134             : 
    1135             : /*
    1136             :  * Release resources associated with a BrinBuildState.
    1137             :  */
    1138             : static void
    1139          56 : terminate_brin_buildstate(BrinBuildState *state)
    1140             : {
    1141             :     /*
    1142             :      * Release the last index buffer used.  We might as well ensure that
    1143             :      * whatever free space remains in that page is available in FSM, too.
    1144             :      */
    1145          56 :     if (!BufferIsInvalid(state->bs_currentInsertBuf))
    1146             :     {
    1147             :         Page        page;
    1148             :         Size        freespace;
    1149             :         BlockNumber blk;
    1150             : 
    1151          34 :         page = BufferGetPage(state->bs_currentInsertBuf);
    1152          34 :         freespace = PageGetFreeSpace(page);
    1153          34 :         blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
    1154          34 :         ReleaseBuffer(state->bs_currentInsertBuf);
    1155          34 :         RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
    1156          34 :         FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
    1157             :     }
    1158             : 
    1159          56 :     brin_free_desc(state->bs_bdesc);
    1160          56 :     pfree(state->bs_dtuple);
    1161          56 :     pfree(state);
    1162          56 : }
    1163             : 
    1164             : /*
    1165             :  * On the given BRIN index, summarize the heap page range that corresponds
    1166             :  * to the heap block number given.
    1167             :  *
    1168             :  * This routine can run in parallel with insertions into the heap.  To avoid
    1169             :  * missing those values from the summary tuple, we first insert a placeholder
    1170             :  * index tuple into the index, then execute the heap scan; transactions
    1171             :  * concurrent with the scan update the placeholder tuple.  After the scan, we
    1172             :  * union the placeholder tuple with the one computed by this routine.  The
    1173             :  * update of the index value happens in a loop, so that if somebody updates
    1174             :  * the placeholder tuple after we read it, we detect the case and try again.
    1175             :  * This ensures that the concurrently inserted tuples are not lost.
    1176             :  *
    1177             :  * A further corner case is this routine being asked to summarize the partial
    1178             :  * range at the end of the table.  heapNumBlocks is the (possibly outdated)
    1179             :  * table size; if we notice that the requested range lies beyond that size,
    1180             :  * we re-compute the table size after inserting the placeholder tuple, to
    1181             :  * avoid missing pages that were appended recently.
    1182             :  */
    1183             : static void
    1184          42 : summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
    1185             :                 BlockNumber heapBlk, BlockNumber heapNumBlks)
    1186             : {
    1187             :     Buffer      phbuf;
    1188             :     BrinTuple  *phtup;
    1189             :     Size        phsz;
    1190             :     OffsetNumber offset;
    1191             :     BlockNumber scanNumBlks;
    1192             : 
    1193             :     /*
    1194             :      * Insert the placeholder tuple
    1195             :      */
    1196          42 :     phbuf = InvalidBuffer;
    1197          42 :     phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
    1198          42 :     offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
    1199             :                            state->bs_rmAccess, &phbuf,
    1200             :                            heapBlk, phtup, phsz);
    1201             : 
    1202             :     /*
    1203             :      * Compute range end.  We hold ShareUpdateExclusive lock on table, so it
    1204             :      * cannot shrink concurrently (but it can grow).
    1205             :      */
    1206             :     Assert(heapBlk % state->bs_pagesPerRange == 0);
    1207          42 :     if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
    1208             :     {
    1209             :         /*
    1210             :          * If we're asked to scan what we believe to be the final range on the
    1211             :          * table (i.e. a range that might be partial) we need to recompute our
    1212             :          * idea of what the latest page is after inserting the placeholder
    1213             :          * tuple.  Anyone that grows the table later will update the
    1214             :          * placeholder tuple, so it doesn't matter that we won't scan these
    1215             :          * pages ourselves.  Careful: the table might have been extended
    1216             :          * beyond the current range, so clamp our result.
    1217             :          *
    1218             :          * Fortunately, this should occur infrequently.
    1219             :          */
    1220           4 :         scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
    1221             :                           state->bs_pagesPerRange);
    1222             :     }
    1223             :     else
    1224             :     {
    1225             :         /* Easy case: range is known to be complete */
    1226          38 :         scanNumBlks = state->bs_pagesPerRange;
    1227             :     }
    1228             : 
    1229             :     /*
    1230             :      * Execute the partial heap scan covering the heap blocks in the specified
    1231             :      * page range, summarizing the heap tuples in it.  This scan stops just
    1232             :      * short of brinbuildCallback creating the new index entry.
    1233             :      *
    1234             :      * Note that it is critical we use the "any visible" mode of
    1235             :      * table_index_build_range_scan here: otherwise, we would miss tuples
    1236             :      * inserted by transactions that are still in progress, among other corner
    1237             :      * cases.
    1238             :      */
    1239          42 :     state->bs_currRangeStart = heapBlk;
    1240          42 :     table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
    1241             :                                  heapBlk, scanNumBlks,
    1242             :                                  brinbuildCallback, (void *) state, NULL);
    1243             : 
    1244             :     /*
    1245             :      * Now we update the values obtained by the scan with the placeholder
    1246             :      * tuple.  We do this in a loop which only terminates if we're able to
    1247             :      * update the placeholder tuple successfully; if we are not, this means
    1248             :      * somebody else modified the placeholder tuple after we read it.
    1249             :      */
    1250             :     for (;;)
    1251           0 :     {
    1252             :         BrinTuple  *newtup;
    1253             :         Size        newsize;
    1254             :         bool        didupdate;
    1255             :         bool        samepage;
    1256             : 
    1257          42 :         CHECK_FOR_INTERRUPTS();
    1258             : 
    1259             :         /*
    1260             :          * Update the summary tuple and try to update.
    1261             :          */
    1262          42 :         newtup = brin_form_tuple(state->bs_bdesc,
    1263             :                                  heapBlk, state->bs_dtuple, &newsize);
    1264          42 :         samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
    1265          42 :         didupdate =
    1266          42 :             brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
    1267             :                           state->bs_rmAccess, heapBlk, phbuf, offset,
    1268             :                           phtup, phsz, newtup, newsize, samepage);
    1269          42 :         brin_free_tuple(phtup);
    1270          42 :         brin_free_tuple(newtup);
    1271             : 
    1272             :         /* If the update succeeded, we're done. */
    1273          42 :         if (didupdate)
    1274          42 :             break;
    1275             : 
    1276             :         /*
    1277             :          * If the update didn't work, it might be because somebody updated the
    1278             :          * placeholder tuple concurrently.  Extract the new version, union it
    1279             :          * with the values we have from the scan, and start over.  (There are
    1280             :          * other reasons for the update to fail, but it's simple to treat them
    1281             :          * the same.)
    1282             :          */
    1283           0 :         phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
    1284             :                                          &offset, &phsz, BUFFER_LOCK_SHARE,
    1285             :                                          NULL);
    1286             :         /* the placeholder tuple must exist */
    1287           0 :         if (phtup == NULL)
    1288           0 :             elog(ERROR, "missing placeholder tuple");
    1289           0 :         phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
    1290           0 :         LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
    1291             : 
    1292             :         /* merge it into the tuple from the heap scan */
    1293           0 :         union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
    1294             :     }
    1295             : 
    1296          42 :     ReleaseBuffer(phbuf);
    1297          42 : }
    1298             : 
    1299             : /*
    1300             :  * Summarize page ranges that are not already summarized.  If pageRange is
    1301             :  * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
    1302             :  * page range containing the given heap page number is scanned.
    1303             :  * If include_partial is true, then the partial range at the end of the table
    1304             :  * is summarized, otherwise not.
    1305             :  *
    1306             :  * For each new index tuple inserted, *numSummarized (if not NULL) is
    1307             :  * incremented; for each existing tuple, *numExisting (if not NULL) is
    1308             :  * incremented.
    1309             :  */
    1310             : static void
    1311          48 : brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
    1312             :               bool include_partial, double *numSummarized, double *numExisting)
    1313             : {
    1314             :     BrinRevmap *revmap;
    1315          48 :     BrinBuildState *state = NULL;
    1316          48 :     IndexInfo  *indexInfo = NULL;
    1317             :     BlockNumber heapNumBlocks;
    1318             :     BlockNumber pagesPerRange;
    1319             :     Buffer      buf;
    1320             :     BlockNumber startBlk;
    1321             : 
    1322          48 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
    1323             : 
    1324             :     /* determine range of pages to process */
    1325          48 :     heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
    1326          48 :     if (pageRange == BRIN_ALL_BLOCKRANGES)
    1327          30 :         startBlk = 0;
    1328             :     else
    1329             :     {
    1330          18 :         startBlk = (pageRange / pagesPerRange) * pagesPerRange;
    1331          18 :         heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
    1332             :     }
    1333          48 :     if (startBlk > heapNumBlocks)
    1334             :     {
    1335             :         /* Nothing to do if start point is beyond end of table */
    1336           0 :         brinRevmapTerminate(revmap);
    1337           0 :         return;
    1338             :     }
    1339             : 
    1340             :     /*
    1341             :      * Scan the revmap to find unsummarized items.
    1342             :      */
    1343          48 :     buf = InvalidBuffer;
    1344        1728 :     for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
    1345             :     {
    1346             :         BrinTuple  *tup;
    1347             :         OffsetNumber off;
    1348             : 
    1349             :         /*
    1350             :          * Unless requested to summarize even a partial range, go away now if
    1351             :          * we think the next range is partial.  Caller would pass true when it
    1352             :          * is typically run once bulk data loading is done
    1353             :          * (brin_summarize_new_values), and false when it is typically the
    1354             :          * result of arbitrarily-scheduled maintenance command (vacuuming).
    1355             :          */
    1356        2926 :         if (!include_partial &&
    1357        1238 :             (startBlk + pagesPerRange > heapNumBlocks))
    1358           8 :             break;
    1359             : 
    1360        1680 :         CHECK_FOR_INTERRUPTS();
    1361             : 
    1362        1680 :         tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
    1363             :                                        BUFFER_LOCK_SHARE, NULL);
    1364        1680 :         if (tup == NULL)
    1365             :         {
    1366             :             /* no revmap entry for this heap range. Summarize it. */
    1367          42 :             if (state == NULL)
    1368             :             {
    1369             :                 /* first time through */
    1370             :                 Assert(!indexInfo);
    1371          22 :                 state = initialize_brin_buildstate(index, revmap,
    1372             :                                                    pagesPerRange);
    1373          22 :                 indexInfo = BuildIndexInfo(index);
    1374             :             }
    1375          42 :             summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
    1376             : 
    1377             :             /* and re-initialize state for the next range */
    1378          42 :             brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
    1379             : 
    1380          42 :             if (numSummarized)
    1381          42 :                 *numSummarized += 1.0;
    1382             :         }
    1383             :         else
    1384             :         {
    1385        1638 :             if (numExisting)
    1386        1200 :                 *numExisting += 1.0;
    1387        1638 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
    1388             :         }
    1389             :     }
    1390             : 
    1391          48 :     if (BufferIsValid(buf))
    1392          38 :         ReleaseBuffer(buf);
    1393             : 
    1394             :     /* free resources */
    1395          48 :     brinRevmapTerminate(revmap);
    1396          48 :     if (state)
    1397             :     {
    1398          22 :         terminate_brin_buildstate(state);
    1399          22 :         pfree(indexInfo);
    1400             :     }
    1401             : }
    1402             : 
    1403             : /*
    1404             :  * Given a deformed tuple in the build state, convert it into the on-disk
    1405             :  * format and insert it into the index, making the revmap point to it.
    1406             :  */
    1407             : static void
    1408         612 : form_and_insert_tuple(BrinBuildState *state)
    1409             : {
    1410             :     BrinTuple  *tup;
    1411             :     Size        size;
    1412             : 
    1413         612 :     tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
    1414             :                           state->bs_dtuple, &size);
    1415         612 :     brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
    1416             :                   &state->bs_currentInsertBuf, state->bs_currRangeStart,
    1417             :                   tup, size);
    1418         612 :     state->bs_numtuples++;
    1419             : 
    1420         612 :     pfree(tup);
    1421         612 : }
    1422             : 
    1423             : /*
    1424             :  * Given two deformed tuples, adjust the first one so that it's consistent
    1425             :  * with the summary values in both.
    1426             :  */
    1427             : static void
    1428           0 : union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
    1429             : {
    1430             :     int         keyno;
    1431             :     BrinMemTuple *db;
    1432             :     MemoryContext cxt;
    1433             :     MemoryContext oldcxt;
    1434             : 
    1435             :     /* Use our own memory context to avoid retail pfree */
    1436           0 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
    1437             :                                 "brin union",
    1438             :                                 ALLOCSET_DEFAULT_SIZES);
    1439           0 :     oldcxt = MemoryContextSwitchTo(cxt);
    1440           0 :     db = brin_deform_tuple(bdesc, b, NULL);
    1441           0 :     MemoryContextSwitchTo(oldcxt);
    1442             : 
    1443           0 :     for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
    1444             :     {
    1445             :         FmgrInfo   *unionFn;
    1446           0 :         BrinValues *col_a = &a->bt_columns[keyno];
    1447           0 :         BrinValues *col_b = &db->bt_columns[keyno];
    1448             : 
    1449           0 :         unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
    1450             :                                     BRIN_PROCNUM_UNION);
    1451           0 :         FunctionCall3Coll(unionFn,
    1452           0 :                           bdesc->bd_index->rd_indcollation[keyno],
    1453             :                           PointerGetDatum(bdesc),
    1454             :                           PointerGetDatum(col_a),
    1455             :                           PointerGetDatum(col_b));
    1456             :     }
    1457             : 
    1458           0 :     MemoryContextDelete(cxt);
    1459           0 : }
    1460             : 
    1461             : /*
    1462             :  * brin_vacuum_scan
    1463             :  *      Do a complete scan of the index during VACUUM.
    1464             :  *
    1465             :  * This routine scans the complete index looking for uncatalogued index pages,
    1466             :  * i.e. those that might have been lost due to a crash after index extension
    1467             :  * and such.
    1468             :  */
    1469             : static void
    1470          20 : brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
    1471             : {
    1472             :     BlockNumber nblocks;
    1473             :     BlockNumber blkno;
    1474             : 
    1475             :     /*
    1476             :      * Scan the index in physical order, and clean up any possible mess in
    1477             :      * each page.
    1478             :      */
    1479          20 :     nblocks = RelationGetNumberOfBlocks(idxrel);
    1480         176 :     for (blkno = 0; blkno < nblocks; blkno++)
    1481             :     {
    1482             :         Buffer      buf;
    1483             : 
    1484         156 :         CHECK_FOR_INTERRUPTS();
    1485             : 
    1486         156 :         buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
    1487             :                                  RBM_NORMAL, strategy);
    1488             : 
    1489         156 :         brin_page_cleanup(idxrel, buf);
    1490             : 
    1491         156 :         ReleaseBuffer(buf);
    1492             :     }
    1493             : 
    1494             :     /*
    1495             :      * Update all upper pages in the index's FSM, as well.  This ensures not
    1496             :      * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
    1497             :      * but also that any pre-existing damage or out-of-dateness is repaired.
    1498             :      */
    1499          20 :     FreeSpaceMapVacuum(idxrel);
    1500          20 : }

Generated by: LCOV version 1.13