Line data Source code
1 : /*
2 : * brin.c
3 : * Implementation of BRIN indexes for Postgres
4 : *
5 : * See src/backend/access/brin/README for details.
6 : *
7 : * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/brin/brin.c
12 : *
13 : * TODO
14 : * * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/brin.h"
19 : #include "access/brin_page.h"
20 : #include "access/brin_pageops.h"
21 : #include "access/brin_xlog.h"
22 : #include "access/relation.h"
23 : #include "access/reloptions.h"
24 : #include "access/relscan.h"
25 : #include "access/table.h"
26 : #include "access/tableam.h"
27 : #include "access/xloginsert.h"
28 : #include "catalog/index.h"
29 : #include "catalog/pg_am.h"
30 : #include "commands/vacuum.h"
31 : #include "miscadmin.h"
32 : #include "pgstat.h"
33 : #include "postmaster/autovacuum.h"
34 : #include "storage/bufmgr.h"
35 : #include "storage/freespace.h"
36 : #include "utils/acl.h"
37 : #include "utils/builtins.h"
38 : #include "utils/index_selfuncs.h"
39 : #include "utils/memutils.h"
40 : #include "utils/rel.h"
41 :
42 :
43 : /*
44 : * We use a BrinBuildState during initial construction of a BRIN index.
45 : * The running state is kept in a BrinMemTuple.
46 : */
47 : typedef struct BrinBuildState
48 : {
49 : Relation bs_irel;
50 : int bs_numtuples;
51 : Buffer bs_currentInsertBuf;
52 : BlockNumber bs_pagesPerRange;
53 : BlockNumber bs_currRangeStart;
54 : BrinRevmap *bs_rmAccess;
55 : BrinDesc *bs_bdesc;
56 : BrinMemTuple *bs_dtuple;
57 : } BrinBuildState;
58 :
59 : /*
60 : * Struct used as "opaque" during index scans
61 : */
62 : typedef struct BrinOpaque
63 : {
64 : BlockNumber bo_pagesPerRange;
65 : BrinRevmap *bo_rmAccess;
66 : BrinDesc *bo_bdesc;
67 : } BrinOpaque;
68 :
69 : #define BRIN_ALL_BLOCKRANGES InvalidBlockNumber
70 :
71 : static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
72 : BrinRevmap *revmap, BlockNumber pagesPerRange);
73 : static void terminate_brin_buildstate(BrinBuildState *state);
74 : static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
75 : bool include_partial, double *numSummarized, double *numExisting);
76 : static void form_and_insert_tuple(BrinBuildState *state);
77 : static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
78 : BrinTuple *b);
79 : static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
80 :
81 :
82 : /*
83 : * BRIN handler function: return IndexAmRoutine with access method parameters
84 : * and callbacks.
85 : */
86 : Datum
87 468 : brinhandler(PG_FUNCTION_ARGS)
88 : {
89 468 : IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
90 :
91 468 : amroutine->amstrategies = 0;
92 468 : amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
93 468 : amroutine->amoptsprocnum = BRIN_PROCNUM_OPTIONS;
94 468 : amroutine->amcanorder = false;
95 468 : amroutine->amcanorderbyop = false;
96 468 : amroutine->amcanbackward = false;
97 468 : amroutine->amcanunique = false;
98 468 : amroutine->amcanmulticol = true;
99 468 : amroutine->amoptionalkey = true;
100 468 : amroutine->amsearcharray = false;
101 468 : amroutine->amsearchnulls = true;
102 468 : amroutine->amstorage = true;
103 468 : amroutine->amclusterable = false;
104 468 : amroutine->ampredlocks = false;
105 468 : amroutine->amcanparallel = false;
106 468 : amroutine->amcaninclude = false;
107 468 : amroutine->amusemaintenanceworkmem = false;
108 468 : amroutine->amparallelvacuumoptions =
109 : VACUUM_OPTION_PARALLEL_CLEANUP;
110 468 : amroutine->amkeytype = InvalidOid;
111 :
112 468 : amroutine->ambuild = brinbuild;
113 468 : amroutine->ambuildempty = brinbuildempty;
114 468 : amroutine->aminsert = brininsert;
115 468 : amroutine->ambulkdelete = brinbulkdelete;
116 468 : amroutine->amvacuumcleanup = brinvacuumcleanup;
117 468 : amroutine->amcanreturn = NULL;
118 468 : amroutine->amcostestimate = brincostestimate;
119 468 : amroutine->amoptions = brinoptions;
120 468 : amroutine->amproperty = NULL;
121 468 : amroutine->ambuildphasename = NULL;
122 468 : amroutine->amvalidate = brinvalidate;
123 468 : amroutine->amadjustmembers = NULL;
124 468 : amroutine->ambeginscan = brinbeginscan;
125 468 : amroutine->amrescan = brinrescan;
126 468 : amroutine->amgettuple = NULL;
127 468 : amroutine->amgetbitmap = bringetbitmap;
128 468 : amroutine->amendscan = brinendscan;
129 468 : amroutine->ammarkpos = NULL;
130 468 : amroutine->amrestrpos = NULL;
131 468 : amroutine->amestimateparallelscan = NULL;
132 468 : amroutine->aminitparallelscan = NULL;
133 468 : amroutine->amparallelrescan = NULL;
134 :
135 468 : PG_RETURN_POINTER(amroutine);
136 : }
137 :
138 : /*
139 : * A tuple in the heap is being inserted. To keep a brin index up to date,
140 : * we need to obtain the relevant index tuple and compare its stored values
141 : * with those of the new tuple. If the tuple values are not consistent with
142 : * the summary tuple, we need to update the index tuple.
143 : *
144 : * If autosummarization is enabled, check if we need to summarize the previous
145 : * page range.
146 : *
147 : * If the range is not currently summarized (i.e. the revmap returns NULL for
148 : * it), there's nothing to do for this tuple.
149 : */
150 : bool
151 9156 : brininsert(Relation idxRel, Datum *values, bool *nulls,
152 : ItemPointer heaptid, Relation heapRel,
153 : IndexUniqueCheck checkUnique,
154 : bool indexUnchanged,
155 : IndexInfo *indexInfo)
156 : {
157 : BlockNumber pagesPerRange;
158 : BlockNumber origHeapBlk;
159 : BlockNumber heapBlk;
160 9156 : BrinDesc *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
161 : BrinRevmap *revmap;
162 9156 : Buffer buf = InvalidBuffer;
163 9156 : MemoryContext tupcxt = NULL;
164 9156 : MemoryContext oldcxt = CurrentMemoryContext;
165 9156 : bool autosummarize = BrinGetAutoSummarize(idxRel);
166 :
167 9156 : revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
168 :
169 : /*
170 : * origHeapBlk is the block number where the insertion occurred. heapBlk
171 : * is the first block in the corresponding page range.
172 : */
173 9156 : origHeapBlk = ItemPointerGetBlockNumber(heaptid);
174 9156 : heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
175 :
176 : for (;;)
177 0 : {
178 9156 : bool need_insert = false;
179 : OffsetNumber off;
180 : BrinTuple *brtup;
181 : BrinMemTuple *dtup;
182 : int keyno;
183 :
184 9156 : CHECK_FOR_INTERRUPTS();
185 :
186 : /*
187 : * If auto-summarization is enabled and we just inserted the first
188 : * tuple into the first block of a new non-first page range, request a
189 : * summarization run of the previous range.
190 : */
191 9156 : if (autosummarize &&
192 156 : heapBlk > 0 &&
193 156 : heapBlk == origHeapBlk &&
194 156 : ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
195 : {
196 8 : BlockNumber lastPageRange = heapBlk - 1;
197 : BrinTuple *lastPageTuple;
198 :
199 : lastPageTuple =
200 8 : brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
201 : NULL, BUFFER_LOCK_SHARE, NULL);
202 8 : if (!lastPageTuple)
203 : {
204 : bool recorded;
205 :
206 6 : recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
207 : RelationGetRelid(idxRel),
208 : lastPageRange);
209 6 : if (!recorded)
210 0 : ereport(LOG,
211 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
212 : errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
213 : RelationGetRelationName(idxRel),
214 : lastPageRange)));
215 : }
216 : else
217 2 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
218 : }
219 :
220 9156 : brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
221 : NULL, BUFFER_LOCK_SHARE, NULL);
222 :
223 : /* if range is unsummarized, there's nothing to do */
224 9156 : if (!brtup)
225 188 : break;
226 :
227 : /* First time through in this statement? */
228 8968 : if (bdesc == NULL)
229 : {
230 298 : MemoryContextSwitchTo(indexInfo->ii_Context);
231 298 : bdesc = brin_build_desc(idxRel);
232 298 : indexInfo->ii_AmCache = (void *) bdesc;
233 298 : MemoryContextSwitchTo(oldcxt);
234 : }
235 : /* First time through in this brininsert call? */
236 8968 : if (tupcxt == NULL)
237 : {
238 8968 : tupcxt = AllocSetContextCreate(CurrentMemoryContext,
239 : "brininsert cxt",
240 : ALLOCSET_DEFAULT_SIZES);
241 8968 : MemoryContextSwitchTo(tupcxt);
242 : }
243 :
244 8968 : dtup = brin_deform_tuple(bdesc, brtup, NULL);
245 :
246 : /*
247 : * Compare the key values of the new tuple to the stored index values;
248 : * our deformed tuple will get updated if the new tuple doesn't fit
249 : * the original range (note this means we can't break out of the loop
250 : * early). Make a note of whether this happens, so that we know to
251 : * insert the modified tuple later.
252 : */
253 42300 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
254 : {
255 : Datum result;
256 : BrinValues *bval;
257 : FmgrInfo *addValue;
258 :
259 33332 : bval = &dtup->bt_columns[keyno];
260 33332 : addValue = index_getprocinfo(idxRel, keyno + 1,
261 : BRIN_PROCNUM_ADDVALUE);
262 99996 : result = FunctionCall4Coll(addValue,
263 33332 : idxRel->rd_indcollation[keyno],
264 : PointerGetDatum(bdesc),
265 : PointerGetDatum(bval),
266 33332 : values[keyno],
267 33332 : nulls[keyno]);
268 : /* if that returned true, we need to insert the updated tuple */
269 33332 : need_insert |= DatumGetBool(result);
270 : }
271 :
272 8968 : if (!need_insert)
273 : {
274 : /*
275 : * The tuple is consistent with the new values, so there's nothing
276 : * to do.
277 : */
278 8060 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
279 : }
280 : else
281 : {
282 908 : Page page = BufferGetPage(buf);
283 908 : ItemId lp = PageGetItemId(page, off);
284 : Size origsz;
285 : BrinTuple *origtup;
286 : Size newsz;
287 : BrinTuple *newtup;
288 : bool samepage;
289 :
290 : /*
291 : * Make a copy of the old tuple, so that we can compare it after
292 : * re-acquiring the lock.
293 : */
294 908 : origsz = ItemIdGetLength(lp);
295 908 : origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
296 :
297 : /*
298 : * Before releasing the lock, check if we can attempt a same-page
299 : * update. Another process could insert a tuple concurrently in
300 : * the same page though, so downstream we must be prepared to cope
301 : * if this turns out to not be possible after all.
302 : */
303 908 : newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
304 908 : samepage = brin_can_do_samepage_update(buf, origsz, newsz);
305 908 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
306 :
307 : /*
308 : * Try to update the tuple. If this doesn't work for whatever
309 : * reason, we need to restart from the top; the revmap might be
310 : * pointing at a different tuple for this block now, so we need to
311 : * recompute to ensure both our new heap tuple and the other
312 : * inserter's are covered by the combined tuple. It might be that
313 : * we don't need to update at all.
314 : */
315 908 : if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
316 : buf, off, origtup, origsz, newtup, newsz,
317 : samepage))
318 : {
319 : /* no luck; start over */
320 0 : MemoryContextResetAndDeleteChildren(tupcxt);
321 0 : continue;
322 : }
323 : }
324 :
325 : /* success! */
326 8968 : break;
327 : }
328 :
329 9156 : brinRevmapTerminate(revmap);
330 9156 : if (BufferIsValid(buf))
331 8970 : ReleaseBuffer(buf);
332 9156 : MemoryContextSwitchTo(oldcxt);
333 9156 : if (tupcxt != NULL)
334 8968 : MemoryContextDelete(tupcxt);
335 :
336 9156 : return false;
337 : }
338 :
339 : /*
340 : * Initialize state for a BRIN index scan.
341 : *
342 : * We read the metapage here to determine the pages-per-range number that this
343 : * index was built with. Note that since this cannot be changed while we're
344 : * holding lock on index, it's not necessary to recompute it during brinrescan.
345 : */
346 : IndexScanDesc
347 996 : brinbeginscan(Relation r, int nkeys, int norderbys)
348 : {
349 : IndexScanDesc scan;
350 : BrinOpaque *opaque;
351 :
352 996 : scan = RelationGetIndexScan(r, nkeys, norderbys);
353 :
354 996 : opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
355 996 : opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
356 : scan->xs_snapshot);
357 996 : opaque->bo_bdesc = brin_build_desc(r);
358 996 : scan->opaque = opaque;
359 :
360 996 : return scan;
361 : }
362 :
363 : /*
364 : * Execute the index scan.
365 : *
366 : * This works by reading index TIDs from the revmap, and obtaining the index
367 : * tuples pointed to by them; the summary values in the index tuples are
368 : * compared to the scan keys. We return into the TID bitmap all the pages in
369 : * ranges corresponding to index tuples that match the scan keys.
370 : *
371 : * If a TID from the revmap is read as InvalidTID, we know that range is
372 : * unsummarized. Pages in those ranges need to be returned regardless of scan
373 : * keys.
374 : */
375 : int64
376 996 : bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
377 : {
378 996 : Relation idxRel = scan->indexRelation;
379 996 : Buffer buf = InvalidBuffer;
380 : BrinDesc *bdesc;
381 : Oid heapOid;
382 : Relation heapRel;
383 : BrinOpaque *opaque;
384 : BlockNumber nblocks;
385 : BlockNumber heapBlk;
386 996 : int totalpages = 0;
387 : FmgrInfo *consistentFn;
388 : MemoryContext oldcxt;
389 : MemoryContext perRangeCxt;
390 : BrinMemTuple *dtup;
391 996 : BrinTuple *btup = NULL;
392 996 : Size btupsz = 0;
393 :
394 996 : opaque = (BrinOpaque *) scan->opaque;
395 996 : bdesc = opaque->bo_bdesc;
396 996 : pgstat_count_index_scan(idxRel);
397 :
398 : /*
399 : * We need to know the size of the table so that we know how long to
400 : * iterate on the revmap.
401 : */
402 996 : heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
403 996 : heapRel = table_open(heapOid, AccessShareLock);
404 996 : nblocks = RelationGetNumberOfBlocks(heapRel);
405 996 : table_close(heapRel, AccessShareLock);
406 :
407 : /*
408 : * Make room for the consistent support procedures of indexed columns. We
409 : * don't look them up here; we do that lazily the first time we see a scan
410 : * key reference each of them. We rely on zeroing fn_oid to InvalidOid.
411 : */
412 996 : consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
413 :
414 : /* allocate an initial in-memory tuple, out of the per-range memcxt */
415 996 : dtup = brin_new_memtuple(bdesc);
416 :
417 : /*
418 : * Setup and use a per-range memory context, which is reset every time we
419 : * loop below. This avoids having to free the tuples within the loop.
420 : */
421 996 : perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
422 : "bringetbitmap cxt",
423 : ALLOCSET_DEFAULT_SIZES);
424 996 : oldcxt = MemoryContextSwitchTo(perRangeCxt);
425 :
426 : /*
427 : * Now scan the revmap. We start by querying for heap page 0,
428 : * incrementing by the number of pages per range; this gives us a full
429 : * view of the table.
430 : */
431 100200 : for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
432 : {
433 : bool addrange;
434 99204 : bool gottuple = false;
435 : BrinTuple *tup;
436 : OffsetNumber off;
437 : Size size;
438 :
439 99204 : CHECK_FOR_INTERRUPTS();
440 :
441 99204 : MemoryContextResetAndDeleteChildren(perRangeCxt);
442 :
443 99204 : tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
444 : &off, &size, BUFFER_LOCK_SHARE,
445 : scan->xs_snapshot);
446 99204 : if (tup)
447 : {
448 99204 : gottuple = true;
449 99204 : btup = brin_copy_tuple(tup, size, btup, &btupsz);
450 99204 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
451 : }
452 :
453 : /*
454 : * For page ranges with no indexed tuple, we must return the whole
455 : * range; otherwise, compare it to the scan keys.
456 : */
457 99204 : if (!gottuple)
458 : {
459 0 : addrange = true;
460 : }
461 : else
462 : {
463 99204 : dtup = brin_deform_tuple(bdesc, btup, dtup);
464 99204 : if (dtup->bt_placeholder)
465 : {
466 : /*
467 : * Placeholder tuples are always returned, regardless of the
468 : * values stored in them.
469 : */
470 0 : addrange = true;
471 : }
472 : else
473 : {
474 : int keyno;
475 :
476 : /*
477 : * Compare scan keys with summary values stored for the range.
478 : * If scan keys are matched, the page range must be added to
479 : * the bitmap. We initially assume the range needs to be
480 : * added; in particular this serves the case where there are
481 : * no keys.
482 : */
483 99204 : addrange = true;
484 173112 : for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
485 : {
486 99204 : ScanKey key = &scan->keyData[keyno];
487 99204 : AttrNumber keyattno = key->sk_attno;
488 99204 : BrinValues *bval = &dtup->bt_columns[keyattno - 1];
489 : Datum add;
490 :
491 : /*
492 : * The collation of the scan key must match the collation
493 : * used in the index column (but only if the search is not
494 : * IS NULL/ IS NOT NULL). Otherwise we shouldn't be using
495 : * this index ...
496 : */
497 : Assert((key->sk_flags & SK_ISNULL) ||
498 : (key->sk_collation ==
499 : TupleDescAttr(bdesc->bd_tupdesc,
500 : keyattno - 1)->attcollation));
501 :
502 : /* First time this column? look up consistent function */
503 99204 : if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
504 : {
505 : FmgrInfo *tmp;
506 :
507 996 : tmp = index_getprocinfo(idxRel, keyattno,
508 : BRIN_PROCNUM_CONSISTENT);
509 996 : fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
510 : CurrentMemoryContext);
511 : }
512 :
513 : /*
514 : * Check whether the scan key is consistent with the page
515 : * range values; if so, have the pages in the range added
516 : * to the output bitmap.
517 : *
518 : * When there are multiple scan keys, failure to meet the
519 : * criteria for a single one of them is enough to discard
520 : * the range as a whole, so break out of the loop as soon
521 : * as a false return value is obtained.
522 : */
523 99204 : add = FunctionCall3Coll(&consistentFn[keyattno - 1],
524 : key->sk_collation,
525 : PointerGetDatum(bdesc),
526 : PointerGetDatum(bval),
527 : PointerGetDatum(key));
528 99204 : addrange = DatumGetBool(add);
529 99204 : if (!addrange)
530 25296 : break;
531 : }
532 : }
533 : }
534 :
535 : /* add the pages in the range to the output bitmap, if needed */
536 99204 : if (addrange)
537 : {
538 : BlockNumber pageno;
539 :
540 73908 : for (pageno = heapBlk;
541 147816 : pageno <= heapBlk + opaque->bo_pagesPerRange - 1;
542 73908 : pageno++)
543 : {
544 73908 : MemoryContextSwitchTo(oldcxt);
545 73908 : tbm_add_page(tbm, pageno);
546 73908 : totalpages++;
547 73908 : MemoryContextSwitchTo(perRangeCxt);
548 : }
549 : }
550 : }
551 :
552 996 : MemoryContextSwitchTo(oldcxt);
553 996 : MemoryContextDelete(perRangeCxt);
554 :
555 996 : if (buf != InvalidBuffer)
556 996 : ReleaseBuffer(buf);
557 :
558 : /*
559 : * XXX We have an approximation of the number of *pages* that our scan
560 : * returns, but we don't have a precise idea of the number of heap tuples
561 : * involved.
562 : */
563 996 : return totalpages * 10;
564 : }
565 :
566 : /*
567 : * Re-initialize state for a BRIN index scan
568 : */
569 : void
570 996 : brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
571 : ScanKey orderbys, int norderbys)
572 : {
573 : /*
574 : * Other index AMs preprocess the scan keys at this point, or sometime
575 : * early during the scan; this lets them optimize by removing redundant
576 : * keys, or doing early returns when they are impossible to satisfy; see
577 : * _bt_preprocess_keys for an example. Something like that could be added
578 : * here someday, too.
579 : */
580 :
581 996 : if (scankey && scan->numberOfKeys > 0)
582 996 : memmove(scan->keyData, scankey,
583 996 : scan->numberOfKeys * sizeof(ScanKeyData));
584 996 : }
585 :
586 : /*
587 : * Close down a BRIN index scan
588 : */
589 : void
590 996 : brinendscan(IndexScanDesc scan)
591 : {
592 996 : BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
593 :
594 996 : brinRevmapTerminate(opaque->bo_rmAccess);
595 996 : brin_free_desc(opaque->bo_bdesc);
596 996 : pfree(opaque);
597 996 : }
598 :
599 : /*
600 : * Per-heap-tuple callback for table_index_build_scan.
601 : *
602 : * Note we don't worry about the page range at the end of the table here; it is
603 : * present in the build state struct after we're called the last time, but not
604 : * inserted into the index. Caller must ensure to do so, if appropriate.
605 : */
606 : static void
607 284718 : brinbuildCallback(Relation index,
608 : ItemPointer tid,
609 : Datum *values,
610 : bool *isnull,
611 : bool tupleIsAlive,
612 : void *brstate)
613 : {
614 284718 : BrinBuildState *state = (BrinBuildState *) brstate;
615 : BlockNumber thisblock;
616 : int i;
617 :
618 284718 : thisblock = ItemPointerGetBlockNumber(tid);
619 :
620 : /*
621 : * If we're in a block that belongs to a future range, summarize what
622 : * we've got and start afresh. Note the scan might have skipped many
623 : * pages, if they were devoid of live tuples; make sure to insert index
624 : * tuples for those too.
625 : */
626 285296 : while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
627 : {
628 :
629 : BRIN_elog((DEBUG2,
630 : "brinbuildCallback: completed a range: %u--%u",
631 : state->bs_currRangeStart,
632 : state->bs_currRangeStart + state->bs_pagesPerRange));
633 :
634 : /* create the index tuple and insert it */
635 578 : form_and_insert_tuple(state);
636 :
637 : /* set state to correspond to the next range */
638 578 : state->bs_currRangeStart += state->bs_pagesPerRange;
639 :
640 : /* re-initialize state for it */
641 578 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
642 : }
643 :
644 : /* Accumulate the current tuple into the running state */
645 585912 : for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
646 : {
647 : FmgrInfo *addValue;
648 : BrinValues *col;
649 301194 : Form_pg_attribute attr = TupleDescAttr(state->bs_bdesc->bd_tupdesc, i);
650 :
651 301194 : col = &state->bs_dtuple->bt_columns[i];
652 301194 : addValue = index_getprocinfo(index, i + 1,
653 : BRIN_PROCNUM_ADDVALUE);
654 :
655 : /*
656 : * Update dtuple state, if and as necessary.
657 : */
658 903582 : FunctionCall4Coll(addValue,
659 : attr->attcollation,
660 301194 : PointerGetDatum(state->bs_bdesc),
661 : PointerGetDatum(col),
662 301194 : values[i], isnull[i]);
663 : }
664 284718 : }
665 :
666 : /*
667 : * brinbuild() -- build a new BRIN index.
668 : */
669 : IndexBuildResult *
670 46 : brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
671 : {
672 : IndexBuildResult *result;
673 : double reltuples;
674 : double idxtuples;
675 : BrinRevmap *revmap;
676 : BrinBuildState *state;
677 : Buffer meta;
678 : BlockNumber pagesPerRange;
679 :
680 : /*
681 : * We expect to be called exactly once for any index relation.
682 : */
683 46 : if (RelationGetNumberOfBlocks(index) != 0)
684 0 : elog(ERROR, "index \"%s\" already contains data",
685 : RelationGetRelationName(index));
686 :
687 : /*
688 : * Critical section not required, because on error the creation of the
689 : * whole relation will be rolled back.
690 : */
691 :
692 46 : meta = ReadBuffer(index, P_NEW);
693 : Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
694 46 : LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
695 :
696 46 : brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
697 : BRIN_CURRENT_VERSION);
698 46 : MarkBufferDirty(meta);
699 :
700 46 : if (RelationNeedsWAL(index))
701 : {
702 : xl_brin_createidx xlrec;
703 : XLogRecPtr recptr;
704 : Page page;
705 :
706 44 : xlrec.version = BRIN_CURRENT_VERSION;
707 44 : xlrec.pagesPerRange = BrinGetPagesPerRange(index);
708 :
709 44 : XLogBeginInsert();
710 44 : XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
711 44 : XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
712 :
713 44 : recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
714 :
715 44 : page = BufferGetPage(meta);
716 44 : PageSetLSN(page, recptr);
717 : }
718 :
719 46 : UnlockReleaseBuffer(meta);
720 :
721 : /*
722 : * Initialize our state, including the deformed tuple state.
723 : */
724 46 : revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
725 46 : state = initialize_brin_buildstate(index, revmap, pagesPerRange);
726 :
727 : /*
728 : * Now scan the relation. No syncscan allowed here because we want the
729 : * heap blocks in physical order.
730 : */
731 46 : reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
732 : brinbuildCallback, (void *) state, NULL);
733 :
734 : /* process the final batch */
735 46 : form_and_insert_tuple(state);
736 :
737 : /* release resources */
738 46 : idxtuples = state->bs_numtuples;
739 46 : brinRevmapTerminate(state->bs_rmAccess);
740 46 : terminate_brin_buildstate(state);
741 :
742 : /*
743 : * Return statistics
744 : */
745 46 : result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
746 :
747 46 : result->heap_tuples = reltuples;
748 46 : result->index_tuples = idxtuples;
749 :
750 46 : return result;
751 : }
752 :
753 : void
754 0 : brinbuildempty(Relation index)
755 : {
756 : Buffer metabuf;
757 :
758 : /* An empty BRIN index has a metapage only. */
759 : metabuf =
760 0 : ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
761 0 : LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
762 :
763 : /* Initialize and xlog metabuffer. */
764 0 : START_CRIT_SECTION();
765 0 : brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
766 : BRIN_CURRENT_VERSION);
767 0 : MarkBufferDirty(metabuf);
768 0 : log_newpage_buffer(metabuf, true);
769 0 : END_CRIT_SECTION();
770 :
771 0 : UnlockReleaseBuffer(metabuf);
772 0 : }
773 :
774 : /*
775 : * brinbulkdelete
776 : * Since there are no per-heap-tuple index tuples in BRIN indexes,
777 : * there's not a lot we can do here.
778 : *
779 : * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
780 : * tuple is deleted), meaning the need to re-run summarization on the affected
781 : * range. Would need to add an extra flag in brintuples for that.
782 : */
783 : IndexBulkDeleteResult *
784 8 : brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
785 : IndexBulkDeleteCallback callback, void *callback_state)
786 : {
787 : /* allocate stats if first time through, else re-use existing struct */
788 8 : if (stats == NULL)
789 8 : stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
790 :
791 8 : return stats;
792 : }
793 :
794 : /*
795 : * This routine is in charge of "vacuuming" a BRIN index: we just summarize
796 : * ranges that are currently unsummarized.
797 : */
798 : IndexBulkDeleteResult *
799 32 : brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
800 : {
801 : Relation heapRel;
802 :
803 : /* No-op in ANALYZE ONLY mode */
804 32 : if (info->analyze_only)
805 2 : return stats;
806 :
807 30 : if (!stats)
808 22 : stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
809 30 : stats->num_pages = RelationGetNumberOfBlocks(info->index);
810 : /* rest of stats is initialized by zeroing */
811 :
812 30 : heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
813 : AccessShareLock);
814 :
815 30 : brin_vacuum_scan(info->index, info->strategy);
816 :
817 30 : brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
818 : &stats->num_index_tuples, &stats->num_index_tuples);
819 :
820 30 : table_close(heapRel, AccessShareLock);
821 :
822 30 : return stats;
823 : }
824 :
825 : /*
826 : * reloptions processor for BRIN indexes
827 : */
828 : bytea *
829 152 : brinoptions(Datum reloptions, bool validate)
830 : {
831 : static const relopt_parse_elt tab[] = {
832 : {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
833 : {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
834 : };
835 :
836 152 : return (bytea *) build_reloptions(reloptions, validate,
837 : RELOPT_KIND_BRIN,
838 : sizeof(BrinOptions),
839 : tab, lengthof(tab));
840 : }
841 :
842 : /*
843 : * SQL-callable function to scan through an index and summarize all ranges
844 : * that are not currently summarized.
845 : */
846 : Datum
847 14 : brin_summarize_new_values(PG_FUNCTION_ARGS)
848 : {
849 14 : Datum relation = PG_GETARG_DATUM(0);
850 :
851 14 : return DirectFunctionCall2(brin_summarize_range,
852 : relation,
853 : Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
854 : }
855 :
856 : /*
857 : * SQL-callable function to summarize the indicated page range, if not already
858 : * summarized. If the second argument is BRIN_ALL_BLOCKRANGES, all
859 : * unsummarized ranges are summarized.
860 : */
861 : Datum
862 48 : brin_summarize_range(PG_FUNCTION_ARGS)
863 : {
864 48 : Oid indexoid = PG_GETARG_OID(0);
865 48 : int64 heapBlk64 = PG_GETARG_INT64(1);
866 : BlockNumber heapBlk;
867 : Oid heapoid;
868 : Relation indexRel;
869 : Relation heapRel;
870 48 : double numSummarized = 0;
871 :
872 48 : if (RecoveryInProgress())
873 0 : ereport(ERROR,
874 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
875 : errmsg("recovery is in progress"),
876 : errhint("BRIN control functions cannot be executed during recovery.")));
877 :
878 48 : if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
879 : {
880 8 : char *blk = psprintf(INT64_FORMAT, heapBlk64);
881 :
882 8 : ereport(ERROR,
883 : (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
884 : errmsg("block number out of range: %s", blk)));
885 : }
886 40 : heapBlk = (BlockNumber) heapBlk64;
887 :
888 : /*
889 : * We must lock table before index to avoid deadlocks. However, if the
890 : * passed indexoid isn't an index then IndexGetRelation() will fail.
891 : * Rather than emitting a not-very-helpful error message, postpone
892 : * complaining, expecting that the is-it-an-index test below will fail.
893 : */
894 40 : heapoid = IndexGetRelation(indexoid, true);
895 40 : if (OidIsValid(heapoid))
896 36 : heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
897 : else
898 4 : heapRel = NULL;
899 :
900 40 : indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
901 :
902 : /* Must be a BRIN index */
903 36 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
904 36 : indexRel->rd_rel->relam != BRIN_AM_OID)
905 4 : ereport(ERROR,
906 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
907 : errmsg("\"%s\" is not a BRIN index",
908 : RelationGetRelationName(indexRel))));
909 :
910 : /* User must own the index (comparable to privileges needed for VACUUM) */
911 32 : if (!pg_class_ownercheck(indexoid, GetUserId()))
912 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
913 0 : RelationGetRelationName(indexRel));
914 :
915 : /*
916 : * Since we did the IndexGetRelation call above without any lock, it's
917 : * barely possible that a race against an index drop/recreation could have
918 : * netted us the wrong table. Recheck.
919 : */
920 32 : if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
921 0 : ereport(ERROR,
922 : (errcode(ERRCODE_UNDEFINED_TABLE),
923 : errmsg("could not open parent table of index %s",
924 : RelationGetRelationName(indexRel))));
925 :
926 : /* OK, do it */
927 32 : brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
928 :
929 32 : relation_close(indexRel, ShareUpdateExclusiveLock);
930 32 : relation_close(heapRel, ShareUpdateExclusiveLock);
931 :
932 32 : PG_RETURN_INT32((int32) numSummarized);
933 : }
934 :
935 : /*
936 : * SQL-callable interface to mark a range as no longer summarized
937 : */
938 : Datum
939 24 : brin_desummarize_range(PG_FUNCTION_ARGS)
940 : {
941 24 : Oid indexoid = PG_GETARG_OID(0);
942 24 : int64 heapBlk64 = PG_GETARG_INT64(1);
943 : BlockNumber heapBlk;
944 : Oid heapoid;
945 : Relation heapRel;
946 : Relation indexRel;
947 : bool done;
948 :
949 24 : if (RecoveryInProgress())
950 0 : ereport(ERROR,
951 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
952 : errmsg("recovery is in progress"),
953 : errhint("BRIN control functions cannot be executed during recovery.")));
954 :
955 24 : if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
956 : {
957 4 : char *blk = psprintf(INT64_FORMAT, heapBlk64);
958 :
959 4 : ereport(ERROR,
960 : (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
961 : errmsg("block number out of range: %s", blk)));
962 : }
963 20 : heapBlk = (BlockNumber) heapBlk64;
964 :
965 : /*
966 : * We must lock table before index to avoid deadlocks. However, if the
967 : * passed indexoid isn't an index then IndexGetRelation() will fail.
968 : * Rather than emitting a not-very-helpful error message, postpone
969 : * complaining, expecting that the is-it-an-index test below will fail.
970 : */
971 20 : heapoid = IndexGetRelation(indexoid, true);
972 20 : if (OidIsValid(heapoid))
973 20 : heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
974 : else
975 0 : heapRel = NULL;
976 :
977 20 : indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
978 :
979 : /* Must be a BRIN index */
980 20 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
981 20 : indexRel->rd_rel->relam != BRIN_AM_OID)
982 0 : ereport(ERROR,
983 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
984 : errmsg("\"%s\" is not a BRIN index",
985 : RelationGetRelationName(indexRel))));
986 :
987 : /* User must own the index (comparable to privileges needed for VACUUM) */
988 20 : if (!pg_class_ownercheck(indexoid, GetUserId()))
989 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
990 0 : RelationGetRelationName(indexRel));
991 :
992 : /*
993 : * Since we did the IndexGetRelation call above without any lock, it's
994 : * barely possible that a race against an index drop/recreation could have
995 : * netted us the wrong table. Recheck.
996 : */
997 20 : if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
998 0 : ereport(ERROR,
999 : (errcode(ERRCODE_UNDEFINED_TABLE),
1000 : errmsg("could not open parent table of index %s",
1001 : RelationGetRelationName(indexRel))));
1002 :
1003 : /* the revmap does the hard work */
1004 : do
1005 : {
1006 20 : done = brinRevmapDesummarizeRange(indexRel, heapBlk);
1007 : }
1008 20 : while (!done);
1009 :
1010 20 : relation_close(indexRel, ShareUpdateExclusiveLock);
1011 20 : relation_close(heapRel, ShareUpdateExclusiveLock);
1012 :
1013 20 : PG_RETURN_VOID();
1014 : }
1015 :
1016 : /*
1017 : * Build a BrinDesc used to create or scan a BRIN index
1018 : */
1019 : BrinDesc *
1020 1386 : brin_build_desc(Relation rel)
1021 : {
1022 : BrinOpcInfo **opcinfo;
1023 : BrinDesc *bdesc;
1024 : TupleDesc tupdesc;
1025 1386 : int totalstored = 0;
1026 : int keyno;
1027 : long totalsize;
1028 : MemoryContext cxt;
1029 : MemoryContext oldcxt;
1030 :
1031 1386 : cxt = AllocSetContextCreate(CurrentMemoryContext,
1032 : "brin desc cxt",
1033 : ALLOCSET_SMALL_SIZES);
1034 1386 : oldcxt = MemoryContextSwitchTo(cxt);
1035 1386 : tupdesc = RelationGetDescr(rel);
1036 :
1037 : /*
1038 : * Obtain BrinOpcInfo for each indexed column. While at it, accumulate
1039 : * the number of columns stored, since the number is opclass-defined.
1040 : */
1041 1386 : opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
1042 32190 : for (keyno = 0; keyno < tupdesc->natts; keyno++)
1043 : {
1044 : FmgrInfo *opcInfoFn;
1045 30804 : Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
1046 :
1047 30804 : opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1048 :
1049 30804 : opcinfo[keyno] = (BrinOpcInfo *)
1050 30804 : DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
1051 30804 : totalstored += opcinfo[keyno]->oi_nstored;
1052 : }
1053 :
1054 : /* Allocate our result struct and fill it in */
1055 1386 : totalsize = offsetof(BrinDesc, bd_info) +
1056 1386 : sizeof(BrinOpcInfo *) * tupdesc->natts;
1057 :
1058 1386 : bdesc = palloc(totalsize);
1059 1386 : bdesc->bd_context = cxt;
1060 1386 : bdesc->bd_index = rel;
1061 1386 : bdesc->bd_tupdesc = tupdesc;
1062 1386 : bdesc->bd_disktdesc = NULL; /* generated lazily */
1063 1386 : bdesc->bd_totalstored = totalstored;
1064 :
1065 32190 : for (keyno = 0; keyno < tupdesc->natts; keyno++)
1066 30804 : bdesc->bd_info[keyno] = opcinfo[keyno];
1067 1386 : pfree(opcinfo);
1068 :
1069 1386 : MemoryContextSwitchTo(oldcxt);
1070 :
1071 1386 : return bdesc;
1072 : }
1073 :
1074 : void
1075 1088 : brin_free_desc(BrinDesc *bdesc)
1076 : {
1077 : /* make sure the tupdesc is still valid */
1078 : Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1079 : /* no need for retail pfree */
1080 1088 : MemoryContextDelete(bdesc->bd_context);
1081 1088 : }
1082 :
1083 : /*
1084 : * Fetch index's statistical data into *stats
1085 : */
1086 : void
1087 3996 : brinGetStats(Relation index, BrinStatsData *stats)
1088 : {
1089 : Buffer metabuffer;
1090 : Page metapage;
1091 : BrinMetaPageData *metadata;
1092 :
1093 3996 : metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1094 3996 : LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1095 3996 : metapage = BufferGetPage(metabuffer);
1096 3996 : metadata = (BrinMetaPageData *) PageGetContents(metapage);
1097 :
1098 3996 : stats->pagesPerRange = metadata->pagesPerRange;
1099 3996 : stats->revmapNumPages = metadata->lastRevmapPage - 1;
1100 :
1101 3996 : UnlockReleaseBuffer(metabuffer);
1102 3996 : }
1103 :
1104 : /*
1105 : * Initialize a BrinBuildState appropriate to create tuples on the given index.
1106 : */
1107 : static BrinBuildState *
1108 68 : initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1109 : BlockNumber pagesPerRange)
1110 : {
1111 : BrinBuildState *state;
1112 :
1113 68 : state = palloc(sizeof(BrinBuildState));
1114 :
1115 68 : state->bs_irel = idxRel;
1116 68 : state->bs_numtuples = 0;
1117 68 : state->bs_currentInsertBuf = InvalidBuffer;
1118 68 : state->bs_pagesPerRange = pagesPerRange;
1119 68 : state->bs_currRangeStart = 0;
1120 68 : state->bs_rmAccess = revmap;
1121 68 : state->bs_bdesc = brin_build_desc(idxRel);
1122 68 : state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1123 :
1124 68 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1125 :
1126 68 : return state;
1127 : }
1128 :
1129 : /*
1130 : * Release resources associated with a BrinBuildState.
1131 : */
1132 : static void
1133 68 : terminate_brin_buildstate(BrinBuildState *state)
1134 : {
1135 : /*
1136 : * Release the last index buffer used. We might as well ensure that
1137 : * whatever free space remains in that page is available in FSM, too.
1138 : */
1139 68 : if (!BufferIsInvalid(state->bs_currentInsertBuf))
1140 : {
1141 : Page page;
1142 : Size freespace;
1143 : BlockNumber blk;
1144 :
1145 46 : page = BufferGetPage(state->bs_currentInsertBuf);
1146 46 : freespace = PageGetFreeSpace(page);
1147 46 : blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
1148 46 : ReleaseBuffer(state->bs_currentInsertBuf);
1149 46 : RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
1150 46 : FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
1151 : }
1152 :
1153 68 : brin_free_desc(state->bs_bdesc);
1154 68 : pfree(state->bs_dtuple);
1155 68 : pfree(state);
1156 68 : }
1157 :
1158 : /*
1159 : * On the given BRIN index, summarize the heap page range that corresponds
1160 : * to the heap block number given.
1161 : *
1162 : * This routine can run in parallel with insertions into the heap. To avoid
1163 : * missing those values from the summary tuple, we first insert a placeholder
1164 : * index tuple into the index, then execute the heap scan; transactions
1165 : * concurrent with the scan update the placeholder tuple. After the scan, we
1166 : * union the placeholder tuple with the one computed by this routine. The
1167 : * update of the index value happens in a loop, so that if somebody updates
1168 : * the placeholder tuple after we read it, we detect the case and try again.
1169 : * This ensures that the concurrently inserted tuples are not lost.
1170 : *
1171 : * A further corner case is this routine being asked to summarize the partial
1172 : * range at the end of the table. heapNumBlocks is the (possibly outdated)
1173 : * table size; if we notice that the requested range lies beyond that size,
1174 : * we re-compute the table size after inserting the placeholder tuple, to
1175 : * avoid missing pages that were appended recently.
1176 : */
1177 : static void
1178 42 : summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1179 : BlockNumber heapBlk, BlockNumber heapNumBlks)
1180 : {
1181 : Buffer phbuf;
1182 : BrinTuple *phtup;
1183 : Size phsz;
1184 : OffsetNumber offset;
1185 : BlockNumber scanNumBlks;
1186 :
1187 : /*
1188 : * Insert the placeholder tuple
1189 : */
1190 42 : phbuf = InvalidBuffer;
1191 42 : phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1192 42 : offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1193 : state->bs_rmAccess, &phbuf,
1194 : heapBlk, phtup, phsz);
1195 :
1196 : /*
1197 : * Compute range end. We hold ShareUpdateExclusive lock on table, so it
1198 : * cannot shrink concurrently (but it can grow).
1199 : */
1200 : Assert(heapBlk % state->bs_pagesPerRange == 0);
1201 42 : if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1202 : {
1203 : /*
1204 : * If we're asked to scan what we believe to be the final range on the
1205 : * table (i.e. a range that might be partial) we need to recompute our
1206 : * idea of what the latest page is after inserting the placeholder
1207 : * tuple. Anyone that grows the table later will update the
1208 : * placeholder tuple, so it doesn't matter that we won't scan these
1209 : * pages ourselves. Careful: the table might have been extended
1210 : * beyond the current range, so clamp our result.
1211 : *
1212 : * Fortunately, this should occur infrequently.
1213 : */
1214 4 : scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1215 : state->bs_pagesPerRange);
1216 : }
1217 : else
1218 : {
1219 : /* Easy case: range is known to be complete */
1220 38 : scanNumBlks = state->bs_pagesPerRange;
1221 : }
1222 :
1223 : /*
1224 : * Execute the partial heap scan covering the heap blocks in the specified
1225 : * page range, summarizing the heap tuples in it. This scan stops just
1226 : * short of brinbuildCallback creating the new index entry.
1227 : *
1228 : * Note that it is critical we use the "any visible" mode of
1229 : * table_index_build_range_scan here: otherwise, we would miss tuples
1230 : * inserted by transactions that are still in progress, among other corner
1231 : * cases.
1232 : */
1233 42 : state->bs_currRangeStart = heapBlk;
1234 42 : table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
1235 : heapBlk, scanNumBlks,
1236 : brinbuildCallback, (void *) state, NULL);
1237 :
1238 : /*
1239 : * Now we update the values obtained by the scan with the placeholder
1240 : * tuple. We do this in a loop which only terminates if we're able to
1241 : * update the placeholder tuple successfully; if we are not, this means
1242 : * somebody else modified the placeholder tuple after we read it.
1243 : */
1244 : for (;;)
1245 0 : {
1246 : BrinTuple *newtup;
1247 : Size newsize;
1248 : bool didupdate;
1249 : bool samepage;
1250 :
1251 42 : CHECK_FOR_INTERRUPTS();
1252 :
1253 : /*
1254 : * Update the summary tuple and try to update.
1255 : */
1256 42 : newtup = brin_form_tuple(state->bs_bdesc,
1257 : heapBlk, state->bs_dtuple, &newsize);
1258 42 : samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1259 : didupdate =
1260 42 : brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1261 : state->bs_rmAccess, heapBlk, phbuf, offset,
1262 : phtup, phsz, newtup, newsize, samepage);
1263 42 : brin_free_tuple(phtup);
1264 42 : brin_free_tuple(newtup);
1265 :
1266 : /* If the update succeeded, we're done. */
1267 42 : if (didupdate)
1268 42 : break;
1269 :
1270 : /*
1271 : * If the update didn't work, it might be because somebody updated the
1272 : * placeholder tuple concurrently. Extract the new version, union it
1273 : * with the values we have from the scan, and start over. (There are
1274 : * other reasons for the update to fail, but it's simple to treat them
1275 : * the same.)
1276 : */
1277 0 : phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1278 : &offset, &phsz, BUFFER_LOCK_SHARE,
1279 : NULL);
1280 : /* the placeholder tuple must exist */
1281 0 : if (phtup == NULL)
1282 0 : elog(ERROR, "missing placeholder tuple");
1283 0 : phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1284 0 : LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1285 :
1286 : /* merge it into the tuple from the heap scan */
1287 0 : union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1288 : }
1289 :
1290 42 : ReleaseBuffer(phbuf);
1291 42 : }
1292 :
1293 : /*
1294 : * Summarize page ranges that are not already summarized. If pageRange is
1295 : * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1296 : * page range containing the given heap page number is scanned.
1297 : * If include_partial is true, then the partial range at the end of the table
1298 : * is summarized, otherwise not.
1299 : *
1300 : * For each new index tuple inserted, *numSummarized (if not NULL) is
1301 : * incremented; for each existing tuple, *numExisting (if not NULL) is
1302 : * incremented.
1303 : */
1304 : static void
1305 62 : brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1306 : bool include_partial, double *numSummarized, double *numExisting)
1307 : {
1308 : BrinRevmap *revmap;
1309 62 : BrinBuildState *state = NULL;
1310 62 : IndexInfo *indexInfo = NULL;
1311 : BlockNumber heapNumBlocks;
1312 : BlockNumber pagesPerRange;
1313 : Buffer buf;
1314 : BlockNumber startBlk;
1315 :
1316 62 : revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
1317 :
1318 : /* determine range of pages to process */
1319 62 : heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1320 62 : if (pageRange == BRIN_ALL_BLOCKRANGES)
1321 40 : startBlk = 0;
1322 : else
1323 : {
1324 22 : startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1325 22 : heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
1326 : }
1327 62 : if (startBlk > heapNumBlocks)
1328 : {
1329 : /* Nothing to do if start point is beyond end of table */
1330 0 : brinRevmapTerminate(revmap);
1331 0 : return;
1332 : }
1333 :
1334 : /*
1335 : * Scan the revmap to find unsummarized items.
1336 : */
1337 62 : buf = InvalidBuffer;
1338 1116 : for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1339 : {
1340 : BrinTuple *tup;
1341 : OffsetNumber off;
1342 :
1343 : /*
1344 : * Unless requested to summarize even a partial range, go away now if
1345 : * we think the next range is partial. Caller would pass true when it
1346 : * is typically run once bulk data loading is done
1347 : * (brin_summarize_new_values), and false when it is typically the
1348 : * result of arbitrarily-scheduled maintenance command (vacuuming).
1349 : */
1350 1078 : if (!include_partial &&
1351 624 : (startBlk + pagesPerRange > heapNumBlocks))
1352 24 : break;
1353 :
1354 1054 : CHECK_FOR_INTERRUPTS();
1355 :
1356 1054 : tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1357 : BUFFER_LOCK_SHARE, NULL);
1358 1054 : if (tup == NULL)
1359 : {
1360 : /* no revmap entry for this heap range. Summarize it. */
1361 42 : if (state == NULL)
1362 : {
1363 : /* first time through */
1364 : Assert(!indexInfo);
1365 22 : state = initialize_brin_buildstate(index, revmap,
1366 : pagesPerRange);
1367 22 : indexInfo = BuildIndexInfo(index);
1368 : }
1369 42 : summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1370 :
1371 : /* and re-initialize state for the next range */
1372 42 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1373 :
1374 42 : if (numSummarized)
1375 42 : *numSummarized += 1.0;
1376 : }
1377 : else
1378 : {
1379 1012 : if (numExisting)
1380 574 : *numExisting += 1.0;
1381 1012 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1382 : }
1383 : }
1384 :
1385 62 : if (BufferIsValid(buf))
1386 32 : ReleaseBuffer(buf);
1387 :
1388 : /* free resources */
1389 62 : brinRevmapTerminate(revmap);
1390 62 : if (state)
1391 : {
1392 22 : terminate_brin_buildstate(state);
1393 22 : pfree(indexInfo);
1394 : }
1395 : }
1396 :
1397 : /*
1398 : * Given a deformed tuple in the build state, convert it into the on-disk
1399 : * format and insert it into the index, making the revmap point to it.
1400 : */
1401 : static void
1402 624 : form_and_insert_tuple(BrinBuildState *state)
1403 : {
1404 : BrinTuple *tup;
1405 : Size size;
1406 :
1407 624 : tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1408 : state->bs_dtuple, &size);
1409 624 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1410 : &state->bs_currentInsertBuf, state->bs_currRangeStart,
1411 : tup, size);
1412 624 : state->bs_numtuples++;
1413 :
1414 624 : pfree(tup);
1415 624 : }
1416 :
1417 : /*
1418 : * Given two deformed tuples, adjust the first one so that it's consistent
1419 : * with the summary values in both.
1420 : */
1421 : static void
1422 0 : union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
1423 : {
1424 : int keyno;
1425 : BrinMemTuple *db;
1426 : MemoryContext cxt;
1427 : MemoryContext oldcxt;
1428 :
1429 : /* Use our own memory context to avoid retail pfree */
1430 0 : cxt = AllocSetContextCreate(CurrentMemoryContext,
1431 : "brin union",
1432 : ALLOCSET_DEFAULT_SIZES);
1433 0 : oldcxt = MemoryContextSwitchTo(cxt);
1434 0 : db = brin_deform_tuple(bdesc, b, NULL);
1435 0 : MemoryContextSwitchTo(oldcxt);
1436 :
1437 0 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
1438 : {
1439 : FmgrInfo *unionFn;
1440 0 : BrinValues *col_a = &a->bt_columns[keyno];
1441 0 : BrinValues *col_b = &db->bt_columns[keyno];
1442 :
1443 0 : unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
1444 : BRIN_PROCNUM_UNION);
1445 0 : FunctionCall3Coll(unionFn,
1446 0 : bdesc->bd_index->rd_indcollation[keyno],
1447 : PointerGetDatum(bdesc),
1448 : PointerGetDatum(col_a),
1449 : PointerGetDatum(col_b));
1450 : }
1451 :
1452 0 : MemoryContextDelete(cxt);
1453 0 : }
1454 :
1455 : /*
1456 : * brin_vacuum_scan
1457 : * Do a complete scan of the index during VACUUM.
1458 : *
1459 : * This routine scans the complete index looking for uncatalogued index pages,
1460 : * i.e. those that might have been lost due to a crash after index extension
1461 : * and such.
1462 : */
1463 : static void
1464 30 : brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
1465 : {
1466 : BlockNumber nblocks;
1467 : BlockNumber blkno;
1468 :
1469 : /*
1470 : * Scan the index in physical order, and clean up any possible mess in
1471 : * each page.
1472 : */
1473 30 : nblocks = RelationGetNumberOfBlocks(idxrel);
1474 156 : for (blkno = 0; blkno < nblocks; blkno++)
1475 : {
1476 : Buffer buf;
1477 :
1478 126 : CHECK_FOR_INTERRUPTS();
1479 :
1480 126 : buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
1481 : RBM_NORMAL, strategy);
1482 :
1483 126 : brin_page_cleanup(idxrel, buf);
1484 :
1485 126 : ReleaseBuffer(buf);
1486 : }
1487 :
1488 : /*
1489 : * Update all upper pages in the index's FSM, as well. This ensures not
1490 : * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
1491 : * but also that any pre-existing damage or out-of-dateness is repaired.
1492 : */
1493 30 : FreeSpaceMapVacuum(idxrel);
1494 30 : }
|