Line data Source code
1 : /*
2 : * brin.c
3 : * Implementation of BRIN indexes for Postgres
4 : *
5 : * See src/backend/access/brin/README for details.
6 : *
7 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/brin/brin.c
12 : *
13 : * TODO
14 : * * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/brin.h"
19 : #include "access/brin_page.h"
20 : #include "access/brin_pageops.h"
21 : #include "access/brin_xlog.h"
22 : #include "access/relation.h"
23 : #include "access/reloptions.h"
24 : #include "access/relscan.h"
25 : #include "access/table.h"
26 : #include "access/tableam.h"
27 : #include "access/xloginsert.h"
28 : #include "catalog/index.h"
29 : #include "catalog/pg_am.h"
30 : #include "commands/vacuum.h"
31 : #include "miscadmin.h"
32 : #include "pgstat.h"
33 : #include "postmaster/autovacuum.h"
34 : #include "storage/bufmgr.h"
35 : #include "storage/freespace.h"
36 : #include "storage/proc.h"
37 : #include "tcop/tcopprot.h"
38 : #include "utils/acl.h"
39 : #include "utils/datum.h"
40 : #include "utils/fmgrprotos.h"
41 : #include "utils/guc.h"
42 : #include "utils/index_selfuncs.h"
43 : #include "utils/memutils.h"
44 : #include "utils/rel.h"
45 : #include "utils/tuplesort.h"
46 :
47 : /* Magic numbers for parallel state sharing */
48 : #define PARALLEL_KEY_BRIN_SHARED UINT64CONST(0xB000000000000001)
49 : #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002)
50 : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003)
51 : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004)
52 : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005)
53 :
54 : /*
55 : * Status for index builds performed in parallel. This is allocated in a
56 : * dynamic shared memory segment.
57 : */
58 : typedef struct BrinShared
59 : {
60 : /*
61 : * These fields are not modified during the build. They primarily exist
62 : * for the benefit of worker processes that need to create state
63 : * corresponding to that used by the leader.
64 : */
65 : Oid heaprelid;
66 : Oid indexrelid;
67 : bool isconcurrent;
68 : BlockNumber pagesPerRange;
69 : int scantuplesortstates;
70 :
71 : /* Query ID, for report in worker processes */
72 : int64 queryid;
73 :
74 : /*
75 : * workersdonecv is used to monitor the progress of workers. All parallel
76 : * participants must indicate that they are done before leader can use
77 : * results built by the workers (and before leader can write the data into
78 : * the index).
79 : */
80 : ConditionVariable workersdonecv;
81 :
82 : /*
83 : * mutex protects all fields before heapdesc.
84 : *
85 : * These fields contain status information of interest to BRIN index
86 : * builds that must work just the same when an index is built in parallel.
87 : */
88 : slock_t mutex;
89 :
90 : /*
91 : * Mutable state that is maintained by workers, and reported back to
92 : * leader at end of the scans.
93 : *
94 : * nparticipantsdone is number of worker processes finished.
95 : *
96 : * reltuples is the total number of input heap tuples.
97 : *
98 : * indtuples is the total number of tuples that made it into the index.
99 : */
100 : int nparticipantsdone;
101 : double reltuples;
102 : double indtuples;
103 :
104 : /*
105 : * ParallelTableScanDescData data follows. Can't directly embed here, as
106 : * implementations of the parallel table scan desc interface might need
107 : * stronger alignment.
108 : */
109 : } BrinShared;
110 :
111 : /*
112 : * Return pointer to a BrinShared's parallel table scan.
113 : *
114 : * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just
115 : * MAXALIGN.
116 : */
117 : #define ParallelTableScanFromBrinShared(shared) \
118 : (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(BrinShared)))
119 :
120 : /*
121 : * Status for leader in parallel index build.
122 : */
123 : typedef struct BrinLeader
124 : {
125 : /* parallel context itself */
126 : ParallelContext *pcxt;
127 :
128 : /*
129 : * nparticipanttuplesorts is the exact number of worker processes
130 : * successfully launched, plus one leader process if it participates as a
131 : * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader
132 : * participating as a worker).
133 : */
134 : int nparticipanttuplesorts;
135 :
136 : /*
137 : * Leader process convenience pointers to shared state (leader avoids TOC
138 : * lookups).
139 : *
140 : * brinshared is the shared state for entire build. sharedsort is the
141 : * shared, tuplesort-managed state passed to each process tuplesort.
142 : * snapshot is the snapshot used by the scan iff an MVCC snapshot is
143 : * required.
144 : */
145 : BrinShared *brinshared;
146 : Sharedsort *sharedsort;
147 : Snapshot snapshot;
148 : WalUsage *walusage;
149 : BufferUsage *bufferusage;
150 : } BrinLeader;
151 :
152 : /*
153 : * We use a BrinBuildState during initial construction of a BRIN index.
154 : * The running state is kept in a BrinMemTuple.
155 : */
156 : typedef struct BrinBuildState
157 : {
158 : Relation bs_irel;
159 : double bs_numtuples;
160 : double bs_reltuples;
161 : Buffer bs_currentInsertBuf;
162 : BlockNumber bs_pagesPerRange;
163 : BlockNumber bs_currRangeStart;
164 : BlockNumber bs_maxRangeStart;
165 : BrinRevmap *bs_rmAccess;
166 : BrinDesc *bs_bdesc;
167 : BrinMemTuple *bs_dtuple;
168 :
169 : BrinTuple *bs_emptyTuple;
170 : Size bs_emptyTupleLen;
171 : MemoryContext bs_context;
172 :
173 : /*
174 : * bs_leader is only present when a parallel index build is performed, and
175 : * only in the leader process. (Actually, only the leader process has a
176 : * BrinBuildState.)
177 : */
178 : BrinLeader *bs_leader;
179 : int bs_worker_id;
180 :
181 : /*
182 : * The sortstate is used by workers (including the leader). It has to be
183 : * part of the build state, because that's the only thing passed to the
184 : * build callback etc.
185 : */
186 : Tuplesortstate *bs_sortstate;
187 : } BrinBuildState;
188 :
189 : /*
190 : * We use a BrinInsertState to capture running state spanning multiple
191 : * brininsert invocations, within the same command.
192 : */
193 : typedef struct BrinInsertState
194 : {
195 : BrinRevmap *bis_rmAccess;
196 : BrinDesc *bis_desc;
197 : BlockNumber bis_pages_per_range;
198 : } BrinInsertState;
199 :
200 : /*
201 : * Struct used as "opaque" during index scans
202 : */
203 : typedef struct BrinOpaque
204 : {
205 : BlockNumber bo_pagesPerRange;
206 : BrinRevmap *bo_rmAccess;
207 : BrinDesc *bo_bdesc;
208 : } BrinOpaque;
209 :
210 : #define BRIN_ALL_BLOCKRANGES InvalidBlockNumber
211 :
212 : static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
213 : BrinRevmap *revmap,
214 : BlockNumber pagesPerRange,
215 : BlockNumber tablePages);
216 : static BrinInsertState *initialize_brin_insertstate(Relation idxRel, IndexInfo *indexInfo);
217 : static void terminate_brin_buildstate(BrinBuildState *state);
218 : static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
219 : bool include_partial, double *numSummarized, double *numExisting);
220 : static void form_and_insert_tuple(BrinBuildState *state);
221 : static void form_and_spill_tuple(BrinBuildState *state);
222 : static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
223 : BrinTuple *b);
224 : static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
225 : static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
226 : BrinMemTuple *dtup, const Datum *values, const bool *nulls);
227 : static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
228 : static void brin_fill_empty_ranges(BrinBuildState *state,
229 : BlockNumber prevRange, BlockNumber nextRange);
230 :
231 : /* parallel index builds */
232 : static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
233 : bool isconcurrent, int request);
234 : static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state);
235 : static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
236 : static double _brin_parallel_heapscan(BrinBuildState *state);
237 : static double _brin_parallel_merge(BrinBuildState *state);
238 : static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
239 : Relation heap, Relation index);
240 : static void _brin_parallel_scan_and_build(BrinBuildState *state,
241 : BrinShared *brinshared,
242 : Sharedsort *sharedsort,
243 : Relation heap, Relation index,
244 : int sortmem, bool progress);
245 :
246 : /*
247 : * BRIN handler function: return IndexAmRoutine with access method parameters
248 : * and callbacks.
249 : */
250 : Datum
251 2231 : brinhandler(PG_FUNCTION_ARGS)
252 : {
253 : static const IndexAmRoutine amroutine = {
254 : .type = T_IndexAmRoutine,
255 : .amstrategies = 0,
256 : .amsupport = BRIN_LAST_OPTIONAL_PROCNUM,
257 : .amoptsprocnum = BRIN_PROCNUM_OPTIONS,
258 : .amcanorder = false,
259 : .amcanorderbyop = false,
260 : .amcanhash = false,
261 : .amconsistentequality = false,
262 : .amconsistentordering = false,
263 : .amcanbackward = false,
264 : .amcanunique = false,
265 : .amcanmulticol = true,
266 : .amoptionalkey = true,
267 : .amsearcharray = false,
268 : .amsearchnulls = true,
269 : .amstorage = true,
270 : .amclusterable = false,
271 : .ampredlocks = false,
272 : .amcanparallel = false,
273 : .amcanbuildparallel = true,
274 : .amcaninclude = false,
275 : .amusemaintenanceworkmem = false,
276 : .amsummarizing = true,
277 : .amparallelvacuumoptions =
278 : VACUUM_OPTION_PARALLEL_CLEANUP,
279 : .amkeytype = InvalidOid,
280 :
281 : .ambuild = brinbuild,
282 : .ambuildempty = brinbuildempty,
283 : .aminsert = brininsert,
284 : .aminsertcleanup = brininsertcleanup,
285 : .ambulkdelete = brinbulkdelete,
286 : .amvacuumcleanup = brinvacuumcleanup,
287 : .amcanreturn = NULL,
288 : .amcostestimate = brincostestimate,
289 : .amgettreeheight = NULL,
290 : .amoptions = brinoptions,
291 : .amproperty = NULL,
292 : .ambuildphasename = NULL,
293 : .amvalidate = brinvalidate,
294 : .amadjustmembers = NULL,
295 : .ambeginscan = brinbeginscan,
296 : .amrescan = brinrescan,
297 : .amgettuple = NULL,
298 : .amgetbitmap = bringetbitmap,
299 : .amendscan = brinendscan,
300 : .ammarkpos = NULL,
301 : .amrestrpos = NULL,
302 : .amestimateparallelscan = NULL,
303 : .aminitparallelscan = NULL,
304 : .amparallelrescan = NULL,
305 : .amtranslatestrategy = NULL,
306 : .amtranslatecmptype = NULL,
307 : };
308 :
309 2231 : PG_RETURN_POINTER(&amroutine);
310 : }
311 :
312 : /*
313 : * Initialize a BrinInsertState to maintain state to be used across multiple
314 : * tuple inserts, within the same command.
315 : */
316 : static BrinInsertState *
317 561 : initialize_brin_insertstate(Relation idxRel, IndexInfo *indexInfo)
318 : {
319 : BrinInsertState *bistate;
320 : MemoryContext oldcxt;
321 :
322 561 : oldcxt = MemoryContextSwitchTo(indexInfo->ii_Context);
323 561 : bistate = palloc0_object(BrinInsertState);
324 561 : bistate->bis_desc = brin_build_desc(idxRel);
325 561 : bistate->bis_rmAccess = brinRevmapInitialize(idxRel,
326 : &bistate->bis_pages_per_range);
327 561 : indexInfo->ii_AmCache = bistate;
328 561 : MemoryContextSwitchTo(oldcxt);
329 :
330 561 : return bistate;
331 : }
332 :
333 : /*
334 : * A tuple in the heap is being inserted. To keep a brin index up to date,
335 : * we need to obtain the relevant index tuple and compare its stored values
336 : * with those of the new tuple. If the tuple values are not consistent with
337 : * the summary tuple, we need to update the index tuple.
338 : *
339 : * If autosummarization is enabled, check if we need to summarize the previous
340 : * page range.
341 : *
342 : * If the range is not currently summarized (i.e. the revmap returns NULL for
343 : * it), there's nothing to do for this tuple.
344 : */
345 : bool
346 63108 : brininsert(Relation idxRel, Datum *values, bool *nulls,
347 : ItemPointer heaptid, Relation heapRel,
348 : IndexUniqueCheck checkUnique,
349 : bool indexUnchanged,
350 : IndexInfo *indexInfo)
351 : {
352 : BlockNumber pagesPerRange;
353 : BlockNumber origHeapBlk;
354 : BlockNumber heapBlk;
355 63108 : BrinInsertState *bistate = (BrinInsertState *) indexInfo->ii_AmCache;
356 : BrinRevmap *revmap;
357 : BrinDesc *bdesc;
358 63108 : Buffer buf = InvalidBuffer;
359 63108 : MemoryContext tupcxt = NULL;
360 63108 : MemoryContext oldcxt = CurrentMemoryContext;
361 63108 : bool autosummarize = BrinGetAutoSummarize(idxRel);
362 :
363 : /*
364 : * If first time through in this statement, initialize the insert state
365 : * that we keep for all the inserts in the command.
366 : */
367 63108 : if (!bistate)
368 561 : bistate = initialize_brin_insertstate(idxRel, indexInfo);
369 :
370 63108 : revmap = bistate->bis_rmAccess;
371 63108 : bdesc = bistate->bis_desc;
372 63108 : pagesPerRange = bistate->bis_pages_per_range;
373 :
374 : /*
375 : * origHeapBlk is the block number where the insertion occurred. heapBlk
376 : * is the first block in the corresponding page range.
377 : */
378 63108 : origHeapBlk = ItemPointerGetBlockNumber(heaptid);
379 63108 : heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
380 :
381 : for (;;)
382 0 : {
383 63108 : bool need_insert = false;
384 : OffsetNumber off;
385 : BrinTuple *brtup;
386 : BrinMemTuple *dtup;
387 :
388 63108 : CHECK_FOR_INTERRUPTS();
389 :
390 : /*
391 : * If auto-summarization is enabled and we just inserted the first
392 : * tuple into the first block of a new non-first page range, request a
393 : * summarization run of the previous range.
394 : */
395 63108 : if (autosummarize &&
396 145 : heapBlk > 0 &&
397 145 : heapBlk == origHeapBlk &&
398 145 : ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
399 : {
400 8 : BlockNumber lastPageRange = heapBlk - 1;
401 : BrinTuple *lastPageTuple;
402 :
403 : lastPageTuple =
404 8 : brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
405 : NULL, BUFFER_LOCK_SHARE);
406 8 : if (!lastPageTuple)
407 : {
408 : bool recorded;
409 :
410 6 : recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
411 : RelationGetRelid(idxRel),
412 : lastPageRange);
413 6 : if (!recorded)
414 0 : ereport(LOG,
415 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
416 : errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
417 : RelationGetRelationName(idxRel),
418 : lastPageRange)));
419 : }
420 : else
421 2 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
422 : }
423 :
424 63108 : brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
425 : NULL, BUFFER_LOCK_SHARE);
426 :
427 : /* if range is unsummarized, there's nothing to do */
428 63108 : if (!brtup)
429 39127 : break;
430 :
431 : /* First time through in this brininsert call? */
432 23981 : if (tupcxt == NULL)
433 : {
434 23981 : tupcxt = AllocSetContextCreate(CurrentMemoryContext,
435 : "brininsert cxt",
436 : ALLOCSET_DEFAULT_SIZES);
437 23981 : MemoryContextSwitchTo(tupcxt);
438 : }
439 :
440 23981 : dtup = brin_deform_tuple(bdesc, brtup, NULL);
441 :
442 23981 : need_insert = add_values_to_range(idxRel, bdesc, dtup, values, nulls);
443 :
444 23981 : if (!need_insert)
445 : {
446 : /*
447 : * The tuple is consistent with the new values, so there's nothing
448 : * to do.
449 : */
450 12017 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
451 : }
452 : else
453 : {
454 11964 : Page page = BufferGetPage(buf);
455 11964 : ItemId lp = PageGetItemId(page, off);
456 : Size origsz;
457 : BrinTuple *origtup;
458 : Size newsz;
459 : BrinTuple *newtup;
460 : bool samepage;
461 :
462 : /*
463 : * Make a copy of the old tuple, so that we can compare it after
464 : * re-acquiring the lock.
465 : */
466 11964 : origsz = ItemIdGetLength(lp);
467 11964 : origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
468 :
469 : /*
470 : * Before releasing the lock, check if we can attempt a same-page
471 : * update. Another process could insert a tuple concurrently in
472 : * the same page though, so downstream we must be prepared to cope
473 : * if this turns out to not be possible after all.
474 : */
475 11964 : newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
476 11964 : samepage = brin_can_do_samepage_update(buf, origsz, newsz);
477 11964 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
478 :
479 : /*
480 : * Try to update the tuple. If this doesn't work for whatever
481 : * reason, we need to restart from the top; the revmap might be
482 : * pointing at a different tuple for this block now, so we need to
483 : * recompute to ensure both our new heap tuple and the other
484 : * inserter's are covered by the combined tuple. It might be that
485 : * we don't need to update at all.
486 : */
487 11964 : if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
488 : buf, off, origtup, origsz, newtup, newsz,
489 : samepage))
490 : {
491 : /* no luck; start over */
492 0 : MemoryContextReset(tupcxt);
493 0 : continue;
494 : }
495 : }
496 :
497 : /* success! */
498 23981 : break;
499 : }
500 :
501 63108 : if (BufferIsValid(buf))
502 23983 : ReleaseBuffer(buf);
503 63108 : MemoryContextSwitchTo(oldcxt);
504 63108 : if (tupcxt != NULL)
505 23981 : MemoryContextDelete(tupcxt);
506 :
507 63108 : return false;
508 : }
509 :
510 : /*
511 : * Callback to clean up the BrinInsertState once all tuple inserts are done.
512 : */
513 : void
514 578 : brininsertcleanup(Relation index, IndexInfo *indexInfo)
515 : {
516 578 : BrinInsertState *bistate = (BrinInsertState *) indexInfo->ii_AmCache;
517 :
518 : /* bail out if cache not initialized */
519 578 : if (bistate == NULL)
520 17 : return;
521 :
522 : /* do this first to avoid dangling pointer if we fail partway through */
523 561 : indexInfo->ii_AmCache = NULL;
524 :
525 : /*
526 : * Clean up the revmap. Note that the brinDesc has already been cleaned up
527 : * as part of its own memory context.
528 : */
529 561 : brinRevmapTerminate(bistate->bis_rmAccess);
530 561 : pfree(bistate);
531 : }
532 :
533 : /*
534 : * Initialize state for a BRIN index scan.
535 : *
536 : * We read the metapage here to determine the pages-per-range number that this
537 : * index was built with. Note that since this cannot be changed while we're
538 : * holding lock on index, it's not necessary to recompute it during brinrescan.
539 : */
540 : IndexScanDesc
541 1473 : brinbeginscan(Relation r, int nkeys, int norderbys)
542 : {
543 : IndexScanDesc scan;
544 : BrinOpaque *opaque;
545 :
546 1473 : scan = RelationGetIndexScan(r, nkeys, norderbys);
547 :
548 1473 : opaque = palloc_object(BrinOpaque);
549 1473 : opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange);
550 1473 : opaque->bo_bdesc = brin_build_desc(r);
551 1473 : scan->opaque = opaque;
552 :
553 1473 : return scan;
554 : }
555 :
556 : /*
557 : * Execute the index scan.
558 : *
559 : * This works by reading index TIDs from the revmap, and obtaining the index
560 : * tuples pointed to by them; the summary values in the index tuples are
561 : * compared to the scan keys. We return into the TID bitmap all the pages in
562 : * ranges corresponding to index tuples that match the scan keys.
563 : *
564 : * If a TID from the revmap is read as InvalidTID, we know that range is
565 : * unsummarized. Pages in those ranges need to be returned regardless of scan
566 : * keys.
567 : */
568 : int64
569 1473 : bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
570 : {
571 1473 : Relation idxRel = scan->indexRelation;
572 1473 : Buffer buf = InvalidBuffer;
573 : BrinDesc *bdesc;
574 : Oid heapOid;
575 : Relation heapRel;
576 : BrinOpaque *opaque;
577 : BlockNumber nblocks;
578 1473 : int64 totalpages = 0;
579 : FmgrInfo *consistentFn;
580 : MemoryContext oldcxt;
581 : MemoryContext perRangeCxt;
582 : BrinMemTuple *dtup;
583 1473 : BrinTuple *btup = NULL;
584 1473 : Size btupsz = 0;
585 : ScanKey **keys,
586 : **nullkeys;
587 : int *nkeys,
588 : *nnullkeys;
589 : char *ptr;
590 : Size len;
591 : char *tmp PG_USED_FOR_ASSERTS_ONLY;
592 :
593 1473 : opaque = (BrinOpaque *) scan->opaque;
594 1473 : bdesc = opaque->bo_bdesc;
595 1473 : pgstat_count_index_scan(idxRel);
596 1473 : if (scan->instrument)
597 1473 : scan->instrument->nsearches++;
598 :
599 : /*
600 : * We need to know the size of the table so that we know how long to
601 : * iterate on the revmap.
602 : */
603 1473 : heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
604 1473 : heapRel = table_open(heapOid, AccessShareLock);
605 1473 : nblocks = RelationGetNumberOfBlocks(heapRel);
606 1473 : table_close(heapRel, AccessShareLock);
607 :
608 : /*
609 : * Make room for the consistent support procedures of indexed columns. We
610 : * don't look them up here; we do that lazily the first time we see a scan
611 : * key reference each of them. We rely on zeroing fn_oid to InvalidOid.
612 : */
613 1473 : consistentFn = palloc0_array(FmgrInfo, bdesc->bd_tupdesc->natts);
614 :
615 : /*
616 : * Make room for per-attribute lists of scan keys that we'll pass to the
617 : * consistent support procedure. We don't know which attributes have scan
618 : * keys, so we allocate space for all attributes. That may use more memory
619 : * but it's probably cheaper than determining which attributes are used.
620 : *
621 : * We keep null and regular keys separate, so that we can pass just the
622 : * regular keys to the consistent function easily.
623 : *
624 : * To reduce the allocation overhead, we allocate one big chunk and then
625 : * carve it into smaller arrays ourselves. All the pieces have exactly the
626 : * same lifetime, so that's OK.
627 : *
628 : * XXX The widest index can have 32 attributes, so the amount of wasted
629 : * memory is negligible. We could invent a more compact approach (with
630 : * just space for used attributes) but that would make the matching more
631 : * complex so it's not a good trade-off.
632 : */
633 1473 : len =
634 1473 : MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) + /* regular keys */
635 1473 : MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
636 1473 : MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts) +
637 1473 : MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) + /* NULL keys */
638 1473 : MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
639 1473 : MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
640 :
641 1473 : ptr = palloc(len);
642 1473 : tmp = ptr;
643 :
644 1473 : keys = (ScanKey **) ptr;
645 1473 : ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
646 :
647 1473 : nullkeys = (ScanKey **) ptr;
648 1473 : ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
649 :
650 1473 : nkeys = (int *) ptr;
651 1473 : ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
652 :
653 1473 : nnullkeys = (int *) ptr;
654 1473 : ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
655 :
656 34989 : for (int i = 0; i < bdesc->bd_tupdesc->natts; i++)
657 : {
658 33516 : keys[i] = (ScanKey *) ptr;
659 33516 : ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
660 :
661 33516 : nullkeys[i] = (ScanKey *) ptr;
662 33516 : ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
663 : }
664 :
665 : Assert(tmp + len == ptr);
666 :
667 : /* zero the number of keys */
668 1473 : memset(nkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
669 1473 : memset(nnullkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
670 :
671 : /* Preprocess the scan keys - split them into per-attribute arrays. */
672 2946 : for (int keyno = 0; keyno < scan->numberOfKeys; keyno++)
673 : {
674 1473 : ScanKey key = &scan->keyData[keyno];
675 1473 : AttrNumber keyattno = key->sk_attno;
676 :
677 : /*
678 : * The collation of the scan key must match the collation used in the
679 : * index column (but only if the search is not IS NULL/ IS NOT NULL).
680 : * Otherwise we shouldn't be using this index ...
681 : */
682 : Assert((key->sk_flags & SK_ISNULL) ||
683 : (key->sk_collation ==
684 : TupleDescAttr(bdesc->bd_tupdesc,
685 : keyattno - 1)->attcollation));
686 :
687 : /*
688 : * First time we see this index attribute, so init as needed.
689 : *
690 : * This is a bit of an overkill - we don't know how many scan keys are
691 : * there for this attribute, so we simply allocate the largest number
692 : * possible (as if all keys were for this attribute). This may waste a
693 : * bit of memory, but we only expect small number of scan keys in
694 : * general, so this should be negligible, and repeated repalloc calls
695 : * are not free either.
696 : */
697 1473 : if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
698 : {
699 : FmgrInfo *tmp;
700 :
701 : /* First time we see this attribute, so no key/null keys. */
702 : Assert(nkeys[keyattno - 1] == 0);
703 : Assert(nnullkeys[keyattno - 1] == 0);
704 :
705 1473 : tmp = index_getprocinfo(idxRel, keyattno,
706 : BRIN_PROCNUM_CONSISTENT);
707 1473 : fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
708 : CurrentMemoryContext);
709 : }
710 :
711 : /* Add key to the proper per-attribute array. */
712 1473 : if (key->sk_flags & SK_ISNULL)
713 : {
714 18 : nullkeys[keyattno - 1][nnullkeys[keyattno - 1]] = key;
715 18 : nnullkeys[keyattno - 1]++;
716 : }
717 : else
718 : {
719 1455 : keys[keyattno - 1][nkeys[keyattno - 1]] = key;
720 1455 : nkeys[keyattno - 1]++;
721 : }
722 : }
723 :
724 : /* allocate an initial in-memory tuple, out of the per-range memcxt */
725 1473 : dtup = brin_new_memtuple(bdesc);
726 :
727 : /*
728 : * Setup and use a per-range memory context, which is reset every time we
729 : * loop below. This avoids having to free the tuples within the loop.
730 : */
731 1473 : perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
732 : "bringetbitmap cxt",
733 : ALLOCSET_DEFAULT_SIZES);
734 1473 : oldcxt = MemoryContextSwitchTo(perRangeCxt);
735 :
736 : /*
737 : * Now scan the revmap. We start by querying for heap page 0,
738 : * incrementing by the number of pages per range; this gives us a full
739 : * view of the table. We make use of uint64 for heapBlk as a BlockNumber
740 : * could wrap for tables with close to 2^32 pages.
741 : */
742 97299 : for (uint64 heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
743 : {
744 : bool addrange;
745 95826 : bool gottuple = false;
746 : BrinTuple *tup;
747 : OffsetNumber off;
748 : Size size;
749 :
750 95826 : CHECK_FOR_INTERRUPTS();
751 :
752 95826 : MemoryContextReset(perRangeCxt);
753 :
754 95826 : tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, (BlockNumber) heapBlk, &buf,
755 : &off, &size, BUFFER_LOCK_SHARE);
756 95826 : if (tup)
757 : {
758 94968 : gottuple = true;
759 94968 : btup = brin_copy_tuple(tup, size, btup, &btupsz);
760 94968 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
761 : }
762 :
763 : /*
764 : * For page ranges with no indexed tuple, we must return the whole
765 : * range; otherwise, compare it to the scan keys.
766 : */
767 95826 : if (!gottuple)
768 : {
769 858 : addrange = true;
770 : }
771 : else
772 : {
773 94968 : dtup = brin_deform_tuple(bdesc, btup, dtup);
774 94968 : if (dtup->bt_placeholder)
775 : {
776 : /*
777 : * Placeholder tuples are always returned, regardless of the
778 : * values stored in them.
779 : */
780 0 : addrange = true;
781 : }
782 : else
783 : {
784 : int attno;
785 :
786 : /*
787 : * Compare scan keys with summary values stored for the range.
788 : * If scan keys are matched, the page range must be added to
789 : * the bitmap. We initially assume the range needs to be
790 : * added; in particular this serves the case where there are
791 : * no keys.
792 : */
793 94968 : addrange = true;
794 2352034 : for (attno = 1; attno <= bdesc->bd_tupdesc->natts; attno++)
795 : {
796 : BrinValues *bval;
797 : Datum add;
798 : Oid collation;
799 :
800 : /*
801 : * skip attributes without any scan keys (both regular and
802 : * IS [NOT] NULL)
803 : */
804 2283867 : if (nkeys[attno - 1] == 0 && nnullkeys[attno - 1] == 0)
805 2188899 : continue;
806 :
807 94968 : bval = &dtup->bt_columns[attno - 1];
808 :
809 : /*
810 : * If the BRIN tuple indicates that this range is empty,
811 : * we can skip it: there's nothing to match. We don't
812 : * need to examine the next columns.
813 : */
814 94968 : if (dtup->bt_empty_range)
815 : {
816 0 : addrange = false;
817 0 : break;
818 : }
819 :
820 : /*
821 : * First check if there are any IS [NOT] NULL scan keys,
822 : * and if we're violating them. In that case we can
823 : * terminate early, without invoking the support function.
824 : *
825 : * As there may be more keys, we can only determine
826 : * mismatch within this loop.
827 : */
828 94968 : if (bdesc->bd_info[attno - 1]->oi_regular_nulls &&
829 94968 : !check_null_keys(bval, nullkeys[attno - 1],
830 94968 : nnullkeys[attno - 1]))
831 : {
832 : /*
833 : * If any of the IS [NOT] NULL keys failed, the page
834 : * range as a whole can't pass. So terminate the loop.
835 : */
836 498 : addrange = false;
837 498 : break;
838 : }
839 :
840 : /*
841 : * So either there are no IS [NOT] NULL keys, or all
842 : * passed. If there are no regular scan keys, we're done -
843 : * the page range matches. If there are regular keys, but
844 : * the page range is marked as 'all nulls' it can't
845 : * possibly pass (we're assuming the operators are
846 : * strict).
847 : */
848 :
849 : /* No regular scan keys - page range as a whole passes. */
850 94470 : if (!nkeys[attno - 1])
851 618 : continue;
852 :
853 : Assert((nkeys[attno - 1] > 0) &&
854 : (nkeys[attno - 1] <= scan->numberOfKeys));
855 :
856 : /* If it is all nulls, it cannot possibly be consistent. */
857 93852 : if (bval->bv_allnulls)
858 : {
859 189 : addrange = false;
860 189 : break;
861 : }
862 :
863 : /*
864 : * Collation from the first key (has to be the same for
865 : * all keys for the same attribute).
866 : */
867 93663 : collation = keys[attno - 1][0]->sk_collation;
868 :
869 : /*
870 : * Check whether the scan key is consistent with the page
871 : * range values; if so, have the pages in the range added
872 : * to the output bitmap.
873 : *
874 : * The opclass may or may not support processing of
875 : * multiple scan keys. We can determine that based on the
876 : * number of arguments - functions with extra parameter
877 : * (number of scan keys) do support this, otherwise we
878 : * have to simply pass the scan keys one by one.
879 : */
880 93663 : if (consistentFn[attno - 1].fn_nargs >= 4)
881 : {
882 : /* Check all keys at once */
883 19797 : add = FunctionCall4Coll(&consistentFn[attno - 1],
884 : collation,
885 : PointerGetDatum(bdesc),
886 : PointerGetDatum(bval),
887 19797 : PointerGetDatum(keys[attno - 1]),
888 19797 : Int32GetDatum(nkeys[attno - 1]));
889 19797 : addrange = DatumGetBool(add);
890 : }
891 : else
892 : {
893 : /*
894 : * Check keys one by one
895 : *
896 : * When there are multiple scan keys, failure to meet
897 : * the criteria for a single one of them is enough to
898 : * discard the range as a whole, so break out of the
899 : * loop as soon as a false return value is obtained.
900 : */
901 : int keyno;
902 :
903 129039 : for (keyno = 0; keyno < nkeys[attno - 1]; keyno++)
904 : {
905 73866 : add = FunctionCall3Coll(&consistentFn[attno - 1],
906 73866 : keys[attno - 1][keyno]->sk_collation,
907 : PointerGetDatum(bdesc),
908 : PointerGetDatum(bval),
909 73866 : PointerGetDatum(keys[attno - 1][keyno]));
910 73866 : addrange = DatumGetBool(add);
911 73866 : if (!addrange)
912 18693 : break;
913 : }
914 : }
915 :
916 : /*
917 : * If we found a scan key eliminating the range, no need
918 : * to check additional ones.
919 : */
920 93663 : if (!addrange)
921 26114 : break;
922 : }
923 : }
924 : }
925 :
926 : /* add the pages in the range to the output bitmap, if needed */
927 95826 : if (addrange)
928 : {
929 : uint64 pageno;
930 :
931 69025 : for (pageno = heapBlk;
932 143010 : pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
933 73985 : pageno++)
934 : {
935 73985 : MemoryContextSwitchTo(oldcxt);
936 73985 : tbm_add_page(tbm, pageno);
937 73985 : totalpages++;
938 73985 : MemoryContextSwitchTo(perRangeCxt);
939 : }
940 : }
941 : }
942 :
943 1473 : MemoryContextSwitchTo(oldcxt);
944 1473 : MemoryContextDelete(perRangeCxt);
945 :
946 1473 : if (buf != InvalidBuffer)
947 1473 : ReleaseBuffer(buf);
948 :
949 : /*
950 : * XXX We have an approximation of the number of *pages* that our scan
951 : * returns, but we don't have a precise idea of the number of heap tuples
952 : * involved.
953 : */
954 1473 : return totalpages * 10;
955 : }
956 :
957 : /*
958 : * Re-initialize state for a BRIN index scan
959 : */
960 : void
961 1473 : brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
962 : ScanKey orderbys, int norderbys)
963 : {
964 : /*
965 : * Other index AMs preprocess the scan keys at this point, or sometime
966 : * early during the scan; this lets them optimize by removing redundant
967 : * keys, or doing early returns when they are impossible to satisfy; see
968 : * _bt_preprocess_keys for an example. Something like that could be added
969 : * here someday, too.
970 : */
971 :
972 1473 : if (scankey && scan->numberOfKeys > 0)
973 1473 : memcpy(scan->keyData, scankey, scan->numberOfKeys * sizeof(ScanKeyData));
974 1473 : }
975 :
976 : /*
977 : * Close down a BRIN index scan
978 : */
979 : void
980 1473 : brinendscan(IndexScanDesc scan)
981 : {
982 1473 : BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
983 :
984 1473 : brinRevmapTerminate(opaque->bo_rmAccess);
985 1473 : brin_free_desc(opaque->bo_bdesc);
986 1473 : pfree(opaque);
987 1473 : }
988 :
989 : /*
990 : * Per-heap-tuple callback for table_index_build_scan.
991 : *
992 : * Note we don't worry about the page range at the end of the table here; it is
993 : * present in the build state struct after we're called the last time, but not
994 : * inserted into the index. Caller must ensure to do so, if appropriate.
995 : */
996 : static void
997 364233 : brinbuildCallback(Relation index,
998 : ItemPointer tid,
999 : Datum *values,
1000 : bool *isnull,
1001 : bool tupleIsAlive,
1002 : void *brstate)
1003 : {
1004 364233 : BrinBuildState *state = (BrinBuildState *) brstate;
1005 : BlockNumber thisblock;
1006 :
1007 364233 : thisblock = ItemPointerGetBlockNumber(tid);
1008 :
1009 : /*
1010 : * If we're in a block that belongs to a future range, summarize what
1011 : * we've got and start afresh. Note the scan might have skipped many
1012 : * pages, if they were devoid of live tuples; make sure to insert index
1013 : * tuples for those too.
1014 : */
1015 365381 : while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
1016 : {
1017 :
1018 : BRIN_elog((DEBUG2,
1019 : "brinbuildCallback: completed a range: %u--%u",
1020 : state->bs_currRangeStart,
1021 : state->bs_currRangeStart + state->bs_pagesPerRange));
1022 :
1023 : /* create the index tuple and insert it */
1024 1148 : form_and_insert_tuple(state);
1025 :
1026 : /* set state to correspond to the next range */
1027 1148 : state->bs_currRangeStart += state->bs_pagesPerRange;
1028 :
1029 : /* re-initialize state for it */
1030 1148 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1031 : }
1032 :
1033 : /* Accumulate the current tuple into the running state */
1034 364233 : (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
1035 : values, isnull);
1036 364233 : }
1037 :
1038 : /*
1039 : * Per-heap-tuple callback for table_index_build_scan with parallelism.
1040 : *
1041 : * A version of the callback used by parallel index builds. The main difference
1042 : * is that instead of writing the BRIN tuples into the index, we write them
1043 : * into a shared tuplesort, and leave the insertion up to the leader (which may
1044 : * reorder them a bit etc.). The callback also does not generate empty ranges,
1045 : * those will be added by the leader when merging results from workers.
1046 : */
1047 : static void
1048 3981 : brinbuildCallbackParallel(Relation index,
1049 : ItemPointer tid,
1050 : Datum *values,
1051 : bool *isnull,
1052 : bool tupleIsAlive,
1053 : void *brstate)
1054 : {
1055 3981 : BrinBuildState *state = (BrinBuildState *) brstate;
1056 : BlockNumber thisblock;
1057 :
1058 3981 : thisblock = ItemPointerGetBlockNumber(tid);
1059 :
1060 : /*
1061 : * If we're in a block that belongs to a different range, summarize what
1062 : * we've got and start afresh. Note the scan might have skipped many
1063 : * pages, if they were devoid of live tuples; we do not create empty BRIN
1064 : * ranges here - the leader is responsible for filling them in.
1065 : *
1066 : * Unlike serial builds, parallel index builds allow synchronized seqscans
1067 : * (because that's what parallel scans do). This means the block may wrap
1068 : * around to the beginning of the relation, so the condition needs to
1069 : * check for both future and past ranges.
1070 : */
1071 3981 : if ((thisblock < state->bs_currRangeStart) ||
1072 3981 : (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1))
1073 : {
1074 :
1075 : BRIN_elog((DEBUG2,
1076 : "brinbuildCallbackParallel: completed a range: %u--%u",
1077 : state->bs_currRangeStart,
1078 : state->bs_currRangeStart + state->bs_pagesPerRange));
1079 :
1080 : /* create the index tuple and write it into the tuplesort */
1081 19 : form_and_spill_tuple(state);
1082 :
1083 : /*
1084 : * Set state to correspond to the next range (for this block).
1085 : *
1086 : * This skips ranges that are either empty (and so we don't get any
1087 : * tuples to summarize), or processed by other workers. We can't
1088 : * differentiate those cases here easily, so we leave it up to the
1089 : * leader to fill empty ranges where needed.
1090 : */
1091 : state->bs_currRangeStart
1092 19 : = state->bs_pagesPerRange * (thisblock / state->bs_pagesPerRange);
1093 :
1094 : /* re-initialize state for it */
1095 19 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1096 : }
1097 :
1098 : /* Accumulate the current tuple into the running state */
1099 3981 : (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
1100 : values, isnull);
1101 3981 : }
1102 :
1103 : /*
1104 : * brinbuild() -- build a new BRIN index.
1105 : */
1106 : IndexBuildResult *
1107 184 : brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
1108 : {
1109 : IndexBuildResult *result;
1110 : double reltuples;
1111 : double idxtuples;
1112 : BrinRevmap *revmap;
1113 : BrinBuildState *state;
1114 : Buffer meta;
1115 : BlockNumber pagesPerRange;
1116 :
1117 : /*
1118 : * We expect to be called exactly once for any index relation.
1119 : */
1120 184 : if (RelationGetNumberOfBlocks(index) != 0)
1121 0 : elog(ERROR, "index \"%s\" already contains data",
1122 : RelationGetRelationName(index));
1123 :
1124 : /*
1125 : * Critical section not required, because on error the creation of the
1126 : * whole relation will be rolled back.
1127 : */
1128 :
1129 184 : meta = ExtendBufferedRel(BMR_REL(index), MAIN_FORKNUM, NULL,
1130 : EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
1131 : Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
1132 :
1133 184 : brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
1134 : BRIN_CURRENT_VERSION);
1135 184 : MarkBufferDirty(meta);
1136 :
1137 184 : if (RelationNeedsWAL(index))
1138 : {
1139 : xl_brin_createidx xlrec;
1140 : XLogRecPtr recptr;
1141 : Page page;
1142 :
1143 127 : xlrec.version = BRIN_CURRENT_VERSION;
1144 127 : xlrec.pagesPerRange = BrinGetPagesPerRange(index);
1145 :
1146 127 : XLogBeginInsert();
1147 127 : XLogRegisterData(&xlrec, SizeOfBrinCreateIdx);
1148 127 : XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
1149 :
1150 127 : recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
1151 :
1152 127 : page = BufferGetPage(meta);
1153 127 : PageSetLSN(page, recptr);
1154 : }
1155 :
1156 184 : UnlockReleaseBuffer(meta);
1157 :
1158 : /*
1159 : * Initialize our state, including the deformed tuple state.
1160 : */
1161 184 : revmap = brinRevmapInitialize(index, &pagesPerRange);
1162 184 : state = initialize_brin_buildstate(index, revmap, pagesPerRange,
1163 : RelationGetNumberOfBlocks(heap));
1164 :
1165 : /*
1166 : * Attempt to launch parallel worker scan when required
1167 : *
1168 : * XXX plan_create_index_workers makes the number of workers dependent on
1169 : * maintenance_work_mem, requiring 32MB for each worker. That makes sense
1170 : * for btree, but not for BRIN, which can do with much less memory. So
1171 : * maybe make that somehow less strict, optionally?
1172 : */
1173 184 : if (indexInfo->ii_ParallelWorkers > 0)
1174 5 : _brin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent,
1175 : indexInfo->ii_ParallelWorkers);
1176 :
1177 : /*
1178 : * If parallel build requested and at least one worker process was
1179 : * successfully launched, set up coordination state, wait for workers to
1180 : * complete. Then read all tuples from the shared tuplesort and insert
1181 : * them into the index.
1182 : *
1183 : * In serial mode, simply scan the table and build the index one index
1184 : * tuple at a time.
1185 : */
1186 184 : if (state->bs_leader)
1187 : {
1188 : SortCoordinate coordinate;
1189 :
1190 4 : coordinate = palloc0_object(SortCoordinateData);
1191 4 : coordinate->isWorker = false;
1192 4 : coordinate->nParticipants =
1193 4 : state->bs_leader->nparticipanttuplesorts;
1194 4 : coordinate->sharedsort = state->bs_leader->sharedsort;
1195 :
1196 : /*
1197 : * Begin leader tuplesort.
1198 : *
1199 : * In cases where parallelism is involved, the leader receives the
1200 : * same share of maintenance_work_mem as a serial sort (it is
1201 : * generally treated in the same way as a serial sort once we return).
1202 : * Parallel worker Tuplesortstates will have received only a fraction
1203 : * of maintenance_work_mem, though.
1204 : *
1205 : * We rely on the lifetime of the Leader Tuplesortstate almost not
1206 : * overlapping with any worker Tuplesortstate's lifetime. There may
1207 : * be some small overlap, but that's okay because we rely on leader
1208 : * Tuplesortstate only allocating a small, fixed amount of memory
1209 : * here. When its tuplesort_performsort() is called (by our caller),
1210 : * and significant amounts of memory are likely to be used, all
1211 : * workers must have already freed almost all memory held by their
1212 : * Tuplesortstates (they are about to go away completely, too). The
1213 : * overall effect is that maintenance_work_mem always represents an
1214 : * absolute high watermark on the amount of memory used by a CREATE
1215 : * INDEX operation, regardless of the use of parallelism or any other
1216 : * factor.
1217 : */
1218 4 : state->bs_sortstate =
1219 4 : tuplesort_begin_index_brin(maintenance_work_mem, coordinate,
1220 : TUPLESORT_NONE);
1221 :
1222 : /* scan the relation and merge per-worker results */
1223 4 : reltuples = _brin_parallel_merge(state);
1224 :
1225 4 : _brin_end_parallel(state->bs_leader, state);
1226 : }
1227 : else /* no parallel index build */
1228 : {
1229 : /*
1230 : * Now scan the relation. No syncscan allowed here because we want
1231 : * the heap blocks in physical order (we want to produce the ranges
1232 : * starting from block 0, and the callback also relies on this to not
1233 : * generate summary for the same range twice).
1234 : */
1235 180 : reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
1236 : brinbuildCallback, state, NULL);
1237 :
1238 : /*
1239 : * process the final batch
1240 : *
1241 : * XXX Note this does not update state->bs_currRangeStart, i.e. it
1242 : * stays set to the last range added to the index. This is OK, because
1243 : * that's what brin_fill_empty_ranges expects.
1244 : */
1245 180 : form_and_insert_tuple(state);
1246 :
1247 : /*
1248 : * Backfill the final ranges with empty data.
1249 : *
1250 : * This saves us from doing what amounts to full table scans when the
1251 : * index with a predicate like WHERE (nonnull_column IS NULL), or
1252 : * other very selective predicates.
1253 : */
1254 180 : brin_fill_empty_ranges(state,
1255 : state->bs_currRangeStart,
1256 : state->bs_maxRangeStart);
1257 : }
1258 :
1259 : /* release resources */
1260 184 : idxtuples = state->bs_numtuples;
1261 184 : brinRevmapTerminate(state->bs_rmAccess);
1262 184 : terminate_brin_buildstate(state);
1263 :
1264 : /*
1265 : * Return statistics
1266 : */
1267 184 : result = palloc_object(IndexBuildResult);
1268 :
1269 184 : result->heap_tuples = reltuples;
1270 184 : result->index_tuples = idxtuples;
1271 :
1272 184 : return result;
1273 : }
1274 :
1275 : void
1276 3 : brinbuildempty(Relation index)
1277 : {
1278 : Buffer metabuf;
1279 :
1280 : /* An empty BRIN index has a metapage only. */
1281 3 : metabuf = ExtendBufferedRel(BMR_REL(index), INIT_FORKNUM, NULL,
1282 : EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
1283 :
1284 : /* Initialize and xlog metabuffer. */
1285 3 : START_CRIT_SECTION();
1286 3 : brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
1287 : BRIN_CURRENT_VERSION);
1288 3 : MarkBufferDirty(metabuf);
1289 3 : log_newpage_buffer(metabuf, true);
1290 3 : END_CRIT_SECTION();
1291 :
1292 3 : UnlockReleaseBuffer(metabuf);
1293 3 : }
1294 :
1295 : /*
1296 : * brinbulkdelete
1297 : * Since there are no per-heap-tuple index tuples in BRIN indexes,
1298 : * there's not a lot we can do here.
1299 : *
1300 : * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
1301 : * tuple is deleted), meaning the need to re-run summarization on the affected
1302 : * range. Would need to add an extra flag in brintuples for that.
1303 : */
1304 : IndexBulkDeleteResult *
1305 10 : brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
1306 : IndexBulkDeleteCallback callback, void *callback_state)
1307 : {
1308 : /* allocate stats if first time through, else re-use existing struct */
1309 10 : if (stats == NULL)
1310 10 : stats = palloc0_object(IndexBulkDeleteResult);
1311 :
1312 10 : return stats;
1313 : }
1314 :
1315 : /*
1316 : * This routine is in charge of "vacuuming" a BRIN index: we just summarize
1317 : * ranges that are currently unsummarized.
1318 : */
1319 : IndexBulkDeleteResult *
1320 56 : brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
1321 : {
1322 : Relation heapRel;
1323 :
1324 : /* No-op in ANALYZE ONLY mode */
1325 56 : if (info->analyze_only)
1326 3 : return stats;
1327 :
1328 53 : if (!stats)
1329 46 : stats = palloc0_object(IndexBulkDeleteResult);
1330 53 : stats->num_pages = RelationGetNumberOfBlocks(info->index);
1331 : /* rest of stats is initialized by zeroing */
1332 :
1333 53 : heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
1334 : AccessShareLock);
1335 :
1336 53 : brin_vacuum_scan(info->index, info->strategy);
1337 :
1338 53 : brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
1339 : &stats->num_index_tuples, &stats->num_index_tuples);
1340 :
1341 53 : table_close(heapRel, AccessShareLock);
1342 :
1343 53 : return stats;
1344 : }
1345 :
1346 : /*
1347 : * reloptions processor for BRIN indexes
1348 : */
1349 : bytea *
1350 602 : brinoptions(Datum reloptions, bool validate)
1351 : {
1352 : static const relopt_parse_elt tab[] = {
1353 : {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
1354 : {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
1355 : };
1356 :
1357 602 : return (bytea *) build_reloptions(reloptions, validate,
1358 : RELOPT_KIND_BRIN,
1359 : sizeof(BrinOptions),
1360 : tab, lengthof(tab));
1361 : }
1362 :
1363 : /*
1364 : * SQL-callable function to scan through an index and summarize all ranges
1365 : * that are not currently summarized.
1366 : */
1367 : Datum
1368 38 : brin_summarize_new_values(PG_FUNCTION_ARGS)
1369 : {
1370 38 : Datum relation = PG_GETARG_DATUM(0);
1371 :
1372 38 : return DirectFunctionCall2(brin_summarize_range,
1373 : relation,
1374 : Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
1375 : }
1376 :
1377 : /*
1378 : * SQL-callable function to summarize the indicated page range, if not already
1379 : * summarized. If the second argument is BRIN_ALL_BLOCKRANGES, all
1380 : * unsummarized ranges are summarized.
1381 : */
1382 : Datum
1383 105 : brin_summarize_range(PG_FUNCTION_ARGS)
1384 : {
1385 105 : Oid indexoid = PG_GETARG_OID(0);
1386 105 : int64 heapBlk64 = PG_GETARG_INT64(1);
1387 : BlockNumber heapBlk;
1388 : Oid heapoid;
1389 : Relation indexRel;
1390 : Relation heapRel;
1391 : Oid save_userid;
1392 : int save_sec_context;
1393 : int save_nestlevel;
1394 105 : double numSummarized = 0;
1395 :
1396 105 : if (RecoveryInProgress())
1397 0 : ereport(ERROR,
1398 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1399 : errmsg("recovery is in progress"),
1400 : errhint("BRIN control functions cannot be executed during recovery.")));
1401 :
1402 105 : if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
1403 18 : ereport(ERROR,
1404 : (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1405 : errmsg("block number out of range: %" PRId64, heapBlk64)));
1406 87 : heapBlk = (BlockNumber) heapBlk64;
1407 :
1408 : /*
1409 : * We must lock table before index to avoid deadlocks. However, if the
1410 : * passed indexoid isn't an index then IndexGetRelation() will fail.
1411 : * Rather than emitting a not-very-helpful error message, postpone
1412 : * complaining, expecting that the is-it-an-index test below will fail.
1413 : */
1414 87 : heapoid = IndexGetRelation(indexoid, true);
1415 87 : if (OidIsValid(heapoid))
1416 : {
1417 78 : heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
1418 :
1419 : /*
1420 : * Autovacuum calls us. For its benefit, switch to the table owner's
1421 : * userid, so that any index functions are run as that user. Also
1422 : * lock down security-restricted operations and arrange to make GUC
1423 : * variable changes local to this command. This is harmless, albeit
1424 : * unnecessary, when called from SQL, because we fail shortly if the
1425 : * user does not own the index.
1426 : */
1427 78 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
1428 78 : SetUserIdAndSecContext(heapRel->rd_rel->relowner,
1429 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
1430 78 : save_nestlevel = NewGUCNestLevel();
1431 78 : RestrictSearchPath();
1432 : }
1433 : else
1434 : {
1435 9 : heapRel = NULL;
1436 : /* Set these just to suppress "uninitialized variable" warnings */
1437 9 : save_userid = InvalidOid;
1438 9 : save_sec_context = -1;
1439 9 : save_nestlevel = -1;
1440 : }
1441 :
1442 87 : indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
1443 :
1444 : /* Must be a BRIN index */
1445 78 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1446 78 : indexRel->rd_rel->relam != BRIN_AM_OID)
1447 9 : ereport(ERROR,
1448 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1449 : errmsg("\"%s\" is not a BRIN index",
1450 : RelationGetRelationName(indexRel))));
1451 :
1452 : /* User must own the index (comparable to privileges needed for VACUUM) */
1453 69 : if (heapRel != NULL && !object_ownercheck(RelationRelationId, indexoid, save_userid))
1454 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1455 0 : RelationGetRelationName(indexRel));
1456 :
1457 : /*
1458 : * Since we did the IndexGetRelation call above without any lock, it's
1459 : * barely possible that a race against an index drop/recreation could have
1460 : * netted us the wrong table. Recheck.
1461 : */
1462 69 : if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
1463 0 : ereport(ERROR,
1464 : (errcode(ERRCODE_UNDEFINED_TABLE),
1465 : errmsg("could not open parent table of index \"%s\"",
1466 : RelationGetRelationName(indexRel))));
1467 :
1468 : /* see gin_clean_pending_list() */
1469 69 : if (indexRel->rd_index->indisvalid)
1470 69 : brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
1471 : else
1472 0 : ereport(DEBUG1,
1473 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1474 : errmsg("index \"%s\" is not valid",
1475 : RelationGetRelationName(indexRel))));
1476 :
1477 : /* Roll back any GUC changes executed by index functions */
1478 69 : AtEOXact_GUC(false, save_nestlevel);
1479 :
1480 : /* Restore userid and security context */
1481 69 : SetUserIdAndSecContext(save_userid, save_sec_context);
1482 :
1483 69 : index_close(indexRel, ShareUpdateExclusiveLock);
1484 69 : table_close(heapRel, ShareUpdateExclusiveLock);
1485 :
1486 69 : PG_RETURN_INT32((int32) numSummarized);
1487 : }
1488 :
1489 : /*
1490 : * SQL-callable interface to mark a range as no longer summarized
1491 : */
1492 : Datum
1493 52 : brin_desummarize_range(PG_FUNCTION_ARGS)
1494 : {
1495 52 : Oid indexoid = PG_GETARG_OID(0);
1496 52 : int64 heapBlk64 = PG_GETARG_INT64(1);
1497 : BlockNumber heapBlk;
1498 : Oid heapoid;
1499 : Relation heapRel;
1500 : Relation indexRel;
1501 : bool done;
1502 :
1503 52 : if (RecoveryInProgress())
1504 0 : ereport(ERROR,
1505 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1506 : errmsg("recovery is in progress"),
1507 : errhint("BRIN control functions cannot be executed during recovery.")));
1508 :
1509 52 : if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
1510 9 : ereport(ERROR,
1511 : (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1512 : errmsg("block number out of range: %" PRId64,
1513 : heapBlk64)));
1514 43 : heapBlk = (BlockNumber) heapBlk64;
1515 :
1516 : /*
1517 : * We must lock table before index to avoid deadlocks. However, if the
1518 : * passed indexoid isn't an index then IndexGetRelation() will fail.
1519 : * Rather than emitting a not-very-helpful error message, postpone
1520 : * complaining, expecting that the is-it-an-index test below will fail.
1521 : *
1522 : * Unlike brin_summarize_range(), autovacuum never calls this. Hence, we
1523 : * don't switch userid.
1524 : */
1525 43 : heapoid = IndexGetRelation(indexoid, true);
1526 43 : if (OidIsValid(heapoid))
1527 43 : heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
1528 : else
1529 0 : heapRel = NULL;
1530 :
1531 43 : indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
1532 :
1533 : /* Must be a BRIN index */
1534 43 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1535 43 : indexRel->rd_rel->relam != BRIN_AM_OID)
1536 0 : ereport(ERROR,
1537 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1538 : errmsg("\"%s\" is not a BRIN index",
1539 : RelationGetRelationName(indexRel))));
1540 :
1541 : /* User must own the index (comparable to privileges needed for VACUUM) */
1542 43 : if (!object_ownercheck(RelationRelationId, indexoid, GetUserId()))
1543 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1544 0 : RelationGetRelationName(indexRel));
1545 :
1546 : /*
1547 : * Since we did the IndexGetRelation call above without any lock, it's
1548 : * barely possible that a race against an index drop/recreation could have
1549 : * netted us the wrong table. Recheck.
1550 : */
1551 43 : if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
1552 0 : ereport(ERROR,
1553 : (errcode(ERRCODE_UNDEFINED_TABLE),
1554 : errmsg("could not open parent table of index \"%s\"",
1555 : RelationGetRelationName(indexRel))));
1556 :
1557 : /* see gin_clean_pending_list() */
1558 43 : if (indexRel->rd_index->indisvalid)
1559 : {
1560 : /* the revmap does the hard work */
1561 : do
1562 : {
1563 43 : done = brinRevmapDesummarizeRange(indexRel, heapBlk);
1564 : }
1565 43 : while (!done);
1566 : }
1567 : else
1568 0 : ereport(DEBUG1,
1569 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1570 : errmsg("index \"%s\" is not valid",
1571 : RelationGetRelationName(indexRel))));
1572 :
1573 43 : index_close(indexRel, ShareUpdateExclusiveLock);
1574 43 : table_close(heapRel, ShareUpdateExclusiveLock);
1575 :
1576 43 : PG_RETURN_VOID();
1577 : }
1578 :
1579 : /*
1580 : * Build a BrinDesc used to create or scan a BRIN index
1581 : */
1582 : BrinDesc *
1583 2293 : brin_build_desc(Relation rel)
1584 : {
1585 : BrinOpcInfo **opcinfo;
1586 : BrinDesc *bdesc;
1587 : TupleDesc tupdesc;
1588 2293 : int totalstored = 0;
1589 : int keyno;
1590 : long totalsize;
1591 : MemoryContext cxt;
1592 : MemoryContext oldcxt;
1593 :
1594 2293 : cxt = AllocSetContextCreate(CurrentMemoryContext,
1595 : "brin desc cxt",
1596 : ALLOCSET_SMALL_SIZES);
1597 2293 : oldcxt = MemoryContextSwitchTo(cxt);
1598 2293 : tupdesc = RelationGetDescr(rel);
1599 :
1600 : /*
1601 : * Obtain BrinOpcInfo for each indexed column. While at it, accumulate
1602 : * the number of columns stored, since the number is opclass-defined.
1603 : */
1604 2293 : opcinfo = palloc_array(BrinOpcInfo *, tupdesc->natts);
1605 38086 : for (keyno = 0; keyno < tupdesc->natts; keyno++)
1606 : {
1607 : FmgrInfo *opcInfoFn;
1608 35793 : Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
1609 :
1610 35793 : opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1611 :
1612 71586 : opcinfo[keyno] = (BrinOpcInfo *)
1613 35793 : DatumGetPointer(FunctionCall1(opcInfoFn, ObjectIdGetDatum(attr->atttypid)));
1614 35793 : totalstored += opcinfo[keyno]->oi_nstored;
1615 : }
1616 :
1617 : /* Allocate our result struct and fill it in */
1618 2293 : totalsize = offsetof(BrinDesc, bd_info) +
1619 2293 : sizeof(BrinOpcInfo *) * tupdesc->natts;
1620 :
1621 2293 : bdesc = palloc(totalsize);
1622 2293 : bdesc->bd_context = cxt;
1623 2293 : bdesc->bd_index = rel;
1624 2293 : bdesc->bd_tupdesc = tupdesc;
1625 2293 : bdesc->bd_disktdesc = NULL; /* generated lazily */
1626 2293 : bdesc->bd_totalstored = totalstored;
1627 :
1628 38086 : for (keyno = 0; keyno < tupdesc->natts; keyno++)
1629 35793 : bdesc->bd_info[keyno] = opcinfo[keyno];
1630 2293 : pfree(opcinfo);
1631 :
1632 2293 : MemoryContextSwitchTo(oldcxt);
1633 :
1634 2293 : return bdesc;
1635 : }
1636 :
1637 : void
1638 1725 : brin_free_desc(BrinDesc *bdesc)
1639 : {
1640 : /* make sure the tupdesc is still valid */
1641 : Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1642 : /* no need for retail pfree */
1643 1725 : MemoryContextDelete(bdesc->bd_context);
1644 1725 : }
1645 :
1646 : /*
1647 : * Fetch index's statistical data into *stats
1648 : */
1649 : void
1650 5365 : brinGetStats(Relation index, BrinStatsData *stats)
1651 : {
1652 : Buffer metabuffer;
1653 : Page metapage;
1654 : BrinMetaPageData *metadata;
1655 :
1656 5365 : metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1657 5365 : LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1658 5365 : metapage = BufferGetPage(metabuffer);
1659 5365 : metadata = (BrinMetaPageData *) PageGetContents(metapage);
1660 :
1661 5365 : stats->pagesPerRange = metadata->pagesPerRange;
1662 5365 : stats->revmapNumPages = metadata->lastRevmapPage - 1;
1663 :
1664 5365 : UnlockReleaseBuffer(metabuffer);
1665 5365 : }
1666 :
1667 : /*
1668 : * Initialize a BrinBuildState appropriate to create tuples on the given index.
1669 : */
1670 : static BrinBuildState *
1671 236 : initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1672 : BlockNumber pagesPerRange, BlockNumber tablePages)
1673 : {
1674 : BrinBuildState *state;
1675 236 : BlockNumber lastRange = 0;
1676 :
1677 236 : state = palloc_object(BrinBuildState);
1678 :
1679 236 : state->bs_irel = idxRel;
1680 236 : state->bs_numtuples = 0;
1681 236 : state->bs_reltuples = 0;
1682 236 : state->bs_currentInsertBuf = InvalidBuffer;
1683 236 : state->bs_pagesPerRange = pagesPerRange;
1684 236 : state->bs_currRangeStart = 0;
1685 236 : state->bs_rmAccess = revmap;
1686 236 : state->bs_bdesc = brin_build_desc(idxRel);
1687 236 : state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1688 236 : state->bs_leader = NULL;
1689 236 : state->bs_worker_id = 0;
1690 236 : state->bs_sortstate = NULL;
1691 236 : state->bs_context = CurrentMemoryContext;
1692 236 : state->bs_emptyTuple = NULL;
1693 236 : state->bs_emptyTupleLen = 0;
1694 :
1695 : /* Remember the memory context to use for an empty tuple, if needed. */
1696 236 : state->bs_context = CurrentMemoryContext;
1697 236 : state->bs_emptyTuple = NULL;
1698 236 : state->bs_emptyTupleLen = 0;
1699 :
1700 : /*
1701 : * Calculate the start of the last page range. Page numbers are 0-based,
1702 : * so to calculate the index we need to subtract one. The integer division
1703 : * gives us the index of the page range.
1704 : */
1705 236 : if (tablePages > 0)
1706 174 : lastRange = ((tablePages - 1) / pagesPerRange) * pagesPerRange;
1707 :
1708 : /* Now calculate the start of the next range. */
1709 236 : state->bs_maxRangeStart = lastRange + state->bs_pagesPerRange;
1710 :
1711 236 : return state;
1712 : }
1713 :
1714 : /*
1715 : * Release resources associated with a BrinBuildState.
1716 : */
1717 : static void
1718 230 : terminate_brin_buildstate(BrinBuildState *state)
1719 : {
1720 : /*
1721 : * Release the last index buffer used. We might as well ensure that
1722 : * whatever free space remains in that page is available in FSM, too.
1723 : */
1724 230 : if (!BufferIsInvalid(state->bs_currentInsertBuf))
1725 : {
1726 : Page page;
1727 : Size freespace;
1728 : BlockNumber blk;
1729 :
1730 184 : page = BufferGetPage(state->bs_currentInsertBuf);
1731 184 : freespace = PageGetFreeSpace(page);
1732 184 : blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
1733 184 : ReleaseBuffer(state->bs_currentInsertBuf);
1734 184 : RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
1735 184 : FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
1736 : }
1737 :
1738 230 : brin_free_desc(state->bs_bdesc);
1739 230 : pfree(state->bs_dtuple);
1740 230 : pfree(state);
1741 230 : }
1742 :
1743 : /*
1744 : * On the given BRIN index, summarize the heap page range that corresponds
1745 : * to the heap block number given.
1746 : *
1747 : * This routine can run in parallel with insertions into the heap. To avoid
1748 : * missing those values from the summary tuple, we first insert a placeholder
1749 : * index tuple into the index, then execute the heap scan; transactions
1750 : * concurrent with the scan update the placeholder tuple. After the scan, we
1751 : * union the placeholder tuple with the one computed by this routine. The
1752 : * update of the index value happens in a loop, so that if somebody updates
1753 : * the placeholder tuple after we read it, we detect the case and try again.
1754 : * This ensures that the concurrently inserted tuples are not lost.
1755 : *
1756 : * A further corner case is this routine being asked to summarize the partial
1757 : * range at the end of the table. heapNumBlocks is the (possibly outdated)
1758 : * table size; if we notice that the requested range lies beyond that size,
1759 : * we re-compute the table size after inserting the placeholder tuple, to
1760 : * avoid missing pages that were appended recently.
1761 : */
1762 : static void
1763 1474 : summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1764 : BlockNumber heapBlk, BlockNumber heapNumBlks)
1765 : {
1766 : Buffer phbuf;
1767 : BrinTuple *phtup;
1768 : Size phsz;
1769 : OffsetNumber offset;
1770 : BlockNumber scanNumBlks;
1771 :
1772 : /*
1773 : * Insert the placeholder tuple
1774 : */
1775 1474 : phbuf = InvalidBuffer;
1776 1474 : phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1777 1474 : offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1778 : state->bs_rmAccess, &phbuf,
1779 : heapBlk, phtup, phsz);
1780 :
1781 : /*
1782 : * Compute range end. We hold ShareUpdateExclusive lock on table, so it
1783 : * cannot shrink concurrently (but it can grow).
1784 : */
1785 : Assert(heapBlk % state->bs_pagesPerRange == 0);
1786 1474 : if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1787 : {
1788 : /*
1789 : * If we're asked to scan what we believe to be the final range on the
1790 : * table (i.e. a range that might be partial) we need to recompute our
1791 : * idea of what the latest page is after inserting the placeholder
1792 : * tuple. Anyone that grows the table later will update the
1793 : * placeholder tuple, so it doesn't matter that we won't scan these
1794 : * pages ourselves. Careful: the table might have been extended
1795 : * beyond the current range, so clamp our result.
1796 : *
1797 : * Fortunately, this should occur infrequently.
1798 : */
1799 12 : scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1800 : state->bs_pagesPerRange);
1801 : }
1802 : else
1803 : {
1804 : /* Easy case: range is known to be complete */
1805 1462 : scanNumBlks = state->bs_pagesPerRange;
1806 : }
1807 :
1808 : /*
1809 : * Execute the partial heap scan covering the heap blocks in the specified
1810 : * page range, summarizing the heap tuples in it. This scan stops just
1811 : * short of brinbuildCallback creating the new index entry.
1812 : *
1813 : * Note that it is critical we use the "any visible" mode of
1814 : * table_index_build_range_scan here: otherwise, we would miss tuples
1815 : * inserted by transactions that are still in progress, among other corner
1816 : * cases.
1817 : */
1818 1474 : state->bs_currRangeStart = heapBlk;
1819 1474 : table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
1820 : heapBlk, scanNumBlks,
1821 : brinbuildCallback, state, NULL);
1822 :
1823 : /*
1824 : * Now we update the values obtained by the scan with the placeholder
1825 : * tuple. We do this in a loop which only terminates if we're able to
1826 : * update the placeholder tuple successfully; if we are not, this means
1827 : * somebody else modified the placeholder tuple after we read it.
1828 : */
1829 : for (;;)
1830 0 : {
1831 : BrinTuple *newtup;
1832 : Size newsize;
1833 : bool didupdate;
1834 : bool samepage;
1835 :
1836 1474 : CHECK_FOR_INTERRUPTS();
1837 :
1838 : /*
1839 : * Update the summary tuple and try to update.
1840 : */
1841 1474 : newtup = brin_form_tuple(state->bs_bdesc,
1842 : heapBlk, state->bs_dtuple, &newsize);
1843 1474 : samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1844 : didupdate =
1845 1474 : brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1846 : state->bs_rmAccess, heapBlk, phbuf, offset,
1847 : phtup, phsz, newtup, newsize, samepage);
1848 1474 : brin_free_tuple(phtup);
1849 1474 : brin_free_tuple(newtup);
1850 :
1851 : /* If the update succeeded, we're done. */
1852 1474 : if (didupdate)
1853 1474 : break;
1854 :
1855 : /*
1856 : * If the update didn't work, it might be because somebody updated the
1857 : * placeholder tuple concurrently. Extract the new version, union it
1858 : * with the values we have from the scan, and start over. (There are
1859 : * other reasons for the update to fail, but it's simple to treat them
1860 : * the same.)
1861 : */
1862 0 : phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1863 : &offset, &phsz, BUFFER_LOCK_SHARE);
1864 : /* the placeholder tuple must exist */
1865 0 : if (phtup == NULL)
1866 0 : elog(ERROR, "missing placeholder tuple");
1867 0 : phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1868 0 : LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1869 :
1870 : /* merge it into the tuple from the heap scan */
1871 0 : union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1872 : }
1873 :
1874 1474 : ReleaseBuffer(phbuf);
1875 1474 : }
1876 :
1877 : /*
1878 : * Summarize page ranges that are not already summarized. If pageRange is
1879 : * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1880 : * page range containing the given heap page number is scanned.
1881 : * If include_partial is true, then the partial range at the end of the table
1882 : * is summarized, otherwise not.
1883 : *
1884 : * For each new index tuple inserted, *numSummarized (if not NULL) is
1885 : * incremented; for each existing tuple, *numExisting (if not NULL) is
1886 : * incremented.
1887 : */
1888 : static void
1889 122 : brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1890 : bool include_partial, double *numSummarized, double *numExisting)
1891 : {
1892 : BrinRevmap *revmap;
1893 122 : BrinBuildState *state = NULL;
1894 122 : IndexInfo *indexInfo = NULL;
1895 : BlockNumber heapNumBlocks;
1896 : BlockNumber pagesPerRange;
1897 : Buffer buf;
1898 : BlockNumber startBlk;
1899 :
1900 122 : revmap = brinRevmapInitialize(index, &pagesPerRange);
1901 :
1902 : /* determine range of pages to process */
1903 122 : heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1904 122 : if (pageRange == BRIN_ALL_BLOCKRANGES)
1905 82 : startBlk = 0;
1906 : else
1907 : {
1908 40 : startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1909 40 : heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
1910 : }
1911 122 : if (startBlk > heapNumBlocks)
1912 : {
1913 : /* Nothing to do if start point is beyond end of table */
1914 0 : brinRevmapTerminate(revmap);
1915 0 : return;
1916 : }
1917 :
1918 : /*
1919 : * Scan the revmap to find unsummarized items.
1920 : */
1921 122 : buf = InvalidBuffer;
1922 9806 : for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1923 : {
1924 : BrinTuple *tup;
1925 : OffsetNumber off;
1926 :
1927 : /*
1928 : * Unless requested to summarize even a partial range, go away now if
1929 : * we think the next range is partial. Caller would pass true when it
1930 : * is typically run once bulk data loading is done
1931 : * (brin_summarize_new_values), and false when it is typically the
1932 : * result of arbitrarily-scheduled maintenance command (vacuuming).
1933 : */
1934 9723 : if (!include_partial &&
1935 1349 : (startBlk + pagesPerRange > heapNumBlocks))
1936 39 : break;
1937 :
1938 9684 : CHECK_FOR_INTERRUPTS();
1939 :
1940 9684 : tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1941 : BUFFER_LOCK_SHARE);
1942 9684 : if (tup == NULL)
1943 : {
1944 : /* no revmap entry for this heap range. Summarize it. */
1945 1474 : if (state == NULL)
1946 : {
1947 : /* first time through */
1948 : Assert(!indexInfo);
1949 46 : state = initialize_brin_buildstate(index, revmap,
1950 : pagesPerRange,
1951 : InvalidBlockNumber);
1952 46 : indexInfo = BuildIndexInfo(index);
1953 : }
1954 1474 : summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1955 :
1956 : /* and re-initialize state for the next range */
1957 1474 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1958 :
1959 1474 : if (numSummarized)
1960 1474 : *numSummarized += 1.0;
1961 : }
1962 : else
1963 : {
1964 8210 : if (numExisting)
1965 1260 : *numExisting += 1.0;
1966 8210 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1967 : }
1968 : }
1969 :
1970 122 : if (BufferIsValid(buf))
1971 86 : ReleaseBuffer(buf);
1972 :
1973 : /* free resources */
1974 122 : brinRevmapTerminate(revmap);
1975 122 : if (state)
1976 : {
1977 46 : terminate_brin_buildstate(state);
1978 46 : pfree(indexInfo);
1979 : }
1980 : }
1981 :
1982 : /*
1983 : * Given a deformed tuple in the build state, convert it into the on-disk
1984 : * format and insert it into the index, making the revmap point to it.
1985 : */
1986 : static void
1987 1328 : form_and_insert_tuple(BrinBuildState *state)
1988 : {
1989 : BrinTuple *tup;
1990 : Size size;
1991 :
1992 1328 : tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1993 : state->bs_dtuple, &size);
1994 1328 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1995 : &state->bs_currentInsertBuf, state->bs_currRangeStart,
1996 : tup, size);
1997 1328 : state->bs_numtuples++;
1998 :
1999 1328 : pfree(tup);
2000 1328 : }
2001 :
2002 : /*
2003 : * Given a deformed tuple in the build state, convert it into the on-disk
2004 : * format and write it to a (shared) tuplesort (the leader will insert it
2005 : * into the index later).
2006 : */
2007 : static void
2008 29 : form_and_spill_tuple(BrinBuildState *state)
2009 : {
2010 : BrinTuple *tup;
2011 : Size size;
2012 :
2013 : /* don't insert empty tuples in parallel build */
2014 29 : if (state->bs_dtuple->bt_empty_range)
2015 9 : return;
2016 :
2017 20 : tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
2018 : state->bs_dtuple, &size);
2019 :
2020 : /* write the BRIN tuple to the tuplesort */
2021 20 : tuplesort_putbrintuple(state->bs_sortstate, tup, size);
2022 :
2023 20 : state->bs_numtuples++;
2024 :
2025 20 : pfree(tup);
2026 : }
2027 :
2028 : /*
2029 : * Given two deformed tuples, adjust the first one so that it's consistent
2030 : * with the summary values in both.
2031 : */
2032 : static void
2033 0 : union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
2034 : {
2035 : int keyno;
2036 : BrinMemTuple *db;
2037 : MemoryContext cxt;
2038 : MemoryContext oldcxt;
2039 :
2040 : /* Use our own memory context to avoid retail pfree */
2041 0 : cxt = AllocSetContextCreate(CurrentMemoryContext,
2042 : "brin union",
2043 : ALLOCSET_DEFAULT_SIZES);
2044 0 : oldcxt = MemoryContextSwitchTo(cxt);
2045 0 : db = brin_deform_tuple(bdesc, b, NULL);
2046 0 : MemoryContextSwitchTo(oldcxt);
2047 :
2048 : /*
2049 : * Check if the ranges are empty.
2050 : *
2051 : * If at least one of them is empty, we don't need to call per-key union
2052 : * functions at all. If "b" is empty, we just use "a" as the result (it
2053 : * might be empty fine, but that's fine). If "a" is empty but "b" is not,
2054 : * we use "b" as the result (but we have to copy the data into "a" first).
2055 : *
2056 : * Only when both ranges are non-empty, we actually do the per-key merge.
2057 : */
2058 :
2059 : /* If "b" is empty - ignore it and just use "a" (even if it's empty etc.). */
2060 0 : if (db->bt_empty_range)
2061 : {
2062 : /* skip the per-key merge */
2063 0 : MemoryContextDelete(cxt);
2064 0 : return;
2065 : }
2066 :
2067 : /*
2068 : * Now we know "b" is not empty. If "a" is empty, then "b" is the result.
2069 : * But we need to copy the data from "b" to "a" first, because that's how
2070 : * we pass result out.
2071 : *
2072 : * We have to copy all the global/per-key flags etc. too.
2073 : */
2074 0 : if (a->bt_empty_range)
2075 : {
2076 0 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
2077 : {
2078 : int i;
2079 0 : BrinValues *col_a = &a->bt_columns[keyno];
2080 0 : BrinValues *col_b = &db->bt_columns[keyno];
2081 0 : BrinOpcInfo *opcinfo = bdesc->bd_info[keyno];
2082 :
2083 0 : col_a->bv_allnulls = col_b->bv_allnulls;
2084 0 : col_a->bv_hasnulls = col_b->bv_hasnulls;
2085 :
2086 : /* If "b" has no data, we're done. */
2087 0 : if (col_b->bv_allnulls)
2088 0 : continue;
2089 :
2090 0 : for (i = 0; i < opcinfo->oi_nstored; i++)
2091 0 : col_a->bv_values[i] =
2092 0 : datumCopy(col_b->bv_values[i],
2093 0 : opcinfo->oi_typcache[i]->typbyval,
2094 0 : opcinfo->oi_typcache[i]->typlen);
2095 : }
2096 :
2097 : /* "a" started empty, but "b" was not empty, so remember that */
2098 0 : a->bt_empty_range = false;
2099 :
2100 : /* skip the per-key merge */
2101 0 : MemoryContextDelete(cxt);
2102 0 : return;
2103 : }
2104 :
2105 : /* Now we know neither range is empty. */
2106 0 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
2107 : {
2108 : FmgrInfo *unionFn;
2109 0 : BrinValues *col_a = &a->bt_columns[keyno];
2110 0 : BrinValues *col_b = &db->bt_columns[keyno];
2111 0 : BrinOpcInfo *opcinfo = bdesc->bd_info[keyno];
2112 :
2113 0 : if (opcinfo->oi_regular_nulls)
2114 : {
2115 : /* Does the "b" summary represent any NULL values? */
2116 0 : bool b_has_nulls = (col_b->bv_hasnulls || col_b->bv_allnulls);
2117 :
2118 : /* Adjust "hasnulls". */
2119 0 : if (!col_a->bv_allnulls && b_has_nulls)
2120 0 : col_a->bv_hasnulls = true;
2121 :
2122 : /* If there are no values in B, there's nothing left to do. */
2123 0 : if (col_b->bv_allnulls)
2124 0 : continue;
2125 :
2126 : /*
2127 : * Adjust "allnulls". If A doesn't have values, just copy the
2128 : * values from B into A, and we're done. We cannot run the
2129 : * operators in this case, because values in A might contain
2130 : * garbage. Note we already established that B contains values.
2131 : *
2132 : * Also adjust "hasnulls" in order not to forget the summary
2133 : * represents NULL values. This is not redundant with the earlier
2134 : * update, because that only happens when allnulls=false.
2135 : */
2136 0 : if (col_a->bv_allnulls)
2137 0 : {
2138 : int i;
2139 :
2140 0 : col_a->bv_allnulls = false;
2141 0 : col_a->bv_hasnulls = true;
2142 :
2143 0 : for (i = 0; i < opcinfo->oi_nstored; i++)
2144 0 : col_a->bv_values[i] =
2145 0 : datumCopy(col_b->bv_values[i],
2146 0 : opcinfo->oi_typcache[i]->typbyval,
2147 0 : opcinfo->oi_typcache[i]->typlen);
2148 :
2149 0 : continue;
2150 : }
2151 : }
2152 :
2153 0 : unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
2154 : BRIN_PROCNUM_UNION);
2155 0 : FunctionCall3Coll(unionFn,
2156 0 : bdesc->bd_index->rd_indcollation[keyno],
2157 : PointerGetDatum(bdesc),
2158 : PointerGetDatum(col_a),
2159 : PointerGetDatum(col_b));
2160 : }
2161 :
2162 0 : MemoryContextDelete(cxt);
2163 : }
2164 :
2165 : /*
2166 : * brin_vacuum_scan
2167 : * Do a complete scan of the index during VACUUM.
2168 : *
2169 : * This routine scans the complete index looking for uncataloged index pages,
2170 : * i.e. those that might have been lost due to a crash after index extension
2171 : * and such.
2172 : */
2173 : static void
2174 53 : brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
2175 : {
2176 : BlockRangeReadStreamPrivate p;
2177 : ReadStream *stream;
2178 : Buffer buf;
2179 :
2180 53 : p.current_blocknum = 0;
2181 53 : p.last_exclusive = RelationGetNumberOfBlocks(idxrel);
2182 :
2183 : /*
2184 : * It is safe to use batchmode as block_range_read_stream_cb takes no
2185 : * locks.
2186 : */
2187 53 : stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
2188 : READ_STREAM_FULL |
2189 : READ_STREAM_USE_BATCHING,
2190 : strategy,
2191 : idxrel,
2192 : MAIN_FORKNUM,
2193 : block_range_read_stream_cb,
2194 : &p,
2195 : 0);
2196 :
2197 : /*
2198 : * Scan the index in physical order, and clean up any possible mess in
2199 : * each page.
2200 : */
2201 291 : while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
2202 : {
2203 238 : CHECK_FOR_INTERRUPTS();
2204 :
2205 238 : brin_page_cleanup(idxrel, buf);
2206 :
2207 238 : ReleaseBuffer(buf);
2208 : }
2209 :
2210 53 : read_stream_end(stream);
2211 :
2212 : /*
2213 : * Update all upper pages in the index's FSM, as well. This ensures not
2214 : * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
2215 : * but also that any pre-existing damage or out-of-dateness is repaired.
2216 : */
2217 53 : FreeSpaceMapVacuum(idxrel);
2218 53 : }
2219 :
2220 : static bool
2221 392195 : add_values_to_range(Relation idxRel, BrinDesc *bdesc, BrinMemTuple *dtup,
2222 : const Datum *values, const bool *nulls)
2223 : {
2224 : int keyno;
2225 :
2226 : /* If the range starts empty, we're certainly going to modify it. */
2227 392195 : bool modified = dtup->bt_empty_range;
2228 :
2229 : /*
2230 : * Compare the key values of the new tuple to the stored index values; our
2231 : * deformed tuple will get updated if the new tuple doesn't fit the
2232 : * original range (note this means we can't break out of the loop early).
2233 : * Make a note of whether this happens, so that we know to insert the
2234 : * modified tuple later.
2235 : */
2236 924444 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
2237 : {
2238 : Datum result;
2239 : BrinValues *bval;
2240 : FmgrInfo *addValue;
2241 : bool has_nulls;
2242 :
2243 532249 : bval = &dtup->bt_columns[keyno];
2244 :
2245 : /*
2246 : * Does the range have actual NULL values? Either of the flags can be
2247 : * set, but we ignore the state before adding first row.
2248 : *
2249 : * We have to remember this, because we'll modify the flags and we
2250 : * need to know if the range started as empty.
2251 : */
2252 1046122 : has_nulls = ((!dtup->bt_empty_range) &&
2253 513873 : (bval->bv_hasnulls || bval->bv_allnulls));
2254 :
2255 : /*
2256 : * If the value we're adding is NULL, handle it locally. Otherwise
2257 : * call the BRIN_PROCNUM_ADDVALUE procedure.
2258 : */
2259 532249 : if (bdesc->bd_info[keyno]->oi_regular_nulls && nulls[keyno])
2260 : {
2261 : /*
2262 : * If the new value is null, we record that we saw it if it's the
2263 : * first one; otherwise, there's nothing to do.
2264 : */
2265 9383 : if (!bval->bv_hasnulls)
2266 : {
2267 1822 : bval->bv_hasnulls = true;
2268 1822 : modified = true;
2269 : }
2270 :
2271 9383 : continue;
2272 : }
2273 :
2274 522866 : addValue = index_getprocinfo(idxRel, keyno + 1,
2275 : BRIN_PROCNUM_ADDVALUE);
2276 522866 : result = FunctionCall4Coll(addValue,
2277 522866 : idxRel->rd_indcollation[keyno],
2278 : PointerGetDatum(bdesc),
2279 : PointerGetDatum(bval),
2280 522866 : values[keyno],
2281 522866 : BoolGetDatum(nulls[keyno]));
2282 : /* if that returned true, we need to insert the updated tuple */
2283 522866 : modified |= DatumGetBool(result);
2284 :
2285 : /*
2286 : * If the range was had actual NULL values (i.e. did not start empty),
2287 : * make sure we don't forget about the NULL values. Either the
2288 : * allnulls flag is still set to true, or (if the opclass cleared it)
2289 : * we need to set hasnulls=true.
2290 : *
2291 : * XXX This can only happen when the opclass modified the tuple, so
2292 : * the modified flag should be set.
2293 : */
2294 522866 : if (has_nulls && !(bval->bv_hasnulls || bval->bv_allnulls))
2295 : {
2296 : Assert(modified);
2297 2 : bval->bv_hasnulls = true;
2298 : }
2299 : }
2300 :
2301 : /*
2302 : * After updating summaries for all the keys, mark it as not empty.
2303 : *
2304 : * If we're actually changing the flag value (i.e. tuple started as
2305 : * empty), we should have modified the tuple. So we should not see empty
2306 : * range that was not modified.
2307 : */
2308 : Assert(!dtup->bt_empty_range || modified);
2309 392195 : dtup->bt_empty_range = false;
2310 :
2311 392195 : return modified;
2312 : }
2313 :
2314 : static bool
2315 94968 : check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
2316 : {
2317 : int keyno;
2318 :
2319 : /*
2320 : * First check if there are any IS [NOT] NULL scan keys, and if we're
2321 : * violating them.
2322 : */
2323 95586 : for (keyno = 0; keyno < nnullkeys; keyno++)
2324 : {
2325 1116 : ScanKey key = nullkeys[keyno];
2326 :
2327 : Assert(key->sk_attno == bval->bv_attno);
2328 :
2329 : /* Handle only IS NULL/IS NOT NULL tests */
2330 1116 : if (!(key->sk_flags & SK_ISNULL))
2331 0 : continue;
2332 :
2333 1116 : if (key->sk_flags & SK_SEARCHNULL)
2334 : {
2335 : /* IS NULL scan key, but range has no NULLs */
2336 558 : if (!bval->bv_allnulls && !bval->bv_hasnulls)
2337 489 : return false;
2338 : }
2339 558 : else if (key->sk_flags & SK_SEARCHNOTNULL)
2340 : {
2341 : /*
2342 : * For IS NOT NULL, we can only skip ranges that are known to have
2343 : * only nulls.
2344 : */
2345 558 : if (bval->bv_allnulls)
2346 9 : return false;
2347 : }
2348 : else
2349 : {
2350 : /*
2351 : * Neither IS NULL nor IS NOT NULL was used; assume all indexable
2352 : * operators are strict and thus return false with NULL value in
2353 : * the scan key.
2354 : */
2355 0 : return false;
2356 : }
2357 : }
2358 :
2359 94470 : return true;
2360 : }
2361 :
2362 : /*
2363 : * Create parallel context, and launch workers for leader.
2364 : *
2365 : * buildstate argument should be initialized (with the exception of the
2366 : * tuplesort states, which may later be created based on shared
2367 : * state initially set up here).
2368 : *
2369 : * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY.
2370 : *
2371 : * request is the target number of parallel worker processes to launch.
2372 : *
2373 : * Sets buildstate's BrinLeader, which caller must use to shut down parallel
2374 : * mode by passing it to _brin_end_parallel() at the very end of its index
2375 : * build. If not even a single worker process can be launched, this is
2376 : * never set, and caller should proceed with a serial index build.
2377 : */
2378 : static void
2379 5 : _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
2380 : bool isconcurrent, int request)
2381 : {
2382 : ParallelContext *pcxt;
2383 : int scantuplesortstates;
2384 : Snapshot snapshot;
2385 : Size estbrinshared;
2386 : Size estsort;
2387 : BrinShared *brinshared;
2388 : Sharedsort *sharedsort;
2389 5 : BrinLeader *brinleader = palloc0_object(BrinLeader);
2390 : WalUsage *walusage;
2391 : BufferUsage *bufferusage;
2392 5 : bool leaderparticipates = true;
2393 : int querylen;
2394 :
2395 : #ifdef DISABLE_LEADER_PARTICIPATION
2396 : leaderparticipates = false;
2397 : #endif
2398 :
2399 : /*
2400 : * Enter parallel mode, and create context for parallel build of brin
2401 : * index
2402 : */
2403 5 : EnterParallelMode();
2404 : Assert(request > 0);
2405 5 : pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main",
2406 : request);
2407 :
2408 5 : scantuplesortstates = leaderparticipates ? request + 1 : request;
2409 :
2410 : /*
2411 : * Prepare for scan of the base relation. In a normal index build, we use
2412 : * SnapshotAny because we must retrieve all tuples and do our own time
2413 : * qual checks (because we have to index RECENTLY_DEAD tuples). In a
2414 : * concurrent build, we take a regular MVCC snapshot and index whatever's
2415 : * live according to that.
2416 : */
2417 5 : if (!isconcurrent)
2418 5 : snapshot = SnapshotAny;
2419 : else
2420 0 : snapshot = RegisterSnapshot(GetTransactionSnapshot());
2421 :
2422 : /*
2423 : * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
2424 : */
2425 5 : estbrinshared = _brin_parallel_estimate_shared(heap, snapshot);
2426 5 : shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared);
2427 5 : estsort = tuplesort_estimate_shared(scantuplesortstates);
2428 5 : shm_toc_estimate_chunk(&pcxt->estimator, estsort);
2429 :
2430 5 : shm_toc_estimate_keys(&pcxt->estimator, 2);
2431 :
2432 : /*
2433 : * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
2434 : * and PARALLEL_KEY_BUFFER_USAGE.
2435 : *
2436 : * If there are no extensions loaded that care, we could skip this. We
2437 : * have no way of knowing whether anyone's looking at pgWalUsage or
2438 : * pgBufferUsage, so do it unconditionally.
2439 : */
2440 5 : shm_toc_estimate_chunk(&pcxt->estimator,
2441 : mul_size(sizeof(WalUsage), pcxt->nworkers));
2442 5 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2443 5 : shm_toc_estimate_chunk(&pcxt->estimator,
2444 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
2445 5 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2446 :
2447 : /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
2448 5 : if (debug_query_string)
2449 : {
2450 5 : querylen = strlen(debug_query_string);
2451 5 : shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
2452 5 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2453 : }
2454 : else
2455 0 : querylen = 0; /* keep compiler quiet */
2456 :
2457 : /* Everyone's had a chance to ask for space, so now create the DSM */
2458 5 : InitializeParallelDSM(pcxt);
2459 :
2460 : /* If no DSM segment was available, back out (do serial build) */
2461 5 : if (pcxt->seg == NULL)
2462 : {
2463 0 : if (IsMVCCSnapshot(snapshot))
2464 0 : UnregisterSnapshot(snapshot);
2465 0 : DestroyParallelContext(pcxt);
2466 0 : ExitParallelMode();
2467 0 : return;
2468 : }
2469 :
2470 : /* Store shared build state, for which we reserved space */
2471 5 : brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared);
2472 : /* Initialize immutable state */
2473 5 : brinshared->heaprelid = RelationGetRelid(heap);
2474 5 : brinshared->indexrelid = RelationGetRelid(index);
2475 5 : brinshared->isconcurrent = isconcurrent;
2476 5 : brinshared->scantuplesortstates = scantuplesortstates;
2477 5 : brinshared->pagesPerRange = buildstate->bs_pagesPerRange;
2478 5 : brinshared->queryid = pgstat_get_my_query_id();
2479 5 : ConditionVariableInit(&brinshared->workersdonecv);
2480 5 : SpinLockInit(&brinshared->mutex);
2481 :
2482 : /* Initialize mutable state */
2483 5 : brinshared->nparticipantsdone = 0;
2484 5 : brinshared->reltuples = 0.0;
2485 5 : brinshared->indtuples = 0.0;
2486 :
2487 5 : table_parallelscan_initialize(heap,
2488 : ParallelTableScanFromBrinShared(brinshared),
2489 : snapshot);
2490 :
2491 : /*
2492 : * Store shared tuplesort-private state, for which we reserved space.
2493 : * Then, initialize opaque state using tuplesort routine.
2494 : */
2495 5 : sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
2496 5 : tuplesort_initialize_shared(sharedsort, scantuplesortstates,
2497 : pcxt->seg);
2498 :
2499 : /*
2500 : * Store shared tuplesort-private state, for which we reserved space.
2501 : * Then, initialize opaque state using tuplesort routine.
2502 : */
2503 5 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BRIN_SHARED, brinshared);
2504 5 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
2505 :
2506 : /* Store query string for workers */
2507 5 : if (debug_query_string)
2508 : {
2509 : char *sharedquery;
2510 :
2511 5 : sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
2512 5 : memcpy(sharedquery, debug_query_string, querylen + 1);
2513 5 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
2514 : }
2515 :
2516 : /*
2517 : * Allocate space for each worker's WalUsage and BufferUsage; no need to
2518 : * initialize.
2519 : */
2520 5 : walusage = shm_toc_allocate(pcxt->toc,
2521 5 : mul_size(sizeof(WalUsage), pcxt->nworkers));
2522 5 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
2523 5 : bufferusage = shm_toc_allocate(pcxt->toc,
2524 5 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
2525 5 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
2526 :
2527 : /* Launch workers, saving status for leader/caller */
2528 5 : LaunchParallelWorkers(pcxt);
2529 5 : brinleader->pcxt = pcxt;
2530 5 : brinleader->nparticipanttuplesorts = pcxt->nworkers_launched;
2531 5 : if (leaderparticipates)
2532 5 : brinleader->nparticipanttuplesorts++;
2533 5 : brinleader->brinshared = brinshared;
2534 5 : brinleader->sharedsort = sharedsort;
2535 5 : brinleader->snapshot = snapshot;
2536 5 : brinleader->walusage = walusage;
2537 5 : brinleader->bufferusage = bufferusage;
2538 :
2539 : /* If no workers were successfully launched, back out (do serial build) */
2540 5 : if (pcxt->nworkers_launched == 0)
2541 : {
2542 1 : _brin_end_parallel(brinleader, NULL);
2543 1 : return;
2544 : }
2545 :
2546 : /* Save leader state now that it's clear build will be parallel */
2547 4 : buildstate->bs_leader = brinleader;
2548 :
2549 : /* Join heap scan ourselves */
2550 4 : if (leaderparticipates)
2551 4 : _brin_leader_participate_as_worker(buildstate, heap, index);
2552 :
2553 : /*
2554 : * Caller needs to wait for all launched workers when we return. Make
2555 : * sure that the failure-to-start case will not hang forever.
2556 : */
2557 4 : WaitForParallelWorkersToAttach(pcxt);
2558 : }
2559 :
2560 : /*
2561 : * Shut down workers, destroy parallel context, and end parallel mode.
2562 : */
2563 : static void
2564 5 : _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
2565 : {
2566 : int i;
2567 :
2568 : /* Shutdown worker processes */
2569 5 : WaitForParallelWorkersToFinish(brinleader->pcxt);
2570 :
2571 : /*
2572 : * Next, accumulate WAL usage. (This must wait for the workers to finish,
2573 : * or we might get incomplete data.)
2574 : */
2575 11 : for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
2576 6 : InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
2577 :
2578 : /* Free last reference to MVCC snapshot, if one was used */
2579 5 : if (IsMVCCSnapshot(brinleader->snapshot))
2580 0 : UnregisterSnapshot(brinleader->snapshot);
2581 5 : DestroyParallelContext(brinleader->pcxt);
2582 5 : ExitParallelMode();
2583 5 : }
2584 :
2585 : /*
2586 : * Within leader, wait for end of heap scan.
2587 : *
2588 : * When called, parallel heap scan started by _brin_begin_parallel() will
2589 : * already be underway within worker processes (when leader participates
2590 : * as a worker, we should end up here just as workers are finishing).
2591 : *
2592 : * Returns the total number of heap tuples scanned.
2593 : */
2594 : static double
2595 4 : _brin_parallel_heapscan(BrinBuildState *state)
2596 : {
2597 4 : BrinShared *brinshared = state->bs_leader->brinshared;
2598 : int nparticipanttuplesorts;
2599 :
2600 4 : nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts;
2601 : for (;;)
2602 : {
2603 11 : SpinLockAcquire(&brinshared->mutex);
2604 11 : if (brinshared->nparticipantsdone == nparticipanttuplesorts)
2605 : {
2606 : /* copy the data into leader state */
2607 4 : state->bs_reltuples = brinshared->reltuples;
2608 4 : state->bs_numtuples = brinshared->indtuples;
2609 :
2610 4 : SpinLockRelease(&brinshared->mutex);
2611 4 : break;
2612 : }
2613 7 : SpinLockRelease(&brinshared->mutex);
2614 :
2615 7 : ConditionVariableSleep(&brinshared->workersdonecv,
2616 : WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN);
2617 : }
2618 :
2619 4 : ConditionVariableCancelSleep();
2620 :
2621 4 : return state->bs_reltuples;
2622 : }
2623 :
2624 : /*
2625 : * Within leader, wait for end of heap scan and merge per-worker results.
2626 : *
2627 : * After waiting for all workers to finish, merge the per-worker results into
2628 : * the complete index. The results from each worker are sorted by block number
2629 : * (start of the page range). While combining the per-worker results we merge
2630 : * summaries for the same page range, and also fill-in empty summaries for
2631 : * ranges without any tuples.
2632 : *
2633 : * Returns the total number of heap tuples scanned.
2634 : */
2635 : static double
2636 4 : _brin_parallel_merge(BrinBuildState *state)
2637 : {
2638 : BrinTuple *btup;
2639 4 : BrinMemTuple *memtuple = NULL;
2640 : Size tuplen;
2641 4 : BlockNumber prevblkno = InvalidBlockNumber;
2642 : MemoryContext rangeCxt,
2643 : oldCxt;
2644 : double reltuples;
2645 :
2646 : /* wait for workers to scan table and produce partial results */
2647 4 : reltuples = _brin_parallel_heapscan(state);
2648 :
2649 : /* do the actual sort in the leader */
2650 4 : tuplesort_performsort(state->bs_sortstate);
2651 :
2652 : /*
2653 : * Initialize BrinMemTuple we'll use to union summaries from workers (in
2654 : * case they happened to produce parts of the same page range).
2655 : */
2656 4 : memtuple = brin_new_memtuple(state->bs_bdesc);
2657 :
2658 : /*
2659 : * Create a memory context we'll reset to combine results for a single
2660 : * page range (received from the workers). We don't expect huge number of
2661 : * overlaps under regular circumstances, because for large tables the
2662 : * chunk size is likely larger than the BRIN page range), but it can
2663 : * happen, and the union functions may do all kinds of stuff. So we better
2664 : * reset the context once in a while.
2665 : */
2666 4 : rangeCxt = AllocSetContextCreate(CurrentMemoryContext,
2667 : "brin union",
2668 : ALLOCSET_DEFAULT_SIZES);
2669 4 : oldCxt = MemoryContextSwitchTo(rangeCxt);
2670 :
2671 : /*
2672 : * Read the BRIN tuples from the shared tuplesort, sorted by block number.
2673 : * That probably gives us an index that is cheaper to scan, thanks to
2674 : * mostly getting data from the same index page as before.
2675 : */
2676 24 : while ((btup = tuplesort_getbrintuple(state->bs_sortstate, &tuplen, true)) != NULL)
2677 : {
2678 : /* Ranges should be multiples of pages_per_range for the index. */
2679 : Assert(btup->bt_blkno % state->bs_leader->brinshared->pagesPerRange == 0);
2680 :
2681 : /*
2682 : * Do we need to union summaries for the same page range?
2683 : *
2684 : * If this is the first brin tuple we read, then just deform it into
2685 : * the memtuple, and continue with the next one from tuplesort. We
2686 : * however may need to insert empty summaries into the index.
2687 : *
2688 : * If it's the same block as the last we saw, we simply union the brin
2689 : * tuple into it, and we're done - we don't even need to insert empty
2690 : * ranges, because that was done earlier when we saw the first brin
2691 : * tuple (for this range).
2692 : *
2693 : * Finally, if it's not the first brin tuple, and it's not the same
2694 : * page range, we need to do the insert and then deform the tuple into
2695 : * the memtuple. Then we'll insert empty ranges before the new brin
2696 : * tuple, if needed.
2697 : */
2698 20 : if (prevblkno == InvalidBlockNumber)
2699 : {
2700 : /* First brin tuples, just deform into memtuple. */
2701 1 : memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
2702 :
2703 : /* continue to insert empty pages before thisblock */
2704 : }
2705 19 : else if (memtuple->bt_blkno == btup->bt_blkno)
2706 : {
2707 : /*
2708 : * Not the first brin tuple, but same page range as the previous
2709 : * one, so we can merge it into the memtuple.
2710 : */
2711 0 : union_tuples(state->bs_bdesc, memtuple, btup);
2712 0 : continue;
2713 : }
2714 : else
2715 : {
2716 : BrinTuple *tmp;
2717 : Size len;
2718 :
2719 : /*
2720 : * We got brin tuple for a different page range, so form a brin
2721 : * tuple from the memtuple, insert it, and re-init the memtuple
2722 : * from the new brin tuple.
2723 : */
2724 19 : tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
2725 : memtuple, &len);
2726 :
2727 19 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
2728 : &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
2729 :
2730 : /*
2731 : * Reset the per-output-range context. This frees all the memory
2732 : * possibly allocated by the union functions, and also the BRIN
2733 : * tuple we just formed and inserted.
2734 : */
2735 19 : MemoryContextReset(rangeCxt);
2736 :
2737 19 : memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
2738 :
2739 : /* continue to insert empty pages before thisblock */
2740 : }
2741 :
2742 : /* Fill empty ranges for all ranges missing in the tuplesort. */
2743 20 : brin_fill_empty_ranges(state, prevblkno, btup->bt_blkno);
2744 :
2745 20 : prevblkno = btup->bt_blkno;
2746 : }
2747 :
2748 4 : tuplesort_end(state->bs_sortstate);
2749 :
2750 : /* Fill the BRIN tuple for the last page range with data. */
2751 4 : if (prevblkno != InvalidBlockNumber)
2752 : {
2753 : BrinTuple *tmp;
2754 : Size len;
2755 :
2756 1 : tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
2757 : memtuple, &len);
2758 :
2759 1 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
2760 : &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
2761 :
2762 1 : pfree(tmp);
2763 : }
2764 :
2765 : /* Fill empty ranges at the end, for all ranges missing in the tuplesort. */
2766 4 : brin_fill_empty_ranges(state, prevblkno, state->bs_maxRangeStart);
2767 :
2768 : /*
2769 : * Switch back to the original memory context, and destroy the one we
2770 : * created to isolate the union_tuple calls.
2771 : */
2772 4 : MemoryContextSwitchTo(oldCxt);
2773 4 : MemoryContextDelete(rangeCxt);
2774 :
2775 4 : return reltuples;
2776 : }
2777 :
2778 : /*
2779 : * Returns size of shared memory required to store state for a parallel
2780 : * brin index build based on the snapshot its parallel scan will use.
2781 : */
2782 : static Size
2783 5 : _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
2784 : {
2785 : /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
2786 5 : return add_size(BUFFERALIGN(sizeof(BrinShared)),
2787 : table_parallelscan_estimate(heap, snapshot));
2788 : }
2789 :
2790 : /*
2791 : * Within leader, participate as a parallel worker.
2792 : */
2793 : static void
2794 4 : _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index)
2795 : {
2796 4 : BrinLeader *brinleader = buildstate->bs_leader;
2797 : int sortmem;
2798 :
2799 : /*
2800 : * Might as well use reliable figure when doling out maintenance_work_mem
2801 : * (when requested number of workers were not launched, this will be
2802 : * somewhat higher than it is for other workers).
2803 : */
2804 4 : sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts;
2805 :
2806 : /* Perform work common to all participants */
2807 4 : _brin_parallel_scan_and_build(buildstate, brinleader->brinshared,
2808 : brinleader->sharedsort, heap, index, sortmem, true);
2809 4 : }
2810 :
2811 : /*
2812 : * Perform a worker's portion of a parallel sort.
2813 : *
2814 : * This generates a tuplesort for the worker portion of the table.
2815 : *
2816 : * sortmem is the amount of working memory to use within each worker,
2817 : * expressed in KBs.
2818 : *
2819 : * When this returns, workers are done, and need only release resources.
2820 : */
2821 : static void
2822 10 : _brin_parallel_scan_and_build(BrinBuildState *state,
2823 : BrinShared *brinshared, Sharedsort *sharedsort,
2824 : Relation heap, Relation index,
2825 : int sortmem, bool progress)
2826 : {
2827 : SortCoordinate coordinate;
2828 : TableScanDesc scan;
2829 : double reltuples;
2830 : IndexInfo *indexInfo;
2831 :
2832 : /* Initialize local tuplesort coordination state */
2833 10 : coordinate = palloc0_object(SortCoordinateData);
2834 10 : coordinate->isWorker = true;
2835 10 : coordinate->nParticipants = -1;
2836 10 : coordinate->sharedsort = sharedsort;
2837 :
2838 : /* Begin "partial" tuplesort */
2839 10 : state->bs_sortstate = tuplesort_begin_index_brin(sortmem, coordinate,
2840 : TUPLESORT_NONE);
2841 :
2842 : /* Join parallel scan */
2843 10 : indexInfo = BuildIndexInfo(index);
2844 10 : indexInfo->ii_Concurrent = brinshared->isconcurrent;
2845 :
2846 10 : scan = table_beginscan_parallel(heap,
2847 : ParallelTableScanFromBrinShared(brinshared));
2848 :
2849 10 : reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
2850 : brinbuildCallbackParallel, state, scan);
2851 :
2852 : /* insert the last item */
2853 10 : form_and_spill_tuple(state);
2854 :
2855 : /* sort the BRIN ranges built by this worker */
2856 10 : tuplesort_performsort(state->bs_sortstate);
2857 :
2858 10 : state->bs_reltuples += reltuples;
2859 :
2860 : /*
2861 : * Done. Record ambuild statistics.
2862 : */
2863 10 : SpinLockAcquire(&brinshared->mutex);
2864 10 : brinshared->nparticipantsdone++;
2865 10 : brinshared->reltuples += state->bs_reltuples;
2866 10 : brinshared->indtuples += state->bs_numtuples;
2867 10 : SpinLockRelease(&brinshared->mutex);
2868 :
2869 : /* Notify leader */
2870 10 : ConditionVariableSignal(&brinshared->workersdonecv);
2871 :
2872 10 : tuplesort_end(state->bs_sortstate);
2873 10 : }
2874 :
2875 : /*
2876 : * Perform work within a launched parallel process.
2877 : */
2878 : void
2879 6 : _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
2880 : {
2881 : char *sharedquery;
2882 : BrinShared *brinshared;
2883 : Sharedsort *sharedsort;
2884 : BrinBuildState *buildstate;
2885 : Relation heapRel;
2886 : Relation indexRel;
2887 : LOCKMODE heapLockmode;
2888 : LOCKMODE indexLockmode;
2889 : WalUsage *walusage;
2890 : BufferUsage *bufferusage;
2891 : int sortmem;
2892 :
2893 : /*
2894 : * The only possible status flag that can be set to the parallel worker is
2895 : * PROC_IN_SAFE_IC.
2896 : */
2897 : Assert((MyProc->statusFlags == 0) ||
2898 : (MyProc->statusFlags == PROC_IN_SAFE_IC));
2899 :
2900 : /* Set debug_query_string for individual workers first */
2901 6 : sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
2902 6 : debug_query_string = sharedquery;
2903 :
2904 : /* Report the query string from leader */
2905 6 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
2906 :
2907 : /* Look up brin shared state */
2908 6 : brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false);
2909 :
2910 : /* Open relations using lock modes known to be obtained by index.c */
2911 6 : if (!brinshared->isconcurrent)
2912 : {
2913 6 : heapLockmode = ShareLock;
2914 6 : indexLockmode = AccessExclusiveLock;
2915 : }
2916 : else
2917 : {
2918 0 : heapLockmode = ShareUpdateExclusiveLock;
2919 0 : indexLockmode = RowExclusiveLock;
2920 : }
2921 :
2922 : /* Track query ID */
2923 6 : pgstat_report_query_id(brinshared->queryid, false);
2924 :
2925 : /* Open relations within worker */
2926 6 : heapRel = table_open(brinshared->heaprelid, heapLockmode);
2927 6 : indexRel = index_open(brinshared->indexrelid, indexLockmode);
2928 :
2929 6 : buildstate = initialize_brin_buildstate(indexRel, NULL,
2930 : brinshared->pagesPerRange,
2931 : InvalidBlockNumber);
2932 :
2933 : /* Look up shared state private to tuplesort.c */
2934 6 : sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
2935 6 : tuplesort_attach_shared(sharedsort, seg);
2936 :
2937 : /* Prepare to track buffer usage during parallel execution */
2938 6 : InstrStartParallelQuery();
2939 :
2940 : /*
2941 : * Might as well use reliable figure when doling out maintenance_work_mem
2942 : * (when requested number of workers were not launched, this will be
2943 : * somewhat higher than it is for other workers).
2944 : */
2945 6 : sortmem = maintenance_work_mem / brinshared->scantuplesortstates;
2946 :
2947 6 : _brin_parallel_scan_and_build(buildstate, brinshared, sharedsort,
2948 : heapRel, indexRel, sortmem, false);
2949 :
2950 : /* Report WAL/buffer usage during parallel execution */
2951 6 : bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
2952 6 : walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
2953 6 : InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
2954 6 : &walusage[ParallelWorkerNumber]);
2955 :
2956 6 : index_close(indexRel, indexLockmode);
2957 6 : table_close(heapRel, heapLockmode);
2958 6 : }
2959 :
2960 : /*
2961 : * brin_build_empty_tuple
2962 : * Maybe initialize a BRIN tuple representing empty range.
2963 : *
2964 : * Returns a BRIN tuple representing an empty page range starting at the
2965 : * specified block number. The empty tuple is initialized only once, when it's
2966 : * needed for the first time, stored in the memory context bs_context to ensure
2967 : * proper life span, and reused on following calls. All empty tuples are
2968 : * exactly the same except for the bt_blkno field, which is set to the value
2969 : * in blkno parameter.
2970 : */
2971 : static void
2972 10 : brin_build_empty_tuple(BrinBuildState *state, BlockNumber blkno)
2973 : {
2974 : /* First time an empty tuple is requested? If yes, initialize it. */
2975 10 : if (state->bs_emptyTuple == NULL)
2976 : {
2977 : MemoryContext oldcxt;
2978 5 : BrinMemTuple *dtuple = brin_new_memtuple(state->bs_bdesc);
2979 :
2980 : /* Allocate the tuple in context for the whole index build. */
2981 5 : oldcxt = MemoryContextSwitchTo(state->bs_context);
2982 :
2983 5 : state->bs_emptyTuple = brin_form_tuple(state->bs_bdesc, blkno, dtuple,
2984 : &state->bs_emptyTupleLen);
2985 :
2986 5 : MemoryContextSwitchTo(oldcxt);
2987 : }
2988 : else
2989 : {
2990 : /* If we already have an empty tuple, just update the block. */
2991 5 : state->bs_emptyTuple->bt_blkno = blkno;
2992 : }
2993 10 : }
2994 :
2995 : /*
2996 : * brin_fill_empty_ranges
2997 : * Add BRIN index tuples representing empty page ranges.
2998 : *
2999 : * prevRange/nextRange determine for which page ranges to add empty summaries.
3000 : * Both boundaries are exclusive, i.e. only ranges starting at blkno for which
3001 : * (prevRange < blkno < nextRange) will be added to the index.
3002 : *
3003 : * If prevRange is InvalidBlockNumber, this means there was no previous page
3004 : * range (i.e. the first empty range to add is for blkno=0).
3005 : *
3006 : * The empty tuple is built only once, and then reused for all future calls.
3007 : */
3008 : static void
3009 204 : brin_fill_empty_ranges(BrinBuildState *state,
3010 : BlockNumber prevRange, BlockNumber nextRange)
3011 : {
3012 : BlockNumber blkno;
3013 :
3014 : /*
3015 : * If we already summarized some ranges, we need to start with the next
3016 : * one. Otherwise start from the first range of the table.
3017 : */
3018 204 : blkno = (prevRange == InvalidBlockNumber) ? 0 : (prevRange + state->bs_pagesPerRange);
3019 :
3020 : /* Generate empty ranges until we hit the next non-empty range. */
3021 214 : while (blkno < nextRange)
3022 : {
3023 : /* Did we already build the empty tuple? If not, do it now. */
3024 10 : brin_build_empty_tuple(state, blkno);
3025 :
3026 10 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
3027 : &state->bs_currentInsertBuf,
3028 10 : blkno, state->bs_emptyTuple, state->bs_emptyTupleLen);
3029 :
3030 : /* try next page range */
3031 10 : blkno += state->bs_pagesPerRange;
3032 : }
3033 204 : }
|