Line data Source code
1 : /*
2 : * brin.c
3 : * Implementation of BRIN indexes for Postgres
4 : *
5 : * See src/backend/access/brin/README for details.
6 : *
7 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/brin/brin.c
12 : *
13 : * TODO
14 : * * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/brin.h"
19 : #include "access/brin_page.h"
20 : #include "access/brin_pageops.h"
21 : #include "access/brin_xlog.h"
22 : #include "access/relation.h"
23 : #include "access/reloptions.h"
24 : #include "access/relscan.h"
25 : #include "access/table.h"
26 : #include "access/tableam.h"
27 : #include "access/xloginsert.h"
28 : #include "catalog/index.h"
29 : #include "catalog/pg_am.h"
30 : #include "commands/vacuum.h"
31 : #include "executor/instrument.h"
32 : #include "miscadmin.h"
33 : #include "pgstat.h"
34 : #include "postmaster/autovacuum.h"
35 : #include "storage/bufmgr.h"
36 : #include "storage/condition_variable.h"
37 : #include "storage/freespace.h"
38 : #include "storage/proc.h"
39 : #include "tcop/tcopprot.h"
40 : #include "utils/acl.h"
41 : #include "utils/datum.h"
42 : #include "utils/fmgrprotos.h"
43 : #include "utils/guc.h"
44 : #include "utils/index_selfuncs.h"
45 : #include "utils/memutils.h"
46 : #include "utils/rel.h"
47 : #include "utils/tuplesort.h"
48 : #include "utils/wait_event.h"
49 :
50 : /* Magic numbers for parallel state sharing */
51 : #define PARALLEL_KEY_BRIN_SHARED UINT64CONST(0xB000000000000001)
52 : #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002)
53 : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003)
54 : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004)
55 : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005)
56 :
57 : /*
58 : * Status for index builds performed in parallel. This is allocated in a
59 : * dynamic shared memory segment.
60 : */
61 : typedef struct BrinShared
62 : {
63 : /*
64 : * These fields are not modified during the build. They primarily exist
65 : * for the benefit of worker processes that need to create state
66 : * corresponding to that used by the leader.
67 : */
68 : Oid heaprelid;
69 : Oid indexrelid;
70 : bool isconcurrent;
71 : BlockNumber pagesPerRange;
72 : int scantuplesortstates;
73 :
74 : /* Query ID, for report in worker processes */
75 : int64 queryid;
76 :
77 : /*
78 : * workersdonecv is used to monitor the progress of workers. All parallel
79 : * participants must indicate that they are done before leader can use
80 : * results built by the workers (and before leader can write the data into
81 : * the index).
82 : */
83 : ConditionVariable workersdonecv;
84 :
85 : /*
86 : * mutex protects all fields before heapdesc.
87 : *
88 : * These fields contain status information of interest to BRIN index
89 : * builds that must work just the same when an index is built in parallel.
90 : */
91 : slock_t mutex;
92 :
93 : /*
94 : * Mutable state that is maintained by workers, and reported back to
95 : * leader at end of the scans.
96 : *
97 : * nparticipantsdone is number of worker processes finished.
98 : *
99 : * reltuples is the total number of input heap tuples.
100 : *
101 : * indtuples is the total number of tuples that made it into the index.
102 : */
103 : int nparticipantsdone;
104 : double reltuples;
105 : double indtuples;
106 :
107 : /*
108 : * ParallelTableScanDescData data follows. Can't directly embed here, as
109 : * implementations of the parallel table scan desc interface might need
110 : * stronger alignment.
111 : */
112 : } BrinShared;
113 :
114 : /*
115 : * Return pointer to a BrinShared's parallel table scan.
116 : *
117 : * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just
118 : * MAXALIGN.
119 : */
120 : #define ParallelTableScanFromBrinShared(shared) \
121 : (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(BrinShared)))
122 :
123 : /*
124 : * Status for leader in parallel index build.
125 : */
126 : typedef struct BrinLeader
127 : {
128 : /* parallel context itself */
129 : ParallelContext *pcxt;
130 :
131 : /*
132 : * nparticipanttuplesorts is the exact number of worker processes
133 : * successfully launched, plus one leader process if it participates as a
134 : * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader
135 : * participating as a worker).
136 : */
137 : int nparticipanttuplesorts;
138 :
139 : /*
140 : * Leader process convenience pointers to shared state (leader avoids TOC
141 : * lookups).
142 : *
143 : * brinshared is the shared state for entire build. sharedsort is the
144 : * shared, tuplesort-managed state passed to each process tuplesort.
145 : * snapshot is the snapshot used by the scan iff an MVCC snapshot is
146 : * required.
147 : */
148 : BrinShared *brinshared;
149 : Sharedsort *sharedsort;
150 : Snapshot snapshot;
151 : WalUsage *walusage;
152 : BufferUsage *bufferusage;
153 : } BrinLeader;
154 :
155 : /*
156 : * We use a BrinBuildState during initial construction of a BRIN index.
157 : * The running state is kept in a BrinMemTuple.
158 : */
159 : typedef struct BrinBuildState
160 : {
161 : Relation bs_irel;
162 : double bs_numtuples;
163 : double bs_reltuples;
164 : Buffer bs_currentInsertBuf;
165 : BlockNumber bs_pagesPerRange;
166 : BlockNumber bs_currRangeStart;
167 : BlockNumber bs_maxRangeStart;
168 : BrinRevmap *bs_rmAccess;
169 : BrinDesc *bs_bdesc;
170 : BrinMemTuple *bs_dtuple;
171 :
172 : BrinTuple *bs_emptyTuple;
173 : Size bs_emptyTupleLen;
174 : MemoryContext bs_context;
175 :
176 : /*
177 : * bs_leader is only present when a parallel index build is performed, and
178 : * only in the leader process. (Actually, only the leader process has a
179 : * BrinBuildState.)
180 : */
181 : BrinLeader *bs_leader;
182 : int bs_worker_id;
183 :
184 : /*
185 : * The sortstate is used by workers (including the leader). It has to be
186 : * part of the build state, because that's the only thing passed to the
187 : * build callback etc.
188 : */
189 : Tuplesortstate *bs_sortstate;
190 : } BrinBuildState;
191 :
192 : /*
193 : * We use a BrinInsertState to capture running state spanning multiple
194 : * brininsert invocations, within the same command.
195 : */
196 : typedef struct BrinInsertState
197 : {
198 : BrinRevmap *bis_rmAccess;
199 : BrinDesc *bis_desc;
200 : BlockNumber bis_pages_per_range;
201 : } BrinInsertState;
202 :
203 : /*
204 : * Struct used as "opaque" during index scans
205 : */
206 : typedef struct BrinOpaque
207 : {
208 : BlockNumber bo_pagesPerRange;
209 : BrinRevmap *bo_rmAccess;
210 : BrinDesc *bo_bdesc;
211 : } BrinOpaque;
212 :
213 : #define BRIN_ALL_BLOCKRANGES InvalidBlockNumber
214 :
215 : static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
216 : BrinRevmap *revmap,
217 : BlockNumber pagesPerRange,
218 : BlockNumber tablePages);
219 : static BrinInsertState *initialize_brin_insertstate(Relation idxRel, IndexInfo *indexInfo);
220 : static void terminate_brin_buildstate(BrinBuildState *state);
221 : static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
222 : bool include_partial, double *numSummarized, double *numExisting);
223 : static void form_and_insert_tuple(BrinBuildState *state);
224 : static void form_and_spill_tuple(BrinBuildState *state);
225 : static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
226 : BrinTuple *b);
227 : static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
228 : static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
229 : BrinMemTuple *dtup, const Datum *values, const bool *nulls);
230 : static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
231 : static void brin_fill_empty_ranges(BrinBuildState *state,
232 : BlockNumber prevRange, BlockNumber nextRange);
233 :
234 : /* parallel index builds */
235 : static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
236 : bool isconcurrent, int request);
237 : static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state);
238 : static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
239 : static double _brin_parallel_heapscan(BrinBuildState *state);
240 : static double _brin_parallel_merge(BrinBuildState *state);
241 : static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
242 : Relation heap, Relation index);
243 : static void _brin_parallel_scan_and_build(BrinBuildState *state,
244 : BrinShared *brinshared,
245 : Sharedsort *sharedsort,
246 : Relation heap, Relation index,
247 : int sortmem, bool progress);
248 :
249 : /*
250 : * BRIN handler function: return IndexAmRoutine with access method parameters
251 : * and callbacks.
252 : */
253 : Datum
254 2789 : brinhandler(PG_FUNCTION_ARGS)
255 : {
256 : static const IndexAmRoutine amroutine = {
257 : .type = T_IndexAmRoutine,
258 : .amstrategies = 0,
259 : .amsupport = BRIN_LAST_OPTIONAL_PROCNUM,
260 : .amoptsprocnum = BRIN_PROCNUM_OPTIONS,
261 : .amcanorder = false,
262 : .amcanorderbyop = false,
263 : .amcanhash = false,
264 : .amconsistentequality = false,
265 : .amconsistentordering = false,
266 : .amcanbackward = false,
267 : .amcanunique = false,
268 : .amcanmulticol = true,
269 : .amoptionalkey = true,
270 : .amsearcharray = false,
271 : .amsearchnulls = true,
272 : .amstorage = true,
273 : .amclusterable = false,
274 : .ampredlocks = false,
275 : .amcanparallel = false,
276 : .amcanbuildparallel = true,
277 : .amcaninclude = false,
278 : .amusemaintenanceworkmem = false,
279 : .amsummarizing = true,
280 : .amparallelvacuumoptions =
281 : VACUUM_OPTION_PARALLEL_CLEANUP,
282 : .amkeytype = InvalidOid,
283 :
284 : .ambuild = brinbuild,
285 : .ambuildempty = brinbuildempty,
286 : .aminsert = brininsert,
287 : .aminsertcleanup = brininsertcleanup,
288 : .ambulkdelete = brinbulkdelete,
289 : .amvacuumcleanup = brinvacuumcleanup,
290 : .amcanreturn = NULL,
291 : .amcostestimate = brincostestimate,
292 : .amgettreeheight = NULL,
293 : .amoptions = brinoptions,
294 : .amproperty = NULL,
295 : .ambuildphasename = NULL,
296 : .amvalidate = brinvalidate,
297 : .amadjustmembers = NULL,
298 : .ambeginscan = brinbeginscan,
299 : .amrescan = brinrescan,
300 : .amgettuple = NULL,
301 : .amgetbitmap = bringetbitmap,
302 : .amendscan = brinendscan,
303 : .ammarkpos = NULL,
304 : .amrestrpos = NULL,
305 : .amestimateparallelscan = NULL,
306 : .aminitparallelscan = NULL,
307 : .amparallelrescan = NULL,
308 : .amtranslatestrategy = NULL,
309 : .amtranslatecmptype = NULL,
310 : };
311 :
312 2789 : PG_RETURN_POINTER(&amroutine);
313 : }
314 :
315 : /*
316 : * Initialize a BrinInsertState to maintain state to be used across multiple
317 : * tuple inserts, within the same command.
318 : */
319 : static BrinInsertState *
320 722 : initialize_brin_insertstate(Relation idxRel, IndexInfo *indexInfo)
321 : {
322 : BrinInsertState *bistate;
323 : MemoryContext oldcxt;
324 :
325 722 : oldcxt = MemoryContextSwitchTo(indexInfo->ii_Context);
326 722 : bistate = palloc0_object(BrinInsertState);
327 722 : bistate->bis_desc = brin_build_desc(idxRel);
328 722 : bistate->bis_rmAccess = brinRevmapInitialize(idxRel,
329 : &bistate->bis_pages_per_range);
330 722 : indexInfo->ii_AmCache = bistate;
331 722 : MemoryContextSwitchTo(oldcxt);
332 :
333 722 : return bistate;
334 : }
335 :
336 : /*
337 : * A tuple in the heap is being inserted. To keep a brin index up to date,
338 : * we need to obtain the relevant index tuple and compare its stored values
339 : * with those of the new tuple. If the tuple values are not consistent with
340 : * the summary tuple, we need to update the index tuple.
341 : *
342 : * If autosummarization is enabled, check if we need to summarize the previous
343 : * page range.
344 : *
345 : * If the range is not currently summarized (i.e. the revmap returns NULL for
346 : * it), there's nothing to do for this tuple.
347 : */
348 : bool
349 68746 : brininsert(Relation idxRel, Datum *values, bool *nulls,
350 : ItemPointer heaptid, Relation heapRel,
351 : IndexUniqueCheck checkUnique,
352 : bool indexUnchanged,
353 : IndexInfo *indexInfo)
354 : {
355 : BlockNumber pagesPerRange;
356 : BlockNumber origHeapBlk;
357 : BlockNumber heapBlk;
358 68746 : BrinInsertState *bistate = (BrinInsertState *) indexInfo->ii_AmCache;
359 : BrinRevmap *revmap;
360 : BrinDesc *bdesc;
361 68746 : Buffer buf = InvalidBuffer;
362 68746 : MemoryContext tupcxt = NULL;
363 68746 : MemoryContext oldcxt = CurrentMemoryContext;
364 68746 : bool autosummarize = BrinGetAutoSummarize(idxRel);
365 :
366 : /*
367 : * If first time through in this statement, initialize the insert state
368 : * that we keep for all the inserts in the command.
369 : */
370 68746 : if (!bistate)
371 722 : bistate = initialize_brin_insertstate(idxRel, indexInfo);
372 :
373 68746 : revmap = bistate->bis_rmAccess;
374 68746 : bdesc = bistate->bis_desc;
375 68746 : pagesPerRange = bistate->bis_pages_per_range;
376 :
377 : /*
378 : * origHeapBlk is the block number where the insertion occurred. heapBlk
379 : * is the first block in the corresponding page range.
380 : */
381 68746 : origHeapBlk = ItemPointerGetBlockNumber(heaptid);
382 68746 : heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
383 :
384 : for (;;)
385 0 : {
386 68746 : bool need_insert = false;
387 : OffsetNumber off;
388 : BrinTuple *brtup;
389 : BrinMemTuple *dtup;
390 :
391 68746 : CHECK_FOR_INTERRUPTS();
392 :
393 : /*
394 : * If auto-summarization is enabled and we just inserted the first
395 : * tuple into the first block of a new non-first page range, request a
396 : * summarization run of the previous range.
397 : */
398 68746 : if (autosummarize &&
399 145 : heapBlk > 0 &&
400 145 : heapBlk == origHeapBlk &&
401 145 : ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
402 : {
403 8 : BlockNumber lastPageRange = heapBlk - 1;
404 : BrinTuple *lastPageTuple;
405 :
406 : lastPageTuple =
407 8 : brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
408 : NULL, BUFFER_LOCK_SHARE);
409 8 : if (!lastPageTuple)
410 : {
411 : bool recorded;
412 :
413 6 : recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
414 : RelationGetRelid(idxRel),
415 : lastPageRange);
416 6 : if (!recorded)
417 0 : ereport(LOG,
418 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
419 : errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
420 : RelationGetRelationName(idxRel),
421 : lastPageRange)));
422 : }
423 : else
424 2 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
425 : }
426 :
427 68746 : brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
428 : NULL, BUFFER_LOCK_SHARE);
429 :
430 : /* if range is unsummarized, there's nothing to do */
431 68746 : if (!brtup)
432 41875 : break;
433 :
434 : /* First time through in this brininsert call? */
435 26871 : if (tupcxt == NULL)
436 : {
437 26871 : tupcxt = AllocSetContextCreate(CurrentMemoryContext,
438 : "brininsert cxt",
439 : ALLOCSET_DEFAULT_SIZES);
440 26871 : MemoryContextSwitchTo(tupcxt);
441 : }
442 :
443 26871 : dtup = brin_deform_tuple(bdesc, brtup, NULL);
444 :
445 26871 : need_insert = add_values_to_range(idxRel, bdesc, dtup, values, nulls);
446 :
447 26871 : if (!need_insert)
448 : {
449 : /*
450 : * The tuple is consistent with the new values, so there's nothing
451 : * to do.
452 : */
453 14307 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
454 : }
455 : else
456 : {
457 12564 : Page page = BufferGetPage(buf);
458 12564 : ItemId lp = PageGetItemId(page, off);
459 : Size origsz;
460 : BrinTuple *origtup;
461 : Size newsz;
462 : BrinTuple *newtup;
463 : bool samepage;
464 :
465 : /*
466 : * Make a copy of the old tuple, so that we can compare it after
467 : * re-acquiring the lock.
468 : */
469 12564 : origsz = ItemIdGetLength(lp);
470 12564 : origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
471 :
472 : /*
473 : * Before releasing the lock, check if we can attempt a same-page
474 : * update. Another process could insert a tuple concurrently in
475 : * the same page though, so downstream we must be prepared to cope
476 : * if this turns out to not be possible after all.
477 : */
478 12564 : newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
479 12564 : samepage = brin_can_do_samepage_update(buf, origsz, newsz);
480 12564 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
481 :
482 : /*
483 : * Try to update the tuple. If this doesn't work for whatever
484 : * reason, we need to restart from the top; the revmap might be
485 : * pointing at a different tuple for this block now, so we need to
486 : * recompute to ensure both our new heap tuple and the other
487 : * inserter's are covered by the combined tuple. It might be that
488 : * we don't need to update at all.
489 : */
490 12564 : if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
491 : buf, off, origtup, origsz, newtup, newsz,
492 : samepage))
493 : {
494 : /* no luck; start over */
495 0 : MemoryContextReset(tupcxt);
496 0 : continue;
497 : }
498 : }
499 :
500 : /* success! */
501 26871 : break;
502 : }
503 :
504 68746 : if (BufferIsValid(buf))
505 26873 : ReleaseBuffer(buf);
506 68746 : MemoryContextSwitchTo(oldcxt);
507 68746 : if (tupcxt != NULL)
508 26871 : MemoryContextDelete(tupcxt);
509 :
510 68746 : return false;
511 : }
512 :
513 : /*
514 : * Callback to clean up the BrinInsertState once all tuple inserts are done.
515 : */
516 : void
517 740 : brininsertcleanup(Relation index, IndexInfo *indexInfo)
518 : {
519 740 : BrinInsertState *bistate = (BrinInsertState *) indexInfo->ii_AmCache;
520 :
521 : /* bail out if cache not initialized */
522 740 : if (bistate == NULL)
523 18 : return;
524 :
525 : /* do this first to avoid dangling pointer if we fail partway through */
526 722 : indexInfo->ii_AmCache = NULL;
527 :
528 : /*
529 : * Clean up the revmap. Note that the brinDesc has already been cleaned up
530 : * as part of its own memory context.
531 : */
532 722 : brinRevmapTerminate(bistate->bis_rmAccess);
533 722 : pfree(bistate);
534 : }
535 :
536 : /*
537 : * Initialize state for a BRIN index scan.
538 : *
539 : * We read the metapage here to determine the pages-per-range number that this
540 : * index was built with. Note that since this cannot be changed while we're
541 : * holding lock on index, it's not necessary to recompute it during brinrescan.
542 : */
543 : IndexScanDesc
544 1964 : brinbeginscan(Relation r, int nkeys, int norderbys)
545 : {
546 : IndexScanDesc scan;
547 : BrinOpaque *opaque;
548 :
549 1964 : scan = RelationGetIndexScan(r, nkeys, norderbys);
550 :
551 1964 : opaque = palloc_object(BrinOpaque);
552 1964 : opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange);
553 1964 : opaque->bo_bdesc = brin_build_desc(r);
554 1964 : scan->opaque = opaque;
555 :
556 1964 : return scan;
557 : }
558 :
559 : /*
560 : * Execute the index scan.
561 : *
562 : * This works by reading index TIDs from the revmap, and obtaining the index
563 : * tuples pointed to by them; the summary values in the index tuples are
564 : * compared to the scan keys. We return into the TID bitmap all the pages in
565 : * ranges corresponding to index tuples that match the scan keys.
566 : *
567 : * If a TID from the revmap is read as InvalidTID, we know that range is
568 : * unsummarized. Pages in those ranges need to be returned regardless of scan
569 : * keys.
570 : */
571 : int64
572 1964 : bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
573 : {
574 1964 : Relation idxRel = scan->indexRelation;
575 1964 : Buffer buf = InvalidBuffer;
576 : BrinDesc *bdesc;
577 : Oid heapOid;
578 : Relation heapRel;
579 : BrinOpaque *opaque;
580 : BlockNumber nblocks;
581 1964 : int64 totalpages = 0;
582 : FmgrInfo *consistentFn;
583 : MemoryContext oldcxt;
584 : MemoryContext perRangeCxt;
585 : BrinMemTuple *dtup;
586 1964 : BrinTuple *btup = NULL;
587 1964 : Size btupsz = 0;
588 : ScanKey **keys,
589 : **nullkeys;
590 : int *nkeys,
591 : *nnullkeys;
592 : char *ptr;
593 : Size len;
594 : char *tmp PG_USED_FOR_ASSERTS_ONLY;
595 :
596 1964 : opaque = (BrinOpaque *) scan->opaque;
597 1964 : bdesc = opaque->bo_bdesc;
598 1964 : pgstat_count_index_scan(idxRel);
599 1964 : if (scan->instrument)
600 36 : scan->instrument->nsearches++;
601 :
602 : /*
603 : * We need to know the size of the table so that we know how long to
604 : * iterate on the revmap.
605 : */
606 1964 : heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
607 1964 : heapRel = table_open(heapOid, AccessShareLock);
608 1964 : nblocks = RelationGetNumberOfBlocks(heapRel);
609 1964 : table_close(heapRel, AccessShareLock);
610 :
611 : /*
612 : * Make room for the consistent support procedures of indexed columns. We
613 : * don't look them up here; we do that lazily the first time we see a scan
614 : * key reference each of them. We rely on zeroing fn_oid to InvalidOid.
615 : */
616 1964 : consistentFn = palloc0_array(FmgrInfo, bdesc->bd_tupdesc->natts);
617 :
618 : /*
619 : * Make room for per-attribute lists of scan keys that we'll pass to the
620 : * consistent support procedure. We don't know which attributes have scan
621 : * keys, so we allocate space for all attributes. That may use more memory
622 : * but it's probably cheaper than determining which attributes are used.
623 : *
624 : * We keep null and regular keys separate, so that we can pass just the
625 : * regular keys to the consistent function easily.
626 : *
627 : * To reduce the allocation overhead, we allocate one big chunk and then
628 : * carve it into smaller arrays ourselves. All the pieces have exactly the
629 : * same lifetime, so that's OK.
630 : *
631 : * XXX The widest index can have 32 attributes, so the amount of wasted
632 : * memory is negligible. We could invent a more compact approach (with
633 : * just space for used attributes) but that would make the matching more
634 : * complex so it's not a good trade-off.
635 : */
636 1964 : len =
637 1964 : MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) + /* regular keys */
638 1964 : MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
639 1964 : MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts) +
640 1964 : MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) + /* NULL keys */
641 1964 : MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
642 1964 : MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
643 :
644 1964 : ptr = palloc(len);
645 1964 : tmp = ptr;
646 :
647 1964 : keys = (ScanKey **) ptr;
648 1964 : ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
649 :
650 1964 : nullkeys = (ScanKey **) ptr;
651 1964 : ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
652 :
653 1964 : nkeys = (int *) ptr;
654 1964 : ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
655 :
656 1964 : nnullkeys = (int *) ptr;
657 1964 : ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
658 :
659 46652 : for (int i = 0; i < bdesc->bd_tupdesc->natts; i++)
660 : {
661 44688 : keys[i] = (ScanKey *) ptr;
662 44688 : ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
663 :
664 44688 : nullkeys[i] = (ScanKey *) ptr;
665 44688 : ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
666 : }
667 :
668 : Assert(tmp + len == ptr);
669 :
670 : /* zero the number of keys */
671 1964 : memset(nkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
672 1964 : memset(nnullkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
673 :
674 : /* Preprocess the scan keys - split them into per-attribute arrays. */
675 3928 : for (int keyno = 0; keyno < scan->numberOfKeys; keyno++)
676 : {
677 1964 : ScanKey key = &scan->keyData[keyno];
678 1964 : AttrNumber keyattno = key->sk_attno;
679 :
680 : /*
681 : * The collation of the scan key must match the collation used in the
682 : * index column (but only if the search is not IS NULL/ IS NOT NULL).
683 : * Otherwise we shouldn't be using this index ...
684 : */
685 : Assert((key->sk_flags & SK_ISNULL) ||
686 : (key->sk_collation ==
687 : TupleDescAttr(bdesc->bd_tupdesc,
688 : keyattno - 1)->attcollation));
689 :
690 : /*
691 : * First time we see this index attribute, so init as needed.
692 : *
693 : * This is a bit of an overkill - we don't know how many scan keys are
694 : * there for this attribute, so we simply allocate the largest number
695 : * possible (as if all keys were for this attribute). This may waste a
696 : * bit of memory, but we only expect small number of scan keys in
697 : * general, so this should be negligible, and repeated repalloc calls
698 : * are not free either.
699 : */
700 1964 : if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
701 : {
702 : FmgrInfo *tmp;
703 :
704 : /* First time we see this attribute, so no key/null keys. */
705 : Assert(nkeys[keyattno - 1] == 0);
706 : Assert(nnullkeys[keyattno - 1] == 0);
707 :
708 1964 : tmp = index_getprocinfo(idxRel, keyattno,
709 : BRIN_PROCNUM_CONSISTENT);
710 1964 : fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
711 : CurrentMemoryContext);
712 : }
713 :
714 : /* Add key to the proper per-attribute array. */
715 1964 : if (key->sk_flags & SK_ISNULL)
716 : {
717 24 : nullkeys[keyattno - 1][nnullkeys[keyattno - 1]] = key;
718 24 : nnullkeys[keyattno - 1]++;
719 : }
720 : else
721 : {
722 1940 : keys[keyattno - 1][nkeys[keyattno - 1]] = key;
723 1940 : nkeys[keyattno - 1]++;
724 : }
725 : }
726 :
727 : /* allocate an initial in-memory tuple, out of the per-range memcxt */
728 1964 : dtup = brin_new_memtuple(bdesc);
729 :
730 : /*
731 : * Setup and use a per-range memory context, which is reset every time we
732 : * loop below. This avoids having to free the tuples within the loop.
733 : */
734 1964 : perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
735 : "bringetbitmap cxt",
736 : ALLOCSET_DEFAULT_SIZES);
737 1964 : oldcxt = MemoryContextSwitchTo(perRangeCxt);
738 :
739 : /*
740 : * Now scan the revmap. We start by querying for heap page 0,
741 : * incrementing by the number of pages per range; this gives us a full
742 : * view of the table. We make use of uint64 for heapBlk as a BlockNumber
743 : * could wrap for tables with close to 2^32 pages.
744 : */
745 129732 : for (uint64 heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
746 : {
747 : bool addrange;
748 127768 : bool gottuple = false;
749 : BrinTuple *tup;
750 : OffsetNumber off;
751 : Size size;
752 :
753 127768 : CHECK_FOR_INTERRUPTS();
754 :
755 127768 : MemoryContextReset(perRangeCxt);
756 :
757 127768 : tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, (BlockNumber) heapBlk, &buf,
758 : &off, &size, BUFFER_LOCK_SHARE);
759 127768 : if (tup)
760 : {
761 126624 : gottuple = true;
762 126624 : btup = brin_copy_tuple(tup, size, btup, &btupsz);
763 126624 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
764 : }
765 :
766 : /*
767 : * For page ranges with no indexed tuple, we must return the whole
768 : * range; otherwise, compare it to the scan keys.
769 : */
770 127768 : if (!gottuple)
771 : {
772 1144 : addrange = true;
773 : }
774 : else
775 : {
776 126624 : dtup = brin_deform_tuple(bdesc, btup, dtup);
777 126624 : if (dtup->bt_placeholder)
778 : {
779 : /*
780 : * Placeholder tuples are always returned, regardless of the
781 : * values stored in them.
782 : */
783 0 : addrange = true;
784 : }
785 : else
786 : {
787 : int attno;
788 :
789 : /*
790 : * Compare scan keys with summary values stored for the range.
791 : * If scan keys are matched, the page range must be added to
792 : * the bitmap. We initially assume the range needs to be
793 : * added; in particular this serves the case where there are
794 : * no keys.
795 : */
796 126624 : addrange = true;
797 3136045 : for (attno = 1; attno <= bdesc->bd_tupdesc->natts; attno++)
798 : {
799 : BrinValues *bval;
800 : Datum add;
801 : Oid collation;
802 :
803 : /*
804 : * skip attributes without any scan keys (both regular and
805 : * IS [NOT] NULL)
806 : */
807 3045156 : if (nkeys[attno - 1] == 0 && nnullkeys[attno - 1] == 0)
808 2918532 : continue;
809 :
810 126624 : bval = &dtup->bt_columns[attno - 1];
811 :
812 : /*
813 : * If the BRIN tuple indicates that this range is empty,
814 : * we can skip it: there's nothing to match. We don't
815 : * need to examine the next columns.
816 : */
817 126624 : if (dtup->bt_empty_range)
818 : {
819 0 : addrange = false;
820 0 : break;
821 : }
822 :
823 : /*
824 : * First check if there are any IS [NOT] NULL scan keys,
825 : * and if we're violating them. In that case we can
826 : * terminate early, without invoking the support function.
827 : *
828 : * As there may be more keys, we can only determine
829 : * mismatch within this loop.
830 : */
831 126624 : if (bdesc->bd_info[attno - 1]->oi_regular_nulls &&
832 126624 : !check_null_keys(bval, nullkeys[attno - 1],
833 126624 : nnullkeys[attno - 1]))
834 : {
835 : /*
836 : * If any of the IS [NOT] NULL keys failed, the page
837 : * range as a whole can't pass. So terminate the loop.
838 : */
839 664 : addrange = false;
840 664 : break;
841 : }
842 :
843 : /*
844 : * So either there are no IS [NOT] NULL keys, or all
845 : * passed. If there are no regular scan keys, we're done -
846 : * the page range matches. If there are regular keys, but
847 : * the page range is marked as 'all nulls' it can't
848 : * possibly pass (we're assuming the operators are
849 : * strict).
850 : */
851 :
852 : /* No regular scan keys - page range as a whole passes. */
853 125960 : if (!nkeys[attno - 1])
854 824 : continue;
855 :
856 : Assert((nkeys[attno - 1] > 0) &&
857 : (nkeys[attno - 1] <= scan->numberOfKeys));
858 :
859 : /* If it is all nulls, it cannot possibly be consistent. */
860 125136 : if (bval->bv_allnulls)
861 : {
862 252 : addrange = false;
863 252 : break;
864 : }
865 :
866 : /*
867 : * Collation from the first key (has to be the same for
868 : * all keys for the same attribute).
869 : */
870 124884 : collation = keys[attno - 1][0]->sk_collation;
871 :
872 : /*
873 : * Check whether the scan key is consistent with the page
874 : * range values; if so, have the pages in the range added
875 : * to the output bitmap.
876 : *
877 : * The opclass may or may not support processing of
878 : * multiple scan keys. We can determine that based on the
879 : * number of arguments - functions with extra parameter
880 : * (number of scan keys) do support this, otherwise we
881 : * have to simply pass the scan keys one by one.
882 : */
883 124884 : if (consistentFn[attno - 1].fn_nargs >= 4)
884 : {
885 : /* Check all keys at once */
886 26396 : add = FunctionCall4Coll(&consistentFn[attno - 1],
887 : collation,
888 : PointerGetDatum(bdesc),
889 : PointerGetDatum(bval),
890 26396 : PointerGetDatum(keys[attno - 1]),
891 26396 : Int32GetDatum(nkeys[attno - 1]));
892 26396 : addrange = DatumGetBool(add);
893 : }
894 : else
895 : {
896 : /*
897 : * Check keys one by one
898 : *
899 : * When there are multiple scan keys, failure to meet
900 : * the criteria for a single one of them is enough to
901 : * discard the range as a whole, so break out of the
902 : * loop as soon as a false return value is obtained.
903 : */
904 : int keyno;
905 :
906 172052 : for (keyno = 0; keyno < nkeys[attno - 1]; keyno++)
907 : {
908 98488 : add = FunctionCall3Coll(&consistentFn[attno - 1],
909 98488 : keys[attno - 1][keyno]->sk_collation,
910 : PointerGetDatum(bdesc),
911 : PointerGetDatum(bval),
912 98488 : PointerGetDatum(keys[attno - 1][keyno]));
913 98488 : addrange = DatumGetBool(add);
914 98488 : if (!addrange)
915 24924 : break;
916 : }
917 : }
918 :
919 : /*
920 : * If we found a scan key eliminating the range, no need
921 : * to check additional ones.
922 : */
923 124884 : if (!addrange)
924 34819 : break;
925 : }
926 : }
927 : }
928 :
929 : /* add the pages in the range to the output bitmap, if needed */
930 127768 : if (addrange)
931 : {
932 : uint64 pageno;
933 :
934 92033 : for (pageno = heapBlk;
935 190678 : pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
936 98645 : pageno++)
937 : {
938 98645 : MemoryContextSwitchTo(oldcxt);
939 98645 : tbm_add_page(tbm, pageno);
940 98645 : totalpages++;
941 98645 : MemoryContextSwitchTo(perRangeCxt);
942 : }
943 : }
944 : }
945 :
946 1964 : MemoryContextSwitchTo(oldcxt);
947 1964 : MemoryContextDelete(perRangeCxt);
948 :
949 1964 : if (buf != InvalidBuffer)
950 1964 : ReleaseBuffer(buf);
951 :
952 : /*
953 : * XXX We have an approximation of the number of *pages* that our scan
954 : * returns, but we don't have a precise idea of the number of heap tuples
955 : * involved.
956 : */
957 1964 : return totalpages * 10;
958 : }
959 :
960 : /*
961 : * Re-initialize state for a BRIN index scan
962 : */
963 : void
964 1964 : brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
965 : ScanKey orderbys, int norderbys)
966 : {
967 : /*
968 : * Other index AMs preprocess the scan keys at this point, or sometime
969 : * early during the scan; this lets them optimize by removing redundant
970 : * keys, or doing early returns when they are impossible to satisfy; see
971 : * _bt_preprocess_keys for an example. Something like that could be added
972 : * here someday, too.
973 : */
974 :
975 1964 : if (scankey && scan->numberOfKeys > 0)
976 1964 : memcpy(scan->keyData, scankey, scan->numberOfKeys * sizeof(ScanKeyData));
977 1964 : }
978 :
979 : /*
980 : * Close down a BRIN index scan
981 : */
982 : void
983 1964 : brinendscan(IndexScanDesc scan)
984 : {
985 1964 : BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
986 :
987 1964 : brinRevmapTerminate(opaque->bo_rmAccess);
988 1964 : brin_free_desc(opaque->bo_bdesc);
989 1964 : pfree(opaque);
990 1964 : }
991 :
992 : /*
993 : * Per-heap-tuple callback for table_index_build_scan.
994 : *
995 : * Note we don't worry about the page range at the end of the table here; it is
996 : * present in the build state struct after we're called the last time, but not
997 : * inserted into the index. Caller must ensure to do so, if appropriate.
998 : */
999 : static void
1000 539321 : brinbuildCallback(Relation index,
1001 : ItemPointer tid,
1002 : Datum *values,
1003 : bool *isnull,
1004 : bool tupleIsAlive,
1005 : void *brstate)
1006 : {
1007 539321 : BrinBuildState *state = (BrinBuildState *) brstate;
1008 : BlockNumber thisblock;
1009 :
1010 539321 : thisblock = ItemPointerGetBlockNumber(tid);
1011 :
1012 : /*
1013 : * If we're in a block that belongs to a future range, summarize what
1014 : * we've got and start afresh. Note the scan might have skipped many
1015 : * pages, if they were devoid of live tuples; make sure to insert index
1016 : * tuples for those too.
1017 : */
1018 540840 : while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
1019 : {
1020 :
1021 : BRIN_elog((DEBUG2,
1022 : "brinbuildCallback: completed a range: %u--%u",
1023 : state->bs_currRangeStart,
1024 : state->bs_currRangeStart + state->bs_pagesPerRange));
1025 :
1026 : /* create the index tuple and insert it */
1027 1519 : form_and_insert_tuple(state);
1028 :
1029 : /* set state to correspond to the next range */
1030 1519 : state->bs_currRangeStart += state->bs_pagesPerRange;
1031 :
1032 : /* re-initialize state for it */
1033 1519 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1034 : }
1035 :
1036 : /* Accumulate the current tuple into the running state */
1037 539321 : (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
1038 : values, isnull);
1039 539321 : }
1040 :
1041 : /*
1042 : * Per-heap-tuple callback for table_index_build_scan with parallelism.
1043 : *
1044 : * A version of the callback used by parallel index builds. The main difference
1045 : * is that instead of writing the BRIN tuples into the index, we write them
1046 : * into a shared tuplesort, and leave the insertion up to the leader (which may
1047 : * reorder them a bit etc.). The callback also does not generate empty ranges,
1048 : * those will be added by the leader when merging results from workers.
1049 : */
1050 : static void
1051 3981 : brinbuildCallbackParallel(Relation index,
1052 : ItemPointer tid,
1053 : Datum *values,
1054 : bool *isnull,
1055 : bool tupleIsAlive,
1056 : void *brstate)
1057 : {
1058 3981 : BrinBuildState *state = (BrinBuildState *) brstate;
1059 : BlockNumber thisblock;
1060 :
1061 3981 : thisblock = ItemPointerGetBlockNumber(tid);
1062 :
1063 : /*
1064 : * If we're in a block that belongs to a different range, summarize what
1065 : * we've got and start afresh. Note the scan might have skipped many
1066 : * pages, if they were devoid of live tuples; we do not create empty BRIN
1067 : * ranges here - the leader is responsible for filling them in.
1068 : *
1069 : * Unlike serial builds, parallel index builds allow synchronized seqscans
1070 : * (because that's what parallel scans do). This means the block may wrap
1071 : * around to the beginning of the relation, so the condition needs to
1072 : * check for both future and past ranges.
1073 : */
1074 3981 : if ((thisblock < state->bs_currRangeStart) ||
1075 3981 : (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1))
1076 : {
1077 :
1078 : BRIN_elog((DEBUG2,
1079 : "brinbuildCallbackParallel: completed a range: %u--%u",
1080 : state->bs_currRangeStart,
1081 : state->bs_currRangeStart + state->bs_pagesPerRange));
1082 :
1083 : /* create the index tuple and write it into the tuplesort */
1084 19 : form_and_spill_tuple(state);
1085 :
1086 : /*
1087 : * Set state to correspond to the next range (for this block).
1088 : *
1089 : * This skips ranges that are either empty (and so we don't get any
1090 : * tuples to summarize), or processed by other workers. We can't
1091 : * differentiate those cases here easily, so we leave it up to the
1092 : * leader to fill empty ranges where needed.
1093 : */
1094 : state->bs_currRangeStart
1095 19 : = state->bs_pagesPerRange * (thisblock / state->bs_pagesPerRange);
1096 :
1097 : /* re-initialize state for it */
1098 19 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1099 : }
1100 :
1101 : /* Accumulate the current tuple into the running state */
1102 3981 : (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
1103 : values, isnull);
1104 3981 : }
1105 :
1106 : /*
1107 : * brinbuild() -- build a new BRIN index.
1108 : */
1109 : IndexBuildResult *
1110 224 : brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
1111 : {
1112 : IndexBuildResult *result;
1113 : double reltuples;
1114 : double idxtuples;
1115 : BrinRevmap *revmap;
1116 : BrinBuildState *state;
1117 : Buffer meta;
1118 : BlockNumber pagesPerRange;
1119 :
1120 : /*
1121 : * We expect to be called exactly once for any index relation.
1122 : */
1123 224 : if (RelationGetNumberOfBlocks(index) != 0)
1124 0 : elog(ERROR, "index \"%s\" already contains data",
1125 : RelationGetRelationName(index));
1126 :
1127 : /*
1128 : * Critical section not required, because on error the creation of the
1129 : * whole relation will be rolled back.
1130 : */
1131 :
1132 224 : meta = ExtendBufferedRel(BMR_REL(index), MAIN_FORKNUM, NULL,
1133 : EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
1134 : Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
1135 :
1136 224 : brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
1137 : BRIN_CURRENT_VERSION);
1138 224 : MarkBufferDirty(meta);
1139 :
1140 224 : if (RelationNeedsWAL(index))
1141 : {
1142 : xl_brin_createidx xlrec;
1143 : XLogRecPtr recptr;
1144 : Page page;
1145 :
1146 129 : xlrec.version = BRIN_CURRENT_VERSION;
1147 129 : xlrec.pagesPerRange = BrinGetPagesPerRange(index);
1148 :
1149 129 : XLogBeginInsert();
1150 129 : XLogRegisterData(&xlrec, SizeOfBrinCreateIdx);
1151 129 : XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
1152 :
1153 129 : recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
1154 :
1155 129 : page = BufferGetPage(meta);
1156 129 : PageSetLSN(page, recptr);
1157 : }
1158 :
1159 224 : UnlockReleaseBuffer(meta);
1160 :
1161 : /*
1162 : * Initialize our state, including the deformed tuple state.
1163 : */
1164 224 : revmap = brinRevmapInitialize(index, &pagesPerRange);
1165 224 : state = initialize_brin_buildstate(index, revmap, pagesPerRange,
1166 : RelationGetNumberOfBlocks(heap));
1167 :
1168 : /*
1169 : * Attempt to launch parallel worker scan when required
1170 : *
1171 : * XXX plan_create_index_workers makes the number of workers dependent on
1172 : * maintenance_work_mem, requiring 32MB for each worker. That makes sense
1173 : * for btree, but not for BRIN, which can do with much less memory. So
1174 : * maybe make that somehow less strict, optionally?
1175 : */
1176 224 : if (indexInfo->ii_ParallelWorkers > 0)
1177 6 : _brin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent,
1178 : indexInfo->ii_ParallelWorkers);
1179 :
1180 : /*
1181 : * If parallel build requested and at least one worker process was
1182 : * successfully launched, set up coordination state, wait for workers to
1183 : * complete. Then read all tuples from the shared tuplesort and insert
1184 : * them into the index.
1185 : *
1186 : * In serial mode, simply scan the table and build the index one index
1187 : * tuple at a time.
1188 : */
1189 224 : if (state->bs_leader)
1190 : {
1191 : SortCoordinate coordinate;
1192 :
1193 5 : coordinate = palloc0_object(SortCoordinateData);
1194 5 : coordinate->isWorker = false;
1195 5 : coordinate->nParticipants =
1196 5 : state->bs_leader->nparticipanttuplesorts;
1197 5 : coordinate->sharedsort = state->bs_leader->sharedsort;
1198 :
1199 : /*
1200 : * Begin leader tuplesort.
1201 : *
1202 : * In cases where parallelism is involved, the leader receives the
1203 : * same share of maintenance_work_mem as a serial sort (it is
1204 : * generally treated in the same way as a serial sort once we return).
1205 : * Parallel worker Tuplesortstates will have received only a fraction
1206 : * of maintenance_work_mem, though.
1207 : *
1208 : * We rely on the lifetime of the Leader Tuplesortstate almost not
1209 : * overlapping with any worker Tuplesortstate's lifetime. There may
1210 : * be some small overlap, but that's okay because we rely on leader
1211 : * Tuplesortstate only allocating a small, fixed amount of memory
1212 : * here. When its tuplesort_performsort() is called (by our caller),
1213 : * and significant amounts of memory are likely to be used, all
1214 : * workers must have already freed almost all memory held by their
1215 : * Tuplesortstates (they are about to go away completely, too). The
1216 : * overall effect is that maintenance_work_mem always represents an
1217 : * absolute high watermark on the amount of memory used by a CREATE
1218 : * INDEX operation, regardless of the use of parallelism or any other
1219 : * factor.
1220 : */
1221 5 : state->bs_sortstate =
1222 5 : tuplesort_begin_index_brin(maintenance_work_mem, coordinate,
1223 : TUPLESORT_NONE);
1224 :
1225 : /* scan the relation and merge per-worker results */
1226 5 : reltuples = _brin_parallel_merge(state);
1227 :
1228 5 : _brin_end_parallel(state->bs_leader, state);
1229 : }
1230 : else /* no parallel index build */
1231 : {
1232 : /*
1233 : * Now scan the relation. No syncscan allowed here because we want
1234 : * the heap blocks in physical order (we want to produce the ranges
1235 : * starting from block 0, and the callback also relies on this to not
1236 : * generate summary for the same range twice).
1237 : */
1238 219 : reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
1239 : brinbuildCallback, state, NULL);
1240 :
1241 : /*
1242 : * process the final batch
1243 : *
1244 : * XXX Note this does not update state->bs_currRangeStart, i.e. it
1245 : * stays set to the last range added to the index. This is OK, because
1246 : * that's what brin_fill_empty_ranges expects.
1247 : */
1248 219 : form_and_insert_tuple(state);
1249 :
1250 : /*
1251 : * Backfill the final ranges with empty data.
1252 : *
1253 : * This saves us from doing what amounts to full table scans when the
1254 : * index with a predicate like WHERE (nonnull_column IS NULL), or
1255 : * other very selective predicates.
1256 : */
1257 219 : brin_fill_empty_ranges(state,
1258 : state->bs_currRangeStart,
1259 : state->bs_maxRangeStart);
1260 : }
1261 :
1262 : /* release resources */
1263 224 : idxtuples = state->bs_numtuples;
1264 224 : brinRevmapTerminate(state->bs_rmAccess);
1265 224 : terminate_brin_buildstate(state);
1266 :
1267 : /*
1268 : * Return statistics
1269 : */
1270 224 : result = palloc_object(IndexBuildResult);
1271 :
1272 224 : result->heap_tuples = reltuples;
1273 224 : result->index_tuples = idxtuples;
1274 :
1275 224 : return result;
1276 : }
1277 :
1278 : void
1279 4 : brinbuildempty(Relation index)
1280 : {
1281 : Buffer metabuf;
1282 :
1283 : /* An empty BRIN index has a metapage only. */
1284 4 : metabuf = ExtendBufferedRel(BMR_REL(index), INIT_FORKNUM, NULL,
1285 : EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
1286 :
1287 : /* Initialize and xlog metabuffer. */
1288 4 : START_CRIT_SECTION();
1289 4 : brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
1290 : BRIN_CURRENT_VERSION);
1291 4 : MarkBufferDirty(metabuf);
1292 4 : log_newpage_buffer(metabuf, true);
1293 4 : END_CRIT_SECTION();
1294 :
1295 4 : UnlockReleaseBuffer(metabuf);
1296 4 : }
1297 :
1298 : /*
1299 : * brinbulkdelete
1300 : * Since there are no per-heap-tuple index tuples in BRIN indexes,
1301 : * there's not a lot we can do here.
1302 : *
1303 : * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
1304 : * tuple is deleted), meaning the need to re-run summarization on the affected
1305 : * range. Would need to add an extra flag in brintuples for that.
1306 : */
1307 : IndexBulkDeleteResult *
1308 13 : brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
1309 : IndexBulkDeleteCallback callback, void *callback_state)
1310 : {
1311 : /* allocate stats if first time through, else re-use existing struct */
1312 13 : if (stats == NULL)
1313 13 : stats = palloc0_object(IndexBulkDeleteResult);
1314 :
1315 13 : return stats;
1316 : }
1317 :
1318 : /*
1319 : * This routine is in charge of "vacuuming" a BRIN index: we just summarize
1320 : * ranges that are currently unsummarized.
1321 : */
1322 : IndexBulkDeleteResult *
1323 72 : brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
1324 : {
1325 : Relation heapRel;
1326 :
1327 : /* No-op in ANALYZE ONLY mode */
1328 72 : if (info->analyze_only)
1329 3 : return stats;
1330 :
1331 69 : if (!stats)
1332 60 : stats = palloc0_object(IndexBulkDeleteResult);
1333 69 : stats->num_pages = RelationGetNumberOfBlocks(info->index);
1334 : /* rest of stats is initialized by zeroing */
1335 :
1336 69 : heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
1337 : AccessShareLock);
1338 :
1339 69 : brin_vacuum_scan(info->index, info->strategy);
1340 :
1341 69 : brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
1342 : &stats->num_index_tuples, &stats->num_index_tuples);
1343 :
1344 69 : table_close(heapRel, AccessShareLock);
1345 :
1346 69 : return stats;
1347 : }
1348 :
1349 : /*
1350 : * reloptions processor for BRIN indexes
1351 : */
1352 : bytea *
1353 762 : brinoptions(Datum reloptions, bool validate)
1354 : {
1355 : static const relopt_parse_elt tab[] = {
1356 : {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
1357 : {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
1358 : };
1359 :
1360 762 : return (bytea *) build_reloptions(reloptions, validate,
1361 : RELOPT_KIND_BRIN,
1362 : sizeof(BrinOptions),
1363 : tab, lengthof(tab));
1364 : }
1365 :
1366 : /*
1367 : * SQL-callable function to scan through an index and summarize all ranges
1368 : * that are not currently summarized.
1369 : */
1370 : Datum
1371 47 : brin_summarize_new_values(PG_FUNCTION_ARGS)
1372 : {
1373 47 : Datum relation = PG_GETARG_DATUM(0);
1374 :
1375 47 : return DirectFunctionCall2(brin_summarize_range,
1376 : relation,
1377 : Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
1378 : }
1379 :
1380 : /*
1381 : * SQL-callable function to summarize the indicated page range, if not already
1382 : * summarized. If the second argument is BRIN_ALL_BLOCKRANGES, all
1383 : * unsummarized ranges are summarized.
1384 : */
1385 : Datum
1386 134 : brin_summarize_range(PG_FUNCTION_ARGS)
1387 : {
1388 134 : Oid indexoid = PG_GETARG_OID(0);
1389 134 : int64 heapBlk64 = PG_GETARG_INT64(1);
1390 : BlockNumber heapBlk;
1391 : Oid heapoid;
1392 : Relation indexRel;
1393 : Relation heapRel;
1394 : Oid save_userid;
1395 : int save_sec_context;
1396 : int save_nestlevel;
1397 134 : double numSummarized = 0;
1398 :
1399 134 : if (RecoveryInProgress())
1400 0 : ereport(ERROR,
1401 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1402 : errmsg("recovery is in progress"),
1403 : errhint("BRIN control functions cannot be executed during recovery.")));
1404 :
1405 134 : if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
1406 24 : ereport(ERROR,
1407 : (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1408 : errmsg("block number out of range: %" PRId64, heapBlk64)));
1409 110 : heapBlk = (BlockNumber) heapBlk64;
1410 :
1411 : /*
1412 : * We must lock table before index to avoid deadlocks. However, if the
1413 : * passed indexoid isn't an index then IndexGetRelation() will fail.
1414 : * Rather than emitting a not-very-helpful error message, postpone
1415 : * complaining, expecting that the is-it-an-index test below will fail.
1416 : */
1417 110 : heapoid = IndexGetRelation(indexoid, true);
1418 110 : if (OidIsValid(heapoid))
1419 : {
1420 98 : heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
1421 :
1422 : /*
1423 : * Autovacuum calls us. For its benefit, switch to the table owner's
1424 : * userid, so that any index functions are run as that user. Also
1425 : * lock down security-restricted operations and arrange to make GUC
1426 : * variable changes local to this command. This is harmless, albeit
1427 : * unnecessary, when called from SQL, because we fail shortly if the
1428 : * user does not own the index.
1429 : */
1430 98 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
1431 98 : SetUserIdAndSecContext(heapRel->rd_rel->relowner,
1432 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
1433 98 : save_nestlevel = NewGUCNestLevel();
1434 98 : RestrictSearchPath();
1435 : }
1436 : else
1437 : {
1438 12 : heapRel = NULL;
1439 : /* Set these just to suppress "uninitialized variable" warnings */
1440 12 : save_userid = InvalidOid;
1441 12 : save_sec_context = -1;
1442 12 : save_nestlevel = -1;
1443 : }
1444 :
1445 110 : indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
1446 :
1447 : /* Must be a BRIN index */
1448 98 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1449 98 : indexRel->rd_rel->relam != BRIN_AM_OID)
1450 12 : ereport(ERROR,
1451 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1452 : errmsg("\"%s\" is not a BRIN index",
1453 : RelationGetRelationName(indexRel))));
1454 :
1455 : /* User must own the index (comparable to privileges needed for VACUUM) */
1456 86 : if (heapRel != NULL && !object_ownercheck(RelationRelationId, indexoid, save_userid))
1457 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1458 0 : RelationGetRelationName(indexRel));
1459 :
1460 : /*
1461 : * Since we did the IndexGetRelation call above without any lock, it's
1462 : * barely possible that a race against an index drop/recreation could have
1463 : * netted us the wrong table. Recheck.
1464 : */
1465 86 : if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
1466 0 : ereport(ERROR,
1467 : (errcode(ERRCODE_UNDEFINED_TABLE),
1468 : errmsg("could not open parent table of index \"%s\"",
1469 : RelationGetRelationName(indexRel))));
1470 :
1471 : /* see gin_clean_pending_list() */
1472 86 : if (indexRel->rd_index->indisvalid)
1473 86 : brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
1474 : else
1475 0 : ereport(DEBUG1,
1476 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1477 : errmsg("index \"%s\" is not valid",
1478 : RelationGetRelationName(indexRel))));
1479 :
1480 : /* Roll back any GUC changes executed by index functions */
1481 86 : AtEOXact_GUC(false, save_nestlevel);
1482 :
1483 : /* Restore userid and security context */
1484 86 : SetUserIdAndSecContext(save_userid, save_sec_context);
1485 :
1486 86 : index_close(indexRel, ShareUpdateExclusiveLock);
1487 86 : table_close(heapRel, ShareUpdateExclusiveLock);
1488 :
1489 86 : PG_RETURN_INT32((int32) numSummarized);
1490 : }
1491 :
1492 : /*
1493 : * SQL-callable interface to mark a range as no longer summarized
1494 : */
1495 : Datum
1496 69 : brin_desummarize_range(PG_FUNCTION_ARGS)
1497 : {
1498 69 : Oid indexoid = PG_GETARG_OID(0);
1499 69 : int64 heapBlk64 = PG_GETARG_INT64(1);
1500 : BlockNumber heapBlk;
1501 : Oid heapoid;
1502 : Relation heapRel;
1503 : Relation indexRel;
1504 : bool done;
1505 :
1506 69 : if (RecoveryInProgress())
1507 0 : ereport(ERROR,
1508 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1509 : errmsg("recovery is in progress"),
1510 : errhint("BRIN control functions cannot be executed during recovery.")));
1511 :
1512 69 : if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
1513 12 : ereport(ERROR,
1514 : (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1515 : errmsg("block number out of range: %" PRId64,
1516 : heapBlk64)));
1517 57 : heapBlk = (BlockNumber) heapBlk64;
1518 :
1519 : /*
1520 : * We must lock table before index to avoid deadlocks. However, if the
1521 : * passed indexoid isn't an index then IndexGetRelation() will fail.
1522 : * Rather than emitting a not-very-helpful error message, postpone
1523 : * complaining, expecting that the is-it-an-index test below will fail.
1524 : *
1525 : * Unlike brin_summarize_range(), autovacuum never calls this. Hence, we
1526 : * don't switch userid.
1527 : */
1528 57 : heapoid = IndexGetRelation(indexoid, true);
1529 57 : if (OidIsValid(heapoid))
1530 57 : heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
1531 : else
1532 0 : heapRel = NULL;
1533 :
1534 57 : indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
1535 :
1536 : /* Must be a BRIN index */
1537 57 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1538 57 : indexRel->rd_rel->relam != BRIN_AM_OID)
1539 0 : ereport(ERROR,
1540 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1541 : errmsg("\"%s\" is not a BRIN index",
1542 : RelationGetRelationName(indexRel))));
1543 :
1544 : /* User must own the index (comparable to privileges needed for VACUUM) */
1545 57 : if (!object_ownercheck(RelationRelationId, indexoid, GetUserId()))
1546 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1547 0 : RelationGetRelationName(indexRel));
1548 :
1549 : /*
1550 : * Since we did the IndexGetRelation call above without any lock, it's
1551 : * barely possible that a race against an index drop/recreation could have
1552 : * netted us the wrong table. Recheck.
1553 : */
1554 57 : if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
1555 0 : ereport(ERROR,
1556 : (errcode(ERRCODE_UNDEFINED_TABLE),
1557 : errmsg("could not open parent table of index \"%s\"",
1558 : RelationGetRelationName(indexRel))));
1559 :
1560 : /* see gin_clean_pending_list() */
1561 57 : if (indexRel->rd_index->indisvalid)
1562 : {
1563 : /* the revmap does the hard work */
1564 : do
1565 : {
1566 57 : done = brinRevmapDesummarizeRange(indexRel, heapBlk);
1567 : }
1568 57 : while (!done);
1569 : }
1570 : else
1571 0 : ereport(DEBUG1,
1572 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1573 : errmsg("index \"%s\" is not valid",
1574 : RelationGetRelationName(indexRel))));
1575 :
1576 57 : index_close(indexRel, ShareUpdateExclusiveLock);
1577 57 : table_close(heapRel, ShareUpdateExclusiveLock);
1578 :
1579 57 : PG_RETURN_VOID();
1580 : }
1581 :
1582 : /*
1583 : * Build a BrinDesc used to create or scan a BRIN index
1584 : */
1585 : BrinDesc *
1586 2994 : brin_build_desc(Relation rel)
1587 : {
1588 : BrinOpcInfo **opcinfo;
1589 : BrinDesc *bdesc;
1590 : TupleDesc tupdesc;
1591 2994 : int totalstored = 0;
1592 : int keyno;
1593 : long totalsize;
1594 : MemoryContext cxt;
1595 : MemoryContext oldcxt;
1596 :
1597 2994 : cxt = AllocSetContextCreate(CurrentMemoryContext,
1598 : "brin desc cxt",
1599 : ALLOCSET_SMALL_SIZES);
1600 2994 : oldcxt = MemoryContextSwitchTo(cxt);
1601 2994 : tupdesc = RelationGetDescr(rel);
1602 :
1603 : /*
1604 : * Obtain BrinOpcInfo for each indexed column. While at it, accumulate
1605 : * the number of columns stored, since the number is opclass-defined.
1606 : */
1607 2994 : opcinfo = palloc_array(BrinOpcInfo *, tupdesc->natts);
1608 50588 : for (keyno = 0; keyno < tupdesc->natts; keyno++)
1609 : {
1610 : FmgrInfo *opcInfoFn;
1611 47594 : Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
1612 :
1613 47594 : opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1614 :
1615 95188 : opcinfo[keyno] = (BrinOpcInfo *)
1616 47594 : DatumGetPointer(FunctionCall1(opcInfoFn, ObjectIdGetDatum(attr->atttypid)));
1617 47594 : totalstored += opcinfo[keyno]->oi_nstored;
1618 : }
1619 :
1620 : /* Allocate our result struct and fill it in */
1621 2994 : totalsize = offsetof(BrinDesc, bd_info) +
1622 2994 : sizeof(BrinOpcInfo *) * tupdesc->natts;
1623 :
1624 2994 : bdesc = palloc(totalsize);
1625 2994 : bdesc->bd_context = cxt;
1626 2994 : bdesc->bd_index = rel;
1627 2994 : bdesc->bd_tupdesc = tupdesc;
1628 2994 : bdesc->bd_disktdesc = NULL; /* generated lazily */
1629 2994 : bdesc->bd_totalstored = totalstored;
1630 :
1631 50588 : for (keyno = 0; keyno < tupdesc->natts; keyno++)
1632 47594 : bdesc->bd_info[keyno] = opcinfo[keyno];
1633 2994 : pfree(opcinfo);
1634 :
1635 2994 : MemoryContextSwitchTo(oldcxt);
1636 :
1637 2994 : return bdesc;
1638 : }
1639 :
1640 : void
1641 2264 : brin_free_desc(BrinDesc *bdesc)
1642 : {
1643 : /* make sure the tupdesc is still valid */
1644 : Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1645 : /* no need for retail pfree */
1646 2264 : MemoryContextDelete(bdesc->bd_context);
1647 2264 : }
1648 :
1649 : /*
1650 : * Fetch index's statistical data into *stats
1651 : */
1652 : void
1653 8944 : brinGetStats(Relation index, BrinStatsData *stats)
1654 : {
1655 : Buffer metabuffer;
1656 : Page metapage;
1657 : BrinMetaPageData *metadata;
1658 :
1659 8944 : metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1660 8944 : LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1661 8944 : metapage = BufferGetPage(metabuffer);
1662 8944 : metadata = (BrinMetaPageData *) PageGetContents(metapage);
1663 :
1664 8944 : stats->pagesPerRange = metadata->pagesPerRange;
1665 8944 : stats->revmapNumPages = metadata->lastRevmapPage - 1;
1666 :
1667 8944 : UnlockReleaseBuffer(metabuffer);
1668 8944 : }
1669 :
1670 : /*
1671 : * Initialize a BrinBuildState appropriate to create tuples on the given index.
1672 : */
1673 : static BrinBuildState *
1674 285 : initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1675 : BlockNumber pagesPerRange, BlockNumber tablePages)
1676 : {
1677 : BrinBuildState *state;
1678 285 : BlockNumber lastRange = 0;
1679 :
1680 285 : state = palloc_object(BrinBuildState);
1681 :
1682 285 : state->bs_irel = idxRel;
1683 285 : state->bs_numtuples = 0;
1684 285 : state->bs_reltuples = 0;
1685 285 : state->bs_currentInsertBuf = InvalidBuffer;
1686 285 : state->bs_pagesPerRange = pagesPerRange;
1687 285 : state->bs_currRangeStart = 0;
1688 285 : state->bs_rmAccess = revmap;
1689 285 : state->bs_bdesc = brin_build_desc(idxRel);
1690 285 : state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1691 285 : state->bs_leader = NULL;
1692 285 : state->bs_worker_id = 0;
1693 285 : state->bs_sortstate = NULL;
1694 :
1695 : /* Remember the memory context to use for an empty tuple, if needed. */
1696 285 : state->bs_context = CurrentMemoryContext;
1697 285 : state->bs_emptyTuple = NULL;
1698 285 : state->bs_emptyTupleLen = 0;
1699 :
1700 : /*
1701 : * Calculate the start of the last page range. Page numbers are 0-based,
1702 : * so to calculate the index we need to subtract one. The integer division
1703 : * gives us the index of the page range.
1704 : */
1705 285 : if (tablePages > 0)
1706 212 : lastRange = ((tablePages - 1) / pagesPerRange) * pagesPerRange;
1707 :
1708 : /* Now calculate the start of the next range. */
1709 285 : state->bs_maxRangeStart = lastRange + state->bs_pagesPerRange;
1710 :
1711 285 : return state;
1712 : }
1713 :
1714 : /*
1715 : * Release resources associated with a BrinBuildState.
1716 : */
1717 : static void
1718 278 : terminate_brin_buildstate(BrinBuildState *state)
1719 : {
1720 : /*
1721 : * Release the last index buffer used. We might as well ensure that
1722 : * whatever free space remains in that page is available in FSM, too.
1723 : */
1724 278 : if (!BufferIsInvalid(state->bs_currentInsertBuf))
1725 : {
1726 : Page page;
1727 : Size freespace;
1728 : BlockNumber blk;
1729 :
1730 224 : page = BufferGetPage(state->bs_currentInsertBuf);
1731 224 : freespace = PageGetFreeSpace(page);
1732 224 : blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
1733 224 : ReleaseBuffer(state->bs_currentInsertBuf);
1734 224 : RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
1735 224 : FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
1736 : }
1737 :
1738 278 : brin_free_desc(state->bs_bdesc);
1739 278 : pfree(state->bs_dtuple);
1740 278 : pfree(state);
1741 278 : }
1742 :
1743 : /*
1744 : * On the given BRIN index, summarize the heap page range that corresponds
1745 : * to the heap block number given.
1746 : *
1747 : * This routine can run in parallel with insertions into the heap. To avoid
1748 : * missing those values from the summary tuple, we first insert a placeholder
1749 : * index tuple into the index, then execute the heap scan; transactions
1750 : * concurrent with the scan update the placeholder tuple. After the scan, we
1751 : * union the placeholder tuple with the one computed by this routine. The
1752 : * update of the index value happens in a loop, so that if somebody updates
1753 : * the placeholder tuple after we read it, we detect the case and try again.
1754 : * This ensures that the concurrently inserted tuples are not lost.
1755 : *
1756 : * A further corner case is this routine being asked to summarize the partial
1757 : * range at the end of the table. heapNumBlocks is the (possibly outdated)
1758 : * table size; if we notice that the requested range lies beyond that size,
1759 : * we re-compute the table size after inserting the placeholder tuple, to
1760 : * avoid missing pages that were appended recently.
1761 : */
1762 : static void
1763 1494 : summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1764 : BlockNumber heapBlk, BlockNumber heapNumBlks)
1765 : {
1766 : Buffer phbuf;
1767 : BrinTuple *phtup;
1768 : Size phsz;
1769 : OffsetNumber offset;
1770 : BlockNumber scanNumBlks;
1771 :
1772 : /*
1773 : * Insert the placeholder tuple
1774 : */
1775 1494 : phbuf = InvalidBuffer;
1776 1494 : phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1777 1494 : offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1778 : state->bs_rmAccess, &phbuf,
1779 : heapBlk, phtup, phsz);
1780 :
1781 : /*
1782 : * Compute range end. We hold ShareUpdateExclusive lock on table, so it
1783 : * cannot shrink concurrently (but it can grow).
1784 : */
1785 : Assert(heapBlk % state->bs_pagesPerRange == 0);
1786 1494 : if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1787 : {
1788 : /*
1789 : * If we're asked to scan what we believe to be the final range on the
1790 : * table (i.e. a range that might be partial) we need to recompute our
1791 : * idea of what the latest page is after inserting the placeholder
1792 : * tuple. Anyone that grows the table later will update the
1793 : * placeholder tuple, so it doesn't matter that we won't scan these
1794 : * pages ourselves. Careful: the table might have been extended
1795 : * beyond the current range, so clamp our result.
1796 : *
1797 : * Fortunately, this should occur infrequently.
1798 : */
1799 16 : scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1800 : state->bs_pagesPerRange);
1801 : }
1802 : else
1803 : {
1804 : /* Easy case: range is known to be complete */
1805 1478 : scanNumBlks = state->bs_pagesPerRange;
1806 : }
1807 :
1808 : /*
1809 : * Execute the partial heap scan covering the heap blocks in the specified
1810 : * page range, summarizing the heap tuples in it. This scan stops just
1811 : * short of brinbuildCallback creating the new index entry.
1812 : *
1813 : * Note that it is critical we use the "any visible" mode of
1814 : * table_index_build_range_scan here: otherwise, we would miss tuples
1815 : * inserted by transactions that are still in progress, among other corner
1816 : * cases.
1817 : */
1818 1494 : state->bs_currRangeStart = heapBlk;
1819 1494 : table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
1820 : heapBlk, scanNumBlks,
1821 : brinbuildCallback, state, NULL);
1822 :
1823 : /*
1824 : * Now we update the values obtained by the scan with the placeholder
1825 : * tuple. We do this in a loop which only terminates if we're able to
1826 : * update the placeholder tuple successfully; if we are not, this means
1827 : * somebody else modified the placeholder tuple after we read it.
1828 : */
1829 : for (;;)
1830 0 : {
1831 : BrinTuple *newtup;
1832 : Size newsize;
1833 : bool didupdate;
1834 : bool samepage;
1835 :
1836 1494 : CHECK_FOR_INTERRUPTS();
1837 :
1838 : /*
1839 : * Update the summary tuple and try to update.
1840 : */
1841 1494 : newtup = brin_form_tuple(state->bs_bdesc,
1842 : heapBlk, state->bs_dtuple, &newsize);
1843 1494 : samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1844 : didupdate =
1845 1494 : brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1846 : state->bs_rmAccess, heapBlk, phbuf, offset,
1847 : phtup, phsz, newtup, newsize, samepage);
1848 1494 : brin_free_tuple(phtup);
1849 1494 : brin_free_tuple(newtup);
1850 :
1851 : /* If the update succeeded, we're done. */
1852 1494 : if (didupdate)
1853 1494 : break;
1854 :
1855 : /*
1856 : * If the update didn't work, it might be because somebody updated the
1857 : * placeholder tuple concurrently. Extract the new version, union it
1858 : * with the values we have from the scan, and start over. (There are
1859 : * other reasons for the update to fail, but it's simple to treat them
1860 : * the same.)
1861 : */
1862 0 : phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1863 : &offset, &phsz, BUFFER_LOCK_SHARE);
1864 : /* the placeholder tuple must exist */
1865 0 : if (phtup == NULL)
1866 0 : elog(ERROR, "missing placeholder tuple");
1867 0 : phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1868 0 : LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1869 :
1870 : /* merge it into the tuple from the heap scan */
1871 0 : union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1872 : }
1873 :
1874 1494 : ReleaseBuffer(phbuf);
1875 1494 : }
1876 :
1877 : /*
1878 : * Summarize page ranges that are not already summarized. If pageRange is
1879 : * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1880 : * page range containing the given heap page number is scanned.
1881 : * If include_partial is true, then the partial range at the end of the table
1882 : * is summarized, otherwise not.
1883 : *
1884 : * For each new index tuple inserted, *numSummarized (if not NULL) is
1885 : * incremented; for each existing tuple, *numExisting (if not NULL) is
1886 : * incremented.
1887 : */
1888 : static void
1889 155 : brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1890 : bool include_partial, double *numSummarized, double *numExisting)
1891 : {
1892 : BrinRevmap *revmap;
1893 155 : BrinBuildState *state = NULL;
1894 155 : IndexInfo *indexInfo = NULL;
1895 : BlockNumber heapNumBlocks;
1896 : BlockNumber pagesPerRange;
1897 : Buffer buf;
1898 : BlockNumber startBlk;
1899 :
1900 155 : revmap = brinRevmapInitialize(index, &pagesPerRange);
1901 :
1902 : /* determine range of pages to process */
1903 155 : heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1904 155 : if (pageRange == BRIN_ALL_BLOCKRANGES)
1905 104 : startBlk = 0;
1906 : else
1907 : {
1908 51 : startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1909 51 : heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
1910 : }
1911 155 : if (startBlk > heapNumBlocks)
1912 : {
1913 : /* Nothing to do if start point is beyond end of table */
1914 0 : brinRevmapTerminate(revmap);
1915 0 : return;
1916 : }
1917 :
1918 : /*
1919 : * Scan the revmap to find unsummarized items.
1920 : */
1921 155 : buf = InvalidBuffer;
1922 10432 : for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1923 : {
1924 : BrinTuple *tup;
1925 : OffsetNumber off;
1926 :
1927 : /*
1928 : * Unless requested to summarize even a partial range, go away now if
1929 : * we think the next range is partial. Caller would pass true when it
1930 : * is typically run once bulk data loading is done
1931 : * (brin_summarize_new_values), and false when it is typically the
1932 : * result of arbitrarily-scheduled maintenance command (vacuuming).
1933 : */
1934 10329 : if (!include_partial &&
1935 1740 : (startBlk + pagesPerRange > heapNumBlocks))
1936 52 : break;
1937 :
1938 10277 : CHECK_FOR_INTERRUPTS();
1939 :
1940 10277 : tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1941 : BUFFER_LOCK_SHARE);
1942 10277 : if (tup == NULL)
1943 : {
1944 : /* no revmap entry for this heap range. Summarize it. */
1945 1494 : if (state == NULL)
1946 : {
1947 : /* first time through */
1948 : Assert(!indexInfo);
1949 54 : state = initialize_brin_buildstate(index, revmap,
1950 : pagesPerRange,
1951 : InvalidBlockNumber);
1952 54 : indexInfo = BuildIndexInfo(index);
1953 : }
1954 1494 : summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1955 :
1956 : /* and re-initialize state for the next range */
1957 1494 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1958 :
1959 1494 : if (numSummarized)
1960 1494 : *numSummarized += 1.0;
1961 : }
1962 : else
1963 : {
1964 8783 : if (numExisting)
1965 1623 : *numExisting += 1.0;
1966 8783 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1967 : }
1968 : }
1969 :
1970 155 : if (BufferIsValid(buf))
1971 110 : ReleaseBuffer(buf);
1972 :
1973 : /* free resources */
1974 155 : brinRevmapTerminate(revmap);
1975 155 : if (state)
1976 : {
1977 54 : terminate_brin_buildstate(state);
1978 54 : pfree(indexInfo);
1979 : }
1980 : }
1981 :
1982 : /*
1983 : * Given a deformed tuple in the build state, convert it into the on-disk
1984 : * format and insert it into the index, making the revmap point to it.
1985 : */
1986 : static void
1987 1738 : form_and_insert_tuple(BrinBuildState *state)
1988 : {
1989 : BrinTuple *tup;
1990 : Size size;
1991 :
1992 1738 : tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1993 : state->bs_dtuple, &size);
1994 1738 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1995 : &state->bs_currentInsertBuf, state->bs_currRangeStart,
1996 : tup, size);
1997 1738 : state->bs_numtuples++;
1998 :
1999 1738 : pfree(tup);
2000 1738 : }
2001 :
2002 : /*
2003 : * Given a deformed tuple in the build state, convert it into the on-disk
2004 : * format and write it to a (shared) tuplesort (the leader will insert it
2005 : * into the index later).
2006 : */
2007 : static void
2008 31 : form_and_spill_tuple(BrinBuildState *state)
2009 : {
2010 : BrinTuple *tup;
2011 : Size size;
2012 :
2013 : /* don't insert empty tuples in parallel build */
2014 31 : if (state->bs_dtuple->bt_empty_range)
2015 11 : return;
2016 :
2017 20 : tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
2018 : state->bs_dtuple, &size);
2019 :
2020 : /* write the BRIN tuple to the tuplesort */
2021 20 : tuplesort_putbrintuple(state->bs_sortstate, tup, size);
2022 :
2023 20 : state->bs_numtuples++;
2024 :
2025 20 : pfree(tup);
2026 : }
2027 :
2028 : /*
2029 : * Given two deformed tuples, adjust the first one so that it's consistent
2030 : * with the summary values in both.
2031 : */
2032 : static void
2033 0 : union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
2034 : {
2035 : int keyno;
2036 : BrinMemTuple *db;
2037 : MemoryContext cxt;
2038 : MemoryContext oldcxt;
2039 :
2040 : /* Use our own memory context to avoid retail pfree */
2041 0 : cxt = AllocSetContextCreate(CurrentMemoryContext,
2042 : "brin union",
2043 : ALLOCSET_DEFAULT_SIZES);
2044 0 : oldcxt = MemoryContextSwitchTo(cxt);
2045 0 : db = brin_deform_tuple(bdesc, b, NULL);
2046 0 : MemoryContextSwitchTo(oldcxt);
2047 :
2048 : /*
2049 : * Check if the ranges are empty.
2050 : *
2051 : * If at least one of them is empty, we don't need to call per-key union
2052 : * functions at all. If "b" is empty, we just use "a" as the result (it
2053 : * might be empty fine, but that's fine). If "a" is empty but "b" is not,
2054 : * we use "b" as the result (but we have to copy the data into "a" first).
2055 : *
2056 : * Only when both ranges are non-empty, we actually do the per-key merge.
2057 : */
2058 :
2059 : /* If "b" is empty - ignore it and just use "a" (even if it's empty etc.). */
2060 0 : if (db->bt_empty_range)
2061 : {
2062 : /* skip the per-key merge */
2063 0 : MemoryContextDelete(cxt);
2064 0 : return;
2065 : }
2066 :
2067 : /*
2068 : * Now we know "b" is not empty. If "a" is empty, then "b" is the result.
2069 : * But we need to copy the data from "b" to "a" first, because that's how
2070 : * we pass result out.
2071 : *
2072 : * We have to copy all the global/per-key flags etc. too.
2073 : */
2074 0 : if (a->bt_empty_range)
2075 : {
2076 0 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
2077 : {
2078 : int i;
2079 0 : BrinValues *col_a = &a->bt_columns[keyno];
2080 0 : BrinValues *col_b = &db->bt_columns[keyno];
2081 0 : BrinOpcInfo *opcinfo = bdesc->bd_info[keyno];
2082 :
2083 0 : col_a->bv_allnulls = col_b->bv_allnulls;
2084 0 : col_a->bv_hasnulls = col_b->bv_hasnulls;
2085 :
2086 : /* If "b" has no data, we're done. */
2087 0 : if (col_b->bv_allnulls)
2088 0 : continue;
2089 :
2090 0 : for (i = 0; i < opcinfo->oi_nstored; i++)
2091 0 : col_a->bv_values[i] =
2092 0 : datumCopy(col_b->bv_values[i],
2093 0 : opcinfo->oi_typcache[i]->typbyval,
2094 0 : opcinfo->oi_typcache[i]->typlen);
2095 : }
2096 :
2097 : /* "a" started empty, but "b" was not empty, so remember that */
2098 0 : a->bt_empty_range = false;
2099 :
2100 : /* skip the per-key merge */
2101 0 : MemoryContextDelete(cxt);
2102 0 : return;
2103 : }
2104 :
2105 : /* Now we know neither range is empty. */
2106 0 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
2107 : {
2108 : FmgrInfo *unionFn;
2109 0 : BrinValues *col_a = &a->bt_columns[keyno];
2110 0 : BrinValues *col_b = &db->bt_columns[keyno];
2111 0 : BrinOpcInfo *opcinfo = bdesc->bd_info[keyno];
2112 :
2113 0 : if (opcinfo->oi_regular_nulls)
2114 : {
2115 : /* Does the "b" summary represent any NULL values? */
2116 0 : bool b_has_nulls = (col_b->bv_hasnulls || col_b->bv_allnulls);
2117 :
2118 : /* Adjust "hasnulls". */
2119 0 : if (!col_a->bv_allnulls && b_has_nulls)
2120 0 : col_a->bv_hasnulls = true;
2121 :
2122 : /* If there are no values in B, there's nothing left to do. */
2123 0 : if (col_b->bv_allnulls)
2124 0 : continue;
2125 :
2126 : /*
2127 : * Adjust "allnulls". If A doesn't have values, just copy the
2128 : * values from B into A, and we're done. We cannot run the
2129 : * operators in this case, because values in A might contain
2130 : * garbage. Note we already established that B contains values.
2131 : *
2132 : * Also adjust "hasnulls" in order not to forget the summary
2133 : * represents NULL values. This is not redundant with the earlier
2134 : * update, because that only happens when allnulls=false.
2135 : */
2136 0 : if (col_a->bv_allnulls)
2137 0 : {
2138 : int i;
2139 :
2140 0 : col_a->bv_allnulls = false;
2141 0 : col_a->bv_hasnulls = true;
2142 :
2143 0 : for (i = 0; i < opcinfo->oi_nstored; i++)
2144 0 : col_a->bv_values[i] =
2145 0 : datumCopy(col_b->bv_values[i],
2146 0 : opcinfo->oi_typcache[i]->typbyval,
2147 0 : opcinfo->oi_typcache[i]->typlen);
2148 :
2149 0 : continue;
2150 : }
2151 : }
2152 :
2153 0 : unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
2154 : BRIN_PROCNUM_UNION);
2155 0 : FunctionCall3Coll(unionFn,
2156 0 : bdesc->bd_index->rd_indcollation[keyno],
2157 : PointerGetDatum(bdesc),
2158 : PointerGetDatum(col_a),
2159 : PointerGetDatum(col_b));
2160 : }
2161 :
2162 0 : MemoryContextDelete(cxt);
2163 : }
2164 :
2165 : /*
2166 : * brin_vacuum_scan
2167 : * Do a complete scan of the index during VACUUM.
2168 : *
2169 : * This routine scans the complete index looking for uncataloged index pages,
2170 : * i.e. those that might have been lost due to a crash after index extension
2171 : * and such.
2172 : */
2173 : static void
2174 69 : brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
2175 : {
2176 : BlockRangeReadStreamPrivate p;
2177 : ReadStream *stream;
2178 : Buffer buf;
2179 :
2180 69 : p.current_blocknum = 0;
2181 69 : p.last_exclusive = RelationGetNumberOfBlocks(idxrel);
2182 :
2183 : /*
2184 : * It is safe to use batchmode as block_range_read_stream_cb takes no
2185 : * locks.
2186 : */
2187 69 : stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
2188 : READ_STREAM_FULL |
2189 : READ_STREAM_USE_BATCHING,
2190 : strategy,
2191 : idxrel,
2192 : MAIN_FORKNUM,
2193 : block_range_read_stream_cb,
2194 : &p,
2195 : 0);
2196 :
2197 : /*
2198 : * Scan the index in physical order, and clean up any possible mess in
2199 : * each page.
2200 : */
2201 374 : while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
2202 : {
2203 305 : CHECK_FOR_INTERRUPTS();
2204 :
2205 305 : brin_page_cleanup(idxrel, buf);
2206 :
2207 305 : ReleaseBuffer(buf);
2208 : }
2209 :
2210 69 : read_stream_end(stream);
2211 :
2212 : /*
2213 : * Update all upper pages in the index's FSM, as well. This ensures not
2214 : * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
2215 : * but also that any pre-existing damage or out-of-dateness is repaired.
2216 : */
2217 69 : FreeSpaceMapVacuum(idxrel);
2218 69 : }
2219 :
2220 : static bool
2221 570173 : add_values_to_range(Relation idxRel, BrinDesc *bdesc, BrinMemTuple *dtup,
2222 : const Datum *values, const bool *nulls)
2223 : {
2224 : int keyno;
2225 :
2226 : /* If the range starts empty, we're certainly going to modify it. */
2227 570173 : bool modified = dtup->bt_empty_range;
2228 :
2229 : /*
2230 : * Compare the key values of the new tuple to the stored index values; our
2231 : * deformed tuple will get updated if the new tuple doesn't fit the
2232 : * original range (note this means we can't break out of the loop early).
2233 : * Make a note of whether this happens, so that we know to insert the
2234 : * modified tuple later.
2235 : */
2236 1305049 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
2237 : {
2238 : Datum result;
2239 : BrinValues *bval;
2240 : FmgrInfo *addValue;
2241 : bool has_nulls;
2242 :
2243 734876 : bval = &dtup->bt_columns[keyno];
2244 :
2245 : /*
2246 : * Does the range have actual NULL values? Either of the flags can be
2247 : * set, but we ignore the state before adding first row.
2248 : *
2249 : * We have to remember this, because we'll modify the flags and we
2250 : * need to know if the range started as empty.
2251 : */
2252 1445842 : has_nulls = ((!dtup->bt_empty_range) &&
2253 710966 : (bval->bv_hasnulls || bval->bv_allnulls));
2254 :
2255 : /*
2256 : * If the value we're adding is NULL, handle it locally. Otherwise
2257 : * call the BRIN_PROCNUM_ADDVALUE procedure.
2258 : */
2259 734876 : if (bdesc->bd_info[keyno]->oi_regular_nulls && nulls[keyno])
2260 : {
2261 : /*
2262 : * If the new value is null, we record that we saw it if it's the
2263 : * first one; otherwise, there's nothing to do.
2264 : */
2265 11707 : if (!bval->bv_hasnulls)
2266 : {
2267 2382 : bval->bv_hasnulls = true;
2268 2382 : modified = true;
2269 : }
2270 :
2271 11707 : continue;
2272 : }
2273 :
2274 723169 : addValue = index_getprocinfo(idxRel, keyno + 1,
2275 : BRIN_PROCNUM_ADDVALUE);
2276 723169 : result = FunctionCall4Coll(addValue,
2277 723169 : idxRel->rd_indcollation[keyno],
2278 : PointerGetDatum(bdesc),
2279 : PointerGetDatum(bval),
2280 723169 : values[keyno],
2281 723169 : BoolGetDatum(nulls[keyno]));
2282 : /* if that returned true, we need to insert the updated tuple */
2283 723169 : modified |= DatumGetBool(result);
2284 :
2285 : /*
2286 : * If the range was had actual NULL values (i.e. did not start empty),
2287 : * make sure we don't forget about the NULL values. Either the
2288 : * allnulls flag is still set to true, or (if the opclass cleared it)
2289 : * we need to set hasnulls=true.
2290 : *
2291 : * XXX This can only happen when the opclass modified the tuple, so
2292 : * the modified flag should be set.
2293 : */
2294 723169 : if (has_nulls && !(bval->bv_hasnulls || bval->bv_allnulls))
2295 : {
2296 : Assert(modified);
2297 2 : bval->bv_hasnulls = true;
2298 : }
2299 : }
2300 :
2301 : /*
2302 : * After updating summaries for all the keys, mark it as not empty.
2303 : *
2304 : * If we're actually changing the flag value (i.e. tuple started as
2305 : * empty), we should have modified the tuple. So we should not see empty
2306 : * range that was not modified.
2307 : */
2308 : Assert(!dtup->bt_empty_range || modified);
2309 570173 : dtup->bt_empty_range = false;
2310 :
2311 570173 : return modified;
2312 : }
2313 :
2314 : static bool
2315 126624 : check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
2316 : {
2317 : int keyno;
2318 :
2319 : /*
2320 : * First check if there are any IS [NOT] NULL scan keys, and if we're
2321 : * violating them.
2322 : */
2323 127448 : for (keyno = 0; keyno < nnullkeys; keyno++)
2324 : {
2325 1488 : ScanKey key = nullkeys[keyno];
2326 :
2327 : Assert(key->sk_attno == bval->bv_attno);
2328 :
2329 : /* Handle only IS NULL/IS NOT NULL tests */
2330 1488 : if (!(key->sk_flags & SK_ISNULL))
2331 0 : continue;
2332 :
2333 1488 : if (key->sk_flags & SK_SEARCHNULL)
2334 : {
2335 : /* IS NULL scan key, but range has no NULLs */
2336 744 : if (!bval->bv_allnulls && !bval->bv_hasnulls)
2337 652 : return false;
2338 : }
2339 744 : else if (key->sk_flags & SK_SEARCHNOTNULL)
2340 : {
2341 : /*
2342 : * For IS NOT NULL, we can only skip ranges that are known to have
2343 : * only nulls.
2344 : */
2345 744 : if (bval->bv_allnulls)
2346 12 : return false;
2347 : }
2348 : else
2349 : {
2350 : /*
2351 : * Neither IS NULL nor IS NOT NULL was used; assume all indexable
2352 : * operators are strict and thus return false with NULL value in
2353 : * the scan key.
2354 : */
2355 0 : return false;
2356 : }
2357 : }
2358 :
2359 125960 : return true;
2360 : }
2361 :
2362 : /*
2363 : * Create parallel context, and launch workers for leader.
2364 : *
2365 : * buildstate argument should be initialized (with the exception of the
2366 : * tuplesort states, which may later be created based on shared
2367 : * state initially set up here).
2368 : *
2369 : * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY.
2370 : *
2371 : * request is the target number of parallel worker processes to launch.
2372 : *
2373 : * Sets buildstate's BrinLeader, which caller must use to shut down parallel
2374 : * mode by passing it to _brin_end_parallel() at the very end of its index
2375 : * build. If not even a single worker process can be launched, this is
2376 : * never set, and caller should proceed with a serial index build.
2377 : */
2378 : static void
2379 6 : _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
2380 : bool isconcurrent, int request)
2381 : {
2382 : ParallelContext *pcxt;
2383 : int scantuplesortstates;
2384 : Snapshot snapshot;
2385 : Size estbrinshared;
2386 : Size estsort;
2387 : BrinShared *brinshared;
2388 : Sharedsort *sharedsort;
2389 6 : BrinLeader *brinleader = palloc0_object(BrinLeader);
2390 : WalUsage *walusage;
2391 : BufferUsage *bufferusage;
2392 6 : bool leaderparticipates = true;
2393 : int querylen;
2394 :
2395 : #ifdef DISABLE_LEADER_PARTICIPATION
2396 : leaderparticipates = false;
2397 : #endif
2398 :
2399 : /*
2400 : * Enter parallel mode, and create context for parallel build of brin
2401 : * index
2402 : */
2403 6 : EnterParallelMode();
2404 : Assert(request > 0);
2405 6 : pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main",
2406 : request);
2407 :
2408 6 : scantuplesortstates = leaderparticipates ? request + 1 : request;
2409 :
2410 : /*
2411 : * Prepare for scan of the base relation. In a normal index build, we use
2412 : * SnapshotAny because we must retrieve all tuples and do our own time
2413 : * qual checks (because we have to index RECENTLY_DEAD tuples). In a
2414 : * concurrent build, we take a regular MVCC snapshot and index whatever's
2415 : * live according to that.
2416 : */
2417 6 : if (!isconcurrent)
2418 6 : snapshot = SnapshotAny;
2419 : else
2420 0 : snapshot = RegisterSnapshot(GetTransactionSnapshot());
2421 :
2422 : /*
2423 : * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
2424 : */
2425 6 : estbrinshared = _brin_parallel_estimate_shared(heap, snapshot);
2426 6 : shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared);
2427 6 : estsort = tuplesort_estimate_shared(scantuplesortstates);
2428 6 : shm_toc_estimate_chunk(&pcxt->estimator, estsort);
2429 :
2430 6 : shm_toc_estimate_keys(&pcxt->estimator, 2);
2431 :
2432 : /*
2433 : * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
2434 : * and PARALLEL_KEY_BUFFER_USAGE.
2435 : *
2436 : * If there are no extensions loaded that care, we could skip this. We
2437 : * have no way of knowing whether anyone's looking at pgWalUsage or
2438 : * pgBufferUsage, so do it unconditionally.
2439 : */
2440 6 : shm_toc_estimate_chunk(&pcxt->estimator,
2441 : mul_size(sizeof(WalUsage), pcxt->nworkers));
2442 6 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2443 6 : shm_toc_estimate_chunk(&pcxt->estimator,
2444 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
2445 6 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2446 :
2447 : /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
2448 6 : if (debug_query_string)
2449 : {
2450 6 : querylen = strlen(debug_query_string);
2451 6 : shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
2452 6 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2453 : }
2454 : else
2455 0 : querylen = 0; /* keep compiler quiet */
2456 :
2457 : /* Everyone's had a chance to ask for space, so now create the DSM */
2458 6 : InitializeParallelDSM(pcxt);
2459 :
2460 : /* If no DSM segment was available, back out (do serial build) */
2461 6 : if (pcxt->seg == NULL)
2462 : {
2463 0 : if (IsMVCCSnapshot(snapshot))
2464 0 : UnregisterSnapshot(snapshot);
2465 0 : DestroyParallelContext(pcxt);
2466 0 : ExitParallelMode();
2467 0 : return;
2468 : }
2469 :
2470 : /* Store shared build state, for which we reserved space */
2471 6 : brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared);
2472 : /* Initialize immutable state */
2473 6 : brinshared->heaprelid = RelationGetRelid(heap);
2474 6 : brinshared->indexrelid = RelationGetRelid(index);
2475 6 : brinshared->isconcurrent = isconcurrent;
2476 6 : brinshared->scantuplesortstates = scantuplesortstates;
2477 6 : brinshared->pagesPerRange = buildstate->bs_pagesPerRange;
2478 6 : brinshared->queryid = pgstat_get_my_query_id();
2479 6 : ConditionVariableInit(&brinshared->workersdonecv);
2480 6 : SpinLockInit(&brinshared->mutex);
2481 :
2482 : /* Initialize mutable state */
2483 6 : brinshared->nparticipantsdone = 0;
2484 6 : brinshared->reltuples = 0.0;
2485 6 : brinshared->indtuples = 0.0;
2486 :
2487 6 : table_parallelscan_initialize(heap,
2488 : ParallelTableScanFromBrinShared(brinshared),
2489 : snapshot);
2490 :
2491 : /*
2492 : * Store shared tuplesort-private state, for which we reserved space.
2493 : * Then, initialize opaque state using tuplesort routine.
2494 : */
2495 6 : sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
2496 6 : tuplesort_initialize_shared(sharedsort, scantuplesortstates,
2497 : pcxt->seg);
2498 :
2499 : /*
2500 : * Store shared tuplesort-private state, for which we reserved space.
2501 : * Then, initialize opaque state using tuplesort routine.
2502 : */
2503 6 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BRIN_SHARED, brinshared);
2504 6 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
2505 :
2506 : /* Store query string for workers */
2507 6 : if (debug_query_string)
2508 : {
2509 : char *sharedquery;
2510 :
2511 6 : sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
2512 6 : memcpy(sharedquery, debug_query_string, querylen + 1);
2513 6 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
2514 : }
2515 :
2516 : /*
2517 : * Allocate space for each worker's WalUsage and BufferUsage; no need to
2518 : * initialize.
2519 : */
2520 6 : walusage = shm_toc_allocate(pcxt->toc,
2521 6 : mul_size(sizeof(WalUsage), pcxt->nworkers));
2522 6 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
2523 6 : bufferusage = shm_toc_allocate(pcxt->toc,
2524 6 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
2525 6 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
2526 :
2527 : /* Launch workers, saving status for leader/caller */
2528 6 : LaunchParallelWorkers(pcxt);
2529 6 : brinleader->pcxt = pcxt;
2530 6 : brinleader->nparticipanttuplesorts = pcxt->nworkers_launched;
2531 6 : if (leaderparticipates)
2532 6 : brinleader->nparticipanttuplesorts++;
2533 6 : brinleader->brinshared = brinshared;
2534 6 : brinleader->sharedsort = sharedsort;
2535 6 : brinleader->snapshot = snapshot;
2536 6 : brinleader->walusage = walusage;
2537 6 : brinleader->bufferusage = bufferusage;
2538 :
2539 : /* If no workers were successfully launched, back out (do serial build) */
2540 6 : if (pcxt->nworkers_launched == 0)
2541 : {
2542 1 : _brin_end_parallel(brinleader, NULL);
2543 1 : return;
2544 : }
2545 :
2546 : /* Save leader state now that it's clear build will be parallel */
2547 5 : buildstate->bs_leader = brinleader;
2548 :
2549 : /* Join heap scan ourselves */
2550 5 : if (leaderparticipates)
2551 5 : _brin_leader_participate_as_worker(buildstate, heap, index);
2552 :
2553 : /*
2554 : * Caller needs to wait for all launched workers when we return. Make
2555 : * sure that the failure-to-start case will not hang forever.
2556 : */
2557 5 : WaitForParallelWorkersToAttach(pcxt);
2558 : }
2559 :
2560 : /*
2561 : * Shut down workers, destroy parallel context, and end parallel mode.
2562 : */
2563 : static void
2564 6 : _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
2565 : {
2566 : int i;
2567 :
2568 : /* Shutdown worker processes */
2569 6 : WaitForParallelWorkersToFinish(brinleader->pcxt);
2570 :
2571 : /*
2572 : * Next, accumulate WAL usage. (This must wait for the workers to finish,
2573 : * or we might get incomplete data.)
2574 : */
2575 13 : for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
2576 7 : InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
2577 :
2578 : /* Free last reference to MVCC snapshot, if one was used */
2579 6 : if (IsMVCCSnapshot(brinleader->snapshot))
2580 0 : UnregisterSnapshot(brinleader->snapshot);
2581 6 : DestroyParallelContext(brinleader->pcxt);
2582 6 : ExitParallelMode();
2583 6 : }
2584 :
2585 : /*
2586 : * Within leader, wait for end of heap scan.
2587 : *
2588 : * When called, parallel heap scan started by _brin_begin_parallel() will
2589 : * already be underway within worker processes (when leader participates
2590 : * as a worker, we should end up here just as workers are finishing).
2591 : *
2592 : * Returns the total number of heap tuples scanned.
2593 : */
2594 : static double
2595 5 : _brin_parallel_heapscan(BrinBuildState *state)
2596 : {
2597 5 : BrinShared *brinshared = state->bs_leader->brinshared;
2598 : int nparticipanttuplesorts;
2599 :
2600 5 : nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts;
2601 : for (;;)
2602 : {
2603 14 : SpinLockAcquire(&brinshared->mutex);
2604 14 : if (brinshared->nparticipantsdone == nparticipanttuplesorts)
2605 : {
2606 : /* copy the data into leader state */
2607 5 : state->bs_reltuples = brinshared->reltuples;
2608 5 : state->bs_numtuples = brinshared->indtuples;
2609 :
2610 5 : SpinLockRelease(&brinshared->mutex);
2611 5 : break;
2612 : }
2613 9 : SpinLockRelease(&brinshared->mutex);
2614 :
2615 9 : ConditionVariableSleep(&brinshared->workersdonecv,
2616 : WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN);
2617 : }
2618 :
2619 5 : ConditionVariableCancelSleep();
2620 :
2621 5 : return state->bs_reltuples;
2622 : }
2623 :
2624 : /*
2625 : * Within leader, wait for end of heap scan and merge per-worker results.
2626 : *
2627 : * After waiting for all workers to finish, merge the per-worker results into
2628 : * the complete index. The results from each worker are sorted by block number
2629 : * (start of the page range). While combining the per-worker results we merge
2630 : * summaries for the same page range, and also fill-in empty summaries for
2631 : * ranges without any tuples.
2632 : *
2633 : * Returns the total number of heap tuples scanned.
2634 : */
2635 : static double
2636 5 : _brin_parallel_merge(BrinBuildState *state)
2637 : {
2638 : BrinTuple *btup;
2639 5 : BrinMemTuple *memtuple = NULL;
2640 : Size tuplen;
2641 5 : BlockNumber prevblkno = InvalidBlockNumber;
2642 : MemoryContext rangeCxt,
2643 : oldCxt;
2644 : double reltuples;
2645 :
2646 : /* wait for workers to scan table and produce partial results */
2647 5 : reltuples = _brin_parallel_heapscan(state);
2648 :
2649 : /* do the actual sort in the leader */
2650 5 : tuplesort_performsort(state->bs_sortstate);
2651 :
2652 : /*
2653 : * Initialize BrinMemTuple we'll use to union summaries from workers (in
2654 : * case they happened to produce parts of the same page range).
2655 : */
2656 5 : memtuple = brin_new_memtuple(state->bs_bdesc);
2657 :
2658 : /*
2659 : * Create a memory context we'll reset to combine results for a single
2660 : * page range (received from the workers). We don't expect huge number of
2661 : * overlaps under regular circumstances, because for large tables the
2662 : * chunk size is likely larger than the BRIN page range), but it can
2663 : * happen, and the union functions may do all kinds of stuff. So we better
2664 : * reset the context once in a while.
2665 : */
2666 5 : rangeCxt = AllocSetContextCreate(CurrentMemoryContext,
2667 : "brin union",
2668 : ALLOCSET_DEFAULT_SIZES);
2669 5 : oldCxt = MemoryContextSwitchTo(rangeCxt);
2670 :
2671 : /*
2672 : * Read the BRIN tuples from the shared tuplesort, sorted by block number.
2673 : * That probably gives us an index that is cheaper to scan, thanks to
2674 : * mostly getting data from the same index page as before.
2675 : */
2676 25 : while ((btup = tuplesort_getbrintuple(state->bs_sortstate, &tuplen, true)) != NULL)
2677 : {
2678 : /* Ranges should be multiples of pages_per_range for the index. */
2679 : Assert(btup->bt_blkno % state->bs_leader->brinshared->pagesPerRange == 0);
2680 :
2681 : /*
2682 : * Do we need to union summaries for the same page range?
2683 : *
2684 : * If this is the first brin tuple we read, then just deform it into
2685 : * the memtuple, and continue with the next one from tuplesort. We
2686 : * however may need to insert empty summaries into the index.
2687 : *
2688 : * If it's the same block as the last we saw, we simply union the brin
2689 : * tuple into it, and we're done - we don't even need to insert empty
2690 : * ranges, because that was done earlier when we saw the first brin
2691 : * tuple (for this range).
2692 : *
2693 : * Finally, if it's not the first brin tuple, and it's not the same
2694 : * page range, we need to do the insert and then deform the tuple into
2695 : * the memtuple. Then we'll insert empty ranges before the new brin
2696 : * tuple, if needed.
2697 : */
2698 20 : if (prevblkno == InvalidBlockNumber)
2699 : {
2700 : /* First brin tuples, just deform into memtuple. */
2701 1 : memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
2702 :
2703 : /* continue to insert empty pages before thisblock */
2704 : }
2705 19 : else if (memtuple->bt_blkno == btup->bt_blkno)
2706 : {
2707 : /*
2708 : * Not the first brin tuple, but same page range as the previous
2709 : * one, so we can merge it into the memtuple.
2710 : */
2711 0 : union_tuples(state->bs_bdesc, memtuple, btup);
2712 0 : continue;
2713 : }
2714 : else
2715 : {
2716 : BrinTuple *tmp;
2717 : Size len;
2718 :
2719 : /*
2720 : * We got brin tuple for a different page range, so form a brin
2721 : * tuple from the memtuple, insert it, and re-init the memtuple
2722 : * from the new brin tuple.
2723 : */
2724 19 : tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
2725 : memtuple, &len);
2726 :
2727 19 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
2728 : &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
2729 :
2730 : /*
2731 : * Reset the per-output-range context. This frees all the memory
2732 : * possibly allocated by the union functions, and also the BRIN
2733 : * tuple we just formed and inserted.
2734 : */
2735 19 : MemoryContextReset(rangeCxt);
2736 :
2737 19 : memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
2738 :
2739 : /* continue to insert empty pages before thisblock */
2740 : }
2741 :
2742 : /* Fill empty ranges for all ranges missing in the tuplesort. */
2743 20 : brin_fill_empty_ranges(state, prevblkno, btup->bt_blkno);
2744 :
2745 20 : prevblkno = btup->bt_blkno;
2746 : }
2747 :
2748 5 : tuplesort_end(state->bs_sortstate);
2749 :
2750 : /* Fill the BRIN tuple for the last page range with data. */
2751 5 : if (prevblkno != InvalidBlockNumber)
2752 : {
2753 : BrinTuple *tmp;
2754 : Size len;
2755 :
2756 1 : tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
2757 : memtuple, &len);
2758 :
2759 1 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
2760 : &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
2761 :
2762 1 : pfree(tmp);
2763 : }
2764 :
2765 : /* Fill empty ranges at the end, for all ranges missing in the tuplesort. */
2766 5 : brin_fill_empty_ranges(state, prevblkno, state->bs_maxRangeStart);
2767 :
2768 : /*
2769 : * Switch back to the original memory context, and destroy the one we
2770 : * created to isolate the union_tuple calls.
2771 : */
2772 5 : MemoryContextSwitchTo(oldCxt);
2773 5 : MemoryContextDelete(rangeCxt);
2774 :
2775 5 : return reltuples;
2776 : }
2777 :
2778 : /*
2779 : * Returns size of shared memory required to store state for a parallel
2780 : * brin index build based on the snapshot its parallel scan will use.
2781 : */
2782 : static Size
2783 6 : _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
2784 : {
2785 : /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
2786 6 : return add_size(BUFFERALIGN(sizeof(BrinShared)),
2787 : table_parallelscan_estimate(heap, snapshot));
2788 : }
2789 :
2790 : /*
2791 : * Within leader, participate as a parallel worker.
2792 : */
2793 : static void
2794 5 : _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index)
2795 : {
2796 5 : BrinLeader *brinleader = buildstate->bs_leader;
2797 : int sortmem;
2798 :
2799 : /*
2800 : * Might as well use reliable figure when doling out maintenance_work_mem
2801 : * (when requested number of workers were not launched, this will be
2802 : * somewhat higher than it is for other workers).
2803 : */
2804 5 : sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts;
2805 :
2806 : /* Perform work common to all participants */
2807 5 : _brin_parallel_scan_and_build(buildstate, brinleader->brinshared,
2808 : brinleader->sharedsort, heap, index, sortmem, true);
2809 5 : }
2810 :
2811 : /*
2812 : * Perform a worker's portion of a parallel sort.
2813 : *
2814 : * This generates a tuplesort for the worker portion of the table.
2815 : *
2816 : * sortmem is the amount of working memory to use within each worker,
2817 : * expressed in KBs.
2818 : *
2819 : * When this returns, workers are done, and need only release resources.
2820 : */
2821 : static void
2822 12 : _brin_parallel_scan_and_build(BrinBuildState *state,
2823 : BrinShared *brinshared, Sharedsort *sharedsort,
2824 : Relation heap, Relation index,
2825 : int sortmem, bool progress)
2826 : {
2827 : SortCoordinate coordinate;
2828 : TableScanDesc scan;
2829 : double reltuples;
2830 : IndexInfo *indexInfo;
2831 :
2832 : /* Initialize local tuplesort coordination state */
2833 12 : coordinate = palloc0_object(SortCoordinateData);
2834 12 : coordinate->isWorker = true;
2835 12 : coordinate->nParticipants = -1;
2836 12 : coordinate->sharedsort = sharedsort;
2837 :
2838 : /* Begin "partial" tuplesort */
2839 12 : state->bs_sortstate = tuplesort_begin_index_brin(sortmem, coordinate,
2840 : TUPLESORT_NONE);
2841 :
2842 : /* Join parallel scan */
2843 12 : indexInfo = BuildIndexInfo(index);
2844 12 : indexInfo->ii_Concurrent = brinshared->isconcurrent;
2845 :
2846 12 : scan = table_beginscan_parallel(heap,
2847 : ParallelTableScanFromBrinShared(brinshared));
2848 :
2849 12 : reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
2850 : brinbuildCallbackParallel, state, scan);
2851 :
2852 : /* insert the last item */
2853 12 : form_and_spill_tuple(state);
2854 :
2855 : /* sort the BRIN ranges built by this worker */
2856 12 : tuplesort_performsort(state->bs_sortstate);
2857 :
2858 12 : state->bs_reltuples += reltuples;
2859 :
2860 : /*
2861 : * Done. Record ambuild statistics.
2862 : */
2863 12 : SpinLockAcquire(&brinshared->mutex);
2864 12 : brinshared->nparticipantsdone++;
2865 12 : brinshared->reltuples += state->bs_reltuples;
2866 12 : brinshared->indtuples += state->bs_numtuples;
2867 12 : SpinLockRelease(&brinshared->mutex);
2868 :
2869 : /* Notify leader */
2870 12 : ConditionVariableSignal(&brinshared->workersdonecv);
2871 :
2872 12 : tuplesort_end(state->bs_sortstate);
2873 12 : }
2874 :
2875 : /*
2876 : * Perform work within a launched parallel process.
2877 : */
2878 : void
2879 7 : _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
2880 : {
2881 : char *sharedquery;
2882 : BrinShared *brinshared;
2883 : Sharedsort *sharedsort;
2884 : BrinBuildState *buildstate;
2885 : Relation heapRel;
2886 : Relation indexRel;
2887 : LOCKMODE heapLockmode;
2888 : LOCKMODE indexLockmode;
2889 : WalUsage *walusage;
2890 : BufferUsage *bufferusage;
2891 : int sortmem;
2892 :
2893 : /*
2894 : * The only possible status flag that can be set to the parallel worker is
2895 : * PROC_IN_SAFE_IC.
2896 : */
2897 : Assert((MyProc->statusFlags == 0) ||
2898 : (MyProc->statusFlags == PROC_IN_SAFE_IC));
2899 :
2900 : /* Set debug_query_string for individual workers first */
2901 7 : sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
2902 7 : debug_query_string = sharedquery;
2903 :
2904 : /* Report the query string from leader */
2905 7 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
2906 :
2907 : /* Look up brin shared state */
2908 7 : brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false);
2909 :
2910 : /* Open relations using lock modes known to be obtained by index.c */
2911 7 : if (!brinshared->isconcurrent)
2912 : {
2913 7 : heapLockmode = ShareLock;
2914 7 : indexLockmode = AccessExclusiveLock;
2915 : }
2916 : else
2917 : {
2918 0 : heapLockmode = ShareUpdateExclusiveLock;
2919 0 : indexLockmode = RowExclusiveLock;
2920 : }
2921 :
2922 : /* Track query ID */
2923 7 : pgstat_report_query_id(brinshared->queryid, false);
2924 :
2925 : /* Open relations within worker */
2926 7 : heapRel = table_open(brinshared->heaprelid, heapLockmode);
2927 7 : indexRel = index_open(brinshared->indexrelid, indexLockmode);
2928 :
2929 7 : buildstate = initialize_brin_buildstate(indexRel, NULL,
2930 : brinshared->pagesPerRange,
2931 : InvalidBlockNumber);
2932 :
2933 : /* Look up shared state private to tuplesort.c */
2934 7 : sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
2935 7 : tuplesort_attach_shared(sharedsort, seg);
2936 :
2937 : /* Prepare to track buffer usage during parallel execution */
2938 7 : InstrStartParallelQuery();
2939 :
2940 : /*
2941 : * Might as well use reliable figure when doling out maintenance_work_mem
2942 : * (when requested number of workers were not launched, this will be
2943 : * somewhat higher than it is for other workers).
2944 : */
2945 7 : sortmem = maintenance_work_mem / brinshared->scantuplesortstates;
2946 :
2947 7 : _brin_parallel_scan_and_build(buildstate, brinshared, sharedsort,
2948 : heapRel, indexRel, sortmem, false);
2949 :
2950 : /* Report WAL/buffer usage during parallel execution */
2951 7 : bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
2952 7 : walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
2953 7 : InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
2954 7 : &walusage[ParallelWorkerNumber]);
2955 :
2956 7 : index_close(indexRel, indexLockmode);
2957 7 : table_close(heapRel, heapLockmode);
2958 7 : }
2959 :
2960 : /*
2961 : * brin_build_empty_tuple
2962 : * Maybe initialize a BRIN tuple representing empty range.
2963 : *
2964 : * Returns a BRIN tuple representing an empty page range starting at the
2965 : * specified block number. The empty tuple is initialized only once, when it's
2966 : * needed for the first time, stored in the memory context bs_context to ensure
2967 : * proper life span, and reused on following calls. All empty tuples are
2968 : * exactly the same except for the bt_blkno field, which is set to the value
2969 : * in blkno parameter.
2970 : */
2971 : static void
2972 11 : brin_build_empty_tuple(BrinBuildState *state, BlockNumber blkno)
2973 : {
2974 : /* First time an empty tuple is requested? If yes, initialize it. */
2975 11 : if (state->bs_emptyTuple == NULL)
2976 : {
2977 : MemoryContext oldcxt;
2978 6 : BrinMemTuple *dtuple = brin_new_memtuple(state->bs_bdesc);
2979 :
2980 : /* Allocate the tuple in context for the whole index build. */
2981 6 : oldcxt = MemoryContextSwitchTo(state->bs_context);
2982 :
2983 6 : state->bs_emptyTuple = brin_form_tuple(state->bs_bdesc, blkno, dtuple,
2984 : &state->bs_emptyTupleLen);
2985 :
2986 6 : MemoryContextSwitchTo(oldcxt);
2987 : }
2988 : else
2989 : {
2990 : /* If we already have an empty tuple, just update the block. */
2991 5 : state->bs_emptyTuple->bt_blkno = blkno;
2992 : }
2993 11 : }
2994 :
2995 : /*
2996 : * brin_fill_empty_ranges
2997 : * Add BRIN index tuples representing empty page ranges.
2998 : *
2999 : * prevRange/nextRange determine for which page ranges to add empty summaries.
3000 : * Both boundaries are exclusive, i.e. only ranges starting at blkno for which
3001 : * (prevRange < blkno < nextRange) will be added to the index.
3002 : *
3003 : * If prevRange is InvalidBlockNumber, this means there was no previous page
3004 : * range (i.e. the first empty range to add is for blkno=0).
3005 : *
3006 : * The empty tuple is built only once, and then reused for all future calls.
3007 : */
3008 : static void
3009 244 : brin_fill_empty_ranges(BrinBuildState *state,
3010 : BlockNumber prevRange, BlockNumber nextRange)
3011 : {
3012 : BlockNumber blkno;
3013 :
3014 : /*
3015 : * If we already summarized some ranges, we need to start with the next
3016 : * one. Otherwise start from the first range of the table.
3017 : */
3018 244 : blkno = (prevRange == InvalidBlockNumber) ? 0 : (prevRange + state->bs_pagesPerRange);
3019 :
3020 : /* Generate empty ranges until we hit the next non-empty range. */
3021 255 : while (blkno < nextRange)
3022 : {
3023 : /* Did we already build the empty tuple? If not, do it now. */
3024 11 : brin_build_empty_tuple(state, blkno);
3025 :
3026 11 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
3027 : &state->bs_currentInsertBuf,
3028 11 : blkno, state->bs_emptyTuple, state->bs_emptyTupleLen);
3029 :
3030 : /* try next page range */
3031 11 : blkno += state->bs_pagesPerRange;
3032 : }
3033 244 : }
|