Line data Source code
1 : /*
2 : * brin.c
3 : * Implementation of BRIN indexes for Postgres
4 : *
5 : * See src/backend/access/brin/README for details.
6 : *
7 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/brin/brin.c
12 : *
13 : * TODO
14 : * * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/brin.h"
19 : #include "access/brin_page.h"
20 : #include "access/brin_pageops.h"
21 : #include "access/brin_xlog.h"
22 : #include "access/relation.h"
23 : #include "access/reloptions.h"
24 : #include "access/relscan.h"
25 : #include "access/table.h"
26 : #include "access/tableam.h"
27 : #include "access/xloginsert.h"
28 : #include "catalog/index.h"
29 : #include "catalog/pg_am.h"
30 : #include "commands/vacuum.h"
31 : #include "miscadmin.h"
32 : #include "pgstat.h"
33 : #include "postmaster/autovacuum.h"
34 : #include "storage/bufmgr.h"
35 : #include "storage/freespace.h"
36 : #include "tcop/tcopprot.h"
37 : #include "utils/acl.h"
38 : #include "utils/datum.h"
39 : #include "utils/fmgrprotos.h"
40 : #include "utils/guc.h"
41 : #include "utils/index_selfuncs.h"
42 : #include "utils/memutils.h"
43 : #include "utils/rel.h"
44 : #include "utils/tuplesort.h"
45 :
46 : /* Magic numbers for parallel state sharing */
47 : #define PARALLEL_KEY_BRIN_SHARED UINT64CONST(0xB000000000000001)
48 : #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002)
49 : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003)
50 : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004)
51 : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005)
52 :
53 : /*
54 : * Status for index builds performed in parallel. This is allocated in a
55 : * dynamic shared memory segment.
56 : */
57 : typedef struct BrinShared
58 : {
59 : /*
60 : * These fields are not modified during the build. They primarily exist
61 : * for the benefit of worker processes that need to create state
62 : * corresponding to that used by the leader.
63 : */
64 : Oid heaprelid;
65 : Oid indexrelid;
66 : bool isconcurrent;
67 : BlockNumber pagesPerRange;
68 : int scantuplesortstates;
69 :
70 : /* Query ID, for report in worker processes */
71 : uint64 queryid;
72 :
73 : /*
74 : * workersdonecv is used to monitor the progress of workers. All parallel
75 : * participants must indicate that they are done before leader can use
76 : * results built by the workers (and before leader can write the data into
77 : * the index).
78 : */
79 : ConditionVariable workersdonecv;
80 :
81 : /*
82 : * mutex protects all fields before heapdesc.
83 : *
84 : * These fields contain status information of interest to BRIN index
85 : * builds that must work just the same when an index is built in parallel.
86 : */
87 : slock_t mutex;
88 :
89 : /*
90 : * Mutable state that is maintained by workers, and reported back to
91 : * leader at end of the scans.
92 : *
93 : * nparticipantsdone is number of worker processes finished.
94 : *
95 : * reltuples is the total number of input heap tuples.
96 : *
97 : * indtuples is the total number of tuples that made it into the index.
98 : */
99 : int nparticipantsdone;
100 : double reltuples;
101 : double indtuples;
102 :
103 : /*
104 : * ParallelTableScanDescData data follows. Can't directly embed here, as
105 : * implementations of the parallel table scan desc interface might need
106 : * stronger alignment.
107 : */
108 : } BrinShared;
109 :
110 : /*
111 : * Return pointer to a BrinShared's parallel table scan.
112 : *
113 : * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just
114 : * MAXALIGN.
115 : */
116 : #define ParallelTableScanFromBrinShared(shared) \
117 : (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(BrinShared)))
118 :
119 : /*
120 : * Status for leader in parallel index build.
121 : */
122 : typedef struct BrinLeader
123 : {
124 : /* parallel context itself */
125 : ParallelContext *pcxt;
126 :
127 : /*
128 : * nparticipanttuplesorts is the exact number of worker processes
129 : * successfully launched, plus one leader process if it participates as a
130 : * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader
131 : * participating as a worker).
132 : */
133 : int nparticipanttuplesorts;
134 :
135 : /*
136 : * Leader process convenience pointers to shared state (leader avoids TOC
137 : * lookups).
138 : *
139 : * brinshared is the shared state for entire build. sharedsort is the
140 : * shared, tuplesort-managed state passed to each process tuplesort.
141 : * snapshot is the snapshot used by the scan iff an MVCC snapshot is
142 : * required.
143 : */
144 : BrinShared *brinshared;
145 : Sharedsort *sharedsort;
146 : Snapshot snapshot;
147 : WalUsage *walusage;
148 : BufferUsage *bufferusage;
149 : } BrinLeader;
150 :
151 : /*
152 : * We use a BrinBuildState during initial construction of a BRIN index.
153 : * The running state is kept in a BrinMemTuple.
154 : */
155 : typedef struct BrinBuildState
156 : {
157 : Relation bs_irel;
158 : double bs_numtuples;
159 : double bs_reltuples;
160 : Buffer bs_currentInsertBuf;
161 : BlockNumber bs_pagesPerRange;
162 : BlockNumber bs_currRangeStart;
163 : BlockNumber bs_maxRangeStart;
164 : BrinRevmap *bs_rmAccess;
165 : BrinDesc *bs_bdesc;
166 : BrinMemTuple *bs_dtuple;
167 :
168 : BrinTuple *bs_emptyTuple;
169 : Size bs_emptyTupleLen;
170 : MemoryContext bs_context;
171 :
172 : /*
173 : * bs_leader is only present when a parallel index build is performed, and
174 : * only in the leader process. (Actually, only the leader process has a
175 : * BrinBuildState.)
176 : */
177 : BrinLeader *bs_leader;
178 : int bs_worker_id;
179 :
180 : /*
181 : * The sortstate is used by workers (including the leader). It has to be
182 : * part of the build state, because that's the only thing passed to the
183 : * build callback etc.
184 : */
185 : Tuplesortstate *bs_sortstate;
186 : } BrinBuildState;
187 :
188 : /*
189 : * We use a BrinInsertState to capture running state spanning multiple
190 : * brininsert invocations, within the same command.
191 : */
192 : typedef struct BrinInsertState
193 : {
194 : BrinRevmap *bis_rmAccess;
195 : BrinDesc *bis_desc;
196 : BlockNumber bis_pages_per_range;
197 : } BrinInsertState;
198 :
199 : /*
200 : * Struct used as "opaque" during index scans
201 : */
202 : typedef struct BrinOpaque
203 : {
204 : BlockNumber bo_pagesPerRange;
205 : BrinRevmap *bo_rmAccess;
206 : BrinDesc *bo_bdesc;
207 : } BrinOpaque;
208 :
209 : #define BRIN_ALL_BLOCKRANGES InvalidBlockNumber
210 :
211 : static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
212 : BrinRevmap *revmap,
213 : BlockNumber pagesPerRange,
214 : BlockNumber tablePages);
215 : static BrinInsertState *initialize_brin_insertstate(Relation idxRel, IndexInfo *indexInfo);
216 : static void terminate_brin_buildstate(BrinBuildState *state);
217 : static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
218 : bool include_partial, double *numSummarized, double *numExisting);
219 : static void form_and_insert_tuple(BrinBuildState *state);
220 : static void form_and_spill_tuple(BrinBuildState *state);
221 : static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
222 : BrinTuple *b);
223 : static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
224 : static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
225 : BrinMemTuple *dtup, const Datum *values, const bool *nulls);
226 : static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
227 : static void brin_fill_empty_ranges(BrinBuildState *state,
228 : BlockNumber prevRange, BlockNumber nextRange);
229 :
230 : /* parallel index builds */
231 : static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
232 : bool isconcurrent, int request);
233 : static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state);
234 : static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
235 : static double _brin_parallel_heapscan(BrinBuildState *state);
236 : static double _brin_parallel_merge(BrinBuildState *state);
237 : static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
238 : Relation heap, Relation index);
239 : static void _brin_parallel_scan_and_build(BrinBuildState *state,
240 : BrinShared *brinshared,
241 : Sharedsort *sharedsort,
242 : Relation heap, Relation index,
243 : int sortmem, bool progress);
244 :
245 : /*
246 : * BRIN handler function: return IndexAmRoutine with access method parameters
247 : * and callbacks.
248 : */
249 : Datum
250 2410 : brinhandler(PG_FUNCTION_ARGS)
251 : {
252 2410 : IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
253 :
254 2410 : amroutine->amstrategies = 0;
255 2410 : amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
256 2410 : amroutine->amoptsprocnum = BRIN_PROCNUM_OPTIONS;
257 2410 : amroutine->amcanorder = false;
258 2410 : amroutine->amcanorderbyop = false;
259 2410 : amroutine->amcanbackward = false;
260 2410 : amroutine->amcanunique = false;
261 2410 : amroutine->amcanmulticol = true;
262 2410 : amroutine->amoptionalkey = true;
263 2410 : amroutine->amsearcharray = false;
264 2410 : amroutine->amsearchnulls = true;
265 2410 : amroutine->amstorage = true;
266 2410 : amroutine->amclusterable = false;
267 2410 : amroutine->ampredlocks = false;
268 2410 : amroutine->amcanparallel = false;
269 2410 : amroutine->amcanbuildparallel = true;
270 2410 : amroutine->amcaninclude = false;
271 2410 : amroutine->amusemaintenanceworkmem = false;
272 2410 : amroutine->amsummarizing = true;
273 2410 : amroutine->amparallelvacuumoptions =
274 : VACUUM_OPTION_PARALLEL_CLEANUP;
275 2410 : amroutine->amkeytype = InvalidOid;
276 :
277 2410 : amroutine->ambuild = brinbuild;
278 2410 : amroutine->ambuildempty = brinbuildempty;
279 2410 : amroutine->aminsert = brininsert;
280 2410 : amroutine->aminsertcleanup = brininsertcleanup;
281 2410 : amroutine->ambulkdelete = brinbulkdelete;
282 2410 : amroutine->amvacuumcleanup = brinvacuumcleanup;
283 2410 : amroutine->amcanreturn = NULL;
284 2410 : amroutine->amcostestimate = brincostestimate;
285 2410 : amroutine->amgettreeheight = NULL;
286 2410 : amroutine->amoptions = brinoptions;
287 2410 : amroutine->amproperty = NULL;
288 2410 : amroutine->ambuildphasename = NULL;
289 2410 : amroutine->amvalidate = brinvalidate;
290 2410 : amroutine->amadjustmembers = NULL;
291 2410 : amroutine->ambeginscan = brinbeginscan;
292 2410 : amroutine->amrescan = brinrescan;
293 2410 : amroutine->amgettuple = NULL;
294 2410 : amroutine->amgetbitmap = bringetbitmap;
295 2410 : amroutine->amendscan = brinendscan;
296 2410 : amroutine->ammarkpos = NULL;
297 2410 : amroutine->amrestrpos = NULL;
298 2410 : amroutine->amestimateparallelscan = NULL;
299 2410 : amroutine->aminitparallelscan = NULL;
300 2410 : amroutine->amparallelrescan = NULL;
301 2410 : amroutine->amtranslatestrategy = NULL;
302 2410 : amroutine->amtranslatecmptype = NULL;
303 :
304 2410 : PG_RETURN_POINTER(amroutine);
305 : }
306 :
307 : /*
308 : * Initialize a BrinInsertState to maintain state to be used across multiple
309 : * tuple inserts, within the same command.
310 : */
311 : static BrinInsertState *
312 1120 : initialize_brin_insertstate(Relation idxRel, IndexInfo *indexInfo)
313 : {
314 : BrinInsertState *bistate;
315 : MemoryContext oldcxt;
316 :
317 1120 : oldcxt = MemoryContextSwitchTo(indexInfo->ii_Context);
318 1120 : bistate = palloc0(sizeof(BrinInsertState));
319 1120 : bistate->bis_desc = brin_build_desc(idxRel);
320 1120 : bistate->bis_rmAccess = brinRevmapInitialize(idxRel,
321 : &bistate->bis_pages_per_range);
322 1120 : indexInfo->ii_AmCache = bistate;
323 1120 : MemoryContextSwitchTo(oldcxt);
324 :
325 1120 : return bistate;
326 : }
327 :
328 : /*
329 : * A tuple in the heap is being inserted. To keep a brin index up to date,
330 : * we need to obtain the relevant index tuple and compare its stored values
331 : * with those of the new tuple. If the tuple values are not consistent with
332 : * the summary tuple, we need to update the index tuple.
333 : *
334 : * If autosummarization is enabled, check if we need to summarize the previous
335 : * page range.
336 : *
337 : * If the range is not currently summarized (i.e. the revmap returns NULL for
338 : * it), there's nothing to do for this tuple.
339 : */
340 : bool
341 126038 : brininsert(Relation idxRel, Datum *values, bool *nulls,
342 : ItemPointer heaptid, Relation heapRel,
343 : IndexUniqueCheck checkUnique,
344 : bool indexUnchanged,
345 : IndexInfo *indexInfo)
346 : {
347 : BlockNumber pagesPerRange;
348 : BlockNumber origHeapBlk;
349 : BlockNumber heapBlk;
350 126038 : BrinInsertState *bistate = (BrinInsertState *) indexInfo->ii_AmCache;
351 : BrinRevmap *revmap;
352 : BrinDesc *bdesc;
353 126038 : Buffer buf = InvalidBuffer;
354 126038 : MemoryContext tupcxt = NULL;
355 126038 : MemoryContext oldcxt = CurrentMemoryContext;
356 126038 : bool autosummarize = BrinGetAutoSummarize(idxRel);
357 :
358 : /*
359 : * If first time through in this statement, initialize the insert state
360 : * that we keep for all the inserts in the command.
361 : */
362 126038 : if (!bistate)
363 1120 : bistate = initialize_brin_insertstate(idxRel, indexInfo);
364 :
365 126038 : revmap = bistate->bis_rmAccess;
366 126038 : bdesc = bistate->bis_desc;
367 126038 : pagesPerRange = bistate->bis_pages_per_range;
368 :
369 : /*
370 : * origHeapBlk is the block number where the insertion occurred. heapBlk
371 : * is the first block in the corresponding page range.
372 : */
373 126038 : origHeapBlk = ItemPointerGetBlockNumber(heaptid);
374 126038 : heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
375 :
376 : for (;;)
377 0 : {
378 126038 : bool need_insert = false;
379 : OffsetNumber off;
380 : BrinTuple *brtup;
381 : BrinMemTuple *dtup;
382 :
383 126038 : CHECK_FOR_INTERRUPTS();
384 :
385 : /*
386 : * If auto-summarization is enabled and we just inserted the first
387 : * tuple into the first block of a new non-first page range, request a
388 : * summarization run of the previous range.
389 : */
390 126038 : if (autosummarize &&
391 156 : heapBlk > 0 &&
392 156 : heapBlk == origHeapBlk &&
393 156 : ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
394 : {
395 8 : BlockNumber lastPageRange = heapBlk - 1;
396 : BrinTuple *lastPageTuple;
397 :
398 : lastPageTuple =
399 8 : brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
400 : NULL, BUFFER_LOCK_SHARE);
401 8 : if (!lastPageTuple)
402 : {
403 : bool recorded;
404 :
405 6 : recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
406 : RelationGetRelid(idxRel),
407 : lastPageRange);
408 6 : if (!recorded)
409 0 : ereport(LOG,
410 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
411 : errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
412 : RelationGetRelationName(idxRel),
413 : lastPageRange)));
414 : }
415 : else
416 2 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
417 : }
418 :
419 126038 : brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
420 : NULL, BUFFER_LOCK_SHARE);
421 :
422 : /* if range is unsummarized, there's nothing to do */
423 126038 : if (!brtup)
424 78120 : break;
425 :
426 : /* First time through in this brininsert call? */
427 47918 : if (tupcxt == NULL)
428 : {
429 47918 : tupcxt = AllocSetContextCreate(CurrentMemoryContext,
430 : "brininsert cxt",
431 : ALLOCSET_DEFAULT_SIZES);
432 47918 : MemoryContextSwitchTo(tupcxt);
433 : }
434 :
435 47918 : dtup = brin_deform_tuple(bdesc, brtup, NULL);
436 :
437 47918 : need_insert = add_values_to_range(idxRel, bdesc, dtup, values, nulls);
438 :
439 47918 : if (!need_insert)
440 : {
441 : /*
442 : * The tuple is consistent with the new values, so there's nothing
443 : * to do.
444 : */
445 23992 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
446 : }
447 : else
448 : {
449 23926 : Page page = BufferGetPage(buf);
450 23926 : ItemId lp = PageGetItemId(page, off);
451 : Size origsz;
452 : BrinTuple *origtup;
453 : Size newsz;
454 : BrinTuple *newtup;
455 : bool samepage;
456 :
457 : /*
458 : * Make a copy of the old tuple, so that we can compare it after
459 : * re-acquiring the lock.
460 : */
461 23926 : origsz = ItemIdGetLength(lp);
462 23926 : origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
463 :
464 : /*
465 : * Before releasing the lock, check if we can attempt a same-page
466 : * update. Another process could insert a tuple concurrently in
467 : * the same page though, so downstream we must be prepared to cope
468 : * if this turns out to not be possible after all.
469 : */
470 23926 : newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
471 23926 : samepage = brin_can_do_samepage_update(buf, origsz, newsz);
472 23926 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
473 :
474 : /*
475 : * Try to update the tuple. If this doesn't work for whatever
476 : * reason, we need to restart from the top; the revmap might be
477 : * pointing at a different tuple for this block now, so we need to
478 : * recompute to ensure both our new heap tuple and the other
479 : * inserter's are covered by the combined tuple. It might be that
480 : * we don't need to update at all.
481 : */
482 23926 : if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
483 : buf, off, origtup, origsz, newtup, newsz,
484 : samepage))
485 : {
486 : /* no luck; start over */
487 0 : MemoryContextReset(tupcxt);
488 0 : continue;
489 : }
490 : }
491 :
492 : /* success! */
493 47918 : break;
494 : }
495 :
496 126038 : if (BufferIsValid(buf))
497 47920 : ReleaseBuffer(buf);
498 126038 : MemoryContextSwitchTo(oldcxt);
499 126038 : if (tupcxt != NULL)
500 47918 : MemoryContextDelete(tupcxt);
501 :
502 126038 : return false;
503 : }
504 :
505 : /*
506 : * Callback to clean up the BrinInsertState once all tuple inserts are done.
507 : */
508 : void
509 1154 : brininsertcleanup(Relation index, IndexInfo *indexInfo)
510 : {
511 1154 : BrinInsertState *bistate = (BrinInsertState *) indexInfo->ii_AmCache;
512 :
513 : /* bail out if cache not initialized */
514 1154 : if (bistate == NULL)
515 34 : return;
516 :
517 : /* do this first to avoid dangling pointer if we fail partway through */
518 1120 : indexInfo->ii_AmCache = NULL;
519 :
520 : /*
521 : * Clean up the revmap. Note that the brinDesc has already been cleaned up
522 : * as part of its own memory context.
523 : */
524 1120 : brinRevmapTerminate(bistate->bis_rmAccess);
525 1120 : pfree(bistate);
526 : }
527 :
528 : /*
529 : * Initialize state for a BRIN index scan.
530 : *
531 : * We read the metapage here to determine the pages-per-range number that this
532 : * index was built with. Note that since this cannot be changed while we're
533 : * holding lock on index, it's not necessary to recompute it during brinrescan.
534 : */
535 : IndexScanDesc
536 2946 : brinbeginscan(Relation r, int nkeys, int norderbys)
537 : {
538 : IndexScanDesc scan;
539 : BrinOpaque *opaque;
540 :
541 2946 : scan = RelationGetIndexScan(r, nkeys, norderbys);
542 :
543 2946 : opaque = palloc_object(BrinOpaque);
544 2946 : opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange);
545 2946 : opaque->bo_bdesc = brin_build_desc(r);
546 2946 : scan->opaque = opaque;
547 :
548 2946 : return scan;
549 : }
550 :
551 : /*
552 : * Execute the index scan.
553 : *
554 : * This works by reading index TIDs from the revmap, and obtaining the index
555 : * tuples pointed to by them; the summary values in the index tuples are
556 : * compared to the scan keys. We return into the TID bitmap all the pages in
557 : * ranges corresponding to index tuples that match the scan keys.
558 : *
559 : * If a TID from the revmap is read as InvalidTID, we know that range is
560 : * unsummarized. Pages in those ranges need to be returned regardless of scan
561 : * keys.
562 : */
563 : int64
564 2946 : bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
565 : {
566 2946 : Relation idxRel = scan->indexRelation;
567 2946 : Buffer buf = InvalidBuffer;
568 : BrinDesc *bdesc;
569 : Oid heapOid;
570 : Relation heapRel;
571 : BrinOpaque *opaque;
572 : BlockNumber nblocks;
573 : BlockNumber heapBlk;
574 2946 : int64 totalpages = 0;
575 : FmgrInfo *consistentFn;
576 : MemoryContext oldcxt;
577 : MemoryContext perRangeCxt;
578 : BrinMemTuple *dtup;
579 2946 : BrinTuple *btup = NULL;
580 2946 : Size btupsz = 0;
581 : ScanKey **keys,
582 : **nullkeys;
583 : int *nkeys,
584 : *nnullkeys;
585 : char *ptr;
586 : Size len;
587 : char *tmp PG_USED_FOR_ASSERTS_ONLY;
588 :
589 2946 : opaque = (BrinOpaque *) scan->opaque;
590 2946 : bdesc = opaque->bo_bdesc;
591 2946 : pgstat_count_index_scan(idxRel);
592 :
593 : /*
594 : * We need to know the size of the table so that we know how long to
595 : * iterate on the revmap.
596 : */
597 2946 : heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
598 2946 : heapRel = table_open(heapOid, AccessShareLock);
599 2946 : nblocks = RelationGetNumberOfBlocks(heapRel);
600 2946 : table_close(heapRel, AccessShareLock);
601 :
602 : /*
603 : * Make room for the consistent support procedures of indexed columns. We
604 : * don't look them up here; we do that lazily the first time we see a scan
605 : * key reference each of them. We rely on zeroing fn_oid to InvalidOid.
606 : */
607 2946 : consistentFn = palloc0_array(FmgrInfo, bdesc->bd_tupdesc->natts);
608 :
609 : /*
610 : * Make room for per-attribute lists of scan keys that we'll pass to the
611 : * consistent support procedure. We don't know which attributes have scan
612 : * keys, so we allocate space for all attributes. That may use more memory
613 : * but it's probably cheaper than determining which attributes are used.
614 : *
615 : * We keep null and regular keys separate, so that we can pass just the
616 : * regular keys to the consistent function easily.
617 : *
618 : * To reduce the allocation overhead, we allocate one big chunk and then
619 : * carve it into smaller arrays ourselves. All the pieces have exactly the
620 : * same lifetime, so that's OK.
621 : *
622 : * XXX The widest index can have 32 attributes, so the amount of wasted
623 : * memory is negligible. We could invent a more compact approach (with
624 : * just space for used attributes) but that would make the matching more
625 : * complex so it's not a good trade-off.
626 : */
627 2946 : len =
628 2946 : MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) + /* regular keys */
629 2946 : MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
630 2946 : MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts) +
631 2946 : MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) + /* NULL keys */
632 2946 : MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
633 2946 : MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
634 :
635 2946 : ptr = palloc(len);
636 2946 : tmp = ptr;
637 :
638 2946 : keys = (ScanKey **) ptr;
639 2946 : ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
640 :
641 2946 : nullkeys = (ScanKey **) ptr;
642 2946 : ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
643 :
644 2946 : nkeys = (int *) ptr;
645 2946 : ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
646 :
647 2946 : nnullkeys = (int *) ptr;
648 2946 : ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
649 :
650 69978 : for (int i = 0; i < bdesc->bd_tupdesc->natts; i++)
651 : {
652 67032 : keys[i] = (ScanKey *) ptr;
653 67032 : ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
654 :
655 67032 : nullkeys[i] = (ScanKey *) ptr;
656 67032 : ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
657 : }
658 :
659 : Assert(tmp + len == ptr);
660 :
661 : /* zero the number of keys */
662 2946 : memset(nkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
663 2946 : memset(nnullkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
664 :
665 : /* Preprocess the scan keys - split them into per-attribute arrays. */
666 5892 : for (int keyno = 0; keyno < scan->numberOfKeys; keyno++)
667 : {
668 2946 : ScanKey key = &scan->keyData[keyno];
669 2946 : AttrNumber keyattno = key->sk_attno;
670 :
671 : /*
672 : * The collation of the scan key must match the collation used in the
673 : * index column (but only if the search is not IS NULL/ IS NOT NULL).
674 : * Otherwise we shouldn't be using this index ...
675 : */
676 : Assert((key->sk_flags & SK_ISNULL) ||
677 : (key->sk_collation ==
678 : TupleDescAttr(bdesc->bd_tupdesc,
679 : keyattno - 1)->attcollation));
680 :
681 : /*
682 : * First time we see this index attribute, so init as needed.
683 : *
684 : * This is a bit of an overkill - we don't know how many scan keys are
685 : * there for this attribute, so we simply allocate the largest number
686 : * possible (as if all keys were for this attribute). This may waste a
687 : * bit of memory, but we only expect small number of scan keys in
688 : * general, so this should be negligible, and repeated repalloc calls
689 : * are not free either.
690 : */
691 2946 : if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
692 : {
693 : FmgrInfo *tmp;
694 :
695 : /* First time we see this attribute, so no key/null keys. */
696 : Assert(nkeys[keyattno - 1] == 0);
697 : Assert(nnullkeys[keyattno - 1] == 0);
698 :
699 2946 : tmp = index_getprocinfo(idxRel, keyattno,
700 : BRIN_PROCNUM_CONSISTENT);
701 2946 : fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
702 : CurrentMemoryContext);
703 : }
704 :
705 : /* Add key to the proper per-attribute array. */
706 2946 : if (key->sk_flags & SK_ISNULL)
707 : {
708 36 : nullkeys[keyattno - 1][nnullkeys[keyattno - 1]] = key;
709 36 : nnullkeys[keyattno - 1]++;
710 : }
711 : else
712 : {
713 2910 : keys[keyattno - 1][nkeys[keyattno - 1]] = key;
714 2910 : nkeys[keyattno - 1]++;
715 : }
716 : }
717 :
718 : /* allocate an initial in-memory tuple, out of the per-range memcxt */
719 2946 : dtup = brin_new_memtuple(bdesc);
720 :
721 : /*
722 : * Setup and use a per-range memory context, which is reset every time we
723 : * loop below. This avoids having to free the tuples within the loop.
724 : */
725 2946 : perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
726 : "bringetbitmap cxt",
727 : ALLOCSET_DEFAULT_SIZES);
728 2946 : oldcxt = MemoryContextSwitchTo(perRangeCxt);
729 :
730 : /*
731 : * Now scan the revmap. We start by querying for heap page 0,
732 : * incrementing by the number of pages per range; this gives us a full
733 : * view of the table.
734 : */
735 194598 : for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
736 : {
737 : bool addrange;
738 191652 : bool gottuple = false;
739 : BrinTuple *tup;
740 : OffsetNumber off;
741 : Size size;
742 :
743 191652 : CHECK_FOR_INTERRUPTS();
744 :
745 191652 : MemoryContextReset(perRangeCxt);
746 :
747 191652 : tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
748 : &off, &size, BUFFER_LOCK_SHARE);
749 191652 : if (tup)
750 : {
751 189936 : gottuple = true;
752 189936 : btup = brin_copy_tuple(tup, size, btup, &btupsz);
753 189936 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
754 : }
755 :
756 : /*
757 : * For page ranges with no indexed tuple, we must return the whole
758 : * range; otherwise, compare it to the scan keys.
759 : */
760 191652 : if (!gottuple)
761 : {
762 1716 : addrange = true;
763 : }
764 : else
765 : {
766 189936 : dtup = brin_deform_tuple(bdesc, btup, dtup);
767 189936 : if (dtup->bt_placeholder)
768 : {
769 : /*
770 : * Placeholder tuples are always returned, regardless of the
771 : * values stored in them.
772 : */
773 0 : addrange = true;
774 : }
775 : else
776 : {
777 : int attno;
778 :
779 : /*
780 : * Compare scan keys with summary values stored for the range.
781 : * If scan keys are matched, the page range must be added to
782 : * the bitmap. We initially assume the range needs to be
783 : * added; in particular this serves the case where there are
784 : * no keys.
785 : */
786 189936 : addrange = true;
787 4704068 : for (attno = 1; attno <= bdesc->bd_tupdesc->natts; attno++)
788 : {
789 : BrinValues *bval;
790 : Datum add;
791 : Oid collation;
792 :
793 : /*
794 : * skip attributes without any scan keys (both regular and
795 : * IS [NOT] NULL)
796 : */
797 4567734 : if (nkeys[attno - 1] == 0 && nnullkeys[attno - 1] == 0)
798 4377798 : continue;
799 :
800 189936 : bval = &dtup->bt_columns[attno - 1];
801 :
802 : /*
803 : * If the BRIN tuple indicates that this range is empty,
804 : * we can skip it: there's nothing to match. We don't
805 : * need to examine the next columns.
806 : */
807 189936 : if (dtup->bt_empty_range)
808 : {
809 0 : addrange = false;
810 0 : break;
811 : }
812 :
813 : /*
814 : * First check if there are any IS [NOT] NULL scan keys,
815 : * and if we're violating them. In that case we can
816 : * terminate early, without invoking the support function.
817 : *
818 : * As there may be more keys, we can only determine
819 : * mismatch within this loop.
820 : */
821 189936 : if (bdesc->bd_info[attno - 1]->oi_regular_nulls &&
822 189936 : !check_null_keys(bval, nullkeys[attno - 1],
823 189936 : nnullkeys[attno - 1]))
824 : {
825 : /*
826 : * If any of the IS [NOT] NULL keys failed, the page
827 : * range as a whole can't pass. So terminate the loop.
828 : */
829 996 : addrange = false;
830 996 : break;
831 : }
832 :
833 : /*
834 : * So either there are no IS [NOT] NULL keys, or all
835 : * passed. If there are no regular scan keys, we're done -
836 : * the page range matches. If there are regular keys, but
837 : * the page range is marked as 'all nulls' it can't
838 : * possibly pass (we're assuming the operators are
839 : * strict).
840 : */
841 :
842 : /* No regular scan keys - page range as a whole passes. */
843 188940 : if (!nkeys[attno - 1])
844 1236 : continue;
845 :
846 : Assert((nkeys[attno - 1] > 0) &&
847 : (nkeys[attno - 1] <= scan->numberOfKeys));
848 :
849 : /* If it is all nulls, it cannot possibly be consistent. */
850 187704 : if (bval->bv_allnulls)
851 : {
852 378 : addrange = false;
853 378 : break;
854 : }
855 :
856 : /*
857 : * Collation from the first key (has to be the same for
858 : * all keys for the same attribute).
859 : */
860 187326 : collation = keys[attno - 1][0]->sk_collation;
861 :
862 : /*
863 : * Check whether the scan key is consistent with the page
864 : * range values; if so, have the pages in the range added
865 : * to the output bitmap.
866 : *
867 : * The opclass may or may not support processing of
868 : * multiple scan keys. We can determine that based on the
869 : * number of arguments - functions with extra parameter
870 : * (number of scan keys) do support this, otherwise we
871 : * have to simply pass the scan keys one by one.
872 : */
873 187326 : if (consistentFn[attno - 1].fn_nargs >= 4)
874 : {
875 : /* Check all keys at once */
876 39594 : add = FunctionCall4Coll(&consistentFn[attno - 1],
877 : collation,
878 : PointerGetDatum(bdesc),
879 : PointerGetDatum(bval),
880 39594 : PointerGetDatum(keys[attno - 1]),
881 39594 : Int32GetDatum(nkeys[attno - 1]));
882 39594 : addrange = DatumGetBool(add);
883 : }
884 : else
885 : {
886 : /*
887 : * Check keys one by one
888 : *
889 : * When there are multiple scan keys, failure to meet
890 : * the criteria for a single one of them is enough to
891 : * discard the range as a whole, so break out of the
892 : * loop as soon as a false return value is obtained.
893 : */
894 : int keyno;
895 :
896 258078 : for (keyno = 0; keyno < nkeys[attno - 1]; keyno++)
897 : {
898 147732 : add = FunctionCall3Coll(&consistentFn[attno - 1],
899 147732 : keys[attno - 1][keyno]->sk_collation,
900 : PointerGetDatum(bdesc),
901 : PointerGetDatum(bval),
902 147732 : PointerGetDatum(keys[attno - 1][keyno]));
903 147732 : addrange = DatumGetBool(add);
904 147732 : if (!addrange)
905 37386 : break;
906 : }
907 : }
908 :
909 : /*
910 : * If we found a scan key eliminating the range, no need
911 : * to check additional ones.
912 : */
913 187326 : if (!addrange)
914 52228 : break;
915 : }
916 : }
917 : }
918 :
919 : /* add the pages in the range to the output bitmap, if needed */
920 191652 : if (addrange)
921 : {
922 : BlockNumber pageno;
923 :
924 138050 : for (pageno = heapBlk;
925 286020 : pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
926 147970 : pageno++)
927 : {
928 147970 : MemoryContextSwitchTo(oldcxt);
929 147970 : tbm_add_page(tbm, pageno);
930 147970 : totalpages++;
931 147970 : MemoryContextSwitchTo(perRangeCxt);
932 : }
933 : }
934 : }
935 :
936 2946 : MemoryContextSwitchTo(oldcxt);
937 2946 : MemoryContextDelete(perRangeCxt);
938 :
939 2946 : if (buf != InvalidBuffer)
940 2946 : ReleaseBuffer(buf);
941 :
942 : /*
943 : * XXX We have an approximation of the number of *pages* that our scan
944 : * returns, but we don't have a precise idea of the number of heap tuples
945 : * involved.
946 : */
947 2946 : return totalpages * 10;
948 : }
949 :
950 : /*
951 : * Re-initialize state for a BRIN index scan
952 : */
953 : void
954 2946 : brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
955 : ScanKey orderbys, int norderbys)
956 : {
957 : /*
958 : * Other index AMs preprocess the scan keys at this point, or sometime
959 : * early during the scan; this lets them optimize by removing redundant
960 : * keys, or doing early returns when they are impossible to satisfy; see
961 : * _bt_preprocess_keys for an example. Something like that could be added
962 : * here someday, too.
963 : */
964 :
965 2946 : if (scankey && scan->numberOfKeys > 0)
966 2946 : memcpy(scan->keyData, scankey, scan->numberOfKeys * sizeof(ScanKeyData));
967 2946 : }
968 :
969 : /*
970 : * Close down a BRIN index scan
971 : */
972 : void
973 2946 : brinendscan(IndexScanDesc scan)
974 : {
975 2946 : BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
976 :
977 2946 : brinRevmapTerminate(opaque->bo_rmAccess);
978 2946 : brin_free_desc(opaque->bo_bdesc);
979 2946 : pfree(opaque);
980 2946 : }
981 :
982 : /*
983 : * Per-heap-tuple callback for table_index_build_scan.
984 : *
985 : * Note we don't worry about the page range at the end of the table here; it is
986 : * present in the build state struct after we're called the last time, but not
987 : * inserted into the index. Caller must ensure to do so, if appropriate.
988 : */
989 : static void
990 728306 : brinbuildCallback(Relation index,
991 : ItemPointer tid,
992 : Datum *values,
993 : bool *isnull,
994 : bool tupleIsAlive,
995 : void *brstate)
996 : {
997 728306 : BrinBuildState *state = (BrinBuildState *) brstate;
998 : BlockNumber thisblock;
999 :
1000 728306 : thisblock = ItemPointerGetBlockNumber(tid);
1001 :
1002 : /*
1003 : * If we're in a block that belongs to a future range, summarize what
1004 : * we've got and start afresh. Note the scan might have skipped many
1005 : * pages, if they were devoid of live tuples; make sure to insert index
1006 : * tuples for those too.
1007 : */
1008 730602 : while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
1009 : {
1010 :
1011 : BRIN_elog((DEBUG2,
1012 : "brinbuildCallback: completed a range: %u--%u",
1013 : state->bs_currRangeStart,
1014 : state->bs_currRangeStart + state->bs_pagesPerRange));
1015 :
1016 : /* create the index tuple and insert it */
1017 2296 : form_and_insert_tuple(state);
1018 :
1019 : /* set state to correspond to the next range */
1020 2296 : state->bs_currRangeStart += state->bs_pagesPerRange;
1021 :
1022 : /* re-initialize state for it */
1023 2296 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1024 : }
1025 :
1026 : /* Accumulate the current tuple into the running state */
1027 728306 : (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
1028 : values, isnull);
1029 728306 : }
1030 :
1031 : /*
1032 : * Per-heap-tuple callback for table_index_build_scan with parallelism.
1033 : *
1034 : * A version of the callback used by parallel index builds. The main difference
1035 : * is that instead of writing the BRIN tuples into the index, we write them
1036 : * into a shared tuplesort, and leave the insertion up to the leader (which may
1037 : * reorder them a bit etc.). The callback also does not generate empty ranges,
1038 : * those will be added by the leader when merging results from workers.
1039 : */
1040 : static void
1041 7962 : brinbuildCallbackParallel(Relation index,
1042 : ItemPointer tid,
1043 : Datum *values,
1044 : bool *isnull,
1045 : bool tupleIsAlive,
1046 : void *brstate)
1047 : {
1048 7962 : BrinBuildState *state = (BrinBuildState *) brstate;
1049 : BlockNumber thisblock;
1050 :
1051 7962 : thisblock = ItemPointerGetBlockNumber(tid);
1052 :
1053 : /*
1054 : * If we're in a block that belongs to a different range, summarize what
1055 : * we've got and start afresh. Note the scan might have skipped many
1056 : * pages, if they were devoid of live tuples; we do not create empty BRIN
1057 : * ranges here - the leader is responsible for filling them in.
1058 : *
1059 : * Unlike serial builds, parallel index builds allow synchronized seqscans
1060 : * (because that's what parallel scans do). This means the block may wrap
1061 : * around to the beginning of the relation, so the condition needs to
1062 : * check for both future and past ranges.
1063 : */
1064 7962 : if ((thisblock < state->bs_currRangeStart) ||
1065 7962 : (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1))
1066 : {
1067 :
1068 : BRIN_elog((DEBUG2,
1069 : "brinbuildCallbackParallel: completed a range: %u--%u",
1070 : state->bs_currRangeStart,
1071 : state->bs_currRangeStart + state->bs_pagesPerRange));
1072 :
1073 : /* create the index tuple and write it into the tuplesort */
1074 38 : form_and_spill_tuple(state);
1075 :
1076 : /*
1077 : * Set state to correspond to the next range (for this block).
1078 : *
1079 : * This skips ranges that are either empty (and so we don't get any
1080 : * tuples to summarize), or processed by other workers. We can't
1081 : * differentiate those cases here easily, so we leave it up to the
1082 : * leader to fill empty ranges where needed.
1083 : */
1084 : state->bs_currRangeStart
1085 38 : = state->bs_pagesPerRange * (thisblock / state->bs_pagesPerRange);
1086 :
1087 : /* re-initialize state for it */
1088 38 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1089 : }
1090 :
1091 : /* Accumulate the current tuple into the running state */
1092 7962 : (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
1093 : values, isnull);
1094 7962 : }
1095 :
1096 : /*
1097 : * brinbuild() -- build a new BRIN index.
1098 : */
1099 : IndexBuildResult *
1100 366 : brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
1101 : {
1102 : IndexBuildResult *result;
1103 : double reltuples;
1104 : double idxtuples;
1105 : BrinRevmap *revmap;
1106 : BrinBuildState *state;
1107 : Buffer meta;
1108 : BlockNumber pagesPerRange;
1109 :
1110 : /*
1111 : * We expect to be called exactly once for any index relation.
1112 : */
1113 366 : if (RelationGetNumberOfBlocks(index) != 0)
1114 0 : elog(ERROR, "index \"%s\" already contains data",
1115 : RelationGetRelationName(index));
1116 :
1117 : /*
1118 : * Critical section not required, because on error the creation of the
1119 : * whole relation will be rolled back.
1120 : */
1121 :
1122 366 : meta = ExtendBufferedRel(BMR_REL(index), MAIN_FORKNUM, NULL,
1123 : EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
1124 : Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
1125 :
1126 366 : brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
1127 : BRIN_CURRENT_VERSION);
1128 366 : MarkBufferDirty(meta);
1129 :
1130 366 : if (RelationNeedsWAL(index))
1131 : {
1132 : xl_brin_createidx xlrec;
1133 : XLogRecPtr recptr;
1134 : Page page;
1135 :
1136 180 : xlrec.version = BRIN_CURRENT_VERSION;
1137 180 : xlrec.pagesPerRange = BrinGetPagesPerRange(index);
1138 :
1139 180 : XLogBeginInsert();
1140 180 : XLogRegisterData(&xlrec, SizeOfBrinCreateIdx);
1141 180 : XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
1142 :
1143 180 : recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
1144 :
1145 180 : page = BufferGetPage(meta);
1146 180 : PageSetLSN(page, recptr);
1147 : }
1148 :
1149 366 : UnlockReleaseBuffer(meta);
1150 :
1151 : /*
1152 : * Initialize our state, including the deformed tuple state.
1153 : */
1154 366 : revmap = brinRevmapInitialize(index, &pagesPerRange);
1155 366 : state = initialize_brin_buildstate(index, revmap, pagesPerRange,
1156 : RelationGetNumberOfBlocks(heap));
1157 :
1158 : /*
1159 : * Attempt to launch parallel worker scan when required
1160 : *
1161 : * XXX plan_create_index_workers makes the number of workers dependent on
1162 : * maintenance_work_mem, requiring 32MB for each worker. That makes sense
1163 : * for btree, but not for BRIN, which can do with much less memory. So
1164 : * maybe make that somehow less strict, optionally?
1165 : */
1166 366 : if (indexInfo->ii_ParallelWorkers > 0)
1167 10 : _brin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent,
1168 : indexInfo->ii_ParallelWorkers);
1169 :
1170 : /*
1171 : * If parallel build requested and at least one worker process was
1172 : * successfully launched, set up coordination state, wait for workers to
1173 : * complete. Then read all tuples from the shared tuplesort and insert
1174 : * them into the index.
1175 : *
1176 : * In serial mode, simply scan the table and build the index one index
1177 : * tuple at a time.
1178 : */
1179 366 : if (state->bs_leader)
1180 : {
1181 : SortCoordinate coordinate;
1182 :
1183 8 : coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData));
1184 8 : coordinate->isWorker = false;
1185 8 : coordinate->nParticipants =
1186 8 : state->bs_leader->nparticipanttuplesorts;
1187 8 : coordinate->sharedsort = state->bs_leader->sharedsort;
1188 :
1189 : /*
1190 : * Begin leader tuplesort.
1191 : *
1192 : * In cases where parallelism is involved, the leader receives the
1193 : * same share of maintenance_work_mem as a serial sort (it is
1194 : * generally treated in the same way as a serial sort once we return).
1195 : * Parallel worker Tuplesortstates will have received only a fraction
1196 : * of maintenance_work_mem, though.
1197 : *
1198 : * We rely on the lifetime of the Leader Tuplesortstate almost not
1199 : * overlapping with any worker Tuplesortstate's lifetime. There may
1200 : * be some small overlap, but that's okay because we rely on leader
1201 : * Tuplesortstate only allocating a small, fixed amount of memory
1202 : * here. When its tuplesort_performsort() is called (by our caller),
1203 : * and significant amounts of memory are likely to be used, all
1204 : * workers must have already freed almost all memory held by their
1205 : * Tuplesortstates (they are about to go away completely, too). The
1206 : * overall effect is that maintenance_work_mem always represents an
1207 : * absolute high watermark on the amount of memory used by a CREATE
1208 : * INDEX operation, regardless of the use of parallelism or any other
1209 : * factor.
1210 : */
1211 8 : state->bs_sortstate =
1212 8 : tuplesort_begin_index_brin(maintenance_work_mem, coordinate,
1213 : TUPLESORT_NONE);
1214 :
1215 : /* scan the relation and merge per-worker results */
1216 8 : reltuples = _brin_parallel_merge(state);
1217 :
1218 8 : _brin_end_parallel(state->bs_leader, state);
1219 : }
1220 : else /* no parallel index build */
1221 : {
1222 : /*
1223 : * Now scan the relation. No syncscan allowed here because we want
1224 : * the heap blocks in physical order (we want to produce the ranges
1225 : * starting from block 0, and the callback also relies on this to not
1226 : * generate summary for the same range twice).
1227 : */
1228 358 : reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
1229 : brinbuildCallback, state, NULL);
1230 :
1231 : /*
1232 : * process the final batch
1233 : *
1234 : * XXX Note this does not update state->bs_currRangeStart, i.e. it
1235 : * stays set to the last range added to the index. This is OK, because
1236 : * that's what brin_fill_empty_ranges expects.
1237 : */
1238 358 : form_and_insert_tuple(state);
1239 :
1240 : /*
1241 : * Backfill the final ranges with empty data.
1242 : *
1243 : * This saves us from doing what amounts to full table scans when the
1244 : * index with a predicate like WHERE (nonnull_column IS NULL), or
1245 : * other very selective predicates.
1246 : */
1247 358 : brin_fill_empty_ranges(state,
1248 : state->bs_currRangeStart,
1249 : state->bs_maxRangeStart);
1250 : }
1251 :
1252 : /* release resources */
1253 366 : idxtuples = state->bs_numtuples;
1254 366 : brinRevmapTerminate(state->bs_rmAccess);
1255 366 : terminate_brin_buildstate(state);
1256 :
1257 : /*
1258 : * Return statistics
1259 : */
1260 366 : result = palloc_object(IndexBuildResult);
1261 :
1262 366 : result->heap_tuples = reltuples;
1263 366 : result->index_tuples = idxtuples;
1264 :
1265 366 : return result;
1266 : }
1267 :
1268 : void
1269 6 : brinbuildempty(Relation index)
1270 : {
1271 : Buffer metabuf;
1272 :
1273 : /* An empty BRIN index has a metapage only. */
1274 6 : metabuf = ExtendBufferedRel(BMR_REL(index), INIT_FORKNUM, NULL,
1275 : EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
1276 :
1277 : /* Initialize and xlog metabuffer. */
1278 6 : START_CRIT_SECTION();
1279 6 : brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
1280 : BRIN_CURRENT_VERSION);
1281 6 : MarkBufferDirty(metabuf);
1282 6 : log_newpage_buffer(metabuf, true);
1283 6 : END_CRIT_SECTION();
1284 :
1285 6 : UnlockReleaseBuffer(metabuf);
1286 6 : }
1287 :
1288 : /*
1289 : * brinbulkdelete
1290 : * Since there are no per-heap-tuple index tuples in BRIN indexes,
1291 : * there's not a lot we can do here.
1292 : *
1293 : * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
1294 : * tuple is deleted), meaning the need to re-run summarization on the affected
1295 : * range. Would need to add an extra flag in brintuples for that.
1296 : */
1297 : IndexBulkDeleteResult *
1298 22 : brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
1299 : IndexBulkDeleteCallback callback, void *callback_state)
1300 : {
1301 : /* allocate stats if first time through, else re-use existing struct */
1302 22 : if (stats == NULL)
1303 22 : stats = palloc0_object(IndexBulkDeleteResult);
1304 :
1305 22 : return stats;
1306 : }
1307 :
1308 : /*
1309 : * This routine is in charge of "vacuuming" a BRIN index: we just summarize
1310 : * ranges that are currently unsummarized.
1311 : */
1312 : IndexBulkDeleteResult *
1313 90 : brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
1314 : {
1315 : Relation heapRel;
1316 :
1317 : /* No-op in ANALYZE ONLY mode */
1318 90 : if (info->analyze_only)
1319 4 : return stats;
1320 :
1321 86 : if (!stats)
1322 70 : stats = palloc0_object(IndexBulkDeleteResult);
1323 86 : stats->num_pages = RelationGetNumberOfBlocks(info->index);
1324 : /* rest of stats is initialized by zeroing */
1325 :
1326 86 : heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
1327 : AccessShareLock);
1328 :
1329 86 : brin_vacuum_scan(info->index, info->strategy);
1330 :
1331 86 : brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
1332 : &stats->num_index_tuples, &stats->num_index_tuples);
1333 :
1334 86 : table_close(heapRel, AccessShareLock);
1335 :
1336 86 : return stats;
1337 : }
1338 :
1339 : /*
1340 : * reloptions processor for BRIN indexes
1341 : */
1342 : bytea *
1343 1164 : brinoptions(Datum reloptions, bool validate)
1344 : {
1345 : static const relopt_parse_elt tab[] = {
1346 : {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
1347 : {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
1348 : };
1349 :
1350 1164 : return (bytea *) build_reloptions(reloptions, validate,
1351 : RELOPT_KIND_BRIN,
1352 : sizeof(BrinOptions),
1353 : tab, lengthof(tab));
1354 : }
1355 :
1356 : /*
1357 : * SQL-callable function to scan through an index and summarize all ranges
1358 : * that are not currently summarized.
1359 : */
1360 : Datum
1361 76 : brin_summarize_new_values(PG_FUNCTION_ARGS)
1362 : {
1363 76 : Datum relation = PG_GETARG_DATUM(0);
1364 :
1365 76 : return DirectFunctionCall2(brin_summarize_range,
1366 : relation,
1367 : Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
1368 : }
1369 :
1370 : /*
1371 : * SQL-callable function to summarize the indicated page range, if not already
1372 : * summarized. If the second argument is BRIN_ALL_BLOCKRANGES, all
1373 : * unsummarized ranges are summarized.
1374 : */
1375 : Datum
1376 204 : brin_summarize_range(PG_FUNCTION_ARGS)
1377 : {
1378 204 : Oid indexoid = PG_GETARG_OID(0);
1379 204 : int64 heapBlk64 = PG_GETARG_INT64(1);
1380 : BlockNumber heapBlk;
1381 : Oid heapoid;
1382 : Relation indexRel;
1383 : Relation heapRel;
1384 : Oid save_userid;
1385 : int save_sec_context;
1386 : int save_nestlevel;
1387 204 : double numSummarized = 0;
1388 :
1389 204 : if (RecoveryInProgress())
1390 0 : ereport(ERROR,
1391 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1392 : errmsg("recovery is in progress"),
1393 : errhint("BRIN control functions cannot be executed during recovery.")));
1394 :
1395 204 : if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
1396 36 : ereport(ERROR,
1397 : (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1398 : errmsg("block number out of range: %lld",
1399 : (long long) heapBlk64)));
1400 168 : heapBlk = (BlockNumber) heapBlk64;
1401 :
1402 : /*
1403 : * We must lock table before index to avoid deadlocks. However, if the
1404 : * passed indexoid isn't an index then IndexGetRelation() will fail.
1405 : * Rather than emitting a not-very-helpful error message, postpone
1406 : * complaining, expecting that the is-it-an-index test below will fail.
1407 : */
1408 168 : heapoid = IndexGetRelation(indexoid, true);
1409 168 : if (OidIsValid(heapoid))
1410 : {
1411 150 : heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
1412 :
1413 : /*
1414 : * Autovacuum calls us. For its benefit, switch to the table owner's
1415 : * userid, so that any index functions are run as that user. Also
1416 : * lock down security-restricted operations and arrange to make GUC
1417 : * variable changes local to this command. This is harmless, albeit
1418 : * unnecessary, when called from SQL, because we fail shortly if the
1419 : * user does not own the index.
1420 : */
1421 150 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
1422 150 : SetUserIdAndSecContext(heapRel->rd_rel->relowner,
1423 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
1424 150 : save_nestlevel = NewGUCNestLevel();
1425 150 : RestrictSearchPath();
1426 : }
1427 : else
1428 : {
1429 18 : heapRel = NULL;
1430 : /* Set these just to suppress "uninitialized variable" warnings */
1431 18 : save_userid = InvalidOid;
1432 18 : save_sec_context = -1;
1433 18 : save_nestlevel = -1;
1434 : }
1435 :
1436 168 : indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
1437 :
1438 : /* Must be a BRIN index */
1439 150 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1440 150 : indexRel->rd_rel->relam != BRIN_AM_OID)
1441 18 : ereport(ERROR,
1442 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1443 : errmsg("\"%s\" is not a BRIN index",
1444 : RelationGetRelationName(indexRel))));
1445 :
1446 : /* User must own the index (comparable to privileges needed for VACUUM) */
1447 132 : if (heapRel != NULL && !object_ownercheck(RelationRelationId, indexoid, save_userid))
1448 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1449 0 : RelationGetRelationName(indexRel));
1450 :
1451 : /*
1452 : * Since we did the IndexGetRelation call above without any lock, it's
1453 : * barely possible that a race against an index drop/recreation could have
1454 : * netted us the wrong table. Recheck.
1455 : */
1456 132 : if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
1457 0 : ereport(ERROR,
1458 : (errcode(ERRCODE_UNDEFINED_TABLE),
1459 : errmsg("could not open parent table of index \"%s\"",
1460 : RelationGetRelationName(indexRel))));
1461 :
1462 : /* see gin_clean_pending_list() */
1463 132 : if (indexRel->rd_index->indisvalid)
1464 132 : brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
1465 : else
1466 0 : ereport(DEBUG1,
1467 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1468 : errmsg("index \"%s\" is not valid",
1469 : RelationGetRelationName(indexRel))));
1470 :
1471 : /* Roll back any GUC changes executed by index functions */
1472 132 : AtEOXact_GUC(false, save_nestlevel);
1473 :
1474 : /* Restore userid and security context */
1475 132 : SetUserIdAndSecContext(save_userid, save_sec_context);
1476 :
1477 132 : relation_close(indexRel, ShareUpdateExclusiveLock);
1478 132 : relation_close(heapRel, ShareUpdateExclusiveLock);
1479 :
1480 132 : PG_RETURN_INT32((int32) numSummarized);
1481 : }
1482 :
1483 : /*
1484 : * SQL-callable interface to mark a range as no longer summarized
1485 : */
1486 : Datum
1487 104 : brin_desummarize_range(PG_FUNCTION_ARGS)
1488 : {
1489 104 : Oid indexoid = PG_GETARG_OID(0);
1490 104 : int64 heapBlk64 = PG_GETARG_INT64(1);
1491 : BlockNumber heapBlk;
1492 : Oid heapoid;
1493 : Relation heapRel;
1494 : Relation indexRel;
1495 : bool done;
1496 :
1497 104 : if (RecoveryInProgress())
1498 0 : ereport(ERROR,
1499 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1500 : errmsg("recovery is in progress"),
1501 : errhint("BRIN control functions cannot be executed during recovery.")));
1502 :
1503 104 : if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
1504 18 : ereport(ERROR,
1505 : (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1506 : errmsg("block number out of range: %lld",
1507 : (long long) heapBlk64)));
1508 86 : heapBlk = (BlockNumber) heapBlk64;
1509 :
1510 : /*
1511 : * We must lock table before index to avoid deadlocks. However, if the
1512 : * passed indexoid isn't an index then IndexGetRelation() will fail.
1513 : * Rather than emitting a not-very-helpful error message, postpone
1514 : * complaining, expecting that the is-it-an-index test below will fail.
1515 : *
1516 : * Unlike brin_summarize_range(), autovacuum never calls this. Hence, we
1517 : * don't switch userid.
1518 : */
1519 86 : heapoid = IndexGetRelation(indexoid, true);
1520 86 : if (OidIsValid(heapoid))
1521 86 : heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
1522 : else
1523 0 : heapRel = NULL;
1524 :
1525 86 : indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
1526 :
1527 : /* Must be a BRIN index */
1528 86 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1529 86 : indexRel->rd_rel->relam != BRIN_AM_OID)
1530 0 : ereport(ERROR,
1531 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1532 : errmsg("\"%s\" is not a BRIN index",
1533 : RelationGetRelationName(indexRel))));
1534 :
1535 : /* User must own the index (comparable to privileges needed for VACUUM) */
1536 86 : if (!object_ownercheck(RelationRelationId, indexoid, GetUserId()))
1537 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1538 0 : RelationGetRelationName(indexRel));
1539 :
1540 : /*
1541 : * Since we did the IndexGetRelation call above without any lock, it's
1542 : * barely possible that a race against an index drop/recreation could have
1543 : * netted us the wrong table. Recheck.
1544 : */
1545 86 : if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
1546 0 : ereport(ERROR,
1547 : (errcode(ERRCODE_UNDEFINED_TABLE),
1548 : errmsg("could not open parent table of index \"%s\"",
1549 : RelationGetRelationName(indexRel))));
1550 :
1551 : /* see gin_clean_pending_list() */
1552 86 : if (indexRel->rd_index->indisvalid)
1553 : {
1554 : /* the revmap does the hard work */
1555 : do
1556 : {
1557 86 : done = brinRevmapDesummarizeRange(indexRel, heapBlk);
1558 : }
1559 86 : while (!done);
1560 : }
1561 : else
1562 0 : ereport(DEBUG1,
1563 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1564 : errmsg("index \"%s\" is not valid",
1565 : RelationGetRelationName(indexRel))));
1566 :
1567 86 : relation_close(indexRel, ShareUpdateExclusiveLock);
1568 86 : relation_close(heapRel, ShareUpdateExclusiveLock);
1569 :
1570 86 : PG_RETURN_VOID();
1571 : }
1572 :
1573 : /*
1574 : * Build a BrinDesc used to create or scan a BRIN index
1575 : */
1576 : BrinDesc *
1577 4568 : brin_build_desc(Relation rel)
1578 : {
1579 : BrinOpcInfo **opcinfo;
1580 : BrinDesc *bdesc;
1581 : TupleDesc tupdesc;
1582 4568 : int totalstored = 0;
1583 : int keyno;
1584 : long totalsize;
1585 : MemoryContext cxt;
1586 : MemoryContext oldcxt;
1587 :
1588 4568 : cxt = AllocSetContextCreate(CurrentMemoryContext,
1589 : "brin desc cxt",
1590 : ALLOCSET_SMALL_SIZES);
1591 4568 : oldcxt = MemoryContextSwitchTo(cxt);
1592 4568 : tupdesc = RelationGetDescr(rel);
1593 :
1594 : /*
1595 : * Obtain BrinOpcInfo for each indexed column. While at it, accumulate
1596 : * the number of columns stored, since the number is opclass-defined.
1597 : */
1598 4568 : opcinfo = palloc_array(BrinOpcInfo *, tupdesc->natts);
1599 75972 : for (keyno = 0; keyno < tupdesc->natts; keyno++)
1600 : {
1601 : FmgrInfo *opcInfoFn;
1602 71404 : Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
1603 :
1604 71404 : opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1605 :
1606 142808 : opcinfo[keyno] = (BrinOpcInfo *)
1607 71404 : DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
1608 71404 : totalstored += opcinfo[keyno]->oi_nstored;
1609 : }
1610 :
1611 : /* Allocate our result struct and fill it in */
1612 4568 : totalsize = offsetof(BrinDesc, bd_info) +
1613 4568 : sizeof(BrinOpcInfo *) * tupdesc->natts;
1614 :
1615 4568 : bdesc = palloc(totalsize);
1616 4568 : bdesc->bd_context = cxt;
1617 4568 : bdesc->bd_index = rel;
1618 4568 : bdesc->bd_tupdesc = tupdesc;
1619 4568 : bdesc->bd_disktdesc = NULL; /* generated lazily */
1620 4568 : bdesc->bd_totalstored = totalstored;
1621 :
1622 75972 : for (keyno = 0; keyno < tupdesc->natts; keyno++)
1623 71404 : bdesc->bd_info[keyno] = opcinfo[keyno];
1624 4568 : pfree(opcinfo);
1625 :
1626 4568 : MemoryContextSwitchTo(oldcxt);
1627 :
1628 4568 : return bdesc;
1629 : }
1630 :
1631 : void
1632 3434 : brin_free_desc(BrinDesc *bdesc)
1633 : {
1634 : /* make sure the tupdesc is still valid */
1635 : Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1636 : /* no need for retail pfree */
1637 3434 : MemoryContextDelete(bdesc->bd_context);
1638 3434 : }
1639 :
1640 : /*
1641 : * Fetch index's statistical data into *stats
1642 : */
1643 : void
1644 10730 : brinGetStats(Relation index, BrinStatsData *stats)
1645 : {
1646 : Buffer metabuffer;
1647 : Page metapage;
1648 : BrinMetaPageData *metadata;
1649 :
1650 10730 : metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1651 10730 : LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1652 10730 : metapage = BufferGetPage(metabuffer);
1653 10730 : metadata = (BrinMetaPageData *) PageGetContents(metapage);
1654 :
1655 10730 : stats->pagesPerRange = metadata->pagesPerRange;
1656 10730 : stats->revmapNumPages = metadata->lastRevmapPage - 1;
1657 :
1658 10730 : UnlockReleaseBuffer(metabuffer);
1659 10730 : }
1660 :
1661 : /*
1662 : * Initialize a BrinBuildState appropriate to create tuples on the given index.
1663 : */
1664 : static BrinBuildState *
1665 456 : initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1666 : BlockNumber pagesPerRange, BlockNumber tablePages)
1667 : {
1668 : BrinBuildState *state;
1669 456 : BlockNumber lastRange = 0;
1670 :
1671 456 : state = palloc_object(BrinBuildState);
1672 :
1673 456 : state->bs_irel = idxRel;
1674 456 : state->bs_numtuples = 0;
1675 456 : state->bs_reltuples = 0;
1676 456 : state->bs_currentInsertBuf = InvalidBuffer;
1677 456 : state->bs_pagesPerRange = pagesPerRange;
1678 456 : state->bs_currRangeStart = 0;
1679 456 : state->bs_rmAccess = revmap;
1680 456 : state->bs_bdesc = brin_build_desc(idxRel);
1681 456 : state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1682 456 : state->bs_leader = NULL;
1683 456 : state->bs_worker_id = 0;
1684 456 : state->bs_sortstate = NULL;
1685 456 : state->bs_context = CurrentMemoryContext;
1686 456 : state->bs_emptyTuple = NULL;
1687 456 : state->bs_emptyTupleLen = 0;
1688 :
1689 : /* Remember the memory context to use for an empty tuple, if needed. */
1690 456 : state->bs_context = CurrentMemoryContext;
1691 456 : state->bs_emptyTuple = NULL;
1692 456 : state->bs_emptyTupleLen = 0;
1693 :
1694 : /*
1695 : * Calculate the start of the last page range. Page numbers are 0-based,
1696 : * so to calculate the index we need to subtract one. The integer division
1697 : * gives us the index of the page range.
1698 : */
1699 456 : if (tablePages > 0)
1700 334 : lastRange = ((tablePages - 1) / pagesPerRange) * pagesPerRange;
1701 :
1702 : /* Now calculate the start of the next range. */
1703 456 : state->bs_maxRangeStart = lastRange + state->bs_pagesPerRange;
1704 :
1705 456 : return state;
1706 : }
1707 :
1708 : /*
1709 : * Release resources associated with a BrinBuildState.
1710 : */
1711 : static void
1712 444 : terminate_brin_buildstate(BrinBuildState *state)
1713 : {
1714 : /*
1715 : * Release the last index buffer used. We might as well ensure that
1716 : * whatever free space remains in that page is available in FSM, too.
1717 : */
1718 444 : if (!BufferIsInvalid(state->bs_currentInsertBuf))
1719 : {
1720 : Page page;
1721 : Size freespace;
1722 : BlockNumber blk;
1723 :
1724 366 : page = BufferGetPage(state->bs_currentInsertBuf);
1725 366 : freespace = PageGetFreeSpace(page);
1726 366 : blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
1727 366 : ReleaseBuffer(state->bs_currentInsertBuf);
1728 366 : RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
1729 366 : FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
1730 : }
1731 :
1732 444 : brin_free_desc(state->bs_bdesc);
1733 444 : pfree(state->bs_dtuple);
1734 444 : pfree(state);
1735 444 : }
1736 :
1737 : /*
1738 : * On the given BRIN index, summarize the heap page range that corresponds
1739 : * to the heap block number given.
1740 : *
1741 : * This routine can run in parallel with insertions into the heap. To avoid
1742 : * missing those values from the summary tuple, we first insert a placeholder
1743 : * index tuple into the index, then execute the heap scan; transactions
1744 : * concurrent with the scan update the placeholder tuple. After the scan, we
1745 : * union the placeholder tuple with the one computed by this routine. The
1746 : * update of the index value happens in a loop, so that if somebody updates
1747 : * the placeholder tuple after we read it, we detect the case and try again.
1748 : * This ensures that the concurrently inserted tuples are not lost.
1749 : *
1750 : * A further corner case is this routine being asked to summarize the partial
1751 : * range at the end of the table. heapNumBlocks is the (possibly outdated)
1752 : * table size; if we notice that the requested range lies beyond that size,
1753 : * we re-compute the table size after inserting the placeholder tuple, to
1754 : * avoid missing pages that were appended recently.
1755 : */
1756 : static void
1757 2934 : summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1758 : BlockNumber heapBlk, BlockNumber heapNumBlks)
1759 : {
1760 : Buffer phbuf;
1761 : BrinTuple *phtup;
1762 : Size phsz;
1763 : OffsetNumber offset;
1764 : BlockNumber scanNumBlks;
1765 :
1766 : /*
1767 : * Insert the placeholder tuple
1768 : */
1769 2934 : phbuf = InvalidBuffer;
1770 2934 : phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1771 2934 : offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1772 : state->bs_rmAccess, &phbuf,
1773 : heapBlk, phtup, phsz);
1774 :
1775 : /*
1776 : * Compute range end. We hold ShareUpdateExclusive lock on table, so it
1777 : * cannot shrink concurrently (but it can grow).
1778 : */
1779 : Assert(heapBlk % state->bs_pagesPerRange == 0);
1780 2934 : if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1781 : {
1782 : /*
1783 : * If we're asked to scan what we believe to be the final range on the
1784 : * table (i.e. a range that might be partial) we need to recompute our
1785 : * idea of what the latest page is after inserting the placeholder
1786 : * tuple. Anyone that grows the table later will update the
1787 : * placeholder tuple, so it doesn't matter that we won't scan these
1788 : * pages ourselves. Careful: the table might have been extended
1789 : * beyond the current range, so clamp our result.
1790 : *
1791 : * Fortunately, this should occur infrequently.
1792 : */
1793 24 : scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1794 : state->bs_pagesPerRange);
1795 : }
1796 : else
1797 : {
1798 : /* Easy case: range is known to be complete */
1799 2910 : scanNumBlks = state->bs_pagesPerRange;
1800 : }
1801 :
1802 : /*
1803 : * Execute the partial heap scan covering the heap blocks in the specified
1804 : * page range, summarizing the heap tuples in it. This scan stops just
1805 : * short of brinbuildCallback creating the new index entry.
1806 : *
1807 : * Note that it is critical we use the "any visible" mode of
1808 : * table_index_build_range_scan here: otherwise, we would miss tuples
1809 : * inserted by transactions that are still in progress, among other corner
1810 : * cases.
1811 : */
1812 2934 : state->bs_currRangeStart = heapBlk;
1813 2934 : table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
1814 : heapBlk, scanNumBlks,
1815 : brinbuildCallback, state, NULL);
1816 :
1817 : /*
1818 : * Now we update the values obtained by the scan with the placeholder
1819 : * tuple. We do this in a loop which only terminates if we're able to
1820 : * update the placeholder tuple successfully; if we are not, this means
1821 : * somebody else modified the placeholder tuple after we read it.
1822 : */
1823 : for (;;)
1824 0 : {
1825 : BrinTuple *newtup;
1826 : Size newsize;
1827 : bool didupdate;
1828 : bool samepage;
1829 :
1830 2934 : CHECK_FOR_INTERRUPTS();
1831 :
1832 : /*
1833 : * Update the summary tuple and try to update.
1834 : */
1835 2934 : newtup = brin_form_tuple(state->bs_bdesc,
1836 : heapBlk, state->bs_dtuple, &newsize);
1837 2934 : samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1838 : didupdate =
1839 2934 : brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1840 : state->bs_rmAccess, heapBlk, phbuf, offset,
1841 : phtup, phsz, newtup, newsize, samepage);
1842 2934 : brin_free_tuple(phtup);
1843 2934 : brin_free_tuple(newtup);
1844 :
1845 : /* If the update succeeded, we're done. */
1846 2934 : if (didupdate)
1847 2934 : break;
1848 :
1849 : /*
1850 : * If the update didn't work, it might be because somebody updated the
1851 : * placeholder tuple concurrently. Extract the new version, union it
1852 : * with the values we have from the scan, and start over. (There are
1853 : * other reasons for the update to fail, but it's simple to treat them
1854 : * the same.)
1855 : */
1856 0 : phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1857 : &offset, &phsz, BUFFER_LOCK_SHARE);
1858 : /* the placeholder tuple must exist */
1859 0 : if (phtup == NULL)
1860 0 : elog(ERROR, "missing placeholder tuple");
1861 0 : phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1862 0 : LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1863 :
1864 : /* merge it into the tuple from the heap scan */
1865 0 : union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1866 : }
1867 :
1868 2934 : ReleaseBuffer(phbuf);
1869 2934 : }
1870 :
1871 : /*
1872 : * Summarize page ranges that are not already summarized. If pageRange is
1873 : * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1874 : * page range containing the given heap page number is scanned.
1875 : * If include_partial is true, then the partial range at the end of the table
1876 : * is summarized, otherwise not.
1877 : *
1878 : * For each new index tuple inserted, *numSummarized (if not NULL) is
1879 : * incremented; for each existing tuple, *numExisting (if not NULL) is
1880 : * incremented.
1881 : */
1882 : static void
1883 218 : brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1884 : bool include_partial, double *numSummarized, double *numExisting)
1885 : {
1886 : BrinRevmap *revmap;
1887 218 : BrinBuildState *state = NULL;
1888 218 : IndexInfo *indexInfo = NULL;
1889 : BlockNumber heapNumBlocks;
1890 : BlockNumber pagesPerRange;
1891 : Buffer buf;
1892 : BlockNumber startBlk;
1893 :
1894 218 : revmap = brinRevmapInitialize(index, &pagesPerRange);
1895 :
1896 : /* determine range of pages to process */
1897 218 : heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1898 218 : if (pageRange == BRIN_ALL_BLOCKRANGES)
1899 144 : startBlk = 0;
1900 : else
1901 : {
1902 74 : startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1903 74 : heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
1904 : }
1905 218 : if (startBlk > heapNumBlocks)
1906 : {
1907 : /* Nothing to do if start point is beyond end of table */
1908 0 : brinRevmapTerminate(revmap);
1909 0 : return;
1910 : }
1911 :
1912 : /*
1913 : * Scan the revmap to find unsummarized items.
1914 : */
1915 218 : buf = InvalidBuffer;
1916 18944 : for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1917 : {
1918 : BrinTuple *tup;
1919 : OffsetNumber off;
1920 :
1921 : /*
1922 : * Unless requested to summarize even a partial range, go away now if
1923 : * we think the next range is partial. Caller would pass true when it
1924 : * is typically run once bulk data loading is done
1925 : * (brin_summarize_new_values), and false when it is typically the
1926 : * result of arbitrarily-scheduled maintenance command (vacuuming).
1927 : */
1928 18792 : if (!include_partial &&
1929 2050 : (startBlk + pagesPerRange > heapNumBlocks))
1930 66 : break;
1931 :
1932 18726 : CHECK_FOR_INTERRUPTS();
1933 :
1934 18726 : tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1935 : BUFFER_LOCK_SHARE);
1936 18726 : if (tup == NULL)
1937 : {
1938 : /* no revmap entry for this heap range. Summarize it. */
1939 2934 : if (state == NULL)
1940 : {
1941 : /* first time through */
1942 : Assert(!indexInfo);
1943 78 : state = initialize_brin_buildstate(index, revmap,
1944 : pagesPerRange,
1945 : InvalidBlockNumber);
1946 78 : indexInfo = BuildIndexInfo(index);
1947 : }
1948 2934 : summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1949 :
1950 : /* and re-initialize state for the next range */
1951 2934 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1952 :
1953 2934 : if (numSummarized)
1954 2934 : *numSummarized += 1.0;
1955 : }
1956 : else
1957 : {
1958 15792 : if (numExisting)
1959 1892 : *numExisting += 1.0;
1960 15792 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1961 : }
1962 : }
1963 :
1964 218 : if (BufferIsValid(buf))
1965 152 : ReleaseBuffer(buf);
1966 :
1967 : /* free resources */
1968 218 : brinRevmapTerminate(revmap);
1969 218 : if (state)
1970 : {
1971 78 : terminate_brin_buildstate(state);
1972 78 : pfree(indexInfo);
1973 : }
1974 : }
1975 :
1976 : /*
1977 : * Given a deformed tuple in the build state, convert it into the on-disk
1978 : * format and insert it into the index, making the revmap point to it.
1979 : */
1980 : static void
1981 2654 : form_and_insert_tuple(BrinBuildState *state)
1982 : {
1983 : BrinTuple *tup;
1984 : Size size;
1985 :
1986 2654 : tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1987 : state->bs_dtuple, &size);
1988 2654 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1989 : &state->bs_currentInsertBuf, state->bs_currRangeStart,
1990 : tup, size);
1991 2654 : state->bs_numtuples++;
1992 :
1993 2654 : pfree(tup);
1994 2654 : }
1995 :
1996 : /*
1997 : * Given a deformed tuple in the build state, convert it into the on-disk
1998 : * format and write it to a (shared) tuplesort (the leader will insert it
1999 : * into the index later).
2000 : */
2001 : static void
2002 58 : form_and_spill_tuple(BrinBuildState *state)
2003 : {
2004 : BrinTuple *tup;
2005 : Size size;
2006 :
2007 : /* don't insert empty tuples in parallel build */
2008 58 : if (state->bs_dtuple->bt_empty_range)
2009 18 : return;
2010 :
2011 40 : tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
2012 : state->bs_dtuple, &size);
2013 :
2014 : /* write the BRIN tuple to the tuplesort */
2015 40 : tuplesort_putbrintuple(state->bs_sortstate, tup, size);
2016 :
2017 40 : state->bs_numtuples++;
2018 :
2019 40 : pfree(tup);
2020 : }
2021 :
2022 : /*
2023 : * Given two deformed tuples, adjust the first one so that it's consistent
2024 : * with the summary values in both.
2025 : */
2026 : static void
2027 0 : union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
2028 : {
2029 : int keyno;
2030 : BrinMemTuple *db;
2031 : MemoryContext cxt;
2032 : MemoryContext oldcxt;
2033 :
2034 : /* Use our own memory context to avoid retail pfree */
2035 0 : cxt = AllocSetContextCreate(CurrentMemoryContext,
2036 : "brin union",
2037 : ALLOCSET_DEFAULT_SIZES);
2038 0 : oldcxt = MemoryContextSwitchTo(cxt);
2039 0 : db = brin_deform_tuple(bdesc, b, NULL);
2040 0 : MemoryContextSwitchTo(oldcxt);
2041 :
2042 : /*
2043 : * Check if the ranges are empty.
2044 : *
2045 : * If at least one of them is empty, we don't need to call per-key union
2046 : * functions at all. If "b" is empty, we just use "a" as the result (it
2047 : * might be empty fine, but that's fine). If "a" is empty but "b" is not,
2048 : * we use "b" as the result (but we have to copy the data into "a" first).
2049 : *
2050 : * Only when both ranges are non-empty, we actually do the per-key merge.
2051 : */
2052 :
2053 : /* If "b" is empty - ignore it and just use "a" (even if it's empty etc.). */
2054 0 : if (db->bt_empty_range)
2055 : {
2056 : /* skip the per-key merge */
2057 0 : MemoryContextDelete(cxt);
2058 0 : return;
2059 : }
2060 :
2061 : /*
2062 : * Now we know "b" is not empty. If "a" is empty, then "b" is the result.
2063 : * But we need to copy the data from "b" to "a" first, because that's how
2064 : * we pass result out.
2065 : *
2066 : * We have to copy all the global/per-key flags etc. too.
2067 : */
2068 0 : if (a->bt_empty_range)
2069 : {
2070 0 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
2071 : {
2072 : int i;
2073 0 : BrinValues *col_a = &a->bt_columns[keyno];
2074 0 : BrinValues *col_b = &db->bt_columns[keyno];
2075 0 : BrinOpcInfo *opcinfo = bdesc->bd_info[keyno];
2076 :
2077 0 : col_a->bv_allnulls = col_b->bv_allnulls;
2078 0 : col_a->bv_hasnulls = col_b->bv_hasnulls;
2079 :
2080 : /* If "b" has no data, we're done. */
2081 0 : if (col_b->bv_allnulls)
2082 0 : continue;
2083 :
2084 0 : for (i = 0; i < opcinfo->oi_nstored; i++)
2085 0 : col_a->bv_values[i] =
2086 0 : datumCopy(col_b->bv_values[i],
2087 0 : opcinfo->oi_typcache[i]->typbyval,
2088 0 : opcinfo->oi_typcache[i]->typlen);
2089 : }
2090 :
2091 : /* "a" started empty, but "b" was not empty, so remember that */
2092 0 : a->bt_empty_range = false;
2093 :
2094 : /* skip the per-key merge */
2095 0 : MemoryContextDelete(cxt);
2096 0 : return;
2097 : }
2098 :
2099 : /* Now we know neither range is empty. */
2100 0 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
2101 : {
2102 : FmgrInfo *unionFn;
2103 0 : BrinValues *col_a = &a->bt_columns[keyno];
2104 0 : BrinValues *col_b = &db->bt_columns[keyno];
2105 0 : BrinOpcInfo *opcinfo = bdesc->bd_info[keyno];
2106 :
2107 0 : if (opcinfo->oi_regular_nulls)
2108 : {
2109 : /* Does the "b" summary represent any NULL values? */
2110 0 : bool b_has_nulls = (col_b->bv_hasnulls || col_b->bv_allnulls);
2111 :
2112 : /* Adjust "hasnulls". */
2113 0 : if (!col_a->bv_allnulls && b_has_nulls)
2114 0 : col_a->bv_hasnulls = true;
2115 :
2116 : /* If there are no values in B, there's nothing left to do. */
2117 0 : if (col_b->bv_allnulls)
2118 0 : continue;
2119 :
2120 : /*
2121 : * Adjust "allnulls". If A doesn't have values, just copy the
2122 : * values from B into A, and we're done. We cannot run the
2123 : * operators in this case, because values in A might contain
2124 : * garbage. Note we already established that B contains values.
2125 : *
2126 : * Also adjust "hasnulls" in order not to forget the summary
2127 : * represents NULL values. This is not redundant with the earlier
2128 : * update, because that only happens when allnulls=false.
2129 : */
2130 0 : if (col_a->bv_allnulls)
2131 : {
2132 : int i;
2133 :
2134 0 : col_a->bv_allnulls = false;
2135 0 : col_a->bv_hasnulls = true;
2136 :
2137 0 : for (i = 0; i < opcinfo->oi_nstored; i++)
2138 0 : col_a->bv_values[i] =
2139 0 : datumCopy(col_b->bv_values[i],
2140 0 : opcinfo->oi_typcache[i]->typbyval,
2141 0 : opcinfo->oi_typcache[i]->typlen);
2142 :
2143 0 : continue;
2144 : }
2145 : }
2146 :
2147 0 : unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
2148 : BRIN_PROCNUM_UNION);
2149 0 : FunctionCall3Coll(unionFn,
2150 0 : bdesc->bd_index->rd_indcollation[keyno],
2151 : PointerGetDatum(bdesc),
2152 : PointerGetDatum(col_a),
2153 : PointerGetDatum(col_b));
2154 : }
2155 :
2156 0 : MemoryContextDelete(cxt);
2157 : }
2158 :
2159 : /*
2160 : * brin_vacuum_scan
2161 : * Do a complete scan of the index during VACUUM.
2162 : *
2163 : * This routine scans the complete index looking for uncataloged index pages,
2164 : * i.e. those that might have been lost due to a crash after index extension
2165 : * and such.
2166 : */
2167 : static void
2168 86 : brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
2169 : {
2170 : BlockNumber nblocks;
2171 : BlockNumber blkno;
2172 :
2173 : /*
2174 : * Scan the index in physical order, and clean up any possible mess in
2175 : * each page.
2176 : */
2177 86 : nblocks = RelationGetNumberOfBlocks(idxrel);
2178 458 : for (blkno = 0; blkno < nblocks; blkno++)
2179 : {
2180 : Buffer buf;
2181 :
2182 372 : CHECK_FOR_INTERRUPTS();
2183 :
2184 372 : buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
2185 : RBM_NORMAL, strategy);
2186 :
2187 372 : brin_page_cleanup(idxrel, buf);
2188 :
2189 372 : ReleaseBuffer(buf);
2190 : }
2191 :
2192 : /*
2193 : * Update all upper pages in the index's FSM, as well. This ensures not
2194 : * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
2195 : * but also that any pre-existing damage or out-of-dateness is repaired.
2196 : */
2197 86 : FreeSpaceMapVacuum(idxrel);
2198 86 : }
2199 :
2200 : static bool
2201 784186 : add_values_to_range(Relation idxRel, BrinDesc *bdesc, BrinMemTuple *dtup,
2202 : const Datum *values, const bool *nulls)
2203 : {
2204 : int keyno;
2205 :
2206 : /* If the range starts empty, we're certainly going to modify it. */
2207 784186 : bool modified = dtup->bt_empty_range;
2208 :
2209 : /*
2210 : * Compare the key values of the new tuple to the stored index values; our
2211 : * deformed tuple will get updated if the new tuple doesn't fit the
2212 : * original range (note this means we can't break out of the loop early).
2213 : * Make a note of whether this happens, so that we know to insert the
2214 : * modified tuple later.
2215 : */
2216 1847924 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
2217 : {
2218 : Datum result;
2219 : BrinValues *bval;
2220 : FmgrInfo *addValue;
2221 : bool has_nulls;
2222 :
2223 1063738 : bval = &dtup->bt_columns[keyno];
2224 :
2225 : /*
2226 : * Does the range have actual NULL values? Either of the flags can be
2227 : * set, but we ignore the state before adding first row.
2228 : *
2229 : * We have to remember this, because we'll modify the flags and we
2230 : * need to know if the range started as empty.
2231 : */
2232 2090904 : has_nulls = ((!dtup->bt_empty_range) &&
2233 1027166 : (bval->bv_hasnulls || bval->bv_allnulls));
2234 :
2235 : /*
2236 : * If the value we're adding is NULL, handle it locally. Otherwise
2237 : * call the BRIN_PROCNUM_ADDVALUE procedure.
2238 : */
2239 1063738 : if (bdesc->bd_info[keyno]->oi_regular_nulls && nulls[keyno])
2240 : {
2241 : /*
2242 : * If the new value is null, we record that we saw it if it's the
2243 : * first one; otherwise, there's nothing to do.
2244 : */
2245 18622 : if (!bval->bv_hasnulls)
2246 : {
2247 3572 : bval->bv_hasnulls = true;
2248 3572 : modified = true;
2249 : }
2250 :
2251 18622 : continue;
2252 : }
2253 :
2254 1045116 : addValue = index_getprocinfo(idxRel, keyno + 1,
2255 : BRIN_PROCNUM_ADDVALUE);
2256 1045116 : result = FunctionCall4Coll(addValue,
2257 1045116 : idxRel->rd_indcollation[keyno],
2258 : PointerGetDatum(bdesc),
2259 : PointerGetDatum(bval),
2260 1045116 : values[keyno],
2261 1045116 : nulls[keyno]);
2262 : /* if that returned true, we need to insert the updated tuple */
2263 1045116 : modified |= DatumGetBool(result);
2264 :
2265 : /*
2266 : * If the range was had actual NULL values (i.e. did not start empty),
2267 : * make sure we don't forget about the NULL values. Either the
2268 : * allnulls flag is still set to true, or (if the opclass cleared it)
2269 : * we need to set hasnulls=true.
2270 : *
2271 : * XXX This can only happen when the opclass modified the tuple, so
2272 : * the modified flag should be set.
2273 : */
2274 1045116 : if (has_nulls && !(bval->bv_hasnulls || bval->bv_allnulls))
2275 : {
2276 : Assert(modified);
2277 4 : bval->bv_hasnulls = true;
2278 : }
2279 : }
2280 :
2281 : /*
2282 : * After updating summaries for all the keys, mark it as not empty.
2283 : *
2284 : * If we're actually changing the flag value (i.e. tuple started as
2285 : * empty), we should have modified the tuple. So we should not see empty
2286 : * range that was not modified.
2287 : */
2288 : Assert(!dtup->bt_empty_range || modified);
2289 784186 : dtup->bt_empty_range = false;
2290 :
2291 784186 : return modified;
2292 : }
2293 :
2294 : static bool
2295 189936 : check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
2296 : {
2297 : int keyno;
2298 :
2299 : /*
2300 : * First check if there are any IS [NOT] NULL scan keys, and if we're
2301 : * violating them.
2302 : */
2303 191172 : for (keyno = 0; keyno < nnullkeys; keyno++)
2304 : {
2305 2232 : ScanKey key = nullkeys[keyno];
2306 :
2307 : Assert(key->sk_attno == bval->bv_attno);
2308 :
2309 : /* Handle only IS NULL/IS NOT NULL tests */
2310 2232 : if (!(key->sk_flags & SK_ISNULL))
2311 0 : continue;
2312 :
2313 2232 : if (key->sk_flags & SK_SEARCHNULL)
2314 : {
2315 : /* IS NULL scan key, but range has no NULLs */
2316 1116 : if (!bval->bv_allnulls && !bval->bv_hasnulls)
2317 978 : return false;
2318 : }
2319 1116 : else if (key->sk_flags & SK_SEARCHNOTNULL)
2320 : {
2321 : /*
2322 : * For IS NOT NULL, we can only skip ranges that are known to have
2323 : * only nulls.
2324 : */
2325 1116 : if (bval->bv_allnulls)
2326 18 : return false;
2327 : }
2328 : else
2329 : {
2330 : /*
2331 : * Neither IS NULL nor IS NOT NULL was used; assume all indexable
2332 : * operators are strict and thus return false with NULL value in
2333 : * the scan key.
2334 : */
2335 0 : return false;
2336 : }
2337 : }
2338 :
2339 188940 : return true;
2340 : }
2341 :
2342 : /*
2343 : * Create parallel context, and launch workers for leader.
2344 : *
2345 : * buildstate argument should be initialized (with the exception of the
2346 : * tuplesort states, which may later be created based on shared
2347 : * state initially set up here).
2348 : *
2349 : * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY.
2350 : *
2351 : * request is the target number of parallel worker processes to launch.
2352 : *
2353 : * Sets buildstate's BrinLeader, which caller must use to shut down parallel
2354 : * mode by passing it to _brin_end_parallel() at the very end of its index
2355 : * build. If not even a single worker process can be launched, this is
2356 : * never set, and caller should proceed with a serial index build.
2357 : */
2358 : static void
2359 10 : _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
2360 : bool isconcurrent, int request)
2361 : {
2362 : ParallelContext *pcxt;
2363 : int scantuplesortstates;
2364 : Snapshot snapshot;
2365 : Size estbrinshared;
2366 : Size estsort;
2367 : BrinShared *brinshared;
2368 : Sharedsort *sharedsort;
2369 10 : BrinLeader *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader));
2370 : WalUsage *walusage;
2371 : BufferUsage *bufferusage;
2372 10 : bool leaderparticipates = true;
2373 : int querylen;
2374 :
2375 : #ifdef DISABLE_LEADER_PARTICIPATION
2376 : leaderparticipates = false;
2377 : #endif
2378 :
2379 : /*
2380 : * Enter parallel mode, and create context for parallel build of brin
2381 : * index
2382 : */
2383 10 : EnterParallelMode();
2384 : Assert(request > 0);
2385 10 : pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main",
2386 : request);
2387 :
2388 10 : scantuplesortstates = leaderparticipates ? request + 1 : request;
2389 :
2390 : /*
2391 : * Prepare for scan of the base relation. In a normal index build, we use
2392 : * SnapshotAny because we must retrieve all tuples and do our own time
2393 : * qual checks (because we have to index RECENTLY_DEAD tuples). In a
2394 : * concurrent build, we take a regular MVCC snapshot and index whatever's
2395 : * live according to that.
2396 : */
2397 10 : if (!isconcurrent)
2398 10 : snapshot = SnapshotAny;
2399 : else
2400 0 : snapshot = RegisterSnapshot(GetTransactionSnapshot());
2401 :
2402 : /*
2403 : * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
2404 : */
2405 10 : estbrinshared = _brin_parallel_estimate_shared(heap, snapshot);
2406 10 : shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared);
2407 10 : estsort = tuplesort_estimate_shared(scantuplesortstates);
2408 10 : shm_toc_estimate_chunk(&pcxt->estimator, estsort);
2409 :
2410 10 : shm_toc_estimate_keys(&pcxt->estimator, 2);
2411 :
2412 : /*
2413 : * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
2414 : * and PARALLEL_KEY_BUFFER_USAGE.
2415 : *
2416 : * If there are no extensions loaded that care, we could skip this. We
2417 : * have no way of knowing whether anyone's looking at pgWalUsage or
2418 : * pgBufferUsage, so do it unconditionally.
2419 : */
2420 10 : shm_toc_estimate_chunk(&pcxt->estimator,
2421 : mul_size(sizeof(WalUsage), pcxt->nworkers));
2422 10 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2423 10 : shm_toc_estimate_chunk(&pcxt->estimator,
2424 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
2425 10 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2426 :
2427 : /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
2428 10 : if (debug_query_string)
2429 : {
2430 10 : querylen = strlen(debug_query_string);
2431 10 : shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
2432 10 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2433 : }
2434 : else
2435 0 : querylen = 0; /* keep compiler quiet */
2436 :
2437 : /* Everyone's had a chance to ask for space, so now create the DSM */
2438 10 : InitializeParallelDSM(pcxt);
2439 :
2440 : /* If no DSM segment was available, back out (do serial build) */
2441 10 : if (pcxt->seg == NULL)
2442 : {
2443 0 : if (IsMVCCSnapshot(snapshot))
2444 0 : UnregisterSnapshot(snapshot);
2445 0 : DestroyParallelContext(pcxt);
2446 0 : ExitParallelMode();
2447 0 : return;
2448 : }
2449 :
2450 : /* Store shared build state, for which we reserved space */
2451 10 : brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared);
2452 : /* Initialize immutable state */
2453 10 : brinshared->heaprelid = RelationGetRelid(heap);
2454 10 : brinshared->indexrelid = RelationGetRelid(index);
2455 10 : brinshared->isconcurrent = isconcurrent;
2456 10 : brinshared->scantuplesortstates = scantuplesortstates;
2457 10 : brinshared->pagesPerRange = buildstate->bs_pagesPerRange;
2458 10 : brinshared->queryid = pgstat_get_my_query_id();
2459 10 : ConditionVariableInit(&brinshared->workersdonecv);
2460 10 : SpinLockInit(&brinshared->mutex);
2461 :
2462 : /* Initialize mutable state */
2463 10 : brinshared->nparticipantsdone = 0;
2464 10 : brinshared->reltuples = 0.0;
2465 10 : brinshared->indtuples = 0.0;
2466 :
2467 10 : table_parallelscan_initialize(heap,
2468 : ParallelTableScanFromBrinShared(brinshared),
2469 : snapshot);
2470 :
2471 : /*
2472 : * Store shared tuplesort-private state, for which we reserved space.
2473 : * Then, initialize opaque state using tuplesort routine.
2474 : */
2475 10 : sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
2476 10 : tuplesort_initialize_shared(sharedsort, scantuplesortstates,
2477 : pcxt->seg);
2478 :
2479 : /*
2480 : * Store shared tuplesort-private state, for which we reserved space.
2481 : * Then, initialize opaque state using tuplesort routine.
2482 : */
2483 10 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BRIN_SHARED, brinshared);
2484 10 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
2485 :
2486 : /* Store query string for workers */
2487 10 : if (debug_query_string)
2488 : {
2489 : char *sharedquery;
2490 :
2491 10 : sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
2492 10 : memcpy(sharedquery, debug_query_string, querylen + 1);
2493 10 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
2494 : }
2495 :
2496 : /*
2497 : * Allocate space for each worker's WalUsage and BufferUsage; no need to
2498 : * initialize.
2499 : */
2500 10 : walusage = shm_toc_allocate(pcxt->toc,
2501 10 : mul_size(sizeof(WalUsage), pcxt->nworkers));
2502 10 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
2503 10 : bufferusage = shm_toc_allocate(pcxt->toc,
2504 10 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
2505 10 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
2506 :
2507 : /* Launch workers, saving status for leader/caller */
2508 10 : LaunchParallelWorkers(pcxt);
2509 10 : brinleader->pcxt = pcxt;
2510 10 : brinleader->nparticipanttuplesorts = pcxt->nworkers_launched;
2511 10 : if (leaderparticipates)
2512 10 : brinleader->nparticipanttuplesorts++;
2513 10 : brinleader->brinshared = brinshared;
2514 10 : brinleader->sharedsort = sharedsort;
2515 10 : brinleader->snapshot = snapshot;
2516 10 : brinleader->walusage = walusage;
2517 10 : brinleader->bufferusage = bufferusage;
2518 :
2519 : /* If no workers were successfully launched, back out (do serial build) */
2520 10 : if (pcxt->nworkers_launched == 0)
2521 : {
2522 2 : _brin_end_parallel(brinleader, NULL);
2523 2 : return;
2524 : }
2525 :
2526 : /* Save leader state now that it's clear build will be parallel */
2527 8 : buildstate->bs_leader = brinleader;
2528 :
2529 : /* Join heap scan ourselves */
2530 8 : if (leaderparticipates)
2531 8 : _brin_leader_participate_as_worker(buildstate, heap, index);
2532 :
2533 : /*
2534 : * Caller needs to wait for all launched workers when we return. Make
2535 : * sure that the failure-to-start case will not hang forever.
2536 : */
2537 8 : WaitForParallelWorkersToAttach(pcxt);
2538 : }
2539 :
2540 : /*
2541 : * Shut down workers, destroy parallel context, and end parallel mode.
2542 : */
2543 : static void
2544 10 : _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
2545 : {
2546 : int i;
2547 :
2548 : /* Shutdown worker processes */
2549 10 : WaitForParallelWorkersToFinish(brinleader->pcxt);
2550 :
2551 : /*
2552 : * Next, accumulate WAL usage. (This must wait for the workers to finish,
2553 : * or we might get incomplete data.)
2554 : */
2555 22 : for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
2556 12 : InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
2557 :
2558 : /* Free last reference to MVCC snapshot, if one was used */
2559 10 : if (IsMVCCSnapshot(brinleader->snapshot))
2560 0 : UnregisterSnapshot(brinleader->snapshot);
2561 10 : DestroyParallelContext(brinleader->pcxt);
2562 10 : ExitParallelMode();
2563 10 : }
2564 :
2565 : /*
2566 : * Within leader, wait for end of heap scan.
2567 : *
2568 : * When called, parallel heap scan started by _brin_begin_parallel() will
2569 : * already be underway within worker processes (when leader participates
2570 : * as a worker, we should end up here just as workers are finishing).
2571 : *
2572 : * Returns the total number of heap tuples scanned.
2573 : */
2574 : static double
2575 8 : _brin_parallel_heapscan(BrinBuildState *state)
2576 : {
2577 8 : BrinShared *brinshared = state->bs_leader->brinshared;
2578 : int nparticipanttuplesorts;
2579 :
2580 8 : nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts;
2581 : for (;;)
2582 : {
2583 24 : SpinLockAcquire(&brinshared->mutex);
2584 24 : if (brinshared->nparticipantsdone == nparticipanttuplesorts)
2585 : {
2586 : /* copy the data into leader state */
2587 8 : state->bs_reltuples = brinshared->reltuples;
2588 8 : state->bs_numtuples = brinshared->indtuples;
2589 :
2590 8 : SpinLockRelease(&brinshared->mutex);
2591 8 : break;
2592 : }
2593 16 : SpinLockRelease(&brinshared->mutex);
2594 :
2595 16 : ConditionVariableSleep(&brinshared->workersdonecv,
2596 : WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN);
2597 : }
2598 :
2599 8 : ConditionVariableCancelSleep();
2600 :
2601 8 : return state->bs_reltuples;
2602 : }
2603 :
2604 : /*
2605 : * Within leader, wait for end of heap scan and merge per-worker results.
2606 : *
2607 : * After waiting for all workers to finish, merge the per-worker results into
2608 : * the complete index. The results from each worker are sorted by block number
2609 : * (start of the page range). While combining the per-worker results we merge
2610 : * summaries for the same page range, and also fill-in empty summaries for
2611 : * ranges without any tuples.
2612 : *
2613 : * Returns the total number of heap tuples scanned.
2614 : */
2615 : static double
2616 8 : _brin_parallel_merge(BrinBuildState *state)
2617 : {
2618 : BrinTuple *btup;
2619 8 : BrinMemTuple *memtuple = NULL;
2620 : Size tuplen;
2621 8 : BlockNumber prevblkno = InvalidBlockNumber;
2622 : MemoryContext rangeCxt,
2623 : oldCxt;
2624 : double reltuples;
2625 :
2626 : /* wait for workers to scan table and produce partial results */
2627 8 : reltuples = _brin_parallel_heapscan(state);
2628 :
2629 : /* do the actual sort in the leader */
2630 8 : tuplesort_performsort(state->bs_sortstate);
2631 :
2632 : /*
2633 : * Initialize BrinMemTuple we'll use to union summaries from workers (in
2634 : * case they happened to produce parts of the same page range).
2635 : */
2636 8 : memtuple = brin_new_memtuple(state->bs_bdesc);
2637 :
2638 : /*
2639 : * Create a memory context we'll reset to combine results for a single
2640 : * page range (received from the workers). We don't expect huge number of
2641 : * overlaps under regular circumstances, because for large tables the
2642 : * chunk size is likely larger than the BRIN page range), but it can
2643 : * happen, and the union functions may do all kinds of stuff. So we better
2644 : * reset the context once in a while.
2645 : */
2646 8 : rangeCxt = AllocSetContextCreate(CurrentMemoryContext,
2647 : "brin union",
2648 : ALLOCSET_DEFAULT_SIZES);
2649 8 : oldCxt = MemoryContextSwitchTo(rangeCxt);
2650 :
2651 : /*
2652 : * Read the BRIN tuples from the shared tuplesort, sorted by block number.
2653 : * That probably gives us an index that is cheaper to scan, thanks to
2654 : * mostly getting data from the same index page as before.
2655 : */
2656 48 : while ((btup = tuplesort_getbrintuple(state->bs_sortstate, &tuplen, true)) != NULL)
2657 : {
2658 : /* Ranges should be multiples of pages_per_range for the index. */
2659 : Assert(btup->bt_blkno % state->bs_leader->brinshared->pagesPerRange == 0);
2660 :
2661 : /*
2662 : * Do we need to union summaries for the same page range?
2663 : *
2664 : * If this is the first brin tuple we read, then just deform it into
2665 : * the memtuple, and continue with the next one from tuplesort. We
2666 : * however may need to insert empty summaries into the index.
2667 : *
2668 : * If it's the same block as the last we saw, we simply union the brin
2669 : * tuple into it, and we're done - we don't even need to insert empty
2670 : * ranges, because that was done earlier when we saw the first brin
2671 : * tuple (for this range).
2672 : *
2673 : * Finally, if it's not the first brin tuple, and it's not the same
2674 : * page range, we need to do the insert and then deform the tuple into
2675 : * the memtuple. Then we'll insert empty ranges before the new brin
2676 : * tuple, if needed.
2677 : */
2678 40 : if (prevblkno == InvalidBlockNumber)
2679 : {
2680 : /* First brin tuples, just deform into memtuple. */
2681 2 : memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
2682 :
2683 : /* continue to insert empty pages before thisblock */
2684 : }
2685 38 : else if (memtuple->bt_blkno == btup->bt_blkno)
2686 : {
2687 : /*
2688 : * Not the first brin tuple, but same page range as the previous
2689 : * one, so we can merge it into the memtuple.
2690 : */
2691 0 : union_tuples(state->bs_bdesc, memtuple, btup);
2692 0 : continue;
2693 : }
2694 : else
2695 : {
2696 : BrinTuple *tmp;
2697 : Size len;
2698 :
2699 : /*
2700 : * We got brin tuple for a different page range, so form a brin
2701 : * tuple from the memtuple, insert it, and re-init the memtuple
2702 : * from the new brin tuple.
2703 : */
2704 38 : tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
2705 : memtuple, &len);
2706 :
2707 38 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
2708 : &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
2709 :
2710 : /*
2711 : * Reset the per-output-range context. This frees all the memory
2712 : * possibly allocated by the union functions, and also the BRIN
2713 : * tuple we just formed and inserted.
2714 : */
2715 38 : MemoryContextReset(rangeCxt);
2716 :
2717 38 : memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
2718 :
2719 : /* continue to insert empty pages before thisblock */
2720 : }
2721 :
2722 : /* Fill empty ranges for all ranges missing in the tuplesort. */
2723 40 : brin_fill_empty_ranges(state, prevblkno, btup->bt_blkno);
2724 :
2725 40 : prevblkno = btup->bt_blkno;
2726 : }
2727 :
2728 8 : tuplesort_end(state->bs_sortstate);
2729 :
2730 : /* Fill the BRIN tuple for the last page range with data. */
2731 8 : if (prevblkno != InvalidBlockNumber)
2732 : {
2733 : BrinTuple *tmp;
2734 : Size len;
2735 :
2736 2 : tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
2737 : memtuple, &len);
2738 :
2739 2 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
2740 : &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
2741 :
2742 2 : pfree(tmp);
2743 : }
2744 :
2745 : /* Fill empty ranges at the end, for all ranges missing in the tuplesort. */
2746 8 : brin_fill_empty_ranges(state, prevblkno, state->bs_maxRangeStart);
2747 :
2748 : /*
2749 : * Switch back to the original memory context, and destroy the one we
2750 : * created to isolate the union_tuple calls.
2751 : */
2752 8 : MemoryContextSwitchTo(oldCxt);
2753 8 : MemoryContextDelete(rangeCxt);
2754 :
2755 8 : return reltuples;
2756 : }
2757 :
2758 : /*
2759 : * Returns size of shared memory required to store state for a parallel
2760 : * brin index build based on the snapshot its parallel scan will use.
2761 : */
2762 : static Size
2763 10 : _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
2764 : {
2765 : /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
2766 10 : return add_size(BUFFERALIGN(sizeof(BrinShared)),
2767 : table_parallelscan_estimate(heap, snapshot));
2768 : }
2769 :
2770 : /*
2771 : * Within leader, participate as a parallel worker.
2772 : */
2773 : static void
2774 8 : _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index)
2775 : {
2776 8 : BrinLeader *brinleader = buildstate->bs_leader;
2777 : int sortmem;
2778 :
2779 : /*
2780 : * Might as well use reliable figure when doling out maintenance_work_mem
2781 : * (when requested number of workers were not launched, this will be
2782 : * somewhat higher than it is for other workers).
2783 : */
2784 8 : sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts;
2785 :
2786 : /* Perform work common to all participants */
2787 8 : _brin_parallel_scan_and_build(buildstate, brinleader->brinshared,
2788 : brinleader->sharedsort, heap, index, sortmem, true);
2789 8 : }
2790 :
2791 : /*
2792 : * Perform a worker's portion of a parallel sort.
2793 : *
2794 : * This generates a tuplesort for the worker portion of the table.
2795 : *
2796 : * sortmem is the amount of working memory to use within each worker,
2797 : * expressed in KBs.
2798 : *
2799 : * When this returns, workers are done, and need only release resources.
2800 : */
2801 : static void
2802 20 : _brin_parallel_scan_and_build(BrinBuildState *state,
2803 : BrinShared *brinshared, Sharedsort *sharedsort,
2804 : Relation heap, Relation index,
2805 : int sortmem, bool progress)
2806 : {
2807 : SortCoordinate coordinate;
2808 : TableScanDesc scan;
2809 : double reltuples;
2810 : IndexInfo *indexInfo;
2811 :
2812 : /* Initialize local tuplesort coordination state */
2813 20 : coordinate = palloc0(sizeof(SortCoordinateData));
2814 20 : coordinate->isWorker = true;
2815 20 : coordinate->nParticipants = -1;
2816 20 : coordinate->sharedsort = sharedsort;
2817 :
2818 : /* Begin "partial" tuplesort */
2819 20 : state->bs_sortstate = tuplesort_begin_index_brin(sortmem, coordinate,
2820 : TUPLESORT_NONE);
2821 :
2822 : /* Join parallel scan */
2823 20 : indexInfo = BuildIndexInfo(index);
2824 20 : indexInfo->ii_Concurrent = brinshared->isconcurrent;
2825 :
2826 20 : scan = table_beginscan_parallel(heap,
2827 : ParallelTableScanFromBrinShared(brinshared));
2828 :
2829 20 : reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
2830 : brinbuildCallbackParallel, state, scan);
2831 :
2832 : /* insert the last item */
2833 20 : form_and_spill_tuple(state);
2834 :
2835 : /* sort the BRIN ranges built by this worker */
2836 20 : tuplesort_performsort(state->bs_sortstate);
2837 :
2838 20 : state->bs_reltuples += reltuples;
2839 :
2840 : /*
2841 : * Done. Record ambuild statistics.
2842 : */
2843 20 : SpinLockAcquire(&brinshared->mutex);
2844 20 : brinshared->nparticipantsdone++;
2845 20 : brinshared->reltuples += state->bs_reltuples;
2846 20 : brinshared->indtuples += state->bs_numtuples;
2847 20 : SpinLockRelease(&brinshared->mutex);
2848 :
2849 : /* Notify leader */
2850 20 : ConditionVariableSignal(&brinshared->workersdonecv);
2851 :
2852 20 : tuplesort_end(state->bs_sortstate);
2853 20 : }
2854 :
2855 : /*
2856 : * Perform work within a launched parallel process.
2857 : */
2858 : void
2859 12 : _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
2860 : {
2861 : char *sharedquery;
2862 : BrinShared *brinshared;
2863 : Sharedsort *sharedsort;
2864 : BrinBuildState *buildstate;
2865 : Relation heapRel;
2866 : Relation indexRel;
2867 : LOCKMODE heapLockmode;
2868 : LOCKMODE indexLockmode;
2869 : WalUsage *walusage;
2870 : BufferUsage *bufferusage;
2871 : int sortmem;
2872 :
2873 : /*
2874 : * The only possible status flag that can be set to the parallel worker is
2875 : * PROC_IN_SAFE_IC.
2876 : */
2877 : Assert((MyProc->statusFlags == 0) ||
2878 : (MyProc->statusFlags == PROC_IN_SAFE_IC));
2879 :
2880 : /* Set debug_query_string for individual workers first */
2881 12 : sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
2882 12 : debug_query_string = sharedquery;
2883 :
2884 : /* Report the query string from leader */
2885 12 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
2886 :
2887 : /* Look up brin shared state */
2888 12 : brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false);
2889 :
2890 : /* Open relations using lock modes known to be obtained by index.c */
2891 12 : if (!brinshared->isconcurrent)
2892 : {
2893 12 : heapLockmode = ShareLock;
2894 12 : indexLockmode = AccessExclusiveLock;
2895 : }
2896 : else
2897 : {
2898 0 : heapLockmode = ShareUpdateExclusiveLock;
2899 0 : indexLockmode = RowExclusiveLock;
2900 : }
2901 :
2902 : /* Track query ID */
2903 12 : pgstat_report_query_id(brinshared->queryid, false);
2904 :
2905 : /* Open relations within worker */
2906 12 : heapRel = table_open(brinshared->heaprelid, heapLockmode);
2907 12 : indexRel = index_open(brinshared->indexrelid, indexLockmode);
2908 :
2909 12 : buildstate = initialize_brin_buildstate(indexRel, NULL,
2910 : brinshared->pagesPerRange,
2911 : InvalidBlockNumber);
2912 :
2913 : /* Look up shared state private to tuplesort.c */
2914 12 : sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
2915 12 : tuplesort_attach_shared(sharedsort, seg);
2916 :
2917 : /* Prepare to track buffer usage during parallel execution */
2918 12 : InstrStartParallelQuery();
2919 :
2920 : /*
2921 : * Might as well use reliable figure when doling out maintenance_work_mem
2922 : * (when requested number of workers were not launched, this will be
2923 : * somewhat higher than it is for other workers).
2924 : */
2925 12 : sortmem = maintenance_work_mem / brinshared->scantuplesortstates;
2926 :
2927 12 : _brin_parallel_scan_and_build(buildstate, brinshared, sharedsort,
2928 : heapRel, indexRel, sortmem, false);
2929 :
2930 : /* Report WAL/buffer usage during parallel execution */
2931 12 : bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
2932 12 : walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
2933 12 : InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
2934 12 : &walusage[ParallelWorkerNumber]);
2935 :
2936 12 : index_close(indexRel, indexLockmode);
2937 12 : table_close(heapRel, heapLockmode);
2938 12 : }
2939 :
2940 : /*
2941 : * brin_build_empty_tuple
2942 : * Maybe initialize a BRIN tuple representing empty range.
2943 : *
2944 : * Returns a BRIN tuple representing an empty page range starting at the
2945 : * specified block number. The empty tuple is initialized only once, when it's
2946 : * needed for the first time, stored in the memory context bs_context to ensure
2947 : * proper life span, and reused on following calls. All empty tuples are
2948 : * exactly the same except for the bt_blkno field, which is set to the value
2949 : * in blkno parameter.
2950 : */
2951 : static void
2952 20 : brin_build_empty_tuple(BrinBuildState *state, BlockNumber blkno)
2953 : {
2954 : /* First time an empty tuple is requested? If yes, initialize it. */
2955 20 : if (state->bs_emptyTuple == NULL)
2956 : {
2957 : MemoryContext oldcxt;
2958 10 : BrinMemTuple *dtuple = brin_new_memtuple(state->bs_bdesc);
2959 :
2960 : /* Allocate the tuple in context for the whole index build. */
2961 10 : oldcxt = MemoryContextSwitchTo(state->bs_context);
2962 :
2963 10 : state->bs_emptyTuple = brin_form_tuple(state->bs_bdesc, blkno, dtuple,
2964 : &state->bs_emptyTupleLen);
2965 :
2966 10 : MemoryContextSwitchTo(oldcxt);
2967 : }
2968 : else
2969 : {
2970 : /* If we already have an empty tuple, just update the block. */
2971 10 : state->bs_emptyTuple->bt_blkno = blkno;
2972 : }
2973 20 : }
2974 :
2975 : /*
2976 : * brin_fill_empty_ranges
2977 : * Add BRIN index tuples representing empty page ranges.
2978 : *
2979 : * prevRange/nextRange determine for which page ranges to add empty summaries.
2980 : * Both boundaries are exclusive, i.e. only ranges starting at blkno for which
2981 : * (prevRange < blkno < nextRange) will be added to the index.
2982 : *
2983 : * If prevRange is InvalidBlockNumber, this means there was no previous page
2984 : * range (i.e. the first empty range to add is for blkno=0).
2985 : *
2986 : * The empty tuple is built only once, and then reused for all future calls.
2987 : */
2988 : static void
2989 406 : brin_fill_empty_ranges(BrinBuildState *state,
2990 : BlockNumber prevRange, BlockNumber nextRange)
2991 : {
2992 : BlockNumber blkno;
2993 :
2994 : /*
2995 : * If we already summarized some ranges, we need to start with the next
2996 : * one. Otherwise start from the first range of the table.
2997 : */
2998 406 : blkno = (prevRange == InvalidBlockNumber) ? 0 : (prevRange + state->bs_pagesPerRange);
2999 :
3000 : /* Generate empty ranges until we hit the next non-empty range. */
3001 426 : while (blkno < nextRange)
3002 : {
3003 : /* Did we already build the empty tuple? If not, do it now. */
3004 20 : brin_build_empty_tuple(state, blkno);
3005 :
3006 20 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
3007 : &state->bs_currentInsertBuf,
3008 : blkno, state->bs_emptyTuple, state->bs_emptyTupleLen);
3009 :
3010 : /* try next page range */
3011 20 : blkno += state->bs_pagesPerRange;
3012 : }
3013 406 : }
|