Line data Source code
1 : /*
2 : * brin.c
3 : * Implementation of BRIN indexes for Postgres
4 : *
5 : * See src/backend/access/brin/README for details.
6 : *
7 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/brin/brin.c
12 : *
13 : * TODO
14 : * * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/brin.h"
19 : #include "access/brin_page.h"
20 : #include "access/brin_pageops.h"
21 : #include "access/brin_xlog.h"
22 : #include "access/relation.h"
23 : #include "access/reloptions.h"
24 : #include "access/relscan.h"
25 : #include "access/table.h"
26 : #include "access/tableam.h"
27 : #include "access/xloginsert.h"
28 : #include "catalog/index.h"
29 : #include "catalog/pg_am.h"
30 : #include "commands/vacuum.h"
31 : #include "miscadmin.h"
32 : #include "pgstat.h"
33 : #include "postmaster/autovacuum.h"
34 : #include "storage/bufmgr.h"
35 : #include "storage/freespace.h"
36 : #include "tcop/tcopprot.h"
37 : #include "utils/acl.h"
38 : #include "utils/datum.h"
39 : #include "utils/fmgrprotos.h"
40 : #include "utils/guc.h"
41 : #include "utils/index_selfuncs.h"
42 : #include "utils/memutils.h"
43 : #include "utils/rel.h"
44 : #include "utils/tuplesort.h"
45 :
46 : /* Magic numbers for parallel state sharing */
47 : #define PARALLEL_KEY_BRIN_SHARED UINT64CONST(0xB000000000000001)
48 : #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002)
49 : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003)
50 : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004)
51 : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005)
52 :
53 : /*
54 : * Status for index builds performed in parallel. This is allocated in a
55 : * dynamic shared memory segment.
56 : */
57 : typedef struct BrinShared
58 : {
59 : /*
60 : * These fields are not modified during the build. They primarily exist
61 : * for the benefit of worker processes that need to create state
62 : * corresponding to that used by the leader.
63 : */
64 : Oid heaprelid;
65 : Oid indexrelid;
66 : bool isconcurrent;
67 : BlockNumber pagesPerRange;
68 : int scantuplesortstates;
69 :
70 : /* Query ID, for report in worker processes */
71 : int64 queryid;
72 :
73 : /*
74 : * workersdonecv is used to monitor the progress of workers. All parallel
75 : * participants must indicate that they are done before leader can use
76 : * results built by the workers (and before leader can write the data into
77 : * the index).
78 : */
79 : ConditionVariable workersdonecv;
80 :
81 : /*
82 : * mutex protects all fields before heapdesc.
83 : *
84 : * These fields contain status information of interest to BRIN index
85 : * builds that must work just the same when an index is built in parallel.
86 : */
87 : slock_t mutex;
88 :
89 : /*
90 : * Mutable state that is maintained by workers, and reported back to
91 : * leader at end of the scans.
92 : *
93 : * nparticipantsdone is number of worker processes finished.
94 : *
95 : * reltuples is the total number of input heap tuples.
96 : *
97 : * indtuples is the total number of tuples that made it into the index.
98 : */
99 : int nparticipantsdone;
100 : double reltuples;
101 : double indtuples;
102 :
103 : /*
104 : * ParallelTableScanDescData data follows. Can't directly embed here, as
105 : * implementations of the parallel table scan desc interface might need
106 : * stronger alignment.
107 : */
108 : } BrinShared;
109 :
110 : /*
111 : * Return pointer to a BrinShared's parallel table scan.
112 : *
113 : * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just
114 : * MAXALIGN.
115 : */
116 : #define ParallelTableScanFromBrinShared(shared) \
117 : (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(BrinShared)))
118 :
119 : /*
120 : * Status for leader in parallel index build.
121 : */
122 : typedef struct BrinLeader
123 : {
124 : /* parallel context itself */
125 : ParallelContext *pcxt;
126 :
127 : /*
128 : * nparticipanttuplesorts is the exact number of worker processes
129 : * successfully launched, plus one leader process if it participates as a
130 : * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader
131 : * participating as a worker).
132 : */
133 : int nparticipanttuplesorts;
134 :
135 : /*
136 : * Leader process convenience pointers to shared state (leader avoids TOC
137 : * lookups).
138 : *
139 : * brinshared is the shared state for entire build. sharedsort is the
140 : * shared, tuplesort-managed state passed to each process tuplesort.
141 : * snapshot is the snapshot used by the scan iff an MVCC snapshot is
142 : * required.
143 : */
144 : BrinShared *brinshared;
145 : Sharedsort *sharedsort;
146 : Snapshot snapshot;
147 : WalUsage *walusage;
148 : BufferUsage *bufferusage;
149 : } BrinLeader;
150 :
151 : /*
152 : * We use a BrinBuildState during initial construction of a BRIN index.
153 : * The running state is kept in a BrinMemTuple.
154 : */
155 : typedef struct BrinBuildState
156 : {
157 : Relation bs_irel;
158 : double bs_numtuples;
159 : double bs_reltuples;
160 : Buffer bs_currentInsertBuf;
161 : BlockNumber bs_pagesPerRange;
162 : BlockNumber bs_currRangeStart;
163 : BlockNumber bs_maxRangeStart;
164 : BrinRevmap *bs_rmAccess;
165 : BrinDesc *bs_bdesc;
166 : BrinMemTuple *bs_dtuple;
167 :
168 : BrinTuple *bs_emptyTuple;
169 : Size bs_emptyTupleLen;
170 : MemoryContext bs_context;
171 :
172 : /*
173 : * bs_leader is only present when a parallel index build is performed, and
174 : * only in the leader process. (Actually, only the leader process has a
175 : * BrinBuildState.)
176 : */
177 : BrinLeader *bs_leader;
178 : int bs_worker_id;
179 :
180 : /*
181 : * The sortstate is used by workers (including the leader). It has to be
182 : * part of the build state, because that's the only thing passed to the
183 : * build callback etc.
184 : */
185 : Tuplesortstate *bs_sortstate;
186 : } BrinBuildState;
187 :
188 : /*
189 : * We use a BrinInsertState to capture running state spanning multiple
190 : * brininsert invocations, within the same command.
191 : */
192 : typedef struct BrinInsertState
193 : {
194 : BrinRevmap *bis_rmAccess;
195 : BrinDesc *bis_desc;
196 : BlockNumber bis_pages_per_range;
197 : } BrinInsertState;
198 :
199 : /*
200 : * Struct used as "opaque" during index scans
201 : */
202 : typedef struct BrinOpaque
203 : {
204 : BlockNumber bo_pagesPerRange;
205 : BrinRevmap *bo_rmAccess;
206 : BrinDesc *bo_bdesc;
207 : } BrinOpaque;
208 :
209 : #define BRIN_ALL_BLOCKRANGES InvalidBlockNumber
210 :
211 : static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
212 : BrinRevmap *revmap,
213 : BlockNumber pagesPerRange,
214 : BlockNumber tablePages);
215 : static BrinInsertState *initialize_brin_insertstate(Relation idxRel, IndexInfo *indexInfo);
216 : static void terminate_brin_buildstate(BrinBuildState *state);
217 : static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
218 : bool include_partial, double *numSummarized, double *numExisting);
219 : static void form_and_insert_tuple(BrinBuildState *state);
220 : static void form_and_spill_tuple(BrinBuildState *state);
221 : static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
222 : BrinTuple *b);
223 : static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
224 : static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
225 : BrinMemTuple *dtup, const Datum *values, const bool *nulls);
226 : static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
227 : static void brin_fill_empty_ranges(BrinBuildState *state,
228 : BlockNumber prevRange, BlockNumber nextRange);
229 :
230 : /* parallel index builds */
231 : static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
232 : bool isconcurrent, int request);
233 : static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state);
234 : static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
235 : static double _brin_parallel_heapscan(BrinBuildState *state);
236 : static double _brin_parallel_merge(BrinBuildState *state);
237 : static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
238 : Relation heap, Relation index);
239 : static void _brin_parallel_scan_and_build(BrinBuildState *state,
240 : BrinShared *brinshared,
241 : Sharedsort *sharedsort,
242 : Relation heap, Relation index,
243 : int sortmem, bool progress);
244 :
245 : /*
246 : * BRIN handler function: return IndexAmRoutine with access method parameters
247 : * and callbacks.
248 : */
249 : Datum
250 4458 : brinhandler(PG_FUNCTION_ARGS)
251 : {
252 : static const IndexAmRoutine amroutine = {
253 : .type = T_IndexAmRoutine,
254 : .amstrategies = 0,
255 : .amsupport = BRIN_LAST_OPTIONAL_PROCNUM,
256 : .amoptsprocnum = BRIN_PROCNUM_OPTIONS,
257 : .amcanorder = false,
258 : .amcanorderbyop = false,
259 : .amcanhash = false,
260 : .amconsistentequality = false,
261 : .amconsistentordering = false,
262 : .amcanbackward = false,
263 : .amcanunique = false,
264 : .amcanmulticol = true,
265 : .amoptionalkey = true,
266 : .amsearcharray = false,
267 : .amsearchnulls = true,
268 : .amstorage = true,
269 : .amclusterable = false,
270 : .ampredlocks = false,
271 : .amcanparallel = false,
272 : .amcanbuildparallel = true,
273 : .amcaninclude = false,
274 : .amusemaintenanceworkmem = false,
275 : .amsummarizing = true,
276 : .amparallelvacuumoptions =
277 : VACUUM_OPTION_PARALLEL_CLEANUP,
278 : .amkeytype = InvalidOid,
279 :
280 : .ambuild = brinbuild,
281 : .ambuildempty = brinbuildempty,
282 : .aminsert = brininsert,
283 : .aminsertcleanup = brininsertcleanup,
284 : .ambulkdelete = brinbulkdelete,
285 : .amvacuumcleanup = brinvacuumcleanup,
286 : .amcanreturn = NULL,
287 : .amcostestimate = brincostestimate,
288 : .amgettreeheight = NULL,
289 : .amoptions = brinoptions,
290 : .amproperty = NULL,
291 : .ambuildphasename = NULL,
292 : .amvalidate = brinvalidate,
293 : .amadjustmembers = NULL,
294 : .ambeginscan = brinbeginscan,
295 : .amrescan = brinrescan,
296 : .amgettuple = NULL,
297 : .amgetbitmap = bringetbitmap,
298 : .amendscan = brinendscan,
299 : .ammarkpos = NULL,
300 : .amrestrpos = NULL,
301 : .amestimateparallelscan = NULL,
302 : .aminitparallelscan = NULL,
303 : .amparallelrescan = NULL,
304 : .amtranslatestrategy = NULL,
305 : .amtranslatecmptype = NULL,
306 : };
307 :
308 4458 : PG_RETURN_POINTER(&amroutine);
309 : }
310 :
311 : /*
312 : * Initialize a BrinInsertState to maintain state to be used across multiple
313 : * tuple inserts, within the same command.
314 : */
315 : static BrinInsertState *
316 1122 : initialize_brin_insertstate(Relation idxRel, IndexInfo *indexInfo)
317 : {
318 : BrinInsertState *bistate;
319 : MemoryContext oldcxt;
320 :
321 1122 : oldcxt = MemoryContextSwitchTo(indexInfo->ii_Context);
322 1122 : bistate = palloc0_object(BrinInsertState);
323 1122 : bistate->bis_desc = brin_build_desc(idxRel);
324 1122 : bistate->bis_rmAccess = brinRevmapInitialize(idxRel,
325 : &bistate->bis_pages_per_range);
326 1122 : indexInfo->ii_AmCache = bistate;
327 1122 : MemoryContextSwitchTo(oldcxt);
328 :
329 1122 : return bistate;
330 : }
331 :
332 : /*
333 : * A tuple in the heap is being inserted. To keep a brin index up to date,
334 : * we need to obtain the relevant index tuple and compare its stored values
335 : * with those of the new tuple. If the tuple values are not consistent with
336 : * the summary tuple, we need to update the index tuple.
337 : *
338 : * If autosummarization is enabled, check if we need to summarize the previous
339 : * page range.
340 : *
341 : * If the range is not currently summarized (i.e. the revmap returns NULL for
342 : * it), there's nothing to do for this tuple.
343 : */
344 : bool
345 126216 : brininsert(Relation idxRel, Datum *values, bool *nulls,
346 : ItemPointer heaptid, Relation heapRel,
347 : IndexUniqueCheck checkUnique,
348 : bool indexUnchanged,
349 : IndexInfo *indexInfo)
350 : {
351 : BlockNumber pagesPerRange;
352 : BlockNumber origHeapBlk;
353 : BlockNumber heapBlk;
354 126216 : BrinInsertState *bistate = (BrinInsertState *) indexInfo->ii_AmCache;
355 : BrinRevmap *revmap;
356 : BrinDesc *bdesc;
357 126216 : Buffer buf = InvalidBuffer;
358 126216 : MemoryContext tupcxt = NULL;
359 126216 : MemoryContext oldcxt = CurrentMemoryContext;
360 126216 : bool autosummarize = BrinGetAutoSummarize(idxRel);
361 :
362 : /*
363 : * If first time through in this statement, initialize the insert state
364 : * that we keep for all the inserts in the command.
365 : */
366 126216 : if (!bistate)
367 1122 : bistate = initialize_brin_insertstate(idxRel, indexInfo);
368 :
369 126216 : revmap = bistate->bis_rmAccess;
370 126216 : bdesc = bistate->bis_desc;
371 126216 : pagesPerRange = bistate->bis_pages_per_range;
372 :
373 : /*
374 : * origHeapBlk is the block number where the insertion occurred. heapBlk
375 : * is the first block in the corresponding page range.
376 : */
377 126216 : origHeapBlk = ItemPointerGetBlockNumber(heaptid);
378 126216 : heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
379 :
380 : for (;;)
381 0 : {
382 126216 : bool need_insert = false;
383 : OffsetNumber off;
384 : BrinTuple *brtup;
385 : BrinMemTuple *dtup;
386 :
387 126216 : CHECK_FOR_INTERRUPTS();
388 :
389 : /*
390 : * If auto-summarization is enabled and we just inserted the first
391 : * tuple into the first block of a new non-first page range, request a
392 : * summarization run of the previous range.
393 : */
394 126216 : if (autosummarize &&
395 290 : heapBlk > 0 &&
396 290 : heapBlk == origHeapBlk &&
397 290 : ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
398 : {
399 16 : BlockNumber lastPageRange = heapBlk - 1;
400 : BrinTuple *lastPageTuple;
401 :
402 : lastPageTuple =
403 16 : brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
404 : NULL, BUFFER_LOCK_SHARE);
405 16 : if (!lastPageTuple)
406 : {
407 : bool recorded;
408 :
409 12 : recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
410 : RelationGetRelid(idxRel),
411 : lastPageRange);
412 12 : if (!recorded)
413 0 : ereport(LOG,
414 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
415 : errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
416 : RelationGetRelationName(idxRel),
417 : lastPageRange)));
418 : }
419 : else
420 4 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
421 : }
422 :
423 126216 : brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
424 : NULL, BUFFER_LOCK_SHARE);
425 :
426 : /* if range is unsummarized, there's nothing to do */
427 126216 : if (!brtup)
428 78254 : break;
429 :
430 : /* First time through in this brininsert call? */
431 47962 : if (tupcxt == NULL)
432 : {
433 47962 : tupcxt = AllocSetContextCreate(CurrentMemoryContext,
434 : "brininsert cxt",
435 : ALLOCSET_DEFAULT_SIZES);
436 47962 : MemoryContextSwitchTo(tupcxt);
437 : }
438 :
439 47962 : dtup = brin_deform_tuple(bdesc, brtup, NULL);
440 :
441 47962 : need_insert = add_values_to_range(idxRel, bdesc, dtup, values, nulls);
442 :
443 47962 : if (!need_insert)
444 : {
445 : /*
446 : * The tuple is consistent with the new values, so there's nothing
447 : * to do.
448 : */
449 24034 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
450 : }
451 : else
452 : {
453 23928 : Page page = BufferGetPage(buf);
454 23928 : ItemId lp = PageGetItemId(page, off);
455 : Size origsz;
456 : BrinTuple *origtup;
457 : Size newsz;
458 : BrinTuple *newtup;
459 : bool samepage;
460 :
461 : /*
462 : * Make a copy of the old tuple, so that we can compare it after
463 : * re-acquiring the lock.
464 : */
465 23928 : origsz = ItemIdGetLength(lp);
466 23928 : origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
467 :
468 : /*
469 : * Before releasing the lock, check if we can attempt a same-page
470 : * update. Another process could insert a tuple concurrently in
471 : * the same page though, so downstream we must be prepared to cope
472 : * if this turns out to not be possible after all.
473 : */
474 23928 : newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
475 23928 : samepage = brin_can_do_samepage_update(buf, origsz, newsz);
476 23928 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
477 :
478 : /*
479 : * Try to update the tuple. If this doesn't work for whatever
480 : * reason, we need to restart from the top; the revmap might be
481 : * pointing at a different tuple for this block now, so we need to
482 : * recompute to ensure both our new heap tuple and the other
483 : * inserter's are covered by the combined tuple. It might be that
484 : * we don't need to update at all.
485 : */
486 23928 : if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
487 : buf, off, origtup, origsz, newtup, newsz,
488 : samepage))
489 : {
490 : /* no luck; start over */
491 0 : MemoryContextReset(tupcxt);
492 0 : continue;
493 : }
494 : }
495 :
496 : /* success! */
497 47962 : break;
498 : }
499 :
500 126216 : if (BufferIsValid(buf))
501 47966 : ReleaseBuffer(buf);
502 126216 : MemoryContextSwitchTo(oldcxt);
503 126216 : if (tupcxt != NULL)
504 47962 : MemoryContextDelete(tupcxt);
505 :
506 126216 : return false;
507 : }
508 :
509 : /*
510 : * Callback to clean up the BrinInsertState once all tuple inserts are done.
511 : */
512 : void
513 1156 : brininsertcleanup(Relation index, IndexInfo *indexInfo)
514 : {
515 1156 : BrinInsertState *bistate = (BrinInsertState *) indexInfo->ii_AmCache;
516 :
517 : /* bail out if cache not initialized */
518 1156 : if (bistate == NULL)
519 34 : return;
520 :
521 : /* do this first to avoid dangling pointer if we fail partway through */
522 1122 : indexInfo->ii_AmCache = NULL;
523 :
524 : /*
525 : * Clean up the revmap. Note that the brinDesc has already been cleaned up
526 : * as part of its own memory context.
527 : */
528 1122 : brinRevmapTerminate(bistate->bis_rmAccess);
529 1122 : pfree(bistate);
530 : }
531 :
532 : /*
533 : * Initialize state for a BRIN index scan.
534 : *
535 : * We read the metapage here to determine the pages-per-range number that this
536 : * index was built with. Note that since this cannot be changed while we're
537 : * holding lock on index, it's not necessary to recompute it during brinrescan.
538 : */
539 : IndexScanDesc
540 2946 : brinbeginscan(Relation r, int nkeys, int norderbys)
541 : {
542 : IndexScanDesc scan;
543 : BrinOpaque *opaque;
544 :
545 2946 : scan = RelationGetIndexScan(r, nkeys, norderbys);
546 :
547 2946 : opaque = palloc_object(BrinOpaque);
548 2946 : opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange);
549 2946 : opaque->bo_bdesc = brin_build_desc(r);
550 2946 : scan->opaque = opaque;
551 :
552 2946 : return scan;
553 : }
554 :
555 : /*
556 : * Execute the index scan.
557 : *
558 : * This works by reading index TIDs from the revmap, and obtaining the index
559 : * tuples pointed to by them; the summary values in the index tuples are
560 : * compared to the scan keys. We return into the TID bitmap all the pages in
561 : * ranges corresponding to index tuples that match the scan keys.
562 : *
563 : * If a TID from the revmap is read as InvalidTID, we know that range is
564 : * unsummarized. Pages in those ranges need to be returned regardless of scan
565 : * keys.
566 : */
567 : int64
568 2946 : bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
569 : {
570 2946 : Relation idxRel = scan->indexRelation;
571 2946 : Buffer buf = InvalidBuffer;
572 : BrinDesc *bdesc;
573 : Oid heapOid;
574 : Relation heapRel;
575 : BrinOpaque *opaque;
576 : BlockNumber nblocks;
577 2946 : int64 totalpages = 0;
578 : FmgrInfo *consistentFn;
579 : MemoryContext oldcxt;
580 : MemoryContext perRangeCxt;
581 : BrinMemTuple *dtup;
582 2946 : BrinTuple *btup = NULL;
583 2946 : Size btupsz = 0;
584 : ScanKey **keys,
585 : **nullkeys;
586 : int *nkeys,
587 : *nnullkeys;
588 : char *ptr;
589 : Size len;
590 : char *tmp PG_USED_FOR_ASSERTS_ONLY;
591 :
592 2946 : opaque = (BrinOpaque *) scan->opaque;
593 2946 : bdesc = opaque->bo_bdesc;
594 2946 : pgstat_count_index_scan(idxRel);
595 2946 : if (scan->instrument)
596 2946 : scan->instrument->nsearches++;
597 :
598 : /*
599 : * We need to know the size of the table so that we know how long to
600 : * iterate on the revmap.
601 : */
602 2946 : heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
603 2946 : heapRel = table_open(heapOid, AccessShareLock);
604 2946 : nblocks = RelationGetNumberOfBlocks(heapRel);
605 2946 : table_close(heapRel, AccessShareLock);
606 :
607 : /*
608 : * Make room for the consistent support procedures of indexed columns. We
609 : * don't look them up here; we do that lazily the first time we see a scan
610 : * key reference each of them. We rely on zeroing fn_oid to InvalidOid.
611 : */
612 2946 : consistentFn = palloc0_array(FmgrInfo, bdesc->bd_tupdesc->natts);
613 :
614 : /*
615 : * Make room for per-attribute lists of scan keys that we'll pass to the
616 : * consistent support procedure. We don't know which attributes have scan
617 : * keys, so we allocate space for all attributes. That may use more memory
618 : * but it's probably cheaper than determining which attributes are used.
619 : *
620 : * We keep null and regular keys separate, so that we can pass just the
621 : * regular keys to the consistent function easily.
622 : *
623 : * To reduce the allocation overhead, we allocate one big chunk and then
624 : * carve it into smaller arrays ourselves. All the pieces have exactly the
625 : * same lifetime, so that's OK.
626 : *
627 : * XXX The widest index can have 32 attributes, so the amount of wasted
628 : * memory is negligible. We could invent a more compact approach (with
629 : * just space for used attributes) but that would make the matching more
630 : * complex so it's not a good trade-off.
631 : */
632 2946 : len =
633 2946 : MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) + /* regular keys */
634 2946 : MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
635 2946 : MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts) +
636 2946 : MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) + /* NULL keys */
637 2946 : MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
638 2946 : MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
639 :
640 2946 : ptr = palloc(len);
641 2946 : tmp = ptr;
642 :
643 2946 : keys = (ScanKey **) ptr;
644 2946 : ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
645 :
646 2946 : nullkeys = (ScanKey **) ptr;
647 2946 : ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
648 :
649 2946 : nkeys = (int *) ptr;
650 2946 : ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
651 :
652 2946 : nnullkeys = (int *) ptr;
653 2946 : ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
654 :
655 69978 : for (int i = 0; i < bdesc->bd_tupdesc->natts; i++)
656 : {
657 67032 : keys[i] = (ScanKey *) ptr;
658 67032 : ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
659 :
660 67032 : nullkeys[i] = (ScanKey *) ptr;
661 67032 : ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
662 : }
663 :
664 : Assert(tmp + len == ptr);
665 :
666 : /* zero the number of keys */
667 2946 : memset(nkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
668 2946 : memset(nnullkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
669 :
670 : /* Preprocess the scan keys - split them into per-attribute arrays. */
671 5892 : for (int keyno = 0; keyno < scan->numberOfKeys; keyno++)
672 : {
673 2946 : ScanKey key = &scan->keyData[keyno];
674 2946 : AttrNumber keyattno = key->sk_attno;
675 :
676 : /*
677 : * The collation of the scan key must match the collation used in the
678 : * index column (but only if the search is not IS NULL/ IS NOT NULL).
679 : * Otherwise we shouldn't be using this index ...
680 : */
681 : Assert((key->sk_flags & SK_ISNULL) ||
682 : (key->sk_collation ==
683 : TupleDescAttr(bdesc->bd_tupdesc,
684 : keyattno - 1)->attcollation));
685 :
686 : /*
687 : * First time we see this index attribute, so init as needed.
688 : *
689 : * This is a bit of an overkill - we don't know how many scan keys are
690 : * there for this attribute, so we simply allocate the largest number
691 : * possible (as if all keys were for this attribute). This may waste a
692 : * bit of memory, but we only expect small number of scan keys in
693 : * general, so this should be negligible, and repeated repalloc calls
694 : * are not free either.
695 : */
696 2946 : if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
697 : {
698 : FmgrInfo *tmp;
699 :
700 : /* First time we see this attribute, so no key/null keys. */
701 : Assert(nkeys[keyattno - 1] == 0);
702 : Assert(nnullkeys[keyattno - 1] == 0);
703 :
704 2946 : tmp = index_getprocinfo(idxRel, keyattno,
705 : BRIN_PROCNUM_CONSISTENT);
706 2946 : fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
707 : CurrentMemoryContext);
708 : }
709 :
710 : /* Add key to the proper per-attribute array. */
711 2946 : if (key->sk_flags & SK_ISNULL)
712 : {
713 36 : nullkeys[keyattno - 1][nnullkeys[keyattno - 1]] = key;
714 36 : nnullkeys[keyattno - 1]++;
715 : }
716 : else
717 : {
718 2910 : keys[keyattno - 1][nkeys[keyattno - 1]] = key;
719 2910 : nkeys[keyattno - 1]++;
720 : }
721 : }
722 :
723 : /* allocate an initial in-memory tuple, out of the per-range memcxt */
724 2946 : dtup = brin_new_memtuple(bdesc);
725 :
726 : /*
727 : * Setup and use a per-range memory context, which is reset every time we
728 : * loop below. This avoids having to free the tuples within the loop.
729 : */
730 2946 : perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
731 : "bringetbitmap cxt",
732 : ALLOCSET_DEFAULT_SIZES);
733 2946 : oldcxt = MemoryContextSwitchTo(perRangeCxt);
734 :
735 : /*
736 : * Now scan the revmap. We start by querying for heap page 0,
737 : * incrementing by the number of pages per range; this gives us a full
738 : * view of the table. We make use of uint64 for heapBlk as a BlockNumber
739 : * could wrap for tables with close to 2^32 pages.
740 : */
741 194598 : for (uint64 heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
742 : {
743 : bool addrange;
744 191652 : bool gottuple = false;
745 : BrinTuple *tup;
746 : OffsetNumber off;
747 : Size size;
748 :
749 191652 : CHECK_FOR_INTERRUPTS();
750 :
751 191652 : MemoryContextReset(perRangeCxt);
752 :
753 191652 : tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, (BlockNumber) heapBlk, &buf,
754 : &off, &size, BUFFER_LOCK_SHARE);
755 191652 : if (tup)
756 : {
757 189936 : gottuple = true;
758 189936 : btup = brin_copy_tuple(tup, size, btup, &btupsz);
759 189936 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
760 : }
761 :
762 : /*
763 : * For page ranges with no indexed tuple, we must return the whole
764 : * range; otherwise, compare it to the scan keys.
765 : */
766 191652 : if (!gottuple)
767 : {
768 1716 : addrange = true;
769 : }
770 : else
771 : {
772 189936 : dtup = brin_deform_tuple(bdesc, btup, dtup);
773 189936 : if (dtup->bt_placeholder)
774 : {
775 : /*
776 : * Placeholder tuples are always returned, regardless of the
777 : * values stored in them.
778 : */
779 0 : addrange = true;
780 : }
781 : else
782 : {
783 : int attno;
784 :
785 : /*
786 : * Compare scan keys with summary values stored for the range.
787 : * If scan keys are matched, the page range must be added to
788 : * the bitmap. We initially assume the range needs to be
789 : * added; in particular this serves the case where there are
790 : * no keys.
791 : */
792 189936 : addrange = true;
793 4704066 : for (attno = 1; attno <= bdesc->bd_tupdesc->natts; attno++)
794 : {
795 : BrinValues *bval;
796 : Datum add;
797 : Oid collation;
798 :
799 : /*
800 : * skip attributes without any scan keys (both regular and
801 : * IS [NOT] NULL)
802 : */
803 4567734 : if (nkeys[attno - 1] == 0 && nnullkeys[attno - 1] == 0)
804 4377798 : continue;
805 :
806 189936 : bval = &dtup->bt_columns[attno - 1];
807 :
808 : /*
809 : * If the BRIN tuple indicates that this range is empty,
810 : * we can skip it: there's nothing to match. We don't
811 : * need to examine the next columns.
812 : */
813 189936 : if (dtup->bt_empty_range)
814 : {
815 0 : addrange = false;
816 0 : break;
817 : }
818 :
819 : /*
820 : * First check if there are any IS [NOT] NULL scan keys,
821 : * and if we're violating them. In that case we can
822 : * terminate early, without invoking the support function.
823 : *
824 : * As there may be more keys, we can only determine
825 : * mismatch within this loop.
826 : */
827 189936 : if (bdesc->bd_info[attno - 1]->oi_regular_nulls &&
828 189936 : !check_null_keys(bval, nullkeys[attno - 1],
829 189936 : nnullkeys[attno - 1]))
830 : {
831 : /*
832 : * If any of the IS [NOT] NULL keys failed, the page
833 : * range as a whole can't pass. So terminate the loop.
834 : */
835 996 : addrange = false;
836 996 : break;
837 : }
838 :
839 : /*
840 : * So either there are no IS [NOT] NULL keys, or all
841 : * passed. If there are no regular scan keys, we're done -
842 : * the page range matches. If there are regular keys, but
843 : * the page range is marked as 'all nulls' it can't
844 : * possibly pass (we're assuming the operators are
845 : * strict).
846 : */
847 :
848 : /* No regular scan keys - page range as a whole passes. */
849 188940 : if (!nkeys[attno - 1])
850 1236 : continue;
851 :
852 : Assert((nkeys[attno - 1] > 0) &&
853 : (nkeys[attno - 1] <= scan->numberOfKeys));
854 :
855 : /* If it is all nulls, it cannot possibly be consistent. */
856 187704 : if (bval->bv_allnulls)
857 : {
858 378 : addrange = false;
859 378 : break;
860 : }
861 :
862 : /*
863 : * Collation from the first key (has to be the same for
864 : * all keys for the same attribute).
865 : */
866 187326 : collation = keys[attno - 1][0]->sk_collation;
867 :
868 : /*
869 : * Check whether the scan key is consistent with the page
870 : * range values; if so, have the pages in the range added
871 : * to the output bitmap.
872 : *
873 : * The opclass may or may not support processing of
874 : * multiple scan keys. We can determine that based on the
875 : * number of arguments - functions with extra parameter
876 : * (number of scan keys) do support this, otherwise we
877 : * have to simply pass the scan keys one by one.
878 : */
879 187326 : if (consistentFn[attno - 1].fn_nargs >= 4)
880 : {
881 : /* Check all keys at once */
882 39594 : add = FunctionCall4Coll(&consistentFn[attno - 1],
883 : collation,
884 : PointerGetDatum(bdesc),
885 : PointerGetDatum(bval),
886 39594 : PointerGetDatum(keys[attno - 1]),
887 39594 : Int32GetDatum(nkeys[attno - 1]));
888 39594 : addrange = DatumGetBool(add);
889 : }
890 : else
891 : {
892 : /*
893 : * Check keys one by one
894 : *
895 : * When there are multiple scan keys, failure to meet
896 : * the criteria for a single one of them is enough to
897 : * discard the range as a whole, so break out of the
898 : * loop as soon as a false return value is obtained.
899 : */
900 : int keyno;
901 :
902 258078 : for (keyno = 0; keyno < nkeys[attno - 1]; keyno++)
903 : {
904 147732 : add = FunctionCall3Coll(&consistentFn[attno - 1],
905 147732 : keys[attno - 1][keyno]->sk_collation,
906 : PointerGetDatum(bdesc),
907 : PointerGetDatum(bval),
908 147732 : PointerGetDatum(keys[attno - 1][keyno]));
909 147732 : addrange = DatumGetBool(add);
910 147732 : if (!addrange)
911 37386 : break;
912 : }
913 : }
914 :
915 : /*
916 : * If we found a scan key eliminating the range, no need
917 : * to check additional ones.
918 : */
919 187326 : if (!addrange)
920 52230 : break;
921 : }
922 : }
923 : }
924 :
925 : /* add the pages in the range to the output bitmap, if needed */
926 191652 : if (addrange)
927 : {
928 : uint64 pageno;
929 :
930 138048 : for (pageno = heapBlk;
931 286008 : pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
932 147960 : pageno++)
933 : {
934 147960 : MemoryContextSwitchTo(oldcxt);
935 147960 : tbm_add_page(tbm, pageno);
936 147960 : totalpages++;
937 147960 : MemoryContextSwitchTo(perRangeCxt);
938 : }
939 : }
940 : }
941 :
942 2946 : MemoryContextSwitchTo(oldcxt);
943 2946 : MemoryContextDelete(perRangeCxt);
944 :
945 2946 : if (buf != InvalidBuffer)
946 2946 : ReleaseBuffer(buf);
947 :
948 : /*
949 : * XXX We have an approximation of the number of *pages* that our scan
950 : * returns, but we don't have a precise idea of the number of heap tuples
951 : * involved.
952 : */
953 2946 : return totalpages * 10;
954 : }
955 :
956 : /*
957 : * Re-initialize state for a BRIN index scan
958 : */
959 : void
960 2946 : brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
961 : ScanKey orderbys, int norderbys)
962 : {
963 : /*
964 : * Other index AMs preprocess the scan keys at this point, or sometime
965 : * early during the scan; this lets them optimize by removing redundant
966 : * keys, or doing early returns when they are impossible to satisfy; see
967 : * _bt_preprocess_keys for an example. Something like that could be added
968 : * here someday, too.
969 : */
970 :
971 2946 : if (scankey && scan->numberOfKeys > 0)
972 2946 : memcpy(scan->keyData, scankey, scan->numberOfKeys * sizeof(ScanKeyData));
973 2946 : }
974 :
975 : /*
976 : * Close down a BRIN index scan
977 : */
978 : void
979 2946 : brinendscan(IndexScanDesc scan)
980 : {
981 2946 : BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
982 :
983 2946 : brinRevmapTerminate(opaque->bo_rmAccess);
984 2946 : brin_free_desc(opaque->bo_bdesc);
985 2946 : pfree(opaque);
986 2946 : }
987 :
988 : /*
989 : * Per-heap-tuple callback for table_index_build_scan.
990 : *
991 : * Note we don't worry about the page range at the end of the table here; it is
992 : * present in the build state struct after we're called the last time, but not
993 : * inserted into the index. Caller must ensure to do so, if appropriate.
994 : */
995 : static void
996 728466 : brinbuildCallback(Relation index,
997 : ItemPointer tid,
998 : Datum *values,
999 : bool *isnull,
1000 : bool tupleIsAlive,
1001 : void *brstate)
1002 : {
1003 728466 : BrinBuildState *state = (BrinBuildState *) brstate;
1004 : BlockNumber thisblock;
1005 :
1006 728466 : thisblock = ItemPointerGetBlockNumber(tid);
1007 :
1008 : /*
1009 : * If we're in a block that belongs to a future range, summarize what
1010 : * we've got and start afresh. Note the scan might have skipped many
1011 : * pages, if they were devoid of live tuples; make sure to insert index
1012 : * tuples for those too.
1013 : */
1014 730762 : while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
1015 : {
1016 :
1017 : BRIN_elog((DEBUG2,
1018 : "brinbuildCallback: completed a range: %u--%u",
1019 : state->bs_currRangeStart,
1020 : state->bs_currRangeStart + state->bs_pagesPerRange));
1021 :
1022 : /* create the index tuple and insert it */
1023 2296 : form_and_insert_tuple(state);
1024 :
1025 : /* set state to correspond to the next range */
1026 2296 : state->bs_currRangeStart += state->bs_pagesPerRange;
1027 :
1028 : /* re-initialize state for it */
1029 2296 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1030 : }
1031 :
1032 : /* Accumulate the current tuple into the running state */
1033 728466 : (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
1034 : values, isnull);
1035 728466 : }
1036 :
1037 : /*
1038 : * Per-heap-tuple callback for table_index_build_scan with parallelism.
1039 : *
1040 : * A version of the callback used by parallel index builds. The main difference
1041 : * is that instead of writing the BRIN tuples into the index, we write them
1042 : * into a shared tuplesort, and leave the insertion up to the leader (which may
1043 : * reorder them a bit etc.). The callback also does not generate empty ranges,
1044 : * those will be added by the leader when merging results from workers.
1045 : */
1046 : static void
1047 7962 : brinbuildCallbackParallel(Relation index,
1048 : ItemPointer tid,
1049 : Datum *values,
1050 : bool *isnull,
1051 : bool tupleIsAlive,
1052 : void *brstate)
1053 : {
1054 7962 : BrinBuildState *state = (BrinBuildState *) brstate;
1055 : BlockNumber thisblock;
1056 :
1057 7962 : thisblock = ItemPointerGetBlockNumber(tid);
1058 :
1059 : /*
1060 : * If we're in a block that belongs to a different range, summarize what
1061 : * we've got and start afresh. Note the scan might have skipped many
1062 : * pages, if they were devoid of live tuples; we do not create empty BRIN
1063 : * ranges here - the leader is responsible for filling them in.
1064 : *
1065 : * Unlike serial builds, parallel index builds allow synchronized seqscans
1066 : * (because that's what parallel scans do). This means the block may wrap
1067 : * around to the beginning of the relation, so the condition needs to
1068 : * check for both future and past ranges.
1069 : */
1070 7962 : if ((thisblock < state->bs_currRangeStart) ||
1071 7962 : (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1))
1072 : {
1073 :
1074 : BRIN_elog((DEBUG2,
1075 : "brinbuildCallbackParallel: completed a range: %u--%u",
1076 : state->bs_currRangeStart,
1077 : state->bs_currRangeStart + state->bs_pagesPerRange));
1078 :
1079 : /* create the index tuple and write it into the tuplesort */
1080 42 : form_and_spill_tuple(state);
1081 :
1082 : /*
1083 : * Set state to correspond to the next range (for this block).
1084 : *
1085 : * This skips ranges that are either empty (and so we don't get any
1086 : * tuples to summarize), or processed by other workers. We can't
1087 : * differentiate those cases here easily, so we leave it up to the
1088 : * leader to fill empty ranges where needed.
1089 : */
1090 : state->bs_currRangeStart
1091 42 : = state->bs_pagesPerRange * (thisblock / state->bs_pagesPerRange);
1092 :
1093 : /* re-initialize state for it */
1094 42 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1095 : }
1096 :
1097 : /* Accumulate the current tuple into the running state */
1098 7962 : (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
1099 : values, isnull);
1100 7962 : }
1101 :
1102 : /*
1103 : * brinbuild() -- build a new BRIN index.
1104 : */
1105 : IndexBuildResult *
1106 368 : brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
1107 : {
1108 : IndexBuildResult *result;
1109 : double reltuples;
1110 : double idxtuples;
1111 : BrinRevmap *revmap;
1112 : BrinBuildState *state;
1113 : Buffer meta;
1114 : BlockNumber pagesPerRange;
1115 :
1116 : /*
1117 : * We expect to be called exactly once for any index relation.
1118 : */
1119 368 : if (RelationGetNumberOfBlocks(index) != 0)
1120 0 : elog(ERROR, "index \"%s\" already contains data",
1121 : RelationGetRelationName(index));
1122 :
1123 : /*
1124 : * Critical section not required, because on error the creation of the
1125 : * whole relation will be rolled back.
1126 : */
1127 :
1128 368 : meta = ExtendBufferedRel(BMR_REL(index), MAIN_FORKNUM, NULL,
1129 : EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
1130 : Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
1131 :
1132 368 : brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
1133 : BRIN_CURRENT_VERSION);
1134 368 : MarkBufferDirty(meta);
1135 :
1136 368 : if (RelationNeedsWAL(index))
1137 : {
1138 : xl_brin_createidx xlrec;
1139 : XLogRecPtr recptr;
1140 : Page page;
1141 :
1142 254 : xlrec.version = BRIN_CURRENT_VERSION;
1143 254 : xlrec.pagesPerRange = BrinGetPagesPerRange(index);
1144 :
1145 254 : XLogBeginInsert();
1146 254 : XLogRegisterData(&xlrec, SizeOfBrinCreateIdx);
1147 254 : XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
1148 :
1149 254 : recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
1150 :
1151 254 : page = BufferGetPage(meta);
1152 254 : PageSetLSN(page, recptr);
1153 : }
1154 :
1155 368 : UnlockReleaseBuffer(meta);
1156 :
1157 : /*
1158 : * Initialize our state, including the deformed tuple state.
1159 : */
1160 368 : revmap = brinRevmapInitialize(index, &pagesPerRange);
1161 368 : state = initialize_brin_buildstate(index, revmap, pagesPerRange,
1162 : RelationGetNumberOfBlocks(heap));
1163 :
1164 : /*
1165 : * Attempt to launch parallel worker scan when required
1166 : *
1167 : * XXX plan_create_index_workers makes the number of workers dependent on
1168 : * maintenance_work_mem, requiring 32MB for each worker. That makes sense
1169 : * for btree, but not for BRIN, which can do with much less memory. So
1170 : * maybe make that somehow less strict, optionally?
1171 : */
1172 368 : if (indexInfo->ii_ParallelWorkers > 0)
1173 10 : _brin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent,
1174 : indexInfo->ii_ParallelWorkers);
1175 :
1176 : /*
1177 : * If parallel build requested and at least one worker process was
1178 : * successfully launched, set up coordination state, wait for workers to
1179 : * complete. Then read all tuples from the shared tuplesort and insert
1180 : * them into the index.
1181 : *
1182 : * In serial mode, simply scan the table and build the index one index
1183 : * tuple at a time.
1184 : */
1185 368 : if (state->bs_leader)
1186 : {
1187 : SortCoordinate coordinate;
1188 :
1189 8 : coordinate = palloc0_object(SortCoordinateData);
1190 8 : coordinate->isWorker = false;
1191 8 : coordinate->nParticipants =
1192 8 : state->bs_leader->nparticipanttuplesorts;
1193 8 : coordinate->sharedsort = state->bs_leader->sharedsort;
1194 :
1195 : /*
1196 : * Begin leader tuplesort.
1197 : *
1198 : * In cases where parallelism is involved, the leader receives the
1199 : * same share of maintenance_work_mem as a serial sort (it is
1200 : * generally treated in the same way as a serial sort once we return).
1201 : * Parallel worker Tuplesortstates will have received only a fraction
1202 : * of maintenance_work_mem, though.
1203 : *
1204 : * We rely on the lifetime of the Leader Tuplesortstate almost not
1205 : * overlapping with any worker Tuplesortstate's lifetime. There may
1206 : * be some small overlap, but that's okay because we rely on leader
1207 : * Tuplesortstate only allocating a small, fixed amount of memory
1208 : * here. When its tuplesort_performsort() is called (by our caller),
1209 : * and significant amounts of memory are likely to be used, all
1210 : * workers must have already freed almost all memory held by their
1211 : * Tuplesortstates (they are about to go away completely, too). The
1212 : * overall effect is that maintenance_work_mem always represents an
1213 : * absolute high watermark on the amount of memory used by a CREATE
1214 : * INDEX operation, regardless of the use of parallelism or any other
1215 : * factor.
1216 : */
1217 8 : state->bs_sortstate =
1218 8 : tuplesort_begin_index_brin(maintenance_work_mem, coordinate,
1219 : TUPLESORT_NONE);
1220 :
1221 : /* scan the relation and merge per-worker results */
1222 8 : reltuples = _brin_parallel_merge(state);
1223 :
1224 8 : _brin_end_parallel(state->bs_leader, state);
1225 : }
1226 : else /* no parallel index build */
1227 : {
1228 : /*
1229 : * Now scan the relation. No syncscan allowed here because we want
1230 : * the heap blocks in physical order (we want to produce the ranges
1231 : * starting from block 0, and the callback also relies on this to not
1232 : * generate summary for the same range twice).
1233 : */
1234 360 : reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
1235 : brinbuildCallback, state, NULL);
1236 :
1237 : /*
1238 : * process the final batch
1239 : *
1240 : * XXX Note this does not update state->bs_currRangeStart, i.e. it
1241 : * stays set to the last range added to the index. This is OK, because
1242 : * that's what brin_fill_empty_ranges expects.
1243 : */
1244 360 : form_and_insert_tuple(state);
1245 :
1246 : /*
1247 : * Backfill the final ranges with empty data.
1248 : *
1249 : * This saves us from doing what amounts to full table scans when the
1250 : * index with a predicate like WHERE (nonnull_column IS NULL), or
1251 : * other very selective predicates.
1252 : */
1253 360 : brin_fill_empty_ranges(state,
1254 : state->bs_currRangeStart,
1255 : state->bs_maxRangeStart);
1256 : }
1257 :
1258 : /* release resources */
1259 368 : idxtuples = state->bs_numtuples;
1260 368 : brinRevmapTerminate(state->bs_rmAccess);
1261 368 : terminate_brin_buildstate(state);
1262 :
1263 : /*
1264 : * Return statistics
1265 : */
1266 368 : result = palloc_object(IndexBuildResult);
1267 :
1268 368 : result->heap_tuples = reltuples;
1269 368 : result->index_tuples = idxtuples;
1270 :
1271 368 : return result;
1272 : }
1273 :
1274 : void
1275 6 : brinbuildempty(Relation index)
1276 : {
1277 : Buffer metabuf;
1278 :
1279 : /* An empty BRIN index has a metapage only. */
1280 6 : metabuf = ExtendBufferedRel(BMR_REL(index), INIT_FORKNUM, NULL,
1281 : EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
1282 :
1283 : /* Initialize and xlog metabuffer. */
1284 6 : START_CRIT_SECTION();
1285 6 : brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
1286 : BRIN_CURRENT_VERSION);
1287 6 : MarkBufferDirty(metabuf);
1288 6 : log_newpage_buffer(metabuf, true);
1289 6 : END_CRIT_SECTION();
1290 :
1291 6 : UnlockReleaseBuffer(metabuf);
1292 6 : }
1293 :
1294 : /*
1295 : * brinbulkdelete
1296 : * Since there are no per-heap-tuple index tuples in BRIN indexes,
1297 : * there's not a lot we can do here.
1298 : *
1299 : * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
1300 : * tuple is deleted), meaning the need to re-run summarization on the affected
1301 : * range. Would need to add an extra flag in brintuples for that.
1302 : */
1303 : IndexBulkDeleteResult *
1304 20 : brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
1305 : IndexBulkDeleteCallback callback, void *callback_state)
1306 : {
1307 : /* allocate stats if first time through, else re-use existing struct */
1308 20 : if (stats == NULL)
1309 20 : stats = palloc0_object(IndexBulkDeleteResult);
1310 :
1311 20 : return stats;
1312 : }
1313 :
1314 : /*
1315 : * This routine is in charge of "vacuuming" a BRIN index: we just summarize
1316 : * ranges that are currently unsummarized.
1317 : */
1318 : IndexBulkDeleteResult *
1319 112 : brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
1320 : {
1321 : Relation heapRel;
1322 :
1323 : /* No-op in ANALYZE ONLY mode */
1324 112 : if (info->analyze_only)
1325 6 : return stats;
1326 :
1327 106 : if (!stats)
1328 92 : stats = palloc0_object(IndexBulkDeleteResult);
1329 106 : stats->num_pages = RelationGetNumberOfBlocks(info->index);
1330 : /* rest of stats is initialized by zeroing */
1331 :
1332 106 : heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
1333 : AccessShareLock);
1334 :
1335 106 : brin_vacuum_scan(info->index, info->strategy);
1336 :
1337 106 : brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
1338 : &stats->num_index_tuples, &stats->num_index_tuples);
1339 :
1340 106 : table_close(heapRel, AccessShareLock);
1341 :
1342 106 : return stats;
1343 : }
1344 :
1345 : /*
1346 : * reloptions processor for BRIN indexes
1347 : */
1348 : bytea *
1349 1204 : brinoptions(Datum reloptions, bool validate)
1350 : {
1351 : static const relopt_parse_elt tab[] = {
1352 : {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
1353 : {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
1354 : };
1355 :
1356 1204 : return (bytea *) build_reloptions(reloptions, validate,
1357 : RELOPT_KIND_BRIN,
1358 : sizeof(BrinOptions),
1359 : tab, lengthof(tab));
1360 : }
1361 :
1362 : /*
1363 : * SQL-callable function to scan through an index and summarize all ranges
1364 : * that are not currently summarized.
1365 : */
1366 : Datum
1367 76 : brin_summarize_new_values(PG_FUNCTION_ARGS)
1368 : {
1369 76 : Datum relation = PG_GETARG_DATUM(0);
1370 :
1371 76 : return DirectFunctionCall2(brin_summarize_range,
1372 : relation,
1373 : Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
1374 : }
1375 :
1376 : /*
1377 : * SQL-callable function to summarize the indicated page range, if not already
1378 : * summarized. If the second argument is BRIN_ALL_BLOCKRANGES, all
1379 : * unsummarized ranges are summarized.
1380 : */
1381 : Datum
1382 210 : brin_summarize_range(PG_FUNCTION_ARGS)
1383 : {
1384 210 : Oid indexoid = PG_GETARG_OID(0);
1385 210 : int64 heapBlk64 = PG_GETARG_INT64(1);
1386 : BlockNumber heapBlk;
1387 : Oid heapoid;
1388 : Relation indexRel;
1389 : Relation heapRel;
1390 : Oid save_userid;
1391 : int save_sec_context;
1392 : int save_nestlevel;
1393 210 : double numSummarized = 0;
1394 :
1395 210 : if (RecoveryInProgress())
1396 0 : ereport(ERROR,
1397 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1398 : errmsg("recovery is in progress"),
1399 : errhint("BRIN control functions cannot be executed during recovery.")));
1400 :
1401 210 : if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
1402 36 : ereport(ERROR,
1403 : (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1404 : errmsg("block number out of range: %" PRId64, heapBlk64)));
1405 174 : heapBlk = (BlockNumber) heapBlk64;
1406 :
1407 : /*
1408 : * We must lock table before index to avoid deadlocks. However, if the
1409 : * passed indexoid isn't an index then IndexGetRelation() will fail.
1410 : * Rather than emitting a not-very-helpful error message, postpone
1411 : * complaining, expecting that the is-it-an-index test below will fail.
1412 : */
1413 174 : heapoid = IndexGetRelation(indexoid, true);
1414 174 : if (OidIsValid(heapoid))
1415 : {
1416 156 : heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
1417 :
1418 : /*
1419 : * Autovacuum calls us. For its benefit, switch to the table owner's
1420 : * userid, so that any index functions are run as that user. Also
1421 : * lock down security-restricted operations and arrange to make GUC
1422 : * variable changes local to this command. This is harmless, albeit
1423 : * unnecessary, when called from SQL, because we fail shortly if the
1424 : * user does not own the index.
1425 : */
1426 156 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
1427 156 : SetUserIdAndSecContext(heapRel->rd_rel->relowner,
1428 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
1429 156 : save_nestlevel = NewGUCNestLevel();
1430 156 : RestrictSearchPath();
1431 : }
1432 : else
1433 : {
1434 18 : heapRel = NULL;
1435 : /* Set these just to suppress "uninitialized variable" warnings */
1436 18 : save_userid = InvalidOid;
1437 18 : save_sec_context = -1;
1438 18 : save_nestlevel = -1;
1439 : }
1440 :
1441 174 : indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
1442 :
1443 : /* Must be a BRIN index */
1444 156 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1445 156 : indexRel->rd_rel->relam != BRIN_AM_OID)
1446 18 : ereport(ERROR,
1447 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1448 : errmsg("\"%s\" is not a BRIN index",
1449 : RelationGetRelationName(indexRel))));
1450 :
1451 : /* User must own the index (comparable to privileges needed for VACUUM) */
1452 138 : if (heapRel != NULL && !object_ownercheck(RelationRelationId, indexoid, save_userid))
1453 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1454 0 : RelationGetRelationName(indexRel));
1455 :
1456 : /*
1457 : * Since we did the IndexGetRelation call above without any lock, it's
1458 : * barely possible that a race against an index drop/recreation could have
1459 : * netted us the wrong table. Recheck.
1460 : */
1461 138 : if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
1462 0 : ereport(ERROR,
1463 : (errcode(ERRCODE_UNDEFINED_TABLE),
1464 : errmsg("could not open parent table of index \"%s\"",
1465 : RelationGetRelationName(indexRel))));
1466 :
1467 : /* see gin_clean_pending_list() */
1468 138 : if (indexRel->rd_index->indisvalid)
1469 138 : brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
1470 : else
1471 0 : ereport(DEBUG1,
1472 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1473 : errmsg("index \"%s\" is not valid",
1474 : RelationGetRelationName(indexRel))));
1475 :
1476 : /* Roll back any GUC changes executed by index functions */
1477 138 : AtEOXact_GUC(false, save_nestlevel);
1478 :
1479 : /* Restore userid and security context */
1480 138 : SetUserIdAndSecContext(save_userid, save_sec_context);
1481 :
1482 138 : index_close(indexRel, ShareUpdateExclusiveLock);
1483 138 : table_close(heapRel, ShareUpdateExclusiveLock);
1484 :
1485 138 : PG_RETURN_INT32((int32) numSummarized);
1486 : }
1487 :
1488 : /*
1489 : * SQL-callable interface to mark a range as no longer summarized
1490 : */
1491 : Datum
1492 104 : brin_desummarize_range(PG_FUNCTION_ARGS)
1493 : {
1494 104 : Oid indexoid = PG_GETARG_OID(0);
1495 104 : int64 heapBlk64 = PG_GETARG_INT64(1);
1496 : BlockNumber heapBlk;
1497 : Oid heapoid;
1498 : Relation heapRel;
1499 : Relation indexRel;
1500 : bool done;
1501 :
1502 104 : if (RecoveryInProgress())
1503 0 : ereport(ERROR,
1504 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1505 : errmsg("recovery is in progress"),
1506 : errhint("BRIN control functions cannot be executed during recovery.")));
1507 :
1508 104 : if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
1509 18 : ereport(ERROR,
1510 : (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1511 : errmsg("block number out of range: %" PRId64,
1512 : heapBlk64)));
1513 86 : heapBlk = (BlockNumber) heapBlk64;
1514 :
1515 : /*
1516 : * We must lock table before index to avoid deadlocks. However, if the
1517 : * passed indexoid isn't an index then IndexGetRelation() will fail.
1518 : * Rather than emitting a not-very-helpful error message, postpone
1519 : * complaining, expecting that the is-it-an-index test below will fail.
1520 : *
1521 : * Unlike brin_summarize_range(), autovacuum never calls this. Hence, we
1522 : * don't switch userid.
1523 : */
1524 86 : heapoid = IndexGetRelation(indexoid, true);
1525 86 : if (OidIsValid(heapoid))
1526 86 : heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
1527 : else
1528 0 : heapRel = NULL;
1529 :
1530 86 : indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
1531 :
1532 : /* Must be a BRIN index */
1533 86 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1534 86 : indexRel->rd_rel->relam != BRIN_AM_OID)
1535 0 : ereport(ERROR,
1536 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1537 : errmsg("\"%s\" is not a BRIN index",
1538 : RelationGetRelationName(indexRel))));
1539 :
1540 : /* User must own the index (comparable to privileges needed for VACUUM) */
1541 86 : if (!object_ownercheck(RelationRelationId, indexoid, GetUserId()))
1542 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1543 0 : RelationGetRelationName(indexRel));
1544 :
1545 : /*
1546 : * Since we did the IndexGetRelation call above without any lock, it's
1547 : * barely possible that a race against an index drop/recreation could have
1548 : * netted us the wrong table. Recheck.
1549 : */
1550 86 : if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
1551 0 : ereport(ERROR,
1552 : (errcode(ERRCODE_UNDEFINED_TABLE),
1553 : errmsg("could not open parent table of index \"%s\"",
1554 : RelationGetRelationName(indexRel))));
1555 :
1556 : /* see gin_clean_pending_list() */
1557 86 : if (indexRel->rd_index->indisvalid)
1558 : {
1559 : /* the revmap does the hard work */
1560 : do
1561 : {
1562 86 : done = brinRevmapDesummarizeRange(indexRel, heapBlk);
1563 : }
1564 86 : while (!done);
1565 : }
1566 : else
1567 0 : ereport(DEBUG1,
1568 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1569 : errmsg("index \"%s\" is not valid",
1570 : RelationGetRelationName(indexRel))));
1571 :
1572 86 : index_close(indexRel, ShareUpdateExclusiveLock);
1573 86 : table_close(heapRel, ShareUpdateExclusiveLock);
1574 :
1575 86 : PG_RETURN_VOID();
1576 : }
1577 :
1578 : /*
1579 : * Build a BrinDesc used to create or scan a BRIN index
1580 : */
1581 : BrinDesc *
1582 4586 : brin_build_desc(Relation rel)
1583 : {
1584 : BrinOpcInfo **opcinfo;
1585 : BrinDesc *bdesc;
1586 : TupleDesc tupdesc;
1587 4586 : int totalstored = 0;
1588 : int keyno;
1589 : long totalsize;
1590 : MemoryContext cxt;
1591 : MemoryContext oldcxt;
1592 :
1593 4586 : cxt = AllocSetContextCreate(CurrentMemoryContext,
1594 : "brin desc cxt",
1595 : ALLOCSET_SMALL_SIZES);
1596 4586 : oldcxt = MemoryContextSwitchTo(cxt);
1597 4586 : tupdesc = RelationGetDescr(rel);
1598 :
1599 : /*
1600 : * Obtain BrinOpcInfo for each indexed column. While at it, accumulate
1601 : * the number of columns stored, since the number is opclass-defined.
1602 : */
1603 4586 : opcinfo = palloc_array(BrinOpcInfo *, tupdesc->natts);
1604 76172 : for (keyno = 0; keyno < tupdesc->natts; keyno++)
1605 : {
1606 : FmgrInfo *opcInfoFn;
1607 71586 : Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
1608 :
1609 71586 : opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1610 :
1611 143172 : opcinfo[keyno] = (BrinOpcInfo *)
1612 71586 : DatumGetPointer(FunctionCall1(opcInfoFn, ObjectIdGetDatum(attr->atttypid)));
1613 71586 : totalstored += opcinfo[keyno]->oi_nstored;
1614 : }
1615 :
1616 : /* Allocate our result struct and fill it in */
1617 4586 : totalsize = offsetof(BrinDesc, bd_info) +
1618 4586 : sizeof(BrinOpcInfo *) * tupdesc->natts;
1619 :
1620 4586 : bdesc = palloc(totalsize);
1621 4586 : bdesc->bd_context = cxt;
1622 4586 : bdesc->bd_index = rel;
1623 4586 : bdesc->bd_tupdesc = tupdesc;
1624 4586 : bdesc->bd_disktdesc = NULL; /* generated lazily */
1625 4586 : bdesc->bd_totalstored = totalstored;
1626 :
1627 76172 : for (keyno = 0; keyno < tupdesc->natts; keyno++)
1628 71586 : bdesc->bd_info[keyno] = opcinfo[keyno];
1629 4586 : pfree(opcinfo);
1630 :
1631 4586 : MemoryContextSwitchTo(oldcxt);
1632 :
1633 4586 : return bdesc;
1634 : }
1635 :
1636 : void
1637 3450 : brin_free_desc(BrinDesc *bdesc)
1638 : {
1639 : /* make sure the tupdesc is still valid */
1640 : Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1641 : /* no need for retail pfree */
1642 3450 : MemoryContextDelete(bdesc->bd_context);
1643 3450 : }
1644 :
1645 : /*
1646 : * Fetch index's statistical data into *stats
1647 : */
1648 : void
1649 10730 : brinGetStats(Relation index, BrinStatsData *stats)
1650 : {
1651 : Buffer metabuffer;
1652 : Page metapage;
1653 : BrinMetaPageData *metadata;
1654 :
1655 10730 : metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1656 10730 : LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1657 10730 : metapage = BufferGetPage(metabuffer);
1658 10730 : metadata = (BrinMetaPageData *) PageGetContents(metapage);
1659 :
1660 10730 : stats->pagesPerRange = metadata->pagesPerRange;
1661 10730 : stats->revmapNumPages = metadata->lastRevmapPage - 1;
1662 :
1663 10730 : UnlockReleaseBuffer(metabuffer);
1664 10730 : }
1665 :
1666 : /*
1667 : * Initialize a BrinBuildState appropriate to create tuples on the given index.
1668 : */
1669 : static BrinBuildState *
1670 472 : initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1671 : BlockNumber pagesPerRange, BlockNumber tablePages)
1672 : {
1673 : BrinBuildState *state;
1674 472 : BlockNumber lastRange = 0;
1675 :
1676 472 : state = palloc_object(BrinBuildState);
1677 :
1678 472 : state->bs_irel = idxRel;
1679 472 : state->bs_numtuples = 0;
1680 472 : state->bs_reltuples = 0;
1681 472 : state->bs_currentInsertBuf = InvalidBuffer;
1682 472 : state->bs_pagesPerRange = pagesPerRange;
1683 472 : state->bs_currRangeStart = 0;
1684 472 : state->bs_rmAccess = revmap;
1685 472 : state->bs_bdesc = brin_build_desc(idxRel);
1686 472 : state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1687 472 : state->bs_leader = NULL;
1688 472 : state->bs_worker_id = 0;
1689 472 : state->bs_sortstate = NULL;
1690 472 : state->bs_context = CurrentMemoryContext;
1691 472 : state->bs_emptyTuple = NULL;
1692 472 : state->bs_emptyTupleLen = 0;
1693 :
1694 : /* Remember the memory context to use for an empty tuple, if needed. */
1695 472 : state->bs_context = CurrentMemoryContext;
1696 472 : state->bs_emptyTuple = NULL;
1697 472 : state->bs_emptyTupleLen = 0;
1698 :
1699 : /*
1700 : * Calculate the start of the last page range. Page numbers are 0-based,
1701 : * so to calculate the index we need to subtract one. The integer division
1702 : * gives us the index of the page range.
1703 : */
1704 472 : if (tablePages > 0)
1705 348 : lastRange = ((tablePages - 1) / pagesPerRange) * pagesPerRange;
1706 :
1707 : /* Now calculate the start of the next range. */
1708 472 : state->bs_maxRangeStart = lastRange + state->bs_pagesPerRange;
1709 :
1710 472 : return state;
1711 : }
1712 :
1713 : /*
1714 : * Release resources associated with a BrinBuildState.
1715 : */
1716 : static void
1717 460 : terminate_brin_buildstate(BrinBuildState *state)
1718 : {
1719 : /*
1720 : * Release the last index buffer used. We might as well ensure that
1721 : * whatever free space remains in that page is available in FSM, too.
1722 : */
1723 460 : if (!BufferIsInvalid(state->bs_currentInsertBuf))
1724 : {
1725 : Page page;
1726 : Size freespace;
1727 : BlockNumber blk;
1728 :
1729 368 : page = BufferGetPage(state->bs_currentInsertBuf);
1730 368 : freespace = PageGetFreeSpace(page);
1731 368 : blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
1732 368 : ReleaseBuffer(state->bs_currentInsertBuf);
1733 368 : RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
1734 368 : FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
1735 : }
1736 :
1737 460 : brin_free_desc(state->bs_bdesc);
1738 460 : pfree(state->bs_dtuple);
1739 460 : pfree(state);
1740 460 : }
1741 :
1742 : /*
1743 : * On the given BRIN index, summarize the heap page range that corresponds
1744 : * to the heap block number given.
1745 : *
1746 : * This routine can run in parallel with insertions into the heap. To avoid
1747 : * missing those values from the summary tuple, we first insert a placeholder
1748 : * index tuple into the index, then execute the heap scan; transactions
1749 : * concurrent with the scan update the placeholder tuple. After the scan, we
1750 : * union the placeholder tuple with the one computed by this routine. The
1751 : * update of the index value happens in a loop, so that if somebody updates
1752 : * the placeholder tuple after we read it, we detect the case and try again.
1753 : * This ensures that the concurrently inserted tuples are not lost.
1754 : *
1755 : * A further corner case is this routine being asked to summarize the partial
1756 : * range at the end of the table. heapNumBlocks is the (possibly outdated)
1757 : * table size; if we notice that the requested range lies beyond that size,
1758 : * we re-compute the table size after inserting the placeholder tuple, to
1759 : * avoid missing pages that were appended recently.
1760 : */
1761 : static void
1762 2948 : summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1763 : BlockNumber heapBlk, BlockNumber heapNumBlks)
1764 : {
1765 : Buffer phbuf;
1766 : BrinTuple *phtup;
1767 : Size phsz;
1768 : OffsetNumber offset;
1769 : BlockNumber scanNumBlks;
1770 :
1771 : /*
1772 : * Insert the placeholder tuple
1773 : */
1774 2948 : phbuf = InvalidBuffer;
1775 2948 : phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1776 2948 : offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1777 : state->bs_rmAccess, &phbuf,
1778 : heapBlk, phtup, phsz);
1779 :
1780 : /*
1781 : * Compute range end. We hold ShareUpdateExclusive lock on table, so it
1782 : * cannot shrink concurrently (but it can grow).
1783 : */
1784 : Assert(heapBlk % state->bs_pagesPerRange == 0);
1785 2948 : if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1786 : {
1787 : /*
1788 : * If we're asked to scan what we believe to be the final range on the
1789 : * table (i.e. a range that might be partial) we need to recompute our
1790 : * idea of what the latest page is after inserting the placeholder
1791 : * tuple. Anyone that grows the table later will update the
1792 : * placeholder tuple, so it doesn't matter that we won't scan these
1793 : * pages ourselves. Careful: the table might have been extended
1794 : * beyond the current range, so clamp our result.
1795 : *
1796 : * Fortunately, this should occur infrequently.
1797 : */
1798 24 : scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1799 : state->bs_pagesPerRange);
1800 : }
1801 : else
1802 : {
1803 : /* Easy case: range is known to be complete */
1804 2924 : scanNumBlks = state->bs_pagesPerRange;
1805 : }
1806 :
1807 : /*
1808 : * Execute the partial heap scan covering the heap blocks in the specified
1809 : * page range, summarizing the heap tuples in it. This scan stops just
1810 : * short of brinbuildCallback creating the new index entry.
1811 : *
1812 : * Note that it is critical we use the "any visible" mode of
1813 : * table_index_build_range_scan here: otherwise, we would miss tuples
1814 : * inserted by transactions that are still in progress, among other corner
1815 : * cases.
1816 : */
1817 2948 : state->bs_currRangeStart = heapBlk;
1818 2948 : table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
1819 : heapBlk, scanNumBlks,
1820 : brinbuildCallback, state, NULL);
1821 :
1822 : /*
1823 : * Now we update the values obtained by the scan with the placeholder
1824 : * tuple. We do this in a loop which only terminates if we're able to
1825 : * update the placeholder tuple successfully; if we are not, this means
1826 : * somebody else modified the placeholder tuple after we read it.
1827 : */
1828 : for (;;)
1829 0 : {
1830 : BrinTuple *newtup;
1831 : Size newsize;
1832 : bool didupdate;
1833 : bool samepage;
1834 :
1835 2948 : CHECK_FOR_INTERRUPTS();
1836 :
1837 : /*
1838 : * Update the summary tuple and try to update.
1839 : */
1840 2948 : newtup = brin_form_tuple(state->bs_bdesc,
1841 : heapBlk, state->bs_dtuple, &newsize);
1842 2948 : samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1843 : didupdate =
1844 2948 : brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1845 : state->bs_rmAccess, heapBlk, phbuf, offset,
1846 : phtup, phsz, newtup, newsize, samepage);
1847 2948 : brin_free_tuple(phtup);
1848 2948 : brin_free_tuple(newtup);
1849 :
1850 : /* If the update succeeded, we're done. */
1851 2948 : if (didupdate)
1852 2948 : break;
1853 :
1854 : /*
1855 : * If the update didn't work, it might be because somebody updated the
1856 : * placeholder tuple concurrently. Extract the new version, union it
1857 : * with the values we have from the scan, and start over. (There are
1858 : * other reasons for the update to fail, but it's simple to treat them
1859 : * the same.)
1860 : */
1861 0 : phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1862 : &offset, &phsz, BUFFER_LOCK_SHARE);
1863 : /* the placeholder tuple must exist */
1864 0 : if (phtup == NULL)
1865 0 : elog(ERROR, "missing placeholder tuple");
1866 0 : phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1867 0 : LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1868 :
1869 : /* merge it into the tuple from the heap scan */
1870 0 : union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1871 : }
1872 :
1873 2948 : ReleaseBuffer(phbuf);
1874 2948 : }
1875 :
1876 : /*
1877 : * Summarize page ranges that are not already summarized. If pageRange is
1878 : * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1879 : * page range containing the given heap page number is scanned.
1880 : * If include_partial is true, then the partial range at the end of the table
1881 : * is summarized, otherwise not.
1882 : *
1883 : * For each new index tuple inserted, *numSummarized (if not NULL) is
1884 : * incremented; for each existing tuple, *numExisting (if not NULL) is
1885 : * incremented.
1886 : */
1887 : static void
1888 244 : brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1889 : bool include_partial, double *numSummarized, double *numExisting)
1890 : {
1891 : BrinRevmap *revmap;
1892 244 : BrinBuildState *state = NULL;
1893 244 : IndexInfo *indexInfo = NULL;
1894 : BlockNumber heapNumBlocks;
1895 : BlockNumber pagesPerRange;
1896 : Buffer buf;
1897 : BlockNumber startBlk;
1898 :
1899 244 : revmap = brinRevmapInitialize(index, &pagesPerRange);
1900 :
1901 : /* determine range of pages to process */
1902 244 : heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1903 244 : if (pageRange == BRIN_ALL_BLOCKRANGES)
1904 164 : startBlk = 0;
1905 : else
1906 : {
1907 80 : startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1908 80 : heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
1909 : }
1910 244 : if (startBlk > heapNumBlocks)
1911 : {
1912 : /* Nothing to do if start point is beyond end of table */
1913 0 : brinRevmapTerminate(revmap);
1914 0 : return;
1915 : }
1916 :
1917 : /*
1918 : * Scan the revmap to find unsummarized items.
1919 : */
1920 244 : buf = InvalidBuffer;
1921 19612 : for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1922 : {
1923 : BrinTuple *tup;
1924 : OffsetNumber off;
1925 :
1926 : /*
1927 : * Unless requested to summarize even a partial range, go away now if
1928 : * we think the next range is partial. Caller would pass true when it
1929 : * is typically run once bulk data loading is done
1930 : * (brin_summarize_new_values), and false when it is typically the
1931 : * result of arbitrarily-scheduled maintenance command (vacuuming).
1932 : */
1933 19446 : if (!include_partial &&
1934 2698 : (startBlk + pagesPerRange > heapNumBlocks))
1935 78 : break;
1936 :
1937 19368 : CHECK_FOR_INTERRUPTS();
1938 :
1939 19368 : tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1940 : BUFFER_LOCK_SHARE);
1941 19368 : if (tup == NULL)
1942 : {
1943 : /* no revmap entry for this heap range. Summarize it. */
1944 2948 : if (state == NULL)
1945 : {
1946 : /* first time through */
1947 : Assert(!indexInfo);
1948 92 : state = initialize_brin_buildstate(index, revmap,
1949 : pagesPerRange,
1950 : InvalidBlockNumber);
1951 92 : indexInfo = BuildIndexInfo(index);
1952 : }
1953 2948 : summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1954 :
1955 : /* and re-initialize state for the next range */
1956 2948 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1957 :
1958 2948 : if (numSummarized)
1959 2948 : *numSummarized += 1.0;
1960 : }
1961 : else
1962 : {
1963 16420 : if (numExisting)
1964 2520 : *numExisting += 1.0;
1965 16420 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1966 : }
1967 : }
1968 :
1969 244 : if (BufferIsValid(buf))
1970 172 : ReleaseBuffer(buf);
1971 :
1972 : /* free resources */
1973 244 : brinRevmapTerminate(revmap);
1974 244 : if (state)
1975 : {
1976 92 : terminate_brin_buildstate(state);
1977 92 : pfree(indexInfo);
1978 : }
1979 : }
1980 :
1981 : /*
1982 : * Given a deformed tuple in the build state, convert it into the on-disk
1983 : * format and insert it into the index, making the revmap point to it.
1984 : */
1985 : static void
1986 2656 : form_and_insert_tuple(BrinBuildState *state)
1987 : {
1988 : BrinTuple *tup;
1989 : Size size;
1990 :
1991 2656 : tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1992 : state->bs_dtuple, &size);
1993 2656 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1994 : &state->bs_currentInsertBuf, state->bs_currRangeStart,
1995 : tup, size);
1996 2656 : state->bs_numtuples++;
1997 :
1998 2656 : pfree(tup);
1999 2656 : }
2000 :
2001 : /*
2002 : * Given a deformed tuple in the build state, convert it into the on-disk
2003 : * format and write it to a (shared) tuplesort (the leader will insert it
2004 : * into the index later).
2005 : */
2006 : static void
2007 62 : form_and_spill_tuple(BrinBuildState *state)
2008 : {
2009 : BrinTuple *tup;
2010 : Size size;
2011 :
2012 : /* don't insert empty tuples in parallel build */
2013 62 : if (state->bs_dtuple->bt_empty_range)
2014 18 : return;
2015 :
2016 44 : tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
2017 : state->bs_dtuple, &size);
2018 :
2019 : /* write the BRIN tuple to the tuplesort */
2020 44 : tuplesort_putbrintuple(state->bs_sortstate, tup, size);
2021 :
2022 44 : state->bs_numtuples++;
2023 :
2024 44 : pfree(tup);
2025 : }
2026 :
2027 : /*
2028 : * Given two deformed tuples, adjust the first one so that it's consistent
2029 : * with the summary values in both.
2030 : */
2031 : static void
2032 4 : union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
2033 : {
2034 : int keyno;
2035 : BrinMemTuple *db;
2036 : MemoryContext cxt;
2037 : MemoryContext oldcxt;
2038 :
2039 : /* Use our own memory context to avoid retail pfree */
2040 4 : cxt = AllocSetContextCreate(CurrentMemoryContext,
2041 : "brin union",
2042 : ALLOCSET_DEFAULT_SIZES);
2043 4 : oldcxt = MemoryContextSwitchTo(cxt);
2044 4 : db = brin_deform_tuple(bdesc, b, NULL);
2045 4 : MemoryContextSwitchTo(oldcxt);
2046 :
2047 : /*
2048 : * Check if the ranges are empty.
2049 : *
2050 : * If at least one of them is empty, we don't need to call per-key union
2051 : * functions at all. If "b" is empty, we just use "a" as the result (it
2052 : * might be empty fine, but that's fine). If "a" is empty but "b" is not,
2053 : * we use "b" as the result (but we have to copy the data into "a" first).
2054 : *
2055 : * Only when both ranges are non-empty, we actually do the per-key merge.
2056 : */
2057 :
2058 : /* If "b" is empty - ignore it and just use "a" (even if it's empty etc.). */
2059 4 : if (db->bt_empty_range)
2060 : {
2061 : /* skip the per-key merge */
2062 0 : MemoryContextDelete(cxt);
2063 0 : return;
2064 : }
2065 :
2066 : /*
2067 : * Now we know "b" is not empty. If "a" is empty, then "b" is the result.
2068 : * But we need to copy the data from "b" to "a" first, because that's how
2069 : * we pass result out.
2070 : *
2071 : * We have to copy all the global/per-key flags etc. too.
2072 : */
2073 4 : if (a->bt_empty_range)
2074 : {
2075 0 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
2076 : {
2077 : int i;
2078 0 : BrinValues *col_a = &a->bt_columns[keyno];
2079 0 : BrinValues *col_b = &db->bt_columns[keyno];
2080 0 : BrinOpcInfo *opcinfo = bdesc->bd_info[keyno];
2081 :
2082 0 : col_a->bv_allnulls = col_b->bv_allnulls;
2083 0 : col_a->bv_hasnulls = col_b->bv_hasnulls;
2084 :
2085 : /* If "b" has no data, we're done. */
2086 0 : if (col_b->bv_allnulls)
2087 0 : continue;
2088 :
2089 0 : for (i = 0; i < opcinfo->oi_nstored; i++)
2090 0 : col_a->bv_values[i] =
2091 0 : datumCopy(col_b->bv_values[i],
2092 0 : opcinfo->oi_typcache[i]->typbyval,
2093 0 : opcinfo->oi_typcache[i]->typlen);
2094 : }
2095 :
2096 : /* "a" started empty, but "b" was not empty, so remember that */
2097 0 : a->bt_empty_range = false;
2098 :
2099 : /* skip the per-key merge */
2100 0 : MemoryContextDelete(cxt);
2101 0 : return;
2102 : }
2103 :
2104 : /* Now we know neither range is empty. */
2105 20 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
2106 : {
2107 : FmgrInfo *unionFn;
2108 16 : BrinValues *col_a = &a->bt_columns[keyno];
2109 16 : BrinValues *col_b = &db->bt_columns[keyno];
2110 16 : BrinOpcInfo *opcinfo = bdesc->bd_info[keyno];
2111 :
2112 16 : if (opcinfo->oi_regular_nulls)
2113 : {
2114 : /* Does the "b" summary represent any NULL values? */
2115 16 : bool b_has_nulls = (col_b->bv_hasnulls || col_b->bv_allnulls);
2116 :
2117 : /* Adjust "hasnulls". */
2118 16 : if (!col_a->bv_allnulls && b_has_nulls)
2119 0 : col_a->bv_hasnulls = true;
2120 :
2121 : /* If there are no values in B, there's nothing left to do. */
2122 16 : if (col_b->bv_allnulls)
2123 0 : continue;
2124 :
2125 : /*
2126 : * Adjust "allnulls". If A doesn't have values, just copy the
2127 : * values from B into A, and we're done. We cannot run the
2128 : * operators in this case, because values in A might contain
2129 : * garbage. Note we already established that B contains values.
2130 : *
2131 : * Also adjust "hasnulls" in order not to forget the summary
2132 : * represents NULL values. This is not redundant with the earlier
2133 : * update, because that only happens when allnulls=false.
2134 : */
2135 16 : if (col_a->bv_allnulls)
2136 0 : {
2137 : int i;
2138 :
2139 0 : col_a->bv_allnulls = false;
2140 0 : col_a->bv_hasnulls = true;
2141 :
2142 0 : for (i = 0; i < opcinfo->oi_nstored; i++)
2143 0 : col_a->bv_values[i] =
2144 0 : datumCopy(col_b->bv_values[i],
2145 0 : opcinfo->oi_typcache[i]->typbyval,
2146 0 : opcinfo->oi_typcache[i]->typlen);
2147 :
2148 0 : continue;
2149 : }
2150 : }
2151 :
2152 16 : unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
2153 : BRIN_PROCNUM_UNION);
2154 16 : FunctionCall3Coll(unionFn,
2155 16 : bdesc->bd_index->rd_indcollation[keyno],
2156 : PointerGetDatum(bdesc),
2157 : PointerGetDatum(col_a),
2158 : PointerGetDatum(col_b));
2159 : }
2160 :
2161 4 : MemoryContextDelete(cxt);
2162 : }
2163 :
2164 : /*
2165 : * brin_vacuum_scan
2166 : * Do a complete scan of the index during VACUUM.
2167 : *
2168 : * This routine scans the complete index looking for uncataloged index pages,
2169 : * i.e. those that might have been lost due to a crash after index extension
2170 : * and such.
2171 : */
2172 : static void
2173 106 : brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
2174 : {
2175 : BlockRangeReadStreamPrivate p;
2176 : ReadStream *stream;
2177 : Buffer buf;
2178 :
2179 106 : p.current_blocknum = 0;
2180 106 : p.last_exclusive = RelationGetNumberOfBlocks(idxrel);
2181 :
2182 : /*
2183 : * It is safe to use batchmode as block_range_read_stream_cb takes no
2184 : * locks.
2185 : */
2186 106 : stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
2187 : READ_STREAM_FULL |
2188 : READ_STREAM_USE_BATCHING,
2189 : strategy,
2190 : idxrel,
2191 : MAIN_FORKNUM,
2192 : block_range_read_stream_cb,
2193 : &p,
2194 : 0);
2195 :
2196 : /*
2197 : * Scan the index in physical order, and clean up any possible mess in
2198 : * each page.
2199 : */
2200 582 : while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
2201 : {
2202 476 : CHECK_FOR_INTERRUPTS();
2203 :
2204 476 : brin_page_cleanup(idxrel, buf);
2205 :
2206 476 : ReleaseBuffer(buf);
2207 : }
2208 :
2209 106 : read_stream_end(stream);
2210 :
2211 : /*
2212 : * Update all upper pages in the index's FSM, as well. This ensures not
2213 : * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
2214 : * but also that any pre-existing damage or out-of-dateness is repaired.
2215 : */
2216 106 : FreeSpaceMapVacuum(idxrel);
2217 106 : }
2218 :
2219 : static bool
2220 784390 : add_values_to_range(Relation idxRel, BrinDesc *bdesc, BrinMemTuple *dtup,
2221 : const Datum *values, const bool *nulls)
2222 : {
2223 : int keyno;
2224 :
2225 : /* If the range starts empty, we're certainly going to modify it. */
2226 784390 : bool modified = dtup->bt_empty_range;
2227 :
2228 : /*
2229 : * Compare the key values of the new tuple to the stored index values; our
2230 : * deformed tuple will get updated if the new tuple doesn't fit the
2231 : * original range (note this means we can't break out of the loop early).
2232 : * Make a note of whether this happens, so that we know to insert the
2233 : * modified tuple later.
2234 : */
2235 1848888 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
2236 : {
2237 : Datum result;
2238 : BrinValues *bval;
2239 : FmgrInfo *addValue;
2240 : bool has_nulls;
2241 :
2242 1064498 : bval = &dtup->bt_columns[keyno];
2243 :
2244 : /*
2245 : * Does the range have actual NULL values? Either of the flags can be
2246 : * set, but we ignore the state before adding first row.
2247 : *
2248 : * We have to remember this, because we'll modify the flags and we
2249 : * need to know if the range started as empty.
2250 : */
2251 2092228 : has_nulls = ((!dtup->bt_empty_range) &&
2252 1027730 : (bval->bv_hasnulls || bval->bv_allnulls));
2253 :
2254 : /*
2255 : * If the value we're adding is NULL, handle it locally. Otherwise
2256 : * call the BRIN_PROCNUM_ADDVALUE procedure.
2257 : */
2258 1064498 : if (bdesc->bd_info[keyno]->oi_regular_nulls && nulls[keyno])
2259 : {
2260 : /*
2261 : * If the new value is null, we record that we saw it if it's the
2262 : * first one; otherwise, there's nothing to do.
2263 : */
2264 18766 : if (!bval->bv_hasnulls)
2265 : {
2266 3644 : bval->bv_hasnulls = true;
2267 3644 : modified = true;
2268 : }
2269 :
2270 18766 : continue;
2271 : }
2272 :
2273 1045732 : addValue = index_getprocinfo(idxRel, keyno + 1,
2274 : BRIN_PROCNUM_ADDVALUE);
2275 1045732 : result = FunctionCall4Coll(addValue,
2276 1045732 : idxRel->rd_indcollation[keyno],
2277 : PointerGetDatum(bdesc),
2278 : PointerGetDatum(bval),
2279 1045732 : values[keyno],
2280 1045732 : BoolGetDatum(nulls[keyno]));
2281 : /* if that returned true, we need to insert the updated tuple */
2282 1045732 : modified |= DatumGetBool(result);
2283 :
2284 : /*
2285 : * If the range was had actual NULL values (i.e. did not start empty),
2286 : * make sure we don't forget about the NULL values. Either the
2287 : * allnulls flag is still set to true, or (if the opclass cleared it)
2288 : * we need to set hasnulls=true.
2289 : *
2290 : * XXX This can only happen when the opclass modified the tuple, so
2291 : * the modified flag should be set.
2292 : */
2293 1045732 : if (has_nulls && !(bval->bv_hasnulls || bval->bv_allnulls))
2294 : {
2295 : Assert(modified);
2296 4 : bval->bv_hasnulls = true;
2297 : }
2298 : }
2299 :
2300 : /*
2301 : * After updating summaries for all the keys, mark it as not empty.
2302 : *
2303 : * If we're actually changing the flag value (i.e. tuple started as
2304 : * empty), we should have modified the tuple. So we should not see empty
2305 : * range that was not modified.
2306 : */
2307 : Assert(!dtup->bt_empty_range || modified);
2308 784390 : dtup->bt_empty_range = false;
2309 :
2310 784390 : return modified;
2311 : }
2312 :
2313 : static bool
2314 189936 : check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
2315 : {
2316 : int keyno;
2317 :
2318 : /*
2319 : * First check if there are any IS [NOT] NULL scan keys, and if we're
2320 : * violating them.
2321 : */
2322 191172 : for (keyno = 0; keyno < nnullkeys; keyno++)
2323 : {
2324 2232 : ScanKey key = nullkeys[keyno];
2325 :
2326 : Assert(key->sk_attno == bval->bv_attno);
2327 :
2328 : /* Handle only IS NULL/IS NOT NULL tests */
2329 2232 : if (!(key->sk_flags & SK_ISNULL))
2330 0 : continue;
2331 :
2332 2232 : if (key->sk_flags & SK_SEARCHNULL)
2333 : {
2334 : /* IS NULL scan key, but range has no NULLs */
2335 1116 : if (!bval->bv_allnulls && !bval->bv_hasnulls)
2336 978 : return false;
2337 : }
2338 1116 : else if (key->sk_flags & SK_SEARCHNOTNULL)
2339 : {
2340 : /*
2341 : * For IS NOT NULL, we can only skip ranges that are known to have
2342 : * only nulls.
2343 : */
2344 1116 : if (bval->bv_allnulls)
2345 18 : return false;
2346 : }
2347 : else
2348 : {
2349 : /*
2350 : * Neither IS NULL nor IS NOT NULL was used; assume all indexable
2351 : * operators are strict and thus return false with NULL value in
2352 : * the scan key.
2353 : */
2354 0 : return false;
2355 : }
2356 : }
2357 :
2358 188940 : return true;
2359 : }
2360 :
2361 : /*
2362 : * Create parallel context, and launch workers for leader.
2363 : *
2364 : * buildstate argument should be initialized (with the exception of the
2365 : * tuplesort states, which may later be created based on shared
2366 : * state initially set up here).
2367 : *
2368 : * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY.
2369 : *
2370 : * request is the target number of parallel worker processes to launch.
2371 : *
2372 : * Sets buildstate's BrinLeader, which caller must use to shut down parallel
2373 : * mode by passing it to _brin_end_parallel() at the very end of its index
2374 : * build. If not even a single worker process can be launched, this is
2375 : * never set, and caller should proceed with a serial index build.
2376 : */
2377 : static void
2378 10 : _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
2379 : bool isconcurrent, int request)
2380 : {
2381 : ParallelContext *pcxt;
2382 : int scantuplesortstates;
2383 : Snapshot snapshot;
2384 : Size estbrinshared;
2385 : Size estsort;
2386 : BrinShared *brinshared;
2387 : Sharedsort *sharedsort;
2388 10 : BrinLeader *brinleader = palloc0_object(BrinLeader);
2389 : WalUsage *walusage;
2390 : BufferUsage *bufferusage;
2391 10 : bool leaderparticipates = true;
2392 : int querylen;
2393 :
2394 : #ifdef DISABLE_LEADER_PARTICIPATION
2395 : leaderparticipates = false;
2396 : #endif
2397 :
2398 : /*
2399 : * Enter parallel mode, and create context for parallel build of brin
2400 : * index
2401 : */
2402 10 : EnterParallelMode();
2403 : Assert(request > 0);
2404 10 : pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main",
2405 : request);
2406 :
2407 10 : scantuplesortstates = leaderparticipates ? request + 1 : request;
2408 :
2409 : /*
2410 : * Prepare for scan of the base relation. In a normal index build, we use
2411 : * SnapshotAny because we must retrieve all tuples and do our own time
2412 : * qual checks (because we have to index RECENTLY_DEAD tuples). In a
2413 : * concurrent build, we take a regular MVCC snapshot and index whatever's
2414 : * live according to that.
2415 : */
2416 10 : if (!isconcurrent)
2417 10 : snapshot = SnapshotAny;
2418 : else
2419 0 : snapshot = RegisterSnapshot(GetTransactionSnapshot());
2420 :
2421 : /*
2422 : * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
2423 : */
2424 10 : estbrinshared = _brin_parallel_estimate_shared(heap, snapshot);
2425 10 : shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared);
2426 10 : estsort = tuplesort_estimate_shared(scantuplesortstates);
2427 10 : shm_toc_estimate_chunk(&pcxt->estimator, estsort);
2428 :
2429 10 : shm_toc_estimate_keys(&pcxt->estimator, 2);
2430 :
2431 : /*
2432 : * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
2433 : * and PARALLEL_KEY_BUFFER_USAGE.
2434 : *
2435 : * If there are no extensions loaded that care, we could skip this. We
2436 : * have no way of knowing whether anyone's looking at pgWalUsage or
2437 : * pgBufferUsage, so do it unconditionally.
2438 : */
2439 10 : shm_toc_estimate_chunk(&pcxt->estimator,
2440 : mul_size(sizeof(WalUsage), pcxt->nworkers));
2441 10 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2442 10 : shm_toc_estimate_chunk(&pcxt->estimator,
2443 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
2444 10 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2445 :
2446 : /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
2447 10 : if (debug_query_string)
2448 : {
2449 10 : querylen = strlen(debug_query_string);
2450 10 : shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
2451 10 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2452 : }
2453 : else
2454 0 : querylen = 0; /* keep compiler quiet */
2455 :
2456 : /* Everyone's had a chance to ask for space, so now create the DSM */
2457 10 : InitializeParallelDSM(pcxt);
2458 :
2459 : /* If no DSM segment was available, back out (do serial build) */
2460 10 : if (pcxt->seg == NULL)
2461 : {
2462 0 : if (IsMVCCSnapshot(snapshot))
2463 0 : UnregisterSnapshot(snapshot);
2464 0 : DestroyParallelContext(pcxt);
2465 0 : ExitParallelMode();
2466 0 : return;
2467 : }
2468 :
2469 : /* Store shared build state, for which we reserved space */
2470 10 : brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared);
2471 : /* Initialize immutable state */
2472 10 : brinshared->heaprelid = RelationGetRelid(heap);
2473 10 : brinshared->indexrelid = RelationGetRelid(index);
2474 10 : brinshared->isconcurrent = isconcurrent;
2475 10 : brinshared->scantuplesortstates = scantuplesortstates;
2476 10 : brinshared->pagesPerRange = buildstate->bs_pagesPerRange;
2477 10 : brinshared->queryid = pgstat_get_my_query_id();
2478 10 : ConditionVariableInit(&brinshared->workersdonecv);
2479 10 : SpinLockInit(&brinshared->mutex);
2480 :
2481 : /* Initialize mutable state */
2482 10 : brinshared->nparticipantsdone = 0;
2483 10 : brinshared->reltuples = 0.0;
2484 10 : brinshared->indtuples = 0.0;
2485 :
2486 10 : table_parallelscan_initialize(heap,
2487 : ParallelTableScanFromBrinShared(brinshared),
2488 : snapshot);
2489 :
2490 : /*
2491 : * Store shared tuplesort-private state, for which we reserved space.
2492 : * Then, initialize opaque state using tuplesort routine.
2493 : */
2494 10 : sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
2495 10 : tuplesort_initialize_shared(sharedsort, scantuplesortstates,
2496 : pcxt->seg);
2497 :
2498 : /*
2499 : * Store shared tuplesort-private state, for which we reserved space.
2500 : * Then, initialize opaque state using tuplesort routine.
2501 : */
2502 10 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BRIN_SHARED, brinshared);
2503 10 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
2504 :
2505 : /* Store query string for workers */
2506 10 : if (debug_query_string)
2507 : {
2508 : char *sharedquery;
2509 :
2510 10 : sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
2511 10 : memcpy(sharedquery, debug_query_string, querylen + 1);
2512 10 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
2513 : }
2514 :
2515 : /*
2516 : * Allocate space for each worker's WalUsage and BufferUsage; no need to
2517 : * initialize.
2518 : */
2519 10 : walusage = shm_toc_allocate(pcxt->toc,
2520 10 : mul_size(sizeof(WalUsage), pcxt->nworkers));
2521 10 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
2522 10 : bufferusage = shm_toc_allocate(pcxt->toc,
2523 10 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
2524 10 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
2525 :
2526 : /* Launch workers, saving status for leader/caller */
2527 10 : LaunchParallelWorkers(pcxt);
2528 10 : brinleader->pcxt = pcxt;
2529 10 : brinleader->nparticipanttuplesorts = pcxt->nworkers_launched;
2530 10 : if (leaderparticipates)
2531 10 : brinleader->nparticipanttuplesorts++;
2532 10 : brinleader->brinshared = brinshared;
2533 10 : brinleader->sharedsort = sharedsort;
2534 10 : brinleader->snapshot = snapshot;
2535 10 : brinleader->walusage = walusage;
2536 10 : brinleader->bufferusage = bufferusage;
2537 :
2538 : /* If no workers were successfully launched, back out (do serial build) */
2539 10 : if (pcxt->nworkers_launched == 0)
2540 : {
2541 2 : _brin_end_parallel(brinleader, NULL);
2542 2 : return;
2543 : }
2544 :
2545 : /* Save leader state now that it's clear build will be parallel */
2546 8 : buildstate->bs_leader = brinleader;
2547 :
2548 : /* Join heap scan ourselves */
2549 8 : if (leaderparticipates)
2550 8 : _brin_leader_participate_as_worker(buildstate, heap, index);
2551 :
2552 : /*
2553 : * Caller needs to wait for all launched workers when we return. Make
2554 : * sure that the failure-to-start case will not hang forever.
2555 : */
2556 8 : WaitForParallelWorkersToAttach(pcxt);
2557 : }
2558 :
2559 : /*
2560 : * Shut down workers, destroy parallel context, and end parallel mode.
2561 : */
2562 : static void
2563 10 : _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
2564 : {
2565 : int i;
2566 :
2567 : /* Shutdown worker processes */
2568 10 : WaitForParallelWorkersToFinish(brinleader->pcxt);
2569 :
2570 : /*
2571 : * Next, accumulate WAL usage. (This must wait for the workers to finish,
2572 : * or we might get incomplete data.)
2573 : */
2574 22 : for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
2575 12 : InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
2576 :
2577 : /* Free last reference to MVCC snapshot, if one was used */
2578 10 : if (IsMVCCSnapshot(brinleader->snapshot))
2579 0 : UnregisterSnapshot(brinleader->snapshot);
2580 10 : DestroyParallelContext(brinleader->pcxt);
2581 10 : ExitParallelMode();
2582 10 : }
2583 :
2584 : /*
2585 : * Within leader, wait for end of heap scan.
2586 : *
2587 : * When called, parallel heap scan started by _brin_begin_parallel() will
2588 : * already be underway within worker processes (when leader participates
2589 : * as a worker, we should end up here just as workers are finishing).
2590 : *
2591 : * Returns the total number of heap tuples scanned.
2592 : */
2593 : static double
2594 8 : _brin_parallel_heapscan(BrinBuildState *state)
2595 : {
2596 8 : BrinShared *brinshared = state->bs_leader->brinshared;
2597 : int nparticipanttuplesorts;
2598 :
2599 8 : nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts;
2600 : for (;;)
2601 : {
2602 24 : SpinLockAcquire(&brinshared->mutex);
2603 24 : if (brinshared->nparticipantsdone == nparticipanttuplesorts)
2604 : {
2605 : /* copy the data into leader state */
2606 8 : state->bs_reltuples = brinshared->reltuples;
2607 8 : state->bs_numtuples = brinshared->indtuples;
2608 :
2609 8 : SpinLockRelease(&brinshared->mutex);
2610 8 : break;
2611 : }
2612 16 : SpinLockRelease(&brinshared->mutex);
2613 :
2614 16 : ConditionVariableSleep(&brinshared->workersdonecv,
2615 : WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN);
2616 : }
2617 :
2618 8 : ConditionVariableCancelSleep();
2619 :
2620 8 : return state->bs_reltuples;
2621 : }
2622 :
2623 : /*
2624 : * Within leader, wait for end of heap scan and merge per-worker results.
2625 : *
2626 : * After waiting for all workers to finish, merge the per-worker results into
2627 : * the complete index. The results from each worker are sorted by block number
2628 : * (start of the page range). While combining the per-worker results we merge
2629 : * summaries for the same page range, and also fill-in empty summaries for
2630 : * ranges without any tuples.
2631 : *
2632 : * Returns the total number of heap tuples scanned.
2633 : */
2634 : static double
2635 8 : _brin_parallel_merge(BrinBuildState *state)
2636 : {
2637 : BrinTuple *btup;
2638 8 : BrinMemTuple *memtuple = NULL;
2639 : Size tuplen;
2640 8 : BlockNumber prevblkno = InvalidBlockNumber;
2641 : MemoryContext rangeCxt,
2642 : oldCxt;
2643 : double reltuples;
2644 :
2645 : /* wait for workers to scan table and produce partial results */
2646 8 : reltuples = _brin_parallel_heapscan(state);
2647 :
2648 : /* do the actual sort in the leader */
2649 8 : tuplesort_performsort(state->bs_sortstate);
2650 :
2651 : /*
2652 : * Initialize BrinMemTuple we'll use to union summaries from workers (in
2653 : * case they happened to produce parts of the same page range).
2654 : */
2655 8 : memtuple = brin_new_memtuple(state->bs_bdesc);
2656 :
2657 : /*
2658 : * Create a memory context we'll reset to combine results for a single
2659 : * page range (received from the workers). We don't expect huge number of
2660 : * overlaps under regular circumstances, because for large tables the
2661 : * chunk size is likely larger than the BRIN page range), but it can
2662 : * happen, and the union functions may do all kinds of stuff. So we better
2663 : * reset the context once in a while.
2664 : */
2665 8 : rangeCxt = AllocSetContextCreate(CurrentMemoryContext,
2666 : "brin union",
2667 : ALLOCSET_DEFAULT_SIZES);
2668 8 : oldCxt = MemoryContextSwitchTo(rangeCxt);
2669 :
2670 : /*
2671 : * Read the BRIN tuples from the shared tuplesort, sorted by block number.
2672 : * That probably gives us an index that is cheaper to scan, thanks to
2673 : * mostly getting data from the same index page as before.
2674 : */
2675 52 : while ((btup = tuplesort_getbrintuple(state->bs_sortstate, &tuplen, true)) != NULL)
2676 : {
2677 : /* Ranges should be multiples of pages_per_range for the index. */
2678 : Assert(btup->bt_blkno % state->bs_leader->brinshared->pagesPerRange == 0);
2679 :
2680 : /*
2681 : * Do we need to union summaries for the same page range?
2682 : *
2683 : * If this is the first brin tuple we read, then just deform it into
2684 : * the memtuple, and continue with the next one from tuplesort. We
2685 : * however may need to insert empty summaries into the index.
2686 : *
2687 : * If it's the same block as the last we saw, we simply union the brin
2688 : * tuple into it, and we're done - we don't even need to insert empty
2689 : * ranges, because that was done earlier when we saw the first brin
2690 : * tuple (for this range).
2691 : *
2692 : * Finally, if it's not the first brin tuple, and it's not the same
2693 : * page range, we need to do the insert and then deform the tuple into
2694 : * the memtuple. Then we'll insert empty ranges before the new brin
2695 : * tuple, if needed.
2696 : */
2697 44 : if (prevblkno == InvalidBlockNumber)
2698 : {
2699 : /* First brin tuples, just deform into memtuple. */
2700 2 : memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
2701 :
2702 : /* continue to insert empty pages before thisblock */
2703 : }
2704 42 : else if (memtuple->bt_blkno == btup->bt_blkno)
2705 : {
2706 : /*
2707 : * Not the first brin tuple, but same page range as the previous
2708 : * one, so we can merge it into the memtuple.
2709 : */
2710 4 : union_tuples(state->bs_bdesc, memtuple, btup);
2711 4 : continue;
2712 : }
2713 : else
2714 : {
2715 : BrinTuple *tmp;
2716 : Size len;
2717 :
2718 : /*
2719 : * We got brin tuple for a different page range, so form a brin
2720 : * tuple from the memtuple, insert it, and re-init the memtuple
2721 : * from the new brin tuple.
2722 : */
2723 38 : tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
2724 : memtuple, &len);
2725 :
2726 38 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
2727 : &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
2728 :
2729 : /*
2730 : * Reset the per-output-range context. This frees all the memory
2731 : * possibly allocated by the union functions, and also the BRIN
2732 : * tuple we just formed and inserted.
2733 : */
2734 38 : MemoryContextReset(rangeCxt);
2735 :
2736 38 : memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
2737 :
2738 : /* continue to insert empty pages before thisblock */
2739 : }
2740 :
2741 : /* Fill empty ranges for all ranges missing in the tuplesort. */
2742 40 : brin_fill_empty_ranges(state, prevblkno, btup->bt_blkno);
2743 :
2744 40 : prevblkno = btup->bt_blkno;
2745 : }
2746 :
2747 8 : tuplesort_end(state->bs_sortstate);
2748 :
2749 : /* Fill the BRIN tuple for the last page range with data. */
2750 8 : if (prevblkno != InvalidBlockNumber)
2751 : {
2752 : BrinTuple *tmp;
2753 : Size len;
2754 :
2755 2 : tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
2756 : memtuple, &len);
2757 :
2758 2 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
2759 : &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
2760 :
2761 2 : pfree(tmp);
2762 : }
2763 :
2764 : /* Fill empty ranges at the end, for all ranges missing in the tuplesort. */
2765 8 : brin_fill_empty_ranges(state, prevblkno, state->bs_maxRangeStart);
2766 :
2767 : /*
2768 : * Switch back to the original memory context, and destroy the one we
2769 : * created to isolate the union_tuple calls.
2770 : */
2771 8 : MemoryContextSwitchTo(oldCxt);
2772 8 : MemoryContextDelete(rangeCxt);
2773 :
2774 8 : return reltuples;
2775 : }
2776 :
2777 : /*
2778 : * Returns size of shared memory required to store state for a parallel
2779 : * brin index build based on the snapshot its parallel scan will use.
2780 : */
2781 : static Size
2782 10 : _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
2783 : {
2784 : /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
2785 10 : return add_size(BUFFERALIGN(sizeof(BrinShared)),
2786 : table_parallelscan_estimate(heap, snapshot));
2787 : }
2788 :
2789 : /*
2790 : * Within leader, participate as a parallel worker.
2791 : */
2792 : static void
2793 8 : _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index)
2794 : {
2795 8 : BrinLeader *brinleader = buildstate->bs_leader;
2796 : int sortmem;
2797 :
2798 : /*
2799 : * Might as well use reliable figure when doling out maintenance_work_mem
2800 : * (when requested number of workers were not launched, this will be
2801 : * somewhat higher than it is for other workers).
2802 : */
2803 8 : sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts;
2804 :
2805 : /* Perform work common to all participants */
2806 8 : _brin_parallel_scan_and_build(buildstate, brinleader->brinshared,
2807 : brinleader->sharedsort, heap, index, sortmem, true);
2808 8 : }
2809 :
2810 : /*
2811 : * Perform a worker's portion of a parallel sort.
2812 : *
2813 : * This generates a tuplesort for the worker portion of the table.
2814 : *
2815 : * sortmem is the amount of working memory to use within each worker,
2816 : * expressed in KBs.
2817 : *
2818 : * When this returns, workers are done, and need only release resources.
2819 : */
2820 : static void
2821 20 : _brin_parallel_scan_and_build(BrinBuildState *state,
2822 : BrinShared *brinshared, Sharedsort *sharedsort,
2823 : Relation heap, Relation index,
2824 : int sortmem, bool progress)
2825 : {
2826 : SortCoordinate coordinate;
2827 : TableScanDesc scan;
2828 : double reltuples;
2829 : IndexInfo *indexInfo;
2830 :
2831 : /* Initialize local tuplesort coordination state */
2832 20 : coordinate = palloc0_object(SortCoordinateData);
2833 20 : coordinate->isWorker = true;
2834 20 : coordinate->nParticipants = -1;
2835 20 : coordinate->sharedsort = sharedsort;
2836 :
2837 : /* Begin "partial" tuplesort */
2838 20 : state->bs_sortstate = tuplesort_begin_index_brin(sortmem, coordinate,
2839 : TUPLESORT_NONE);
2840 :
2841 : /* Join parallel scan */
2842 20 : indexInfo = BuildIndexInfo(index);
2843 20 : indexInfo->ii_Concurrent = brinshared->isconcurrent;
2844 :
2845 20 : scan = table_beginscan_parallel(heap,
2846 : ParallelTableScanFromBrinShared(brinshared));
2847 :
2848 20 : reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
2849 : brinbuildCallbackParallel, state, scan);
2850 :
2851 : /* insert the last item */
2852 20 : form_and_spill_tuple(state);
2853 :
2854 : /* sort the BRIN ranges built by this worker */
2855 20 : tuplesort_performsort(state->bs_sortstate);
2856 :
2857 20 : state->bs_reltuples += reltuples;
2858 :
2859 : /*
2860 : * Done. Record ambuild statistics.
2861 : */
2862 20 : SpinLockAcquire(&brinshared->mutex);
2863 20 : brinshared->nparticipantsdone++;
2864 20 : brinshared->reltuples += state->bs_reltuples;
2865 20 : brinshared->indtuples += state->bs_numtuples;
2866 20 : SpinLockRelease(&brinshared->mutex);
2867 :
2868 : /* Notify leader */
2869 20 : ConditionVariableSignal(&brinshared->workersdonecv);
2870 :
2871 20 : tuplesort_end(state->bs_sortstate);
2872 20 : }
2873 :
2874 : /*
2875 : * Perform work within a launched parallel process.
2876 : */
2877 : void
2878 12 : _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
2879 : {
2880 : char *sharedquery;
2881 : BrinShared *brinshared;
2882 : Sharedsort *sharedsort;
2883 : BrinBuildState *buildstate;
2884 : Relation heapRel;
2885 : Relation indexRel;
2886 : LOCKMODE heapLockmode;
2887 : LOCKMODE indexLockmode;
2888 : WalUsage *walusage;
2889 : BufferUsage *bufferusage;
2890 : int sortmem;
2891 :
2892 : /*
2893 : * The only possible status flag that can be set to the parallel worker is
2894 : * PROC_IN_SAFE_IC.
2895 : */
2896 : Assert((MyProc->statusFlags == 0) ||
2897 : (MyProc->statusFlags == PROC_IN_SAFE_IC));
2898 :
2899 : /* Set debug_query_string for individual workers first */
2900 12 : sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
2901 12 : debug_query_string = sharedquery;
2902 :
2903 : /* Report the query string from leader */
2904 12 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
2905 :
2906 : /* Look up brin shared state */
2907 12 : brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false);
2908 :
2909 : /* Open relations using lock modes known to be obtained by index.c */
2910 12 : if (!brinshared->isconcurrent)
2911 : {
2912 12 : heapLockmode = ShareLock;
2913 12 : indexLockmode = AccessExclusiveLock;
2914 : }
2915 : else
2916 : {
2917 0 : heapLockmode = ShareUpdateExclusiveLock;
2918 0 : indexLockmode = RowExclusiveLock;
2919 : }
2920 :
2921 : /* Track query ID */
2922 12 : pgstat_report_query_id(brinshared->queryid, false);
2923 :
2924 : /* Open relations within worker */
2925 12 : heapRel = table_open(brinshared->heaprelid, heapLockmode);
2926 12 : indexRel = index_open(brinshared->indexrelid, indexLockmode);
2927 :
2928 12 : buildstate = initialize_brin_buildstate(indexRel, NULL,
2929 : brinshared->pagesPerRange,
2930 : InvalidBlockNumber);
2931 :
2932 : /* Look up shared state private to tuplesort.c */
2933 12 : sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
2934 12 : tuplesort_attach_shared(sharedsort, seg);
2935 :
2936 : /* Prepare to track buffer usage during parallel execution */
2937 12 : InstrStartParallelQuery();
2938 :
2939 : /*
2940 : * Might as well use reliable figure when doling out maintenance_work_mem
2941 : * (when requested number of workers were not launched, this will be
2942 : * somewhat higher than it is for other workers).
2943 : */
2944 12 : sortmem = maintenance_work_mem / brinshared->scantuplesortstates;
2945 :
2946 12 : _brin_parallel_scan_and_build(buildstate, brinshared, sharedsort,
2947 : heapRel, indexRel, sortmem, false);
2948 :
2949 : /* Report WAL/buffer usage during parallel execution */
2950 12 : bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
2951 12 : walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
2952 12 : InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
2953 12 : &walusage[ParallelWorkerNumber]);
2954 :
2955 12 : index_close(indexRel, indexLockmode);
2956 12 : table_close(heapRel, heapLockmode);
2957 12 : }
2958 :
2959 : /*
2960 : * brin_build_empty_tuple
2961 : * Maybe initialize a BRIN tuple representing empty range.
2962 : *
2963 : * Returns a BRIN tuple representing an empty page range starting at the
2964 : * specified block number. The empty tuple is initialized only once, when it's
2965 : * needed for the first time, stored in the memory context bs_context to ensure
2966 : * proper life span, and reused on following calls. All empty tuples are
2967 : * exactly the same except for the bt_blkno field, which is set to the value
2968 : * in blkno parameter.
2969 : */
2970 : static void
2971 20 : brin_build_empty_tuple(BrinBuildState *state, BlockNumber blkno)
2972 : {
2973 : /* First time an empty tuple is requested? If yes, initialize it. */
2974 20 : if (state->bs_emptyTuple == NULL)
2975 : {
2976 : MemoryContext oldcxt;
2977 10 : BrinMemTuple *dtuple = brin_new_memtuple(state->bs_bdesc);
2978 :
2979 : /* Allocate the tuple in context for the whole index build. */
2980 10 : oldcxt = MemoryContextSwitchTo(state->bs_context);
2981 :
2982 10 : state->bs_emptyTuple = brin_form_tuple(state->bs_bdesc, blkno, dtuple,
2983 : &state->bs_emptyTupleLen);
2984 :
2985 10 : MemoryContextSwitchTo(oldcxt);
2986 : }
2987 : else
2988 : {
2989 : /* If we already have an empty tuple, just update the block. */
2990 10 : state->bs_emptyTuple->bt_blkno = blkno;
2991 : }
2992 20 : }
2993 :
2994 : /*
2995 : * brin_fill_empty_ranges
2996 : * Add BRIN index tuples representing empty page ranges.
2997 : *
2998 : * prevRange/nextRange determine for which page ranges to add empty summaries.
2999 : * Both boundaries are exclusive, i.e. only ranges starting at blkno for which
3000 : * (prevRange < blkno < nextRange) will be added to the index.
3001 : *
3002 : * If prevRange is InvalidBlockNumber, this means there was no previous page
3003 : * range (i.e. the first empty range to add is for blkno=0).
3004 : *
3005 : * The empty tuple is built only once, and then reused for all future calls.
3006 : */
3007 : static void
3008 408 : brin_fill_empty_ranges(BrinBuildState *state,
3009 : BlockNumber prevRange, BlockNumber nextRange)
3010 : {
3011 : BlockNumber blkno;
3012 :
3013 : /*
3014 : * If we already summarized some ranges, we need to start with the next
3015 : * one. Otherwise start from the first range of the table.
3016 : */
3017 408 : blkno = (prevRange == InvalidBlockNumber) ? 0 : (prevRange + state->bs_pagesPerRange);
3018 :
3019 : /* Generate empty ranges until we hit the next non-empty range. */
3020 428 : while (blkno < nextRange)
3021 : {
3022 : /* Did we already build the empty tuple? If not, do it now. */
3023 20 : brin_build_empty_tuple(state, blkno);
3024 :
3025 20 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
3026 : &state->bs_currentInsertBuf,
3027 20 : blkno, state->bs_emptyTuple, state->bs_emptyTupleLen);
3028 :
3029 : /* try next page range */
3030 20 : blkno += state->bs_pagesPerRange;
3031 : }
3032 408 : }
|