LCOV - code coverage report
Current view: top level - src/backend/access/brin - brin.c (source / functions) Hit Total Coverage
Test: PostgreSQL 14devel Lines: 475 540 88.0 %
Date: 2021-05-13 09:07:15 Functions: 25 27 92.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * brin.c
       3             :  *      Implementation of BRIN indexes for Postgres
       4             :  *
       5             :  * See src/backend/access/brin/README for details.
       6             :  *
       7             :  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/access/brin/brin.c
      12             :  *
      13             :  * TODO
      14             :  *      * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
      15             :  */
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/brin.h"
      19             : #include "access/brin_page.h"
      20             : #include "access/brin_pageops.h"
      21             : #include "access/brin_xlog.h"
      22             : #include "access/relation.h"
      23             : #include "access/reloptions.h"
      24             : #include "access/relscan.h"
      25             : #include "access/table.h"
      26             : #include "access/tableam.h"
      27             : #include "access/xloginsert.h"
      28             : #include "catalog/index.h"
      29             : #include "catalog/pg_am.h"
      30             : #include "commands/vacuum.h"
      31             : #include "miscadmin.h"
      32             : #include "pgstat.h"
      33             : #include "postmaster/autovacuum.h"
      34             : #include "storage/bufmgr.h"
      35             : #include "storage/freespace.h"
      36             : #include "utils/acl.h"
      37             : #include "utils/builtins.h"
      38             : #include "utils/datum.h"
      39             : #include "utils/index_selfuncs.h"
      40             : #include "utils/memutils.h"
      41             : #include "utils/rel.h"
      42             : 
      43             : 
      44             : /*
      45             :  * We use a BrinBuildState during initial construction of a BRIN index.
      46             :  * The running state is kept in a BrinMemTuple.
      47             :  */
      48             : typedef struct BrinBuildState
      49             : {
      50             :     Relation    bs_irel;
      51             :     int         bs_numtuples;
      52             :     Buffer      bs_currentInsertBuf;
      53             :     BlockNumber bs_pagesPerRange;
      54             :     BlockNumber bs_currRangeStart;
      55             :     BrinRevmap *bs_rmAccess;
      56             :     BrinDesc   *bs_bdesc;
      57             :     BrinMemTuple *bs_dtuple;
      58             : } BrinBuildState;
      59             : 
      60             : /*
      61             :  * Struct used as "opaque" during index scans
      62             :  */
      63             : typedef struct BrinOpaque
      64             : {
      65             :     BlockNumber bo_pagesPerRange;
      66             :     BrinRevmap *bo_rmAccess;
      67             :     BrinDesc   *bo_bdesc;
      68             : } BrinOpaque;
      69             : 
      70             : #define BRIN_ALL_BLOCKRANGES    InvalidBlockNumber
      71             : 
      72             : static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
      73             :                                                   BrinRevmap *revmap, BlockNumber pagesPerRange);
      74             : static void terminate_brin_buildstate(BrinBuildState *state);
      75             : static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
      76             :                           bool include_partial, double *numSummarized, double *numExisting);
      77             : static void form_and_insert_tuple(BrinBuildState *state);
      78             : static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
      79             :                          BrinTuple *b);
      80             : static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
      81             : static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
      82             :                                 BrinMemTuple *dtup, Datum *values, bool *nulls);
      83             : static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
      84             : 
      85             : /*
      86             :  * BRIN handler function: return IndexAmRoutine with access method parameters
      87             :  * and callbacks.
      88             :  */
      89             : Datum
      90        1240 : brinhandler(PG_FUNCTION_ARGS)
      91             : {
      92        1240 :     IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
      93             : 
      94        1240 :     amroutine->amstrategies = 0;
      95        1240 :     amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
      96        1240 :     amroutine->amoptsprocnum = BRIN_PROCNUM_OPTIONS;
      97        1240 :     amroutine->amcanorder = false;
      98        1240 :     amroutine->amcanorderbyop = false;
      99        1240 :     amroutine->amcanbackward = false;
     100        1240 :     amroutine->amcanunique = false;
     101        1240 :     amroutine->amcanmulticol = true;
     102        1240 :     amroutine->amoptionalkey = true;
     103        1240 :     amroutine->amsearcharray = false;
     104        1240 :     amroutine->amsearchnulls = true;
     105        1240 :     amroutine->amstorage = true;
     106        1240 :     amroutine->amclusterable = false;
     107        1240 :     amroutine->ampredlocks = false;
     108        1240 :     amroutine->amcanparallel = false;
     109        1240 :     amroutine->amcaninclude = false;
     110        1240 :     amroutine->amusemaintenanceworkmem = false;
     111        1240 :     amroutine->amparallelvacuumoptions =
     112             :         VACUUM_OPTION_PARALLEL_CLEANUP;
     113        1240 :     amroutine->amkeytype = InvalidOid;
     114             : 
     115        1240 :     amroutine->ambuild = brinbuild;
     116        1240 :     amroutine->ambuildempty = brinbuildempty;
     117        1240 :     amroutine->aminsert = brininsert;
     118        1240 :     amroutine->ambulkdelete = brinbulkdelete;
     119        1240 :     amroutine->amvacuumcleanup = brinvacuumcleanup;
     120        1240 :     amroutine->amcanreturn = NULL;
     121        1240 :     amroutine->amcostestimate = brincostestimate;
     122        1240 :     amroutine->amoptions = brinoptions;
     123        1240 :     amroutine->amproperty = NULL;
     124        1240 :     amroutine->ambuildphasename = NULL;
     125        1240 :     amroutine->amvalidate = brinvalidate;
     126        1240 :     amroutine->amadjustmembers = NULL;
     127        1240 :     amroutine->ambeginscan = brinbeginscan;
     128        1240 :     amroutine->amrescan = brinrescan;
     129        1240 :     amroutine->amgettuple = NULL;
     130        1240 :     amroutine->amgetbitmap = bringetbitmap;
     131        1240 :     amroutine->amendscan = brinendscan;
     132        1240 :     amroutine->ammarkpos = NULL;
     133        1240 :     amroutine->amrestrpos = NULL;
     134        1240 :     amroutine->amestimateparallelscan = NULL;
     135        1240 :     amroutine->aminitparallelscan = NULL;
     136        1240 :     amroutine->amparallelrescan = NULL;
     137             : 
     138        1240 :     PG_RETURN_POINTER(amroutine);
     139             : }
     140             : 
     141             : /*
     142             :  * A tuple in the heap is being inserted.  To keep a brin index up to date,
     143             :  * we need to obtain the relevant index tuple and compare its stored values
     144             :  * with those of the new tuple.  If the tuple values are not consistent with
     145             :  * the summary tuple, we need to update the index tuple.
     146             :  *
     147             :  * If autosummarization is enabled, check if we need to summarize the previous
     148             :  * page range.
     149             :  *
     150             :  * If the range is not currently summarized (i.e. the revmap returns NULL for
     151             :  * it), there's nothing to do for this tuple.
     152             :  */
     153             : bool
     154       10816 : brininsert(Relation idxRel, Datum *values, bool *nulls,
     155             :            ItemPointer heaptid, Relation heapRel,
     156             :            IndexUniqueCheck checkUnique,
     157             :            bool indexUnchanged,
     158             :            IndexInfo *indexInfo)
     159             : {
     160             :     BlockNumber pagesPerRange;
     161             :     BlockNumber origHeapBlk;
     162             :     BlockNumber heapBlk;
     163       10816 :     BrinDesc   *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
     164             :     BrinRevmap *revmap;
     165       10816 :     Buffer      buf = InvalidBuffer;
     166       10816 :     MemoryContext tupcxt = NULL;
     167       10816 :     MemoryContext oldcxt = CurrentMemoryContext;
     168       10816 :     bool        autosummarize = BrinGetAutoSummarize(idxRel);
     169             : 
     170       10816 :     revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
     171             : 
     172             :     /*
     173             :      * origHeapBlk is the block number where the insertion occurred.  heapBlk
     174             :      * is the first block in the corresponding page range.
     175             :      */
     176       10816 :     origHeapBlk = ItemPointerGetBlockNumber(heaptid);
     177       10816 :     heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
     178             : 
     179             :     for (;;)
     180           0 :     {
     181       10816 :         bool        need_insert = false;
     182             :         OffsetNumber off;
     183             :         BrinTuple  *brtup;
     184             :         BrinMemTuple *dtup;
     185             : 
     186       10816 :         CHECK_FOR_INTERRUPTS();
     187             : 
     188             :         /*
     189             :          * If auto-summarization is enabled and we just inserted the first
     190             :          * tuple into the first block of a new non-first page range, request a
     191             :          * summarization run of the previous range.
     192             :          */
     193       10816 :         if (autosummarize &&
     194         156 :             heapBlk > 0 &&
     195         156 :             heapBlk == origHeapBlk &&
     196         156 :             ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
     197             :         {
     198           8 :             BlockNumber lastPageRange = heapBlk - 1;
     199             :             BrinTuple  *lastPageTuple;
     200             : 
     201             :             lastPageTuple =
     202           8 :                 brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
     203             :                                          NULL, BUFFER_LOCK_SHARE, NULL);
     204           8 :             if (!lastPageTuple)
     205             :             {
     206             :                 bool        recorded;
     207             : 
     208           6 :                 recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
     209             :                                                  RelationGetRelid(idxRel),
     210             :                                                  lastPageRange);
     211           6 :                 if (!recorded)
     212           0 :                     ereport(LOG,
     213             :                             (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     214             :                              errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
     215             :                                     RelationGetRelationName(idxRel),
     216             :                                     lastPageRange)));
     217             :             }
     218             :             else
     219           2 :                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     220             :         }
     221             : 
     222       10816 :         brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
     223             :                                          NULL, BUFFER_LOCK_SHARE, NULL);
     224             : 
     225             :         /* if range is unsummarized, there's nothing to do */
     226       10816 :         if (!brtup)
     227         236 :             break;
     228             : 
     229             :         /* First time through in this statement? */
     230       10580 :         if (bdesc == NULL)
     231             :         {
     232         662 :             MemoryContextSwitchTo(indexInfo->ii_Context);
     233         662 :             bdesc = brin_build_desc(idxRel);
     234         662 :             indexInfo->ii_AmCache = (void *) bdesc;
     235         662 :             MemoryContextSwitchTo(oldcxt);
     236             :         }
     237             :         /* First time through in this brininsert call? */
     238       10580 :         if (tupcxt == NULL)
     239             :         {
     240       10580 :             tupcxt = AllocSetContextCreate(CurrentMemoryContext,
     241             :                                            "brininsert cxt",
     242             :                                            ALLOCSET_DEFAULT_SIZES);
     243       10580 :             MemoryContextSwitchTo(tupcxt);
     244             :         }
     245             : 
     246       10580 :         dtup = brin_deform_tuple(bdesc, brtup, NULL);
     247             : 
     248       10580 :         need_insert = add_values_to_range(idxRel, bdesc, dtup, values, nulls);
     249             : 
     250       10580 :         if (!need_insert)
     251             :         {
     252             :             /*
     253             :              * The tuple is consistent with the new values, so there's nothing
     254             :              * to do.
     255             :              */
     256        8604 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     257             :         }
     258             :         else
     259             :         {
     260        1976 :             Page        page = BufferGetPage(buf);
     261        1976 :             ItemId      lp = PageGetItemId(page, off);
     262             :             Size        origsz;
     263             :             BrinTuple  *origtup;
     264             :             Size        newsz;
     265             :             BrinTuple  *newtup;
     266             :             bool        samepage;
     267             : 
     268             :             /*
     269             :              * Make a copy of the old tuple, so that we can compare it after
     270             :              * re-acquiring the lock.
     271             :              */
     272        1976 :             origsz = ItemIdGetLength(lp);
     273        1976 :             origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
     274             : 
     275             :             /*
     276             :              * Before releasing the lock, check if we can attempt a same-page
     277             :              * update.  Another process could insert a tuple concurrently in
     278             :              * the same page though, so downstream we must be prepared to cope
     279             :              * if this turns out to not be possible after all.
     280             :              */
     281        1976 :             newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
     282        1976 :             samepage = brin_can_do_samepage_update(buf, origsz, newsz);
     283        1976 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     284             : 
     285             :             /*
     286             :              * Try to update the tuple.  If this doesn't work for whatever
     287             :              * reason, we need to restart from the top; the revmap might be
     288             :              * pointing at a different tuple for this block now, so we need to
     289             :              * recompute to ensure both our new heap tuple and the other
     290             :              * inserter's are covered by the combined tuple.  It might be that
     291             :              * we don't need to update at all.
     292             :              */
     293        1976 :             if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
     294             :                                buf, off, origtup, origsz, newtup, newsz,
     295             :                                samepage))
     296             :             {
     297             :                 /* no luck; start over */
     298           0 :                 MemoryContextResetAndDeleteChildren(tupcxt);
     299           0 :                 continue;
     300             :             }
     301             :         }
     302             : 
     303             :         /* success! */
     304       10580 :         break;
     305             :     }
     306             : 
     307       10816 :     brinRevmapTerminate(revmap);
     308       10816 :     if (BufferIsValid(buf))
     309       10582 :         ReleaseBuffer(buf);
     310       10816 :     MemoryContextSwitchTo(oldcxt);
     311       10816 :     if (tupcxt != NULL)
     312       10580 :         MemoryContextDelete(tupcxt);
     313             : 
     314       10816 :     return false;
     315             : }
     316             : 
     317             : /*
     318             :  * Initialize state for a BRIN index scan.
     319             :  *
     320             :  * We read the metapage here to determine the pages-per-range number that this
     321             :  * index was built with.  Note that since this cannot be changed while we're
     322             :  * holding lock on index, it's not necessary to recompute it during brinrescan.
     323             :  */
     324             : IndexScanDesc
     325        1712 : brinbeginscan(Relation r, int nkeys, int norderbys)
     326             : {
     327             :     IndexScanDesc scan;
     328             :     BrinOpaque *opaque;
     329             : 
     330        1712 :     scan = RelationGetIndexScan(r, nkeys, norderbys);
     331             : 
     332        1712 :     opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
     333        1712 :     opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
     334             :                                                scan->xs_snapshot);
     335        1712 :     opaque->bo_bdesc = brin_build_desc(r);
     336        1712 :     scan->opaque = opaque;
     337             : 
     338        1712 :     return scan;
     339             : }
     340             : 
     341             : /*
     342             :  * Execute the index scan.
     343             :  *
     344             :  * This works by reading index TIDs from the revmap, and obtaining the index
     345             :  * tuples pointed to by them; the summary values in the index tuples are
     346             :  * compared to the scan keys.  We return into the TID bitmap all the pages in
     347             :  * ranges corresponding to index tuples that match the scan keys.
     348             :  *
     349             :  * If a TID from the revmap is read as InvalidTID, we know that range is
     350             :  * unsummarized.  Pages in those ranges need to be returned regardless of scan
     351             :  * keys.
     352             :  */
     353             : int64
     354        1712 : bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
     355             : {
     356        1712 :     Relation    idxRel = scan->indexRelation;
     357        1712 :     Buffer      buf = InvalidBuffer;
     358             :     BrinDesc   *bdesc;
     359             :     Oid         heapOid;
     360             :     Relation    heapRel;
     361             :     BrinOpaque *opaque;
     362             :     BlockNumber nblocks;
     363             :     BlockNumber heapBlk;
     364        1712 :     int         totalpages = 0;
     365             :     FmgrInfo   *consistentFn;
     366             :     MemoryContext oldcxt;
     367             :     MemoryContext perRangeCxt;
     368             :     BrinMemTuple *dtup;
     369        1712 :     BrinTuple  *btup = NULL;
     370        1712 :     Size        btupsz = 0;
     371             :     ScanKey   **keys,
     372             :               **nullkeys;
     373             :     int        *nkeys,
     374             :                *nnullkeys;
     375             :     int         keyno;
     376             :     char       *ptr;
     377             :     Size        len;
     378             :     char       *tmp PG_USED_FOR_ASSERTS_ONLY;
     379             : 
     380        1712 :     opaque = (BrinOpaque *) scan->opaque;
     381        1712 :     bdesc = opaque->bo_bdesc;
     382        1712 :     pgstat_count_index_scan(idxRel);
     383             : 
     384             :     /*
     385             :      * We need to know the size of the table so that we know how long to
     386             :      * iterate on the revmap.
     387             :      */
     388        1712 :     heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
     389        1712 :     heapRel = table_open(heapOid, AccessShareLock);
     390        1712 :     nblocks = RelationGetNumberOfBlocks(heapRel);
     391        1712 :     table_close(heapRel, AccessShareLock);
     392             : 
     393             :     /*
     394             :      * Make room for the consistent support procedures of indexed columns.  We
     395             :      * don't look them up here; we do that lazily the first time we see a scan
     396             :      * key reference each of them.  We rely on zeroing fn_oid to InvalidOid.
     397             :      */
     398        1712 :     consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
     399             : 
     400             :     /*
     401             :      * Make room for per-attribute lists of scan keys that we'll pass to the
     402             :      * consistent support procedure. We don't know which attributes have scan
     403             :      * keys, so we allocate space for all attributes. That may use more memory
     404             :      * but it's probably cheaper than determining which attributes are used.
     405             :      *
     406             :      * We keep null and regular keys separate, so that we can pass just the
     407             :      * regular keys to the consistent function easily.
     408             :      *
     409             :      * To reduce the allocation overhead, we allocate one big chunk and then
     410             :      * carve it into smaller arrays ourselves. All the pieces have exactly the
     411             :      * same lifetime, so that's OK.
     412             :      *
     413             :      * XXX The widest index can have 32 attributes, so the amount of wasted
     414             :      * memory is negligible. We could invent a more compact approach (with
     415             :      * just space for used attributes) but that would make the matching more
     416             :      * complex so it's not a good trade-off.
     417             :      */
     418        1712 :     len =
     419        3424 :         MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) +  /* regular keys */
     420        3424 :         MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
     421        3424 :         MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts) +
     422        3424 :         MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) +  /* NULL keys */
     423        1712 :         MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
     424        1712 :         MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
     425             : 
     426        1712 :     ptr = palloc(len);
     427        1712 :     tmp = ptr;
     428             : 
     429        1712 :     keys = (ScanKey **) ptr;
     430        1712 :     ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
     431             : 
     432        1712 :     nullkeys = (ScanKey **) ptr;
     433        1712 :     ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
     434             : 
     435        1712 :     nkeys = (int *) ptr;
     436        1712 :     ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
     437             : 
     438        1712 :     nnullkeys = (int *) ptr;
     439        1712 :     ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
     440             : 
     441       46148 :     for (int i = 0; i < bdesc->bd_tupdesc->natts; i++)
     442             :     {
     443       44436 :         keys[i] = (ScanKey *) ptr;
     444       44436 :         ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
     445             : 
     446       44436 :         nullkeys[i] = (ScanKey *) ptr;
     447       44436 :         ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
     448             :     }
     449             : 
     450             :     Assert(tmp + len == ptr);
     451             : 
     452             :     /* zero the number of keys */
     453        1712 :     memset(nkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
     454        1712 :     memset(nnullkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
     455             : 
     456             :     /* Preprocess the scan keys - split them into per-attribute arrays. */
     457        3424 :     for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
     458             :     {
     459        1712 :         ScanKey     key = &scan->keyData[keyno];
     460        1712 :         AttrNumber  keyattno = key->sk_attno;
     461             : 
     462             :         /*
     463             :          * The collation of the scan key must match the collation used in the
     464             :          * index column (but only if the search is not IS NULL/ IS NOT NULL).
     465             :          * Otherwise we shouldn't be using this index ...
     466             :          */
     467             :         Assert((key->sk_flags & SK_ISNULL) ||
     468             :                (key->sk_collation ==
     469             :                 TupleDescAttr(bdesc->bd_tupdesc,
     470             :                               keyattno - 1)->attcollation));
     471             : 
     472             :         /*
     473             :          * First time we see this index attribute, so init as needed.
     474             :          *
     475             :          * This is a bit of an overkill - we don't know how many scan keys are
     476             :          * there for this attribute, so we simply allocate the largest number
     477             :          * possible (as if all keys were for this attribute). This may waste a
     478             :          * bit of memory, but we only expect small number of scan keys in
     479             :          * general, so this should be negligible, and repeated repalloc calls
     480             :          * are not free either.
     481             :          */
     482        1712 :         if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
     483             :         {
     484             :             FmgrInfo   *tmp;
     485             : 
     486             :             /* First time we see this attribute, so no key/null keys. */
     487             :             Assert(nkeys[keyattno - 1] == 0);
     488             :             Assert(nnullkeys[keyattno - 1] == 0);
     489             : 
     490        1712 :             tmp = index_getprocinfo(idxRel, keyattno,
     491             :                                     BRIN_PROCNUM_CONSISTENT);
     492        1712 :             fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
     493             :                            CurrentMemoryContext);
     494             :         }
     495             : 
     496             :         /* Add key to the proper per-attribute array. */
     497        1712 :         if (key->sk_flags & SK_ISNULL)
     498             :         {
     499          24 :             nullkeys[keyattno - 1][nnullkeys[keyattno - 1]] = key;
     500          24 :             nnullkeys[keyattno - 1]++;
     501             :         }
     502             :         else
     503             :         {
     504        1688 :             keys[keyattno - 1][nkeys[keyattno - 1]] = key;
     505        1688 :             nkeys[keyattno - 1]++;
     506             :         }
     507             :     }
     508             : 
     509             :     /* allocate an initial in-memory tuple, out of the per-range memcxt */
     510        1712 :     dtup = brin_new_memtuple(bdesc);
     511             : 
     512             :     /*
     513             :      * Setup and use a per-range memory context, which is reset every time we
     514             :      * loop below.  This avoids having to free the tuples within the loop.
     515             :      */
     516        1712 :     perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
     517             :                                         "bringetbitmap cxt",
     518             :                                         ALLOCSET_DEFAULT_SIZES);
     519        1712 :     oldcxt = MemoryContextSwitchTo(perRangeCxt);
     520             : 
     521             :     /*
     522             :      * Now scan the revmap.  We start by querying for heap page 0,
     523             :      * incrementing by the number of pages per range; this gives us a full
     524             :      * view of the table.
     525             :      */
     526      126864 :     for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
     527             :     {
     528             :         bool        addrange;
     529      125152 :         bool        gottuple = false;
     530             :         BrinTuple  *tup;
     531             :         OffsetNumber off;
     532             :         Size        size;
     533             : 
     534      125152 :         CHECK_FOR_INTERRUPTS();
     535             : 
     536      125152 :         MemoryContextResetAndDeleteChildren(perRangeCxt);
     537             : 
     538      125152 :         tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
     539             :                                        &off, &size, BUFFER_LOCK_SHARE,
     540             :                                        scan->xs_snapshot);
     541      125152 :         if (tup)
     542             :         {
     543      125152 :             gottuple = true;
     544      125152 :             btup = brin_copy_tuple(tup, size, btup, &btupsz);
     545      125152 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     546             :         }
     547             : 
     548             :         /*
     549             :          * For page ranges with no indexed tuple, we must return the whole
     550             :          * range; otherwise, compare it to the scan keys.
     551             :          */
     552      125152 :         if (!gottuple)
     553             :         {
     554           0 :             addrange = true;
     555             :         }
     556             :         else
     557             :         {
     558      125152 :             dtup = brin_deform_tuple(bdesc, btup, dtup);
     559      125152 :             if (dtup->bt_placeholder)
     560             :             {
     561             :                 /*
     562             :                  * Placeholder tuples are always returned, regardless of the
     563             :                  * values stored in them.
     564             :                  */
     565           0 :                 addrange = true;
     566             :             }
     567             :             else
     568             :             {
     569             :                 int         attno;
     570             : 
     571             :                 /*
     572             :                  * Compare scan keys with summary values stored for the range.
     573             :                  * If scan keys are matched, the page range must be added to
     574             :                  * the bitmap.  We initially assume the range needs to be
     575             :                  * added; in particular this serves the case where there are
     576             :                  * no keys.
     577             :                  */
     578      125152 :                 addrange = true;
     579     3634492 :                 for (attno = 1; attno <= bdesc->bd_tupdesc->natts; attno++)
     580             :                 {
     581             :                     BrinValues *bval;
     582             :                     Datum       add;
     583             :                     Oid         collation;
     584             : 
     585             :                     /*
     586             :                      * skip attributes without any scan keys (both regular and
     587             :                      * IS [NOT] NULL)
     588             :                      */
     589     3510256 :                     if (nkeys[attno - 1] == 0 && nnullkeys[attno - 1] == 0)
     590     3385104 :                         continue;
     591             : 
     592      125152 :                     bval = &dtup->bt_columns[attno - 1];
     593             : 
     594             :                     /*
     595             :                      * First check if there are any IS [NOT] NULL scan keys,
     596             :                      * and if we're violating them. In that case we can
     597             :                      * terminate early, without invoking the support function.
     598             :                      *
     599             :                      * As there may be more keys, we can only determine
     600             :                      * mismatch within this loop.
     601             :                      */
     602      125152 :                     if (bdesc->bd_info[attno - 1]->oi_regular_nulls &&
     603      125152 :                         !check_null_keys(bval, nullkeys[attno - 1],
     604      125152 :                                          nnullkeys[attno - 1]))
     605             :                     {
     606             :                         /*
     607             :                          * If any of the IS [NOT] NULL keys failed, the page
     608             :                          * range as a whole can't pass. So terminate the loop.
     609             :                          */
     610         664 :                         addrange = false;
     611         664 :                         break;
     612             :                     }
     613             : 
     614             :                     /*
     615             :                      * So either there are no IS [NOT] NULL keys, or all
     616             :                      * passed. If there are no regular scan keys, we're done -
     617             :                      * the page range matches. If there are regular keys, but
     618             :                      * the page range is marked as 'all nulls' it can't
     619             :                      * possibly pass (we're assuming the operators are
     620             :                      * strict).
     621             :                      */
     622             : 
     623             :                     /* No regular scan keys - page range as a whole passes. */
     624      124488 :                     if (!nkeys[attno - 1])
     625         824 :                         continue;
     626             : 
     627             :                     Assert((nkeys[attno - 1] > 0) &&
     628             :                            (nkeys[attno - 1] <= scan->numberOfKeys));
     629             : 
     630             :                     /* If it is all nulls, it cannot possibly be consistent. */
     631      123664 :                     if (bval->bv_allnulls)
     632             :                     {
     633         252 :                         addrange = false;
     634         252 :                         break;
     635             :                     }
     636             : 
     637             :                     /*
     638             :                      * Collation from the first key (has to be the same for
     639             :                      * all keys for the same attribute).
     640             :                      */
     641      123412 :                     collation = keys[attno - 1][0]->sk_collation;
     642             : 
     643             :                     /*
     644             :                      * Check whether the scan key is consistent with the page
     645             :                      * range values; if so, have the pages in the range added
     646             :                      * to the output bitmap.
     647             :                      *
     648             :                      * The opclass may or may not support processing of
     649             :                      * multiple scan keys. We can determine that based on the
     650             :                      * number of arguments - functions with extra parameter
     651             :                      * (number of scan keys) do support this, otherwise we
     652             :                      * have to simply pass the scan keys one by one.
     653             :                      */
     654      123412 :                     if (consistentFn[attno - 1].fn_nargs >= 4)
     655             :                     {
     656             :                         /* Check all keys at once */
     657       50016 :                         add = FunctionCall4Coll(&consistentFn[attno - 1],
     658             :                                                 collation,
     659             :                                                 PointerGetDatum(bdesc),
     660             :                                                 PointerGetDatum(bval),
     661       25008 :                                                 PointerGetDatum(keys[attno - 1]),
     662       25008 :                                                 Int32GetDatum(nkeys[attno - 1]));
     663       25008 :                         addrange = DatumGetBool(add);
     664             :                     }
     665             :                     else
     666             :                     {
     667             :                         /*
     668             :                          * Check keys one by one
     669             :                          *
     670             :                          * When there are multiple scan keys, failure to meet
     671             :                          * the criteria for a single one of them is enough to
     672             :                          * discard the range as a whole, so break out of the
     673             :                          * loop as soon as a false return value is obtained.
     674             :                          */
     675             :                         int         keyno;
     676             : 
     677      171884 :                         for (keyno = 0; keyno < nkeys[attno - 1]; keyno++)
     678             :                         {
     679      196808 :                             add = FunctionCall3Coll(&consistentFn[attno - 1],
     680       98404 :                                                     keys[attno - 1][keyno]->sk_collation,
     681             :                                                     PointerGetDatum(bdesc),
     682             :                                                     PointerGetDatum(bval),
     683       98404 :                                                     PointerGetDatum(keys[attno - 1][keyno]));
     684       98404 :                             addrange = DatumGetBool(add);
     685       98404 :                             if (!addrange)
     686       24924 :                                 break;
     687             :                         }
     688             :                     }
     689             :                 }
     690             :             }
     691             :         }
     692             : 
     693             :         /* add the pages in the range to the output bitmap, if needed */
     694      125152 :         if (addrange)
     695             :         {
     696             :             BlockNumber pageno;
     697             : 
     698       90164 :             for (pageno = heapBlk;
     699      180328 :                  pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
     700       90164 :                  pageno++)
     701             :             {
     702       90164 :                 MemoryContextSwitchTo(oldcxt);
     703       90164 :                 tbm_add_page(tbm, pageno);
     704       90164 :                 totalpages++;
     705       90164 :                 MemoryContextSwitchTo(perRangeCxt);
     706             :             }
     707             :         }
     708             :     }
     709             : 
     710        1712 :     MemoryContextSwitchTo(oldcxt);
     711        1712 :     MemoryContextDelete(perRangeCxt);
     712             : 
     713        1712 :     if (buf != InvalidBuffer)
     714        1712 :         ReleaseBuffer(buf);
     715             : 
     716             :     /*
     717             :      * XXX We have an approximation of the number of *pages* that our scan
     718             :      * returns, but we don't have a precise idea of the number of heap tuples
     719             :      * involved.
     720             :      */
     721        1712 :     return totalpages * 10;
     722             : }
     723             : 
     724             : /*
     725             :  * Re-initialize state for a BRIN index scan
     726             :  */
     727             : void
     728        1712 : brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
     729             :            ScanKey orderbys, int norderbys)
     730             : {
     731             :     /*
     732             :      * Other index AMs preprocess the scan keys at this point, or sometime
     733             :      * early during the scan; this lets them optimize by removing redundant
     734             :      * keys, or doing early returns when they are impossible to satisfy; see
     735             :      * _bt_preprocess_keys for an example.  Something like that could be added
     736             :      * here someday, too.
     737             :      */
     738             : 
     739        1712 :     if (scankey && scan->numberOfKeys > 0)
     740        1712 :         memmove(scan->keyData, scankey,
     741        1712 :                 scan->numberOfKeys * sizeof(ScanKeyData));
     742        1712 : }
     743             : 
     744             : /*
     745             :  * Close down a BRIN index scan
     746             :  */
     747             : void
     748        1712 : brinendscan(IndexScanDesc scan)
     749             : {
     750        1712 :     BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
     751             : 
     752        1712 :     brinRevmapTerminate(opaque->bo_rmAccess);
     753        1712 :     brin_free_desc(opaque->bo_bdesc);
     754        1712 :     pfree(opaque);
     755        1712 : }
     756             : 
     757             : /*
     758             :  * Per-heap-tuple callback for table_index_build_scan.
     759             :  *
     760             :  * Note we don't worry about the page range at the end of the table here; it is
     761             :  * present in the build state struct after we're called the last time, but not
     762             :  * inserted into the index.  Caller must ensure to do so, if appropriate.
     763             :  */
     764             : static void
     765      486562 : brinbuildCallback(Relation index,
     766             :                   ItemPointer tid,
     767             :                   Datum *values,
     768             :                   bool *isnull,
     769             :                   bool tupleIsAlive,
     770             :                   void *brstate)
     771             : {
     772      486562 :     BrinBuildState *state = (BrinBuildState *) brstate;
     773             :     BlockNumber thisblock;
     774             : 
     775      486562 :     thisblock = ItemPointerGetBlockNumber(tid);
     776             : 
     777             :     /*
     778             :      * If we're in a block that belongs to a future range, summarize what
     779             :      * we've got and start afresh.  Note the scan might have skipped many
     780             :      * pages, if they were devoid of live tuples; make sure to insert index
     781             :      * tuples for those too.
     782             :      */
     783      487828 :     while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
     784             :     {
     785             : 
     786             :         BRIN_elog((DEBUG2,
     787             :                    "brinbuildCallback: completed a range: %u--%u",
     788             :                    state->bs_currRangeStart,
     789             :                    state->bs_currRangeStart + state->bs_pagesPerRange));
     790             : 
     791             :         /* create the index tuple and insert it */
     792        1266 :         form_and_insert_tuple(state);
     793             : 
     794             :         /* set state to correspond to the next range */
     795        1266 :         state->bs_currRangeStart += state->bs_pagesPerRange;
     796             : 
     797             :         /* re-initialize state for it */
     798        1266 :         brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
     799             :     }
     800             : 
     801             :     /* Accumulate the current tuple into the running state */
     802      486562 :     (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
     803             :                                values, isnull);
     804      486562 : }
     805             : 
     806             : /*
     807             :  * brinbuild() -- build a new BRIN index.
     808             :  */
     809             : IndexBuildResult *
     810         162 : brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
     811             : {
     812             :     IndexBuildResult *result;
     813             :     double      reltuples;
     814             :     double      idxtuples;
     815             :     BrinRevmap *revmap;
     816             :     BrinBuildState *state;
     817             :     Buffer      meta;
     818             :     BlockNumber pagesPerRange;
     819             : 
     820             :     /*
     821             :      * We expect to be called exactly once for any index relation.
     822             :      */
     823         162 :     if (RelationGetNumberOfBlocks(index) != 0)
     824           0 :         elog(ERROR, "index \"%s\" already contains data",
     825             :              RelationGetRelationName(index));
     826             : 
     827             :     /*
     828             :      * Critical section not required, because on error the creation of the
     829             :      * whole relation will be rolled back.
     830             :      */
     831             : 
     832         162 :     meta = ReadBuffer(index, P_NEW);
     833             :     Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
     834         162 :     LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
     835             : 
     836         162 :     brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
     837             :                        BRIN_CURRENT_VERSION);
     838         162 :     MarkBufferDirty(meta);
     839             : 
     840         162 :     if (RelationNeedsWAL(index))
     841             :     {
     842             :         xl_brin_createidx xlrec;
     843             :         XLogRecPtr  recptr;
     844             :         Page        page;
     845             : 
     846         100 :         xlrec.version = BRIN_CURRENT_VERSION;
     847         100 :         xlrec.pagesPerRange = BrinGetPagesPerRange(index);
     848             : 
     849         100 :         XLogBeginInsert();
     850         100 :         XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
     851         100 :         XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
     852             : 
     853         100 :         recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
     854             : 
     855         100 :         page = BufferGetPage(meta);
     856         100 :         PageSetLSN(page, recptr);
     857             :     }
     858             : 
     859         162 :     UnlockReleaseBuffer(meta);
     860             : 
     861             :     /*
     862             :      * Initialize our state, including the deformed tuple state.
     863             :      */
     864         162 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
     865         162 :     state = initialize_brin_buildstate(index, revmap, pagesPerRange);
     866             : 
     867             :     /*
     868             :      * Now scan the relation.  No syncscan allowed here because we want the
     869             :      * heap blocks in physical order.
     870             :      */
     871         162 :     reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
     872             :                                        brinbuildCallback, (void *) state, NULL);
     873             : 
     874             :     /* process the final batch */
     875         162 :     form_and_insert_tuple(state);
     876             : 
     877             :     /* release resources */
     878         162 :     idxtuples = state->bs_numtuples;
     879         162 :     brinRevmapTerminate(state->bs_rmAccess);
     880         162 :     terminate_brin_buildstate(state);
     881             : 
     882             :     /*
     883             :      * Return statistics
     884             :      */
     885         162 :     result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
     886             : 
     887         162 :     result->heap_tuples = reltuples;
     888         162 :     result->index_tuples = idxtuples;
     889             : 
     890         162 :     return result;
     891             : }
     892             : 
     893             : void
     894           0 : brinbuildempty(Relation index)
     895             : {
     896             :     Buffer      metabuf;
     897             : 
     898             :     /* An empty BRIN index has a metapage only. */
     899             :     metabuf =
     900           0 :         ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
     901           0 :     LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
     902             : 
     903             :     /* Initialize and xlog metabuffer. */
     904           0 :     START_CRIT_SECTION();
     905           0 :     brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
     906             :                        BRIN_CURRENT_VERSION);
     907           0 :     MarkBufferDirty(metabuf);
     908           0 :     log_newpage_buffer(metabuf, true);
     909           0 :     END_CRIT_SECTION();
     910             : 
     911           0 :     UnlockReleaseBuffer(metabuf);
     912           0 : }
     913             : 
     914             : /*
     915             :  * brinbulkdelete
     916             :  *      Since there are no per-heap-tuple index tuples in BRIN indexes,
     917             :  *      there's not a lot we can do here.
     918             :  *
     919             :  * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
     920             :  * tuple is deleted), meaning the need to re-run summarization on the affected
     921             :  * range.  Would need to add an extra flag in brintuples for that.
     922             :  */
     923             : IndexBulkDeleteResult *
     924          16 : brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
     925             :                IndexBulkDeleteCallback callback, void *callback_state)
     926             : {
     927             :     /* allocate stats if first time through, else re-use existing struct */
     928          16 :     if (stats == NULL)
     929          16 :         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     930             : 
     931          16 :     return stats;
     932             : }
     933             : 
     934             : /*
     935             :  * This routine is in charge of "vacuuming" a BRIN index: we just summarize
     936             :  * ranges that are currently unsummarized.
     937             :  */
     938             : IndexBulkDeleteResult *
     939          88 : brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
     940             : {
     941             :     Relation    heapRel;
     942             : 
     943             :     /* No-op in ANALYZE ONLY mode */
     944          88 :     if (info->analyze_only)
     945           2 :         return stats;
     946             : 
     947          86 :     if (!stats)
     948          70 :         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     949          86 :     stats->num_pages = RelationGetNumberOfBlocks(info->index);
     950             :     /* rest of stats is initialized by zeroing */
     951             : 
     952          86 :     heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
     953             :                          AccessShareLock);
     954             : 
     955          86 :     brin_vacuum_scan(info->index, info->strategy);
     956             : 
     957          86 :     brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
     958             :                   &stats->num_index_tuples, &stats->num_index_tuples);
     959             : 
     960          86 :     table_close(heapRel, AccessShareLock);
     961             : 
     962          86 :     return stats;
     963             : }
     964             : 
     965             : /*
     966             :  * reloptions processor for BRIN indexes
     967             :  */
     968             : bytea *
     969         432 : brinoptions(Datum reloptions, bool validate)
     970             : {
     971             :     static const relopt_parse_elt tab[] = {
     972             :         {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
     973             :         {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
     974             :     };
     975             : 
     976         432 :     return (bytea *) build_reloptions(reloptions, validate,
     977             :                                       RELOPT_KIND_BRIN,
     978             :                                       sizeof(BrinOptions),
     979             :                                       tab, lengthof(tab));
     980             : }
     981             : 
     982             : /*
     983             :  * SQL-callable function to scan through an index and summarize all ranges
     984             :  * that are not currently summarized.
     985             :  */
     986             : Datum
     987          38 : brin_summarize_new_values(PG_FUNCTION_ARGS)
     988             : {
     989          38 :     Datum       relation = PG_GETARG_DATUM(0);
     990             : 
     991          38 :     return DirectFunctionCall2(brin_summarize_range,
     992             :                                relation,
     993             :                                Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
     994             : }
     995             : 
     996             : /*
     997             :  * SQL-callable function to summarize the indicated page range, if not already
     998             :  * summarized.  If the second argument is BRIN_ALL_BLOCKRANGES, all
     999             :  * unsummarized ranges are summarized.
    1000             :  */
    1001             : Datum
    1002         120 : brin_summarize_range(PG_FUNCTION_ARGS)
    1003             : {
    1004         120 :     Oid         indexoid = PG_GETARG_OID(0);
    1005         120 :     int64       heapBlk64 = PG_GETARG_INT64(1);
    1006             :     BlockNumber heapBlk;
    1007             :     Oid         heapoid;
    1008             :     Relation    indexRel;
    1009             :     Relation    heapRel;
    1010         120 :     double      numSummarized = 0;
    1011             : 
    1012         120 :     if (RecoveryInProgress())
    1013           0 :         ereport(ERROR,
    1014             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1015             :                  errmsg("recovery is in progress"),
    1016             :                  errhint("BRIN control functions cannot be executed during recovery.")));
    1017             : 
    1018         120 :     if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
    1019             :     {
    1020          24 :         char       *blk = psprintf(INT64_FORMAT, heapBlk64);
    1021             : 
    1022          24 :         ereport(ERROR,
    1023             :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
    1024             :                  errmsg("block number out of range: %s", blk)));
    1025             :     }
    1026          96 :     heapBlk = (BlockNumber) heapBlk64;
    1027             : 
    1028             :     /*
    1029             :      * We must lock table before index to avoid deadlocks.  However, if the
    1030             :      * passed indexoid isn't an index then IndexGetRelation() will fail.
    1031             :      * Rather than emitting a not-very-helpful error message, postpone
    1032             :      * complaining, expecting that the is-it-an-index test below will fail.
    1033             :      */
    1034          96 :     heapoid = IndexGetRelation(indexoid, true);
    1035          96 :     if (OidIsValid(heapoid))
    1036          84 :         heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
    1037             :     else
    1038          12 :         heapRel = NULL;
    1039             : 
    1040          96 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
    1041             : 
    1042             :     /* Must be a BRIN index */
    1043          84 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
    1044          84 :         indexRel->rd_rel->relam != BRIN_AM_OID)
    1045          12 :         ereport(ERROR,
    1046             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1047             :                  errmsg("\"%s\" is not a BRIN index",
    1048             :                         RelationGetRelationName(indexRel))));
    1049             : 
    1050             :     /* User must own the index (comparable to privileges needed for VACUUM) */
    1051          72 :     if (!pg_class_ownercheck(indexoid, GetUserId()))
    1052           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
    1053           0 :                        RelationGetRelationName(indexRel));
    1054             : 
    1055             :     /*
    1056             :      * Since we did the IndexGetRelation call above without any lock, it's
    1057             :      * barely possible that a race against an index drop/recreation could have
    1058             :      * netted us the wrong table.  Recheck.
    1059             :      */
    1060          72 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
    1061           0 :         ereport(ERROR,
    1062             :                 (errcode(ERRCODE_UNDEFINED_TABLE),
    1063             :                  errmsg("could not open parent table of index \"%s\"",
    1064             :                         RelationGetRelationName(indexRel))));
    1065             : 
    1066             :     /* OK, do it */
    1067          72 :     brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
    1068             : 
    1069          72 :     relation_close(indexRel, ShareUpdateExclusiveLock);
    1070          72 :     relation_close(heapRel, ShareUpdateExclusiveLock);
    1071             : 
    1072          72 :     PG_RETURN_INT32((int32) numSummarized);
    1073             : }
    1074             : 
    1075             : /*
    1076             :  * SQL-callable interface to mark a range as no longer summarized
    1077             :  */
    1078             : Datum
    1079          64 : brin_desummarize_range(PG_FUNCTION_ARGS)
    1080             : {
    1081          64 :     Oid         indexoid = PG_GETARG_OID(0);
    1082          64 :     int64       heapBlk64 = PG_GETARG_INT64(1);
    1083             :     BlockNumber heapBlk;
    1084             :     Oid         heapoid;
    1085             :     Relation    heapRel;
    1086             :     Relation    indexRel;
    1087             :     bool        done;
    1088             : 
    1089          64 :     if (RecoveryInProgress())
    1090           0 :         ereport(ERROR,
    1091             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1092             :                  errmsg("recovery is in progress"),
    1093             :                  errhint("BRIN control functions cannot be executed during recovery.")));
    1094             : 
    1095          64 :     if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
    1096             :     {
    1097          12 :         char       *blk = psprintf(INT64_FORMAT, heapBlk64);
    1098             : 
    1099          12 :         ereport(ERROR,
    1100             :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
    1101             :                  errmsg("block number out of range: %s", blk)));
    1102             :     }
    1103          52 :     heapBlk = (BlockNumber) heapBlk64;
    1104             : 
    1105             :     /*
    1106             :      * We must lock table before index to avoid deadlocks.  However, if the
    1107             :      * passed indexoid isn't an index then IndexGetRelation() will fail.
    1108             :      * Rather than emitting a not-very-helpful error message, postpone
    1109             :      * complaining, expecting that the is-it-an-index test below will fail.
    1110             :      */
    1111          52 :     heapoid = IndexGetRelation(indexoid, true);
    1112          52 :     if (OidIsValid(heapoid))
    1113          52 :         heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
    1114             :     else
    1115           0 :         heapRel = NULL;
    1116             : 
    1117          52 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
    1118             : 
    1119             :     /* Must be a BRIN index */
    1120          52 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
    1121          52 :         indexRel->rd_rel->relam != BRIN_AM_OID)
    1122           0 :         ereport(ERROR,
    1123             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1124             :                  errmsg("\"%s\" is not a BRIN index",
    1125             :                         RelationGetRelationName(indexRel))));
    1126             : 
    1127             :     /* User must own the index (comparable to privileges needed for VACUUM) */
    1128          52 :     if (!pg_class_ownercheck(indexoid, GetUserId()))
    1129           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
    1130           0 :                        RelationGetRelationName(indexRel));
    1131             : 
    1132             :     /*
    1133             :      * Since we did the IndexGetRelation call above without any lock, it's
    1134             :      * barely possible that a race against an index drop/recreation could have
    1135             :      * netted us the wrong table.  Recheck.
    1136             :      */
    1137          52 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
    1138           0 :         ereport(ERROR,
    1139             :                 (errcode(ERRCODE_UNDEFINED_TABLE),
    1140             :                  errmsg("could not open parent table of index \"%s\"",
    1141             :                         RelationGetRelationName(indexRel))));
    1142             : 
    1143             :     /* the revmap does the hard work */
    1144             :     do
    1145             :     {
    1146          52 :         done = brinRevmapDesummarizeRange(indexRel, heapBlk);
    1147             :     }
    1148          52 :     while (!done);
    1149             : 
    1150          52 :     relation_close(indexRel, ShareUpdateExclusiveLock);
    1151          52 :     relation_close(heapRel, ShareUpdateExclusiveLock);
    1152             : 
    1153          52 :     PG_RETURN_VOID();
    1154             : }
    1155             : 
    1156             : /*
    1157             :  * Build a BrinDesc used to create or scan a BRIN index
    1158             :  */
    1159             : BrinDesc *
    1160        2602 : brin_build_desc(Relation rel)
    1161             : {
    1162             :     BrinOpcInfo **opcinfo;
    1163             :     BrinDesc   *bdesc;
    1164             :     TupleDesc   tupdesc;
    1165        2602 :     int         totalstored = 0;
    1166             :     int         keyno;
    1167             :     long        totalsize;
    1168             :     MemoryContext cxt;
    1169             :     MemoryContext oldcxt;
    1170             : 
    1171        2602 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
    1172             :                                 "brin desc cxt",
    1173             :                                 ALLOCSET_SMALL_SIZES);
    1174        2602 :     oldcxt = MemoryContextSwitchTo(cxt);
    1175        2602 :     tupdesc = RelationGetDescr(rel);
    1176             : 
    1177             :     /*
    1178             :      * Obtain BrinOpcInfo for each indexed column.  While at it, accumulate
    1179             :      * the number of columns stored, since the number is opclass-defined.
    1180             :      */
    1181        2602 :     opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
    1182       49476 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
    1183             :     {
    1184             :         FmgrInfo   *opcInfoFn;
    1185       46874 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
    1186             : 
    1187       46874 :         opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
    1188             : 
    1189       46874 :         opcinfo[keyno] = (BrinOpcInfo *)
    1190       46874 :             DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
    1191       46874 :         totalstored += opcinfo[keyno]->oi_nstored;
    1192             :     }
    1193             : 
    1194             :     /* Allocate our result struct and fill it in */
    1195        2602 :     totalsize = offsetof(BrinDesc, bd_info) +
    1196        2602 :         sizeof(BrinOpcInfo *) * tupdesc->natts;
    1197             : 
    1198        2602 :     bdesc = palloc(totalsize);
    1199        2602 :     bdesc->bd_context = cxt;
    1200        2602 :     bdesc->bd_index = rel;
    1201        2602 :     bdesc->bd_tupdesc = tupdesc;
    1202        2602 :     bdesc->bd_disktdesc = NULL; /* generated lazily */
    1203        2602 :     bdesc->bd_totalstored = totalstored;
    1204             : 
    1205       49476 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
    1206       46874 :         bdesc->bd_info[keyno] = opcinfo[keyno];
    1207        2602 :     pfree(opcinfo);
    1208             : 
    1209        2602 :     MemoryContextSwitchTo(oldcxt);
    1210             : 
    1211        2602 :     return bdesc;
    1212             : }
    1213             : 
    1214             : void
    1215        1940 : brin_free_desc(BrinDesc *bdesc)
    1216             : {
    1217             :     /* make sure the tupdesc is still valid */
    1218             :     Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
    1219             :     /* no need for retail pfree */
    1220        1940 :     MemoryContextDelete(bdesc->bd_context);
    1221        1940 : }
    1222             : 
    1223             : /*
    1224             :  * Fetch index's statistical data into *stats
    1225             :  */
    1226             : void
    1227        6880 : brinGetStats(Relation index, BrinStatsData *stats)
    1228             : {
    1229             :     Buffer      metabuffer;
    1230             :     Page        metapage;
    1231             :     BrinMetaPageData *metadata;
    1232             : 
    1233        6880 :     metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
    1234        6880 :     LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
    1235        6880 :     metapage = BufferGetPage(metabuffer);
    1236        6880 :     metadata = (BrinMetaPageData *) PageGetContents(metapage);
    1237             : 
    1238        6880 :     stats->pagesPerRange = metadata->pagesPerRange;
    1239        6880 :     stats->revmapNumPages = metadata->lastRevmapPage - 1;
    1240             : 
    1241        6880 :     UnlockReleaseBuffer(metabuffer);
    1242        6880 : }
    1243             : 
    1244             : /*
    1245             :  * Initialize a BrinBuildState appropriate to create tuples on the given index.
    1246             :  */
    1247             : static BrinBuildState *
    1248         208 : initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
    1249             :                            BlockNumber pagesPerRange)
    1250             : {
    1251             :     BrinBuildState *state;
    1252             : 
    1253         208 :     state = palloc(sizeof(BrinBuildState));
    1254             : 
    1255         208 :     state->bs_irel = idxRel;
    1256         208 :     state->bs_numtuples = 0;
    1257         208 :     state->bs_currentInsertBuf = InvalidBuffer;
    1258         208 :     state->bs_pagesPerRange = pagesPerRange;
    1259         208 :     state->bs_currRangeStart = 0;
    1260         208 :     state->bs_rmAccess = revmap;
    1261         208 :     state->bs_bdesc = brin_build_desc(idxRel);
    1262         208 :     state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
    1263             : 
    1264         208 :     brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
    1265             : 
    1266         208 :     return state;
    1267             : }
    1268             : 
    1269             : /*
    1270             :  * Release resources associated with a BrinBuildState.
    1271             :  */
    1272             : static void
    1273         208 : terminate_brin_buildstate(BrinBuildState *state)
    1274             : {
    1275             :     /*
    1276             :      * Release the last index buffer used.  We might as well ensure that
    1277             :      * whatever free space remains in that page is available in FSM, too.
    1278             :      */
    1279         208 :     if (!BufferIsInvalid(state->bs_currentInsertBuf))
    1280             :     {
    1281             :         Page        page;
    1282             :         Size        freespace;
    1283             :         BlockNumber blk;
    1284             : 
    1285         162 :         page = BufferGetPage(state->bs_currentInsertBuf);
    1286         162 :         freespace = PageGetFreeSpace(page);
    1287         162 :         blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
    1288         162 :         ReleaseBuffer(state->bs_currentInsertBuf);
    1289         162 :         RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
    1290         162 :         FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
    1291             :     }
    1292             : 
    1293         208 :     brin_free_desc(state->bs_bdesc);
    1294         208 :     pfree(state->bs_dtuple);
    1295         208 :     pfree(state);
    1296         208 : }
    1297             : 
    1298             : /*
    1299             :  * On the given BRIN index, summarize the heap page range that corresponds
    1300             :  * to the heap block number given.
    1301             :  *
    1302             :  * This routine can run in parallel with insertions into the heap.  To avoid
    1303             :  * missing those values from the summary tuple, we first insert a placeholder
    1304             :  * index tuple into the index, then execute the heap scan; transactions
    1305             :  * concurrent with the scan update the placeholder tuple.  After the scan, we
    1306             :  * union the placeholder tuple with the one computed by this routine.  The
    1307             :  * update of the index value happens in a loop, so that if somebody updates
    1308             :  * the placeholder tuple after we read it, we detect the case and try again.
    1309             :  * This ensures that the concurrently inserted tuples are not lost.
    1310             :  *
    1311             :  * A further corner case is this routine being asked to summarize the partial
    1312             :  * range at the end of the table.  heapNumBlocks is the (possibly outdated)
    1313             :  * table size; if we notice that the requested range lies beyond that size,
    1314             :  * we re-compute the table size after inserting the placeholder tuple, to
    1315             :  * avoid missing pages that were appended recently.
    1316             :  */
    1317             : static void
    1318          94 : summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
    1319             :                 BlockNumber heapBlk, BlockNumber heapNumBlks)
    1320             : {
    1321             :     Buffer      phbuf;
    1322             :     BrinTuple  *phtup;
    1323             :     Size        phsz;
    1324             :     OffsetNumber offset;
    1325             :     BlockNumber scanNumBlks;
    1326             : 
    1327             :     /*
    1328             :      * Insert the placeholder tuple
    1329             :      */
    1330          94 :     phbuf = InvalidBuffer;
    1331          94 :     phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
    1332          94 :     offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
    1333             :                            state->bs_rmAccess, &phbuf,
    1334             :                            heapBlk, phtup, phsz);
    1335             : 
    1336             :     /*
    1337             :      * Compute range end.  We hold ShareUpdateExclusive lock on table, so it
    1338             :      * cannot shrink concurrently (but it can grow).
    1339             :      */
    1340             :     Assert(heapBlk % state->bs_pagesPerRange == 0);
    1341          94 :     if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
    1342             :     {
    1343             :         /*
    1344             :          * If we're asked to scan what we believe to be the final range on the
    1345             :          * table (i.e. a range that might be partial) we need to recompute our
    1346             :          * idea of what the latest page is after inserting the placeholder
    1347             :          * tuple.  Anyone that grows the table later will update the
    1348             :          * placeholder tuple, so it doesn't matter that we won't scan these
    1349             :          * pages ourselves.  Careful: the table might have been extended
    1350             :          * beyond the current range, so clamp our result.
    1351             :          *
    1352             :          * Fortunately, this should occur infrequently.
    1353             :          */
    1354          12 :         scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
    1355             :                           state->bs_pagesPerRange);
    1356             :     }
    1357             :     else
    1358             :     {
    1359             :         /* Easy case: range is known to be complete */
    1360          82 :         scanNumBlks = state->bs_pagesPerRange;
    1361             :     }
    1362             : 
    1363             :     /*
    1364             :      * Execute the partial heap scan covering the heap blocks in the specified
    1365             :      * page range, summarizing the heap tuples in it.  This scan stops just
    1366             :      * short of brinbuildCallback creating the new index entry.
    1367             :      *
    1368             :      * Note that it is critical we use the "any visible" mode of
    1369             :      * table_index_build_range_scan here: otherwise, we would miss tuples
    1370             :      * inserted by transactions that are still in progress, among other corner
    1371             :      * cases.
    1372             :      */
    1373          94 :     state->bs_currRangeStart = heapBlk;
    1374          94 :     table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
    1375             :                                  heapBlk, scanNumBlks,
    1376             :                                  brinbuildCallback, (void *) state, NULL);
    1377             : 
    1378             :     /*
    1379             :      * Now we update the values obtained by the scan with the placeholder
    1380             :      * tuple.  We do this in a loop which only terminates if we're able to
    1381             :      * update the placeholder tuple successfully; if we are not, this means
    1382             :      * somebody else modified the placeholder tuple after we read it.
    1383             :      */
    1384             :     for (;;)
    1385           0 :     {
    1386             :         BrinTuple  *newtup;
    1387             :         Size        newsize;
    1388             :         bool        didupdate;
    1389             :         bool        samepage;
    1390             : 
    1391          94 :         CHECK_FOR_INTERRUPTS();
    1392             : 
    1393             :         /*
    1394             :          * Update the summary tuple and try to update.
    1395             :          */
    1396          94 :         newtup = brin_form_tuple(state->bs_bdesc,
    1397             :                                  heapBlk, state->bs_dtuple, &newsize);
    1398          94 :         samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
    1399             :         didupdate =
    1400          94 :             brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
    1401             :                           state->bs_rmAccess, heapBlk, phbuf, offset,
    1402             :                           phtup, phsz, newtup, newsize, samepage);
    1403          94 :         brin_free_tuple(phtup);
    1404          94 :         brin_free_tuple(newtup);
    1405             : 
    1406             :         /* If the update succeeded, we're done. */
    1407          94 :         if (didupdate)
    1408          94 :             break;
    1409             : 
    1410             :         /*
    1411             :          * If the update didn't work, it might be because somebody updated the
    1412             :          * placeholder tuple concurrently.  Extract the new version, union it
    1413             :          * with the values we have from the scan, and start over.  (There are
    1414             :          * other reasons for the update to fail, but it's simple to treat them
    1415             :          * the same.)
    1416             :          */
    1417           0 :         phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
    1418             :                                          &offset, &phsz, BUFFER_LOCK_SHARE,
    1419             :                                          NULL);
    1420             :         /* the placeholder tuple must exist */
    1421           0 :         if (phtup == NULL)
    1422           0 :             elog(ERROR, "missing placeholder tuple");
    1423           0 :         phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
    1424           0 :         LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
    1425             : 
    1426             :         /* merge it into the tuple from the heap scan */
    1427           0 :         union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
    1428             :     }
    1429             : 
    1430          94 :     ReleaseBuffer(phbuf);
    1431          94 : }
    1432             : 
    1433             : /*
    1434             :  * Summarize page ranges that are not already summarized.  If pageRange is
    1435             :  * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
    1436             :  * page range containing the given heap page number is scanned.
    1437             :  * If include_partial is true, then the partial range at the end of the table
    1438             :  * is summarized, otherwise not.
    1439             :  *
    1440             :  * For each new index tuple inserted, *numSummarized (if not NULL) is
    1441             :  * incremented; for each existing tuple, *numExisting (if not NULL) is
    1442             :  * incremented.
    1443             :  */
    1444             : static void
    1445         158 : brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
    1446             :               bool include_partial, double *numSummarized, double *numExisting)
    1447             : {
    1448             :     BrinRevmap *revmap;
    1449         158 :     BrinBuildState *state = NULL;
    1450         158 :     IndexInfo  *indexInfo = NULL;
    1451             :     BlockNumber heapNumBlocks;
    1452             :     BlockNumber pagesPerRange;
    1453             :     Buffer      buf;
    1454             :     BlockNumber startBlk;
    1455             : 
    1456         158 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
    1457             : 
    1458             :     /* determine range of pages to process */
    1459         158 :     heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
    1460         158 :     if (pageRange == BRIN_ALL_BLOCKRANGES)
    1461         112 :         startBlk = 0;
    1462             :     else
    1463             :     {
    1464          46 :         startBlk = (pageRange / pagesPerRange) * pagesPerRange;
    1465          46 :         heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
    1466             :     }
    1467         158 :     if (startBlk > heapNumBlocks)
    1468             :     {
    1469             :         /* Nothing to do if start point is beyond end of table */
    1470           0 :         brinRevmapTerminate(revmap);
    1471           0 :         return;
    1472             :     }
    1473             : 
    1474             :     /*
    1475             :      * Scan the revmap to find unsummarized items.
    1476             :      */
    1477         158 :     buf = InvalidBuffer;
    1478        3248 :     for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
    1479             :     {
    1480             :         BrinTuple  *tup;
    1481             :         OffsetNumber off;
    1482             : 
    1483             :         /*
    1484             :          * Unless requested to summarize even a partial range, go away now if
    1485             :          * we think the next range is partial.  Caller would pass true when it
    1486             :          * is typically run once bulk data loading is done
    1487             :          * (brin_summarize_new_values), and false when it is typically the
    1488             :          * result of arbitrarily-scheduled maintenance command (vacuuming).
    1489             :          */
    1490        3154 :         if (!include_partial &&
    1491        2288 :             (startBlk + pagesPerRange > heapNumBlocks))
    1492          64 :             break;
    1493             : 
    1494        3090 :         CHECK_FOR_INTERRUPTS();
    1495             : 
    1496        3090 :         tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
    1497             :                                        BUFFER_LOCK_SHARE, NULL);
    1498        3090 :         if (tup == NULL)
    1499             :         {
    1500             :             /* no revmap entry for this heap range. Summarize it. */
    1501          94 :             if (state == NULL)
    1502             :             {
    1503             :                 /* first time through */
    1504             :                 Assert(!indexInfo);
    1505          46 :                 state = initialize_brin_buildstate(index, revmap,
    1506             :                                                    pagesPerRange);
    1507          46 :                 indexInfo = BuildIndexInfo(index);
    1508             :             }
    1509          94 :             summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
    1510             : 
    1511             :             /* and re-initialize state for the next range */
    1512          94 :             brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
    1513             : 
    1514          94 :             if (numSummarized)
    1515          94 :                 *numSummarized += 1.0;
    1516             :         }
    1517             :         else
    1518             :         {
    1519        2996 :             if (numExisting)
    1520        2154 :                 *numExisting += 1.0;
    1521        2996 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
    1522             :         }
    1523             :     }
    1524             : 
    1525         158 :     if (BufferIsValid(buf))
    1526         120 :         ReleaseBuffer(buf);
    1527             : 
    1528             :     /* free resources */
    1529         158 :     brinRevmapTerminate(revmap);
    1530         158 :     if (state)
    1531             :     {
    1532          46 :         terminate_brin_buildstate(state);
    1533          46 :         pfree(indexInfo);
    1534             :     }
    1535             : }
    1536             : 
    1537             : /*
    1538             :  * Given a deformed tuple in the build state, convert it into the on-disk
    1539             :  * format and insert it into the index, making the revmap point to it.
    1540             :  */
    1541             : static void
    1542        1428 : form_and_insert_tuple(BrinBuildState *state)
    1543             : {
    1544             :     BrinTuple  *tup;
    1545             :     Size        size;
    1546             : 
    1547        1428 :     tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
    1548             :                           state->bs_dtuple, &size);
    1549        1428 :     brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
    1550             :                   &state->bs_currentInsertBuf, state->bs_currRangeStart,
    1551             :                   tup, size);
    1552        1428 :     state->bs_numtuples++;
    1553             : 
    1554        1428 :     pfree(tup);
    1555        1428 : }
    1556             : 
    1557             : /*
    1558             :  * Given two deformed tuples, adjust the first one so that it's consistent
    1559             :  * with the summary values in both.
    1560             :  */
    1561             : static void
    1562           0 : union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
    1563             : {
    1564             :     int         keyno;
    1565             :     BrinMemTuple *db;
    1566             :     MemoryContext cxt;
    1567             :     MemoryContext oldcxt;
    1568             : 
    1569             :     /* Use our own memory context to avoid retail pfree */
    1570           0 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
    1571             :                                 "brin union",
    1572             :                                 ALLOCSET_DEFAULT_SIZES);
    1573           0 :     oldcxt = MemoryContextSwitchTo(cxt);
    1574           0 :     db = brin_deform_tuple(bdesc, b, NULL);
    1575           0 :     MemoryContextSwitchTo(oldcxt);
    1576             : 
    1577           0 :     for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
    1578             :     {
    1579             :         FmgrInfo   *unionFn;
    1580           0 :         BrinValues *col_a = &a->bt_columns[keyno];
    1581           0 :         BrinValues *col_b = &db->bt_columns[keyno];
    1582           0 :         BrinOpcInfo *opcinfo = bdesc->bd_info[keyno];
    1583             : 
    1584           0 :         if (opcinfo->oi_regular_nulls)
    1585             :         {
    1586             :             /* Adjust "hasnulls". */
    1587           0 :             if (!col_a->bv_hasnulls && col_b->bv_hasnulls)
    1588           0 :                 col_a->bv_hasnulls = true;
    1589             : 
    1590             :             /* If there are no values in B, there's nothing left to do. */
    1591           0 :             if (col_b->bv_allnulls)
    1592           0 :                 continue;
    1593             : 
    1594             :             /*
    1595             :              * Adjust "allnulls".  If A doesn't have values, just copy the
    1596             :              * values from B into A, and we're done.  We cannot run the
    1597             :              * operators in this case, because values in A might contain
    1598             :              * garbage.  Note we already established that B contains values.
    1599             :              */
    1600           0 :             if (col_a->bv_allnulls)
    1601             :             {
    1602             :                 int         i;
    1603             : 
    1604           0 :                 col_a->bv_allnulls = false;
    1605             : 
    1606           0 :                 for (i = 0; i < opcinfo->oi_nstored; i++)
    1607           0 :                     col_a->bv_values[i] =
    1608           0 :                         datumCopy(col_b->bv_values[i],
    1609           0 :                                   opcinfo->oi_typcache[i]->typbyval,
    1610           0 :                                   opcinfo->oi_typcache[i]->typlen);
    1611             : 
    1612           0 :                 continue;
    1613             :             }
    1614             :         }
    1615             : 
    1616           0 :         unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
    1617             :                                     BRIN_PROCNUM_UNION);
    1618           0 :         FunctionCall3Coll(unionFn,
    1619           0 :                           bdesc->bd_index->rd_indcollation[keyno],
    1620             :                           PointerGetDatum(bdesc),
    1621             :                           PointerGetDatum(col_a),
    1622             :                           PointerGetDatum(col_b));
    1623             :     }
    1624             : 
    1625           0 :     MemoryContextDelete(cxt);
    1626           0 : }
    1627             : 
    1628             : /*
    1629             :  * brin_vacuum_scan
    1630             :  *      Do a complete scan of the index during VACUUM.
    1631             :  *
    1632             :  * This routine scans the complete index looking for uncatalogued index pages,
    1633             :  * i.e. those that might have been lost due to a crash after index extension
    1634             :  * and such.
    1635             :  */
    1636             : static void
    1637          86 : brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
    1638             : {
    1639             :     BlockNumber nblocks;
    1640             :     BlockNumber blkno;
    1641             : 
    1642             :     /*
    1643             :      * Scan the index in physical order, and clean up any possible mess in
    1644             :      * each page.
    1645             :      */
    1646          86 :     nblocks = RelationGetNumberOfBlocks(idxrel);
    1647         464 :     for (blkno = 0; blkno < nblocks; blkno++)
    1648             :     {
    1649             :         Buffer      buf;
    1650             : 
    1651         378 :         CHECK_FOR_INTERRUPTS();
    1652             : 
    1653         378 :         buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
    1654             :                                  RBM_NORMAL, strategy);
    1655             : 
    1656         378 :         brin_page_cleanup(idxrel, buf);
    1657             : 
    1658         378 :         ReleaseBuffer(buf);
    1659             :     }
    1660             : 
    1661             :     /*
    1662             :      * Update all upper pages in the index's FSM, as well.  This ensures not
    1663             :      * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
    1664             :      * but also that any pre-existing damage or out-of-dateness is repaired.
    1665             :      */
    1666          86 :     FreeSpaceMapVacuum(idxrel);
    1667          86 : }
    1668             : 
    1669             : static bool
    1670      497142 : add_values_to_range(Relation idxRel, BrinDesc *bdesc, BrinMemTuple *dtup,
    1671             :                     Datum *values, bool *nulls)
    1672             : {
    1673             :     int         keyno;
    1674      497142 :     bool        modified = false;
    1675             : 
    1676             :     /*
    1677             :      * Compare the key values of the new tuple to the stored index values; our
    1678             :      * deformed tuple will get updated if the new tuple doesn't fit the
    1679             :      * original range (note this means we can't break out of the loop early).
    1680             :      * Make a note of whether this happens, so that we know to insert the
    1681             :      * modified tuple later.
    1682             :      */
    1683     1093516 :     for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
    1684             :     {
    1685             :         Datum       result;
    1686             :         BrinValues *bval;
    1687             :         FmgrInfo   *addValue;
    1688             : 
    1689      596374 :         bval = &dtup->bt_columns[keyno];
    1690             : 
    1691      596374 :         if (bdesc->bd_info[keyno]->oi_regular_nulls && nulls[keyno])
    1692             :         {
    1693             :             /*
    1694             :              * If the new value is null, we record that we saw it if it's the
    1695             :              * first one; otherwise, there's nothing to do.
    1696             :              */
    1697        9288 :             if (!bval->bv_hasnulls)
    1698             :             {
    1699        2304 :                 bval->bv_hasnulls = true;
    1700        2304 :                 modified = true;
    1701             :             }
    1702             : 
    1703        9288 :             continue;
    1704             :         }
    1705             : 
    1706      587086 :         addValue = index_getprocinfo(idxRel, keyno + 1,
    1707             :                                      BRIN_PROCNUM_ADDVALUE);
    1708     1761258 :         result = FunctionCall4Coll(addValue,
    1709      587086 :                                    idxRel->rd_indcollation[keyno],
    1710             :                                    PointerGetDatum(bdesc),
    1711             :                                    PointerGetDatum(bval),
    1712      587086 :                                    values[keyno],
    1713      587086 :                                    nulls[keyno]);
    1714             :         /* if that returned true, we need to insert the updated tuple */
    1715      587086 :         modified |= DatumGetBool(result);
    1716             :     }
    1717             : 
    1718      497142 :     return modified;
    1719             : }
    1720             : 
    1721             : static bool
    1722      125152 : check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
    1723             : {
    1724             :     int         keyno;
    1725             : 
    1726             :     /*
    1727             :      * First check if there are any IS [NOT] NULL scan keys, and if we're
    1728             :      * violating them.
    1729             :      */
    1730      125976 :     for (keyno = 0; keyno < nnullkeys; keyno++)
    1731             :     {
    1732        1488 :         ScanKey     key = nullkeys[keyno];
    1733             : 
    1734             :         Assert(key->sk_attno == bval->bv_attno);
    1735             : 
    1736             :         /* Handle only IS NULL/IS NOT NULL tests */
    1737        1488 :         if (!(key->sk_flags & SK_ISNULL))
    1738           0 :             continue;
    1739             : 
    1740        1488 :         if (key->sk_flags & SK_SEARCHNULL)
    1741             :         {
    1742             :             /* IS NULL scan key, but range has no NULLs */
    1743         744 :             if (!bval->bv_allnulls && !bval->bv_hasnulls)
    1744         652 :                 return false;
    1745             :         }
    1746         744 :         else if (key->sk_flags & SK_SEARCHNOTNULL)
    1747             :         {
    1748             :             /*
    1749             :              * For IS NOT NULL, we can only skip ranges that are known to have
    1750             :              * only nulls.
    1751             :              */
    1752         744 :             if (bval->bv_allnulls)
    1753          12 :                 return false;
    1754             :         }
    1755             :         else
    1756             :         {
    1757             :             /*
    1758             :              * Neither IS NULL nor IS NOT NULL was used; assume all indexable
    1759             :              * operators are strict and thus return false with NULL value in
    1760             :              * the scan key.
    1761             :              */
    1762           0 :             return false;
    1763             :         }
    1764             :     }
    1765             : 
    1766      124488 :     return true;
    1767             : }

Generated by: LCOV version 1.13