Line data Source code
1 : /*
2 : * brin.c
3 : * Implementation of BRIN indexes for Postgres
4 : *
5 : * See src/backend/access/brin/README for details.
6 : *
7 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/brin/brin.c
12 : *
13 : * TODO
14 : * * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/brin.h"
19 : #include "access/brin_page.h"
20 : #include "access/brin_pageops.h"
21 : #include "access/brin_xlog.h"
22 : #include "access/relation.h"
23 : #include "access/reloptions.h"
24 : #include "access/relscan.h"
25 : #include "access/table.h"
26 : #include "access/tableam.h"
27 : #include "access/xloginsert.h"
28 : #include "catalog/index.h"
29 : #include "catalog/pg_am.h"
30 : #include "commands/vacuum.h"
31 : #include "miscadmin.h"
32 : #include "pgstat.h"
33 : #include "postmaster/autovacuum.h"
34 : #include "storage/bufmgr.h"
35 : #include "storage/freespace.h"
36 : #include "tcop/tcopprot.h" /* pgrminclude ignore */
37 : #include "utils/acl.h"
38 : #include "utils/datum.h"
39 : #include "utils/fmgrprotos.h"
40 : #include "utils/guc.h"
41 : #include "utils/index_selfuncs.h"
42 : #include "utils/memutils.h"
43 : #include "utils/rel.h"
44 : #include "utils/tuplesort.h"
45 :
46 : /* Magic numbers for parallel state sharing */
47 : #define PARALLEL_KEY_BRIN_SHARED UINT64CONST(0xB000000000000001)
48 : #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002)
49 : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003)
50 : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004)
51 : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005)
52 :
53 : /*
54 : * Status for index builds performed in parallel. This is allocated in a
55 : * dynamic shared memory segment.
56 : */
57 : typedef struct BrinShared
58 : {
59 : /*
60 : * These fields are not modified during the build. They primarily exist
61 : * for the benefit of worker processes that need to create state
62 : * corresponding to that used by the leader.
63 : */
64 : Oid heaprelid;
65 : Oid indexrelid;
66 : bool isconcurrent;
67 : BlockNumber pagesPerRange;
68 : int scantuplesortstates;
69 :
70 : /*
71 : * workersdonecv is used to monitor the progress of workers. All parallel
72 : * participants must indicate that they are done before leader can use
73 : * results built by the workers (and before leader can write the data into
74 : * the index).
75 : */
76 : ConditionVariable workersdonecv;
77 :
78 : /*
79 : * mutex protects all fields before heapdesc.
80 : *
81 : * These fields contain status information of interest to BRIN index
82 : * builds that must work just the same when an index is built in parallel.
83 : */
84 : slock_t mutex;
85 :
86 : /*
87 : * Mutable state that is maintained by workers, and reported back to
88 : * leader at end of the scans.
89 : *
90 : * nparticipantsdone is number of worker processes finished.
91 : *
92 : * reltuples is the total number of input heap tuples.
93 : *
94 : * indtuples is the total number of tuples that made it into the index.
95 : */
96 : int nparticipantsdone;
97 : double reltuples;
98 : double indtuples;
99 :
100 : /*
101 : * ParallelTableScanDescData data follows. Can't directly embed here, as
102 : * implementations of the parallel table scan desc interface might need
103 : * stronger alignment.
104 : */
105 : } BrinShared;
106 :
107 : /*
108 : * Return pointer to a BrinShared's parallel table scan.
109 : *
110 : * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just
111 : * MAXALIGN.
112 : */
113 : #define ParallelTableScanFromBrinShared(shared) \
114 : (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(BrinShared)))
115 :
116 : /*
117 : * Status for leader in parallel index build.
118 : */
119 : typedef struct BrinLeader
120 : {
121 : /* parallel context itself */
122 : ParallelContext *pcxt;
123 :
124 : /*
125 : * nparticipanttuplesorts is the exact number of worker processes
126 : * successfully launched, plus one leader process if it participates as a
127 : * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader
128 : * participating as a worker).
129 : */
130 : int nparticipanttuplesorts;
131 :
132 : /*
133 : * Leader process convenience pointers to shared state (leader avoids TOC
134 : * lookups).
135 : *
136 : * brinshared is the shared state for entire build. sharedsort is the
137 : * shared, tuplesort-managed state passed to each process tuplesort.
138 : * snapshot is the snapshot used by the scan iff an MVCC snapshot is
139 : * required.
140 : */
141 : BrinShared *brinshared;
142 : Sharedsort *sharedsort;
143 : Snapshot snapshot;
144 : WalUsage *walusage;
145 : BufferUsage *bufferusage;
146 : } BrinLeader;
147 :
148 : /*
149 : * We use a BrinBuildState during initial construction of a BRIN index.
150 : * The running state is kept in a BrinMemTuple.
151 : */
152 : typedef struct BrinBuildState
153 : {
154 : Relation bs_irel;
155 : double bs_numtuples;
156 : double bs_reltuples;
157 : Buffer bs_currentInsertBuf;
158 : BlockNumber bs_pagesPerRange;
159 : BlockNumber bs_currRangeStart;
160 : BlockNumber bs_maxRangeStart;
161 : BrinRevmap *bs_rmAccess;
162 : BrinDesc *bs_bdesc;
163 : BrinMemTuple *bs_dtuple;
164 :
165 : BrinTuple *bs_emptyTuple;
166 : Size bs_emptyTupleLen;
167 : MemoryContext bs_context;
168 :
169 : /*
170 : * bs_leader is only present when a parallel index build is performed, and
171 : * only in the leader process. (Actually, only the leader process has a
172 : * BrinBuildState.)
173 : */
174 : BrinLeader *bs_leader;
175 : int bs_worker_id;
176 :
177 : /*
178 : * The sortstate is used by workers (including the leader). It has to be
179 : * part of the build state, because that's the only thing passed to the
180 : * build callback etc.
181 : */
182 : Tuplesortstate *bs_sortstate;
183 : } BrinBuildState;
184 :
185 : /*
186 : * We use a BrinInsertState to capture running state spanning multiple
187 : * brininsert invocations, within the same command.
188 : */
189 : typedef struct BrinInsertState
190 : {
191 : BrinRevmap *bis_rmAccess;
192 : BrinDesc *bis_desc;
193 : BlockNumber bis_pages_per_range;
194 : } BrinInsertState;
195 :
196 : /*
197 : * Struct used as "opaque" during index scans
198 : */
199 : typedef struct BrinOpaque
200 : {
201 : BlockNumber bo_pagesPerRange;
202 : BrinRevmap *bo_rmAccess;
203 : BrinDesc *bo_bdesc;
204 : } BrinOpaque;
205 :
206 : #define BRIN_ALL_BLOCKRANGES InvalidBlockNumber
207 :
208 : static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
209 : BrinRevmap *revmap,
210 : BlockNumber pagesPerRange,
211 : BlockNumber tablePages);
212 : static BrinInsertState *initialize_brin_insertstate(Relation idxRel, IndexInfo *indexInfo);
213 : static void terminate_brin_buildstate(BrinBuildState *state);
214 : static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
215 : bool include_partial, double *numSummarized, double *numExisting);
216 : static void form_and_insert_tuple(BrinBuildState *state);
217 : static void form_and_spill_tuple(BrinBuildState *state);
218 : static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
219 : BrinTuple *b);
220 : static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
221 : static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
222 : BrinMemTuple *dtup, const Datum *values, const bool *nulls);
223 : static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
224 : static void brin_fill_empty_ranges(BrinBuildState *state,
225 : BlockNumber prevRange, BlockNumber nextRange);
226 :
227 : /* parallel index builds */
228 : static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
229 : bool isconcurrent, int request);
230 : static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state);
231 : static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
232 : static double _brin_parallel_heapscan(BrinBuildState *buildstate);
233 : static double _brin_parallel_merge(BrinBuildState *buildstate);
234 : static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
235 : Relation heap, Relation index);
236 : static void _brin_parallel_scan_and_build(BrinBuildState *buildstate,
237 : BrinShared *brinshared,
238 : Sharedsort *sharedsort,
239 : Relation heap, Relation index,
240 : int sortmem, bool progress);
241 :
242 : /*
243 : * BRIN handler function: return IndexAmRoutine with access method parameters
244 : * and callbacks.
245 : */
246 : Datum
247 2198 : brinhandler(PG_FUNCTION_ARGS)
248 : {
249 2198 : IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
250 :
251 2198 : amroutine->amstrategies = 0;
252 2198 : amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
253 2198 : amroutine->amoptsprocnum = BRIN_PROCNUM_OPTIONS;
254 2198 : amroutine->amcanorder = false;
255 2198 : amroutine->amcanorderbyop = false;
256 2198 : amroutine->amcanbackward = false;
257 2198 : amroutine->amcanunique = false;
258 2198 : amroutine->amcanmulticol = true;
259 2198 : amroutine->amoptionalkey = true;
260 2198 : amroutine->amsearcharray = false;
261 2198 : amroutine->amsearchnulls = true;
262 2198 : amroutine->amstorage = true;
263 2198 : amroutine->amclusterable = false;
264 2198 : amroutine->ampredlocks = false;
265 2198 : amroutine->amcanparallel = false;
266 2198 : amroutine->amcanbuildparallel = true;
267 2198 : amroutine->amcaninclude = false;
268 2198 : amroutine->amusemaintenanceworkmem = false;
269 2198 : amroutine->amsummarizing = true;
270 2198 : amroutine->amparallelvacuumoptions =
271 : VACUUM_OPTION_PARALLEL_CLEANUP;
272 2198 : amroutine->amkeytype = InvalidOid;
273 :
274 2198 : amroutine->ambuild = brinbuild;
275 2198 : amroutine->ambuildempty = brinbuildempty;
276 2198 : amroutine->aminsert = brininsert;
277 2198 : amroutine->aminsertcleanup = brininsertcleanup;
278 2198 : amroutine->ambulkdelete = brinbulkdelete;
279 2198 : amroutine->amvacuumcleanup = brinvacuumcleanup;
280 2198 : amroutine->amcanreturn = NULL;
281 2198 : amroutine->amcostestimate = brincostestimate;
282 2198 : amroutine->amoptions = brinoptions;
283 2198 : amroutine->amproperty = NULL;
284 2198 : amroutine->ambuildphasename = NULL;
285 2198 : amroutine->amvalidate = brinvalidate;
286 2198 : amroutine->amadjustmembers = NULL;
287 2198 : amroutine->ambeginscan = brinbeginscan;
288 2198 : amroutine->amrescan = brinrescan;
289 2198 : amroutine->amgettuple = NULL;
290 2198 : amroutine->amgetbitmap = bringetbitmap;
291 2198 : amroutine->amendscan = brinendscan;
292 2198 : amroutine->ammarkpos = NULL;
293 2198 : amroutine->amrestrpos = NULL;
294 2198 : amroutine->amestimateparallelscan = NULL;
295 2198 : amroutine->aminitparallelscan = NULL;
296 2198 : amroutine->amparallelrescan = NULL;
297 :
298 2198 : PG_RETURN_POINTER(amroutine);
299 : }
300 :
301 : /*
302 : * Initialize a BrinInsertState to maintain state to be used across multiple
303 : * tuple inserts, within the same command.
304 : */
305 : static BrinInsertState *
306 1090 : initialize_brin_insertstate(Relation idxRel, IndexInfo *indexInfo)
307 : {
308 : BrinInsertState *bistate;
309 : MemoryContext oldcxt;
310 :
311 1090 : oldcxt = MemoryContextSwitchTo(indexInfo->ii_Context);
312 1090 : bistate = palloc0(sizeof(BrinInsertState));
313 1090 : bistate->bis_desc = brin_build_desc(idxRel);
314 1090 : bistate->bis_rmAccess = brinRevmapInitialize(idxRel,
315 : &bistate->bis_pages_per_range);
316 1090 : indexInfo->ii_AmCache = bistate;
317 1090 : MemoryContextSwitchTo(oldcxt);
318 :
319 1090 : return bistate;
320 : }
321 :
322 : /*
323 : * A tuple in the heap is being inserted. To keep a brin index up to date,
324 : * we need to obtain the relevant index tuple and compare its stored values
325 : * with those of the new tuple. If the tuple values are not consistent with
326 : * the summary tuple, we need to update the index tuple.
327 : *
328 : * If autosummarization is enabled, check if we need to summarize the previous
329 : * page range.
330 : *
331 : * If the range is not currently summarized (i.e. the revmap returns NULL for
332 : * it), there's nothing to do for this tuple.
333 : */
334 : bool
335 126006 : brininsert(Relation idxRel, Datum *values, bool *nulls,
336 : ItemPointer heaptid, Relation heapRel,
337 : IndexUniqueCheck checkUnique,
338 : bool indexUnchanged,
339 : IndexInfo *indexInfo)
340 : {
341 : BlockNumber pagesPerRange;
342 : BlockNumber origHeapBlk;
343 : BlockNumber heapBlk;
344 126006 : BrinInsertState *bistate = (BrinInsertState *) indexInfo->ii_AmCache;
345 : BrinRevmap *revmap;
346 : BrinDesc *bdesc;
347 126006 : Buffer buf = InvalidBuffer;
348 126006 : MemoryContext tupcxt = NULL;
349 126006 : MemoryContext oldcxt = CurrentMemoryContext;
350 126006 : bool autosummarize = BrinGetAutoSummarize(idxRel);
351 :
352 : /*
353 : * If first time through in this statement, initialize the insert state
354 : * that we keep for all the inserts in the command.
355 : */
356 126006 : if (!bistate)
357 1090 : bistate = initialize_brin_insertstate(idxRel, indexInfo);
358 :
359 126006 : revmap = bistate->bis_rmAccess;
360 126006 : bdesc = bistate->bis_desc;
361 126006 : pagesPerRange = bistate->bis_pages_per_range;
362 :
363 : /*
364 : * origHeapBlk is the block number where the insertion occurred. heapBlk
365 : * is the first block in the corresponding page range.
366 : */
367 126006 : origHeapBlk = ItemPointerGetBlockNumber(heaptid);
368 126006 : heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
369 :
370 : for (;;)
371 0 : {
372 126006 : bool need_insert = false;
373 : OffsetNumber off;
374 : BrinTuple *brtup;
375 : BrinMemTuple *dtup;
376 :
377 126006 : CHECK_FOR_INTERRUPTS();
378 :
379 : /*
380 : * If auto-summarization is enabled and we just inserted the first
381 : * tuple into the first block of a new non-first page range, request a
382 : * summarization run of the previous range.
383 : */
384 126006 : if (autosummarize &&
385 156 : heapBlk > 0 &&
386 156 : heapBlk == origHeapBlk &&
387 156 : ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
388 : {
389 8 : BlockNumber lastPageRange = heapBlk - 1;
390 : BrinTuple *lastPageTuple;
391 :
392 : lastPageTuple =
393 8 : brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
394 : NULL, BUFFER_LOCK_SHARE);
395 8 : if (!lastPageTuple)
396 : {
397 : bool recorded;
398 :
399 6 : recorded = AutoVacuumRequestWork(AVW_BRINSummarizeRange,
400 : RelationGetRelid(idxRel),
401 : lastPageRange);
402 6 : if (!recorded)
403 0 : ereport(LOG,
404 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
405 : errmsg("request for BRIN range summarization for index \"%s\" page %u was not recorded",
406 : RelationGetRelationName(idxRel),
407 : lastPageRange)));
408 : }
409 : else
410 2 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
411 : }
412 :
413 126006 : brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
414 : NULL, BUFFER_LOCK_SHARE);
415 :
416 : /* if range is unsummarized, there's nothing to do */
417 126006 : if (!brtup)
418 78120 : break;
419 :
420 : /* First time through in this brininsert call? */
421 47886 : if (tupcxt == NULL)
422 : {
423 47886 : tupcxt = AllocSetContextCreate(CurrentMemoryContext,
424 : "brininsert cxt",
425 : ALLOCSET_DEFAULT_SIZES);
426 47886 : MemoryContextSwitchTo(tupcxt);
427 : }
428 :
429 47886 : dtup = brin_deform_tuple(bdesc, brtup, NULL);
430 :
431 47886 : need_insert = add_values_to_range(idxRel, bdesc, dtup, values, nulls);
432 :
433 47886 : if (!need_insert)
434 : {
435 : /*
436 : * The tuple is consistent with the new values, so there's nothing
437 : * to do.
438 : */
439 23978 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
440 : }
441 : else
442 : {
443 23908 : Page page = BufferGetPage(buf);
444 23908 : ItemId lp = PageGetItemId(page, off);
445 : Size origsz;
446 : BrinTuple *origtup;
447 : Size newsz;
448 : BrinTuple *newtup;
449 : bool samepage;
450 :
451 : /*
452 : * Make a copy of the old tuple, so that we can compare it after
453 : * re-acquiring the lock.
454 : */
455 23908 : origsz = ItemIdGetLength(lp);
456 23908 : origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
457 :
458 : /*
459 : * Before releasing the lock, check if we can attempt a same-page
460 : * update. Another process could insert a tuple concurrently in
461 : * the same page though, so downstream we must be prepared to cope
462 : * if this turns out to not be possible after all.
463 : */
464 23908 : newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
465 23908 : samepage = brin_can_do_samepage_update(buf, origsz, newsz);
466 23908 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
467 :
468 : /*
469 : * Try to update the tuple. If this doesn't work for whatever
470 : * reason, we need to restart from the top; the revmap might be
471 : * pointing at a different tuple for this block now, so we need to
472 : * recompute to ensure both our new heap tuple and the other
473 : * inserter's are covered by the combined tuple. It might be that
474 : * we don't need to update at all.
475 : */
476 23908 : if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
477 : buf, off, origtup, origsz, newtup, newsz,
478 : samepage))
479 : {
480 : /* no luck; start over */
481 0 : MemoryContextReset(tupcxt);
482 0 : continue;
483 : }
484 : }
485 :
486 : /* success! */
487 47886 : break;
488 : }
489 :
490 126006 : if (BufferIsValid(buf))
491 47888 : ReleaseBuffer(buf);
492 126006 : MemoryContextSwitchTo(oldcxt);
493 126006 : if (tupcxt != NULL)
494 47886 : MemoryContextDelete(tupcxt);
495 :
496 126006 : return false;
497 : }
498 :
499 : /*
500 : * Callback to clean up the BrinInsertState once all tuple inserts are done.
501 : */
502 : void
503 1096 : brininsertcleanup(Relation index, IndexInfo *indexInfo)
504 : {
505 1096 : BrinInsertState *bistate = (BrinInsertState *) indexInfo->ii_AmCache;
506 :
507 : /* bail out if cache not initialized */
508 1096 : if (indexInfo->ii_AmCache == NULL)
509 6 : return;
510 :
511 : /*
512 : * Clean up the revmap. Note that the brinDesc has already been cleaned up
513 : * as part of its own memory context.
514 : */
515 1090 : brinRevmapTerminate(bistate->bis_rmAccess);
516 1090 : bistate->bis_rmAccess = NULL;
517 1090 : bistate->bis_desc = NULL;
518 : }
519 :
520 : /*
521 : * Initialize state for a BRIN index scan.
522 : *
523 : * We read the metapage here to determine the pages-per-range number that this
524 : * index was built with. Note that since this cannot be changed while we're
525 : * holding lock on index, it's not necessary to recompute it during brinrescan.
526 : */
527 : IndexScanDesc
528 2946 : brinbeginscan(Relation r, int nkeys, int norderbys)
529 : {
530 : IndexScanDesc scan;
531 : BrinOpaque *opaque;
532 :
533 2946 : scan = RelationGetIndexScan(r, nkeys, norderbys);
534 :
535 2946 : opaque = palloc_object(BrinOpaque);
536 2946 : opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange);
537 2946 : opaque->bo_bdesc = brin_build_desc(r);
538 2946 : scan->opaque = opaque;
539 :
540 2946 : return scan;
541 : }
542 :
543 : /*
544 : * Execute the index scan.
545 : *
546 : * This works by reading index TIDs from the revmap, and obtaining the index
547 : * tuples pointed to by them; the summary values in the index tuples are
548 : * compared to the scan keys. We return into the TID bitmap all the pages in
549 : * ranges corresponding to index tuples that match the scan keys.
550 : *
551 : * If a TID from the revmap is read as InvalidTID, we know that range is
552 : * unsummarized. Pages in those ranges need to be returned regardless of scan
553 : * keys.
554 : */
555 : int64
556 2946 : bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
557 : {
558 2946 : Relation idxRel = scan->indexRelation;
559 2946 : Buffer buf = InvalidBuffer;
560 : BrinDesc *bdesc;
561 : Oid heapOid;
562 : Relation heapRel;
563 : BrinOpaque *opaque;
564 : BlockNumber nblocks;
565 : BlockNumber heapBlk;
566 2946 : int totalpages = 0;
567 : FmgrInfo *consistentFn;
568 : MemoryContext oldcxt;
569 : MemoryContext perRangeCxt;
570 : BrinMemTuple *dtup;
571 2946 : BrinTuple *btup = NULL;
572 2946 : Size btupsz = 0;
573 : ScanKey **keys,
574 : **nullkeys;
575 : int *nkeys,
576 : *nnullkeys;
577 : char *ptr;
578 : Size len;
579 : char *tmp PG_USED_FOR_ASSERTS_ONLY;
580 :
581 2946 : opaque = (BrinOpaque *) scan->opaque;
582 2946 : bdesc = opaque->bo_bdesc;
583 2946 : pgstat_count_index_scan(idxRel);
584 :
585 : /*
586 : * We need to know the size of the table so that we know how long to
587 : * iterate on the revmap.
588 : */
589 2946 : heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
590 2946 : heapRel = table_open(heapOid, AccessShareLock);
591 2946 : nblocks = RelationGetNumberOfBlocks(heapRel);
592 2946 : table_close(heapRel, AccessShareLock);
593 :
594 : /*
595 : * Make room for the consistent support procedures of indexed columns. We
596 : * don't look them up here; we do that lazily the first time we see a scan
597 : * key reference each of them. We rely on zeroing fn_oid to InvalidOid.
598 : */
599 2946 : consistentFn = palloc0_array(FmgrInfo, bdesc->bd_tupdesc->natts);
600 :
601 : /*
602 : * Make room for per-attribute lists of scan keys that we'll pass to the
603 : * consistent support procedure. We don't know which attributes have scan
604 : * keys, so we allocate space for all attributes. That may use more memory
605 : * but it's probably cheaper than determining which attributes are used.
606 : *
607 : * We keep null and regular keys separate, so that we can pass just the
608 : * regular keys to the consistent function easily.
609 : *
610 : * To reduce the allocation overhead, we allocate one big chunk and then
611 : * carve it into smaller arrays ourselves. All the pieces have exactly the
612 : * same lifetime, so that's OK.
613 : *
614 : * XXX The widest index can have 32 attributes, so the amount of wasted
615 : * memory is negligible. We could invent a more compact approach (with
616 : * just space for used attributes) but that would make the matching more
617 : * complex so it's not a good trade-off.
618 : */
619 2946 : len =
620 2946 : MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) + /* regular keys */
621 2946 : MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
622 2946 : MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts) +
623 2946 : MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts) + /* NULL keys */
624 2946 : MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys) * bdesc->bd_tupdesc->natts +
625 2946 : MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
626 :
627 2946 : ptr = palloc(len);
628 2946 : tmp = ptr;
629 :
630 2946 : keys = (ScanKey **) ptr;
631 2946 : ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
632 :
633 2946 : nullkeys = (ScanKey **) ptr;
634 2946 : ptr += MAXALIGN(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
635 :
636 2946 : nkeys = (int *) ptr;
637 2946 : ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
638 :
639 2946 : nnullkeys = (int *) ptr;
640 2946 : ptr += MAXALIGN(sizeof(int) * bdesc->bd_tupdesc->natts);
641 :
642 69978 : for (int i = 0; i < bdesc->bd_tupdesc->natts; i++)
643 : {
644 67032 : keys[i] = (ScanKey *) ptr;
645 67032 : ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
646 :
647 67032 : nullkeys[i] = (ScanKey *) ptr;
648 67032 : ptr += MAXALIGN(sizeof(ScanKey) * scan->numberOfKeys);
649 : }
650 :
651 : Assert(tmp + len == ptr);
652 :
653 : /* zero the number of keys */
654 2946 : memset(nkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
655 2946 : memset(nnullkeys, 0, sizeof(int) * bdesc->bd_tupdesc->natts);
656 :
657 : /* Preprocess the scan keys - split them into per-attribute arrays. */
658 5892 : for (int keyno = 0; keyno < scan->numberOfKeys; keyno++)
659 : {
660 2946 : ScanKey key = &scan->keyData[keyno];
661 2946 : AttrNumber keyattno = key->sk_attno;
662 :
663 : /*
664 : * The collation of the scan key must match the collation used in the
665 : * index column (but only if the search is not IS NULL/ IS NOT NULL).
666 : * Otherwise we shouldn't be using this index ...
667 : */
668 : Assert((key->sk_flags & SK_ISNULL) ||
669 : (key->sk_collation ==
670 : TupleDescAttr(bdesc->bd_tupdesc,
671 : keyattno - 1)->attcollation));
672 :
673 : /*
674 : * First time we see this index attribute, so init as needed.
675 : *
676 : * This is a bit of an overkill - we don't know how many scan keys are
677 : * there for this attribute, so we simply allocate the largest number
678 : * possible (as if all keys were for this attribute). This may waste a
679 : * bit of memory, but we only expect small number of scan keys in
680 : * general, so this should be negligible, and repeated repalloc calls
681 : * are not free either.
682 : */
683 2946 : if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
684 : {
685 : FmgrInfo *tmp;
686 :
687 : /* First time we see this attribute, so no key/null keys. */
688 : Assert(nkeys[keyattno - 1] == 0);
689 : Assert(nnullkeys[keyattno - 1] == 0);
690 :
691 2946 : tmp = index_getprocinfo(idxRel, keyattno,
692 : BRIN_PROCNUM_CONSISTENT);
693 2946 : fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
694 : CurrentMemoryContext);
695 : }
696 :
697 : /* Add key to the proper per-attribute array. */
698 2946 : if (key->sk_flags & SK_ISNULL)
699 : {
700 36 : nullkeys[keyattno - 1][nnullkeys[keyattno - 1]] = key;
701 36 : nnullkeys[keyattno - 1]++;
702 : }
703 : else
704 : {
705 2910 : keys[keyattno - 1][nkeys[keyattno - 1]] = key;
706 2910 : nkeys[keyattno - 1]++;
707 : }
708 : }
709 :
710 : /* allocate an initial in-memory tuple, out of the per-range memcxt */
711 2946 : dtup = brin_new_memtuple(bdesc);
712 :
713 : /*
714 : * Setup and use a per-range memory context, which is reset every time we
715 : * loop below. This avoids having to free the tuples within the loop.
716 : */
717 2946 : perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
718 : "bringetbitmap cxt",
719 : ALLOCSET_DEFAULT_SIZES);
720 2946 : oldcxt = MemoryContextSwitchTo(perRangeCxt);
721 :
722 : /*
723 : * Now scan the revmap. We start by querying for heap page 0,
724 : * incrementing by the number of pages per range; this gives us a full
725 : * view of the table.
726 : */
727 194598 : for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
728 : {
729 : bool addrange;
730 191652 : bool gottuple = false;
731 : BrinTuple *tup;
732 : OffsetNumber off;
733 : Size size;
734 :
735 191652 : CHECK_FOR_INTERRUPTS();
736 :
737 191652 : MemoryContextReset(perRangeCxt);
738 :
739 191652 : tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
740 : &off, &size, BUFFER_LOCK_SHARE);
741 191652 : if (tup)
742 : {
743 189936 : gottuple = true;
744 189936 : btup = brin_copy_tuple(tup, size, btup, &btupsz);
745 189936 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
746 : }
747 :
748 : /*
749 : * For page ranges with no indexed tuple, we must return the whole
750 : * range; otherwise, compare it to the scan keys.
751 : */
752 191652 : if (!gottuple)
753 : {
754 1716 : addrange = true;
755 : }
756 : else
757 : {
758 189936 : dtup = brin_deform_tuple(bdesc, btup, dtup);
759 189936 : if (dtup->bt_placeholder)
760 : {
761 : /*
762 : * Placeholder tuples are always returned, regardless of the
763 : * values stored in them.
764 : */
765 0 : addrange = true;
766 : }
767 : else
768 : {
769 : int attno;
770 :
771 : /*
772 : * Compare scan keys with summary values stored for the range.
773 : * If scan keys are matched, the page range must be added to
774 : * the bitmap. We initially assume the range needs to be
775 : * added; in particular this serves the case where there are
776 : * no keys.
777 : */
778 189936 : addrange = true;
779 4704070 : for (attno = 1; attno <= bdesc->bd_tupdesc->natts; attno++)
780 : {
781 : BrinValues *bval;
782 : Datum add;
783 : Oid collation;
784 :
785 : /*
786 : * skip attributes without any scan keys (both regular and
787 : * IS [NOT] NULL)
788 : */
789 4567734 : if (nkeys[attno - 1] == 0 && nnullkeys[attno - 1] == 0)
790 4377798 : continue;
791 :
792 189936 : bval = &dtup->bt_columns[attno - 1];
793 :
794 : /*
795 : * If the BRIN tuple indicates that this range is empty,
796 : * we can skip it: there's nothing to match. We don't
797 : * need to examine the next columns.
798 : */
799 189936 : if (dtup->bt_empty_range)
800 : {
801 0 : addrange = false;
802 0 : break;
803 : }
804 :
805 : /*
806 : * First check if there are any IS [NOT] NULL scan keys,
807 : * and if we're violating them. In that case we can
808 : * terminate early, without invoking the support function.
809 : *
810 : * As there may be more keys, we can only determine
811 : * mismatch within this loop.
812 : */
813 189936 : if (bdesc->bd_info[attno - 1]->oi_regular_nulls &&
814 189936 : !check_null_keys(bval, nullkeys[attno - 1],
815 189936 : nnullkeys[attno - 1]))
816 : {
817 : /*
818 : * If any of the IS [NOT] NULL keys failed, the page
819 : * range as a whole can't pass. So terminate the loop.
820 : */
821 996 : addrange = false;
822 996 : break;
823 : }
824 :
825 : /*
826 : * So either there are no IS [NOT] NULL keys, or all
827 : * passed. If there are no regular scan keys, we're done -
828 : * the page range matches. If there are regular keys, but
829 : * the page range is marked as 'all nulls' it can't
830 : * possibly pass (we're assuming the operators are
831 : * strict).
832 : */
833 :
834 : /* No regular scan keys - page range as a whole passes. */
835 188940 : if (!nkeys[attno - 1])
836 1236 : continue;
837 :
838 : Assert((nkeys[attno - 1] > 0) &&
839 : (nkeys[attno - 1] <= scan->numberOfKeys));
840 :
841 : /* If it is all nulls, it cannot possibly be consistent. */
842 187704 : if (bval->bv_allnulls)
843 : {
844 378 : addrange = false;
845 378 : break;
846 : }
847 :
848 : /*
849 : * Collation from the first key (has to be the same for
850 : * all keys for the same attribute).
851 : */
852 187326 : collation = keys[attno - 1][0]->sk_collation;
853 :
854 : /*
855 : * Check whether the scan key is consistent with the page
856 : * range values; if so, have the pages in the range added
857 : * to the output bitmap.
858 : *
859 : * The opclass may or may not support processing of
860 : * multiple scan keys. We can determine that based on the
861 : * number of arguments - functions with extra parameter
862 : * (number of scan keys) do support this, otherwise we
863 : * have to simply pass the scan keys one by one.
864 : */
865 187326 : if (consistentFn[attno - 1].fn_nargs >= 4)
866 : {
867 : /* Check all keys at once */
868 39594 : add = FunctionCall4Coll(&consistentFn[attno - 1],
869 : collation,
870 : PointerGetDatum(bdesc),
871 : PointerGetDatum(bval),
872 39594 : PointerGetDatum(keys[attno - 1]),
873 39594 : Int32GetDatum(nkeys[attno - 1]));
874 39594 : addrange = DatumGetBool(add);
875 : }
876 : else
877 : {
878 : /*
879 : * Check keys one by one
880 : *
881 : * When there are multiple scan keys, failure to meet
882 : * the criteria for a single one of them is enough to
883 : * discard the range as a whole, so break out of the
884 : * loop as soon as a false return value is obtained.
885 : */
886 : int keyno;
887 :
888 258078 : for (keyno = 0; keyno < nkeys[attno - 1]; keyno++)
889 : {
890 147732 : add = FunctionCall3Coll(&consistentFn[attno - 1],
891 147732 : keys[attno - 1][keyno]->sk_collation,
892 : PointerGetDatum(bdesc),
893 : PointerGetDatum(bval),
894 147732 : PointerGetDatum(keys[attno - 1][keyno]));
895 147732 : addrange = DatumGetBool(add);
896 147732 : if (!addrange)
897 37386 : break;
898 : }
899 : }
900 :
901 : /*
902 : * If we found a scan key eliminating the range, no need
903 : * to check additional ones.
904 : */
905 187326 : if (!addrange)
906 52226 : break;
907 : }
908 : }
909 : }
910 :
911 : /* add the pages in the range to the output bitmap, if needed */
912 191652 : if (addrange)
913 : {
914 : BlockNumber pageno;
915 :
916 138052 : for (pageno = heapBlk;
917 286032 : pageno <= Min(nblocks, heapBlk + opaque->bo_pagesPerRange) - 1;
918 147980 : pageno++)
919 : {
920 147980 : MemoryContextSwitchTo(oldcxt);
921 147980 : tbm_add_page(tbm, pageno);
922 147980 : totalpages++;
923 147980 : MemoryContextSwitchTo(perRangeCxt);
924 : }
925 : }
926 : }
927 :
928 2946 : MemoryContextSwitchTo(oldcxt);
929 2946 : MemoryContextDelete(perRangeCxt);
930 :
931 2946 : if (buf != InvalidBuffer)
932 2946 : ReleaseBuffer(buf);
933 :
934 : /*
935 : * XXX We have an approximation of the number of *pages* that our scan
936 : * returns, but we don't have a precise idea of the number of heap tuples
937 : * involved.
938 : */
939 2946 : return totalpages * 10;
940 : }
941 :
942 : /*
943 : * Re-initialize state for a BRIN index scan
944 : */
945 : void
946 2946 : brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
947 : ScanKey orderbys, int norderbys)
948 : {
949 : /*
950 : * Other index AMs preprocess the scan keys at this point, or sometime
951 : * early during the scan; this lets them optimize by removing redundant
952 : * keys, or doing early returns when they are impossible to satisfy; see
953 : * _bt_preprocess_keys for an example. Something like that could be added
954 : * here someday, too.
955 : */
956 :
957 2946 : if (scankey && scan->numberOfKeys > 0)
958 2946 : memmove(scan->keyData, scankey,
959 2946 : scan->numberOfKeys * sizeof(ScanKeyData));
960 2946 : }
961 :
962 : /*
963 : * Close down a BRIN index scan
964 : */
965 : void
966 2946 : brinendscan(IndexScanDesc scan)
967 : {
968 2946 : BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
969 :
970 2946 : brinRevmapTerminate(opaque->bo_rmAccess);
971 2946 : brin_free_desc(opaque->bo_bdesc);
972 2946 : pfree(opaque);
973 2946 : }
974 :
975 : /*
976 : * Per-heap-tuple callback for table_index_build_scan.
977 : *
978 : * Note we don't worry about the page range at the end of the table here; it is
979 : * present in the build state struct after we're called the last time, but not
980 : * inserted into the index. Caller must ensure to do so, if appropriate.
981 : */
982 : static void
983 728304 : brinbuildCallback(Relation index,
984 : ItemPointer tid,
985 : Datum *values,
986 : bool *isnull,
987 : bool tupleIsAlive,
988 : void *brstate)
989 : {
990 728304 : BrinBuildState *state = (BrinBuildState *) brstate;
991 : BlockNumber thisblock;
992 :
993 728304 : thisblock = ItemPointerGetBlockNumber(tid);
994 :
995 : /*
996 : * If we're in a block that belongs to a future range, summarize what
997 : * we've got and start afresh. Note the scan might have skipped many
998 : * pages, if they were devoid of live tuples; make sure to insert index
999 : * tuples for those too.
1000 : */
1001 730572 : while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
1002 : {
1003 :
1004 : BRIN_elog((DEBUG2,
1005 : "brinbuildCallback: completed a range: %u--%u",
1006 : state->bs_currRangeStart,
1007 : state->bs_currRangeStart + state->bs_pagesPerRange));
1008 :
1009 : /* create the index tuple and insert it */
1010 2268 : form_and_insert_tuple(state);
1011 :
1012 : /* set state to correspond to the next range */
1013 2268 : state->bs_currRangeStart += state->bs_pagesPerRange;
1014 :
1015 : /* re-initialize state for it */
1016 2268 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1017 : }
1018 :
1019 : /* Accumulate the current tuple into the running state */
1020 728304 : (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
1021 : values, isnull);
1022 728304 : }
1023 :
1024 : /*
1025 : * Per-heap-tuple callback for table_index_build_scan with parallelism.
1026 : *
1027 : * A version of the callback used by parallel index builds. The main difference
1028 : * is that instead of writing the BRIN tuples into the index, we write them
1029 : * into a shared tuplesort, and leave the insertion up to the leader (which may
1030 : * reorder them a bit etc.). The callback also does not generate empty ranges,
1031 : * those will be added by the leader when merging results from workers.
1032 : */
1033 : static void
1034 7962 : brinbuildCallbackParallel(Relation index,
1035 : ItemPointer tid,
1036 : Datum *values,
1037 : bool *isnull,
1038 : bool tupleIsAlive,
1039 : void *brstate)
1040 : {
1041 7962 : BrinBuildState *state = (BrinBuildState *) brstate;
1042 : BlockNumber thisblock;
1043 :
1044 7962 : thisblock = ItemPointerGetBlockNumber(tid);
1045 :
1046 : /*
1047 : * If we're in a block that belongs to a different range, summarize what
1048 : * we've got and start afresh. Note the scan might have skipped many
1049 : * pages, if they were devoid of live tuples; we do not create empty BRIN
1050 : * ranges here - the leader is responsible for filling them in.
1051 : *
1052 : * Unlike serial builds, parallel index builds allow synchronized seqscans
1053 : * (because that's what parallel scans do). This means the block may wrap
1054 : * around to the beginning of the relation, so the condition needs to
1055 : * check for both future and past ranges.
1056 : */
1057 7962 : if ((thisblock < state->bs_currRangeStart) ||
1058 7962 : (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1))
1059 : {
1060 :
1061 : BRIN_elog((DEBUG2,
1062 : "brinbuildCallbackParallel: completed a range: %u--%u",
1063 : state->bs_currRangeStart,
1064 : state->bs_currRangeStart + state->bs_pagesPerRange));
1065 :
1066 : /* create the index tuple and write it into the tuplesort */
1067 28 : form_and_spill_tuple(state);
1068 :
1069 : /*
1070 : * Set state to correspond to the next range (for this block).
1071 : *
1072 : * This skips ranges that are either empty (and so we don't get any
1073 : * tuples to summarize), or processed by other workers. We can't
1074 : * differentiate those cases here easily, so we leave it up to the
1075 : * leader to fill empty ranges where needed.
1076 : */
1077 : state->bs_currRangeStart
1078 28 : = state->bs_pagesPerRange * (thisblock / state->bs_pagesPerRange);
1079 :
1080 : /* re-initialize state for it */
1081 28 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1082 : }
1083 :
1084 : /* Accumulate the current tuple into the running state */
1085 7962 : (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
1086 : values, isnull);
1087 7962 : }
1088 :
1089 : /*
1090 : * brinbuild() -- build a new BRIN index.
1091 : */
1092 : IndexBuildResult *
1093 338 : brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
1094 : {
1095 : IndexBuildResult *result;
1096 : double reltuples;
1097 : double idxtuples;
1098 : BrinRevmap *revmap;
1099 : BrinBuildState *state;
1100 : Buffer meta;
1101 : BlockNumber pagesPerRange;
1102 :
1103 : /*
1104 : * We expect to be called exactly once for any index relation.
1105 : */
1106 338 : if (RelationGetNumberOfBlocks(index) != 0)
1107 0 : elog(ERROR, "index \"%s\" already contains data",
1108 : RelationGetRelationName(index));
1109 :
1110 : /*
1111 : * Critical section not required, because on error the creation of the
1112 : * whole relation will be rolled back.
1113 : */
1114 :
1115 338 : meta = ExtendBufferedRel(BMR_REL(index), MAIN_FORKNUM, NULL,
1116 : EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
1117 : Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
1118 :
1119 338 : brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
1120 : BRIN_CURRENT_VERSION);
1121 338 : MarkBufferDirty(meta);
1122 :
1123 338 : if (RelationNeedsWAL(index))
1124 : {
1125 : xl_brin_createidx xlrec;
1126 : XLogRecPtr recptr;
1127 : Page page;
1128 :
1129 174 : xlrec.version = BRIN_CURRENT_VERSION;
1130 174 : xlrec.pagesPerRange = BrinGetPagesPerRange(index);
1131 :
1132 174 : XLogBeginInsert();
1133 174 : XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
1134 174 : XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
1135 :
1136 174 : recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
1137 :
1138 174 : page = BufferGetPage(meta);
1139 174 : PageSetLSN(page, recptr);
1140 : }
1141 :
1142 338 : UnlockReleaseBuffer(meta);
1143 :
1144 : /*
1145 : * Initialize our state, including the deformed tuple state.
1146 : */
1147 338 : revmap = brinRevmapInitialize(index, &pagesPerRange);
1148 338 : state = initialize_brin_buildstate(index, revmap, pagesPerRange,
1149 : RelationGetNumberOfBlocks(heap));
1150 :
1151 : /*
1152 : * Attempt to launch parallel worker scan when required
1153 : *
1154 : * XXX plan_create_index_workers makes the number of workers dependent on
1155 : * maintenance_work_mem, requiring 32MB for each worker. That makes sense
1156 : * for btree, but not for BRIN, which can do with much less memory. So
1157 : * maybe make that somehow less strict, optionally?
1158 : */
1159 338 : if (indexInfo->ii_ParallelWorkers > 0)
1160 4 : _brin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent,
1161 : indexInfo->ii_ParallelWorkers);
1162 :
1163 : /*
1164 : * If parallel build requested and at least one worker process was
1165 : * successfully launched, set up coordination state, wait for workers to
1166 : * complete. Then read all tuples from the shared tuplesort and insert
1167 : * them into the index.
1168 : *
1169 : * In serial mode, simply scan the table and build the index one index
1170 : * tuple at a time.
1171 : */
1172 338 : if (state->bs_leader)
1173 : {
1174 : SortCoordinate coordinate;
1175 :
1176 2 : coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData));
1177 2 : coordinate->isWorker = false;
1178 2 : coordinate->nParticipants =
1179 2 : state->bs_leader->nparticipanttuplesorts;
1180 2 : coordinate->sharedsort = state->bs_leader->sharedsort;
1181 :
1182 : /*
1183 : * Begin leader tuplesort.
1184 : *
1185 : * In cases where parallelism is involved, the leader receives the
1186 : * same share of maintenance_work_mem as a serial sort (it is
1187 : * generally treated in the same way as a serial sort once we return).
1188 : * Parallel worker Tuplesortstates will have received only a fraction
1189 : * of maintenance_work_mem, though.
1190 : *
1191 : * We rely on the lifetime of the Leader Tuplesortstate almost not
1192 : * overlapping with any worker Tuplesortstate's lifetime. There may
1193 : * be some small overlap, but that's okay because we rely on leader
1194 : * Tuplesortstate only allocating a small, fixed amount of memory
1195 : * here. When its tuplesort_performsort() is called (by our caller),
1196 : * and significant amounts of memory are likely to be used, all
1197 : * workers must have already freed almost all memory held by their
1198 : * Tuplesortstates (they are about to go away completely, too). The
1199 : * overall effect is that maintenance_work_mem always represents an
1200 : * absolute high watermark on the amount of memory used by a CREATE
1201 : * INDEX operation, regardless of the use of parallelism or any other
1202 : * factor.
1203 : */
1204 2 : state->bs_sortstate =
1205 2 : tuplesort_begin_index_brin(maintenance_work_mem, coordinate,
1206 : TUPLESORT_NONE);
1207 :
1208 : /* scan the relation and merge per-worker results */
1209 2 : reltuples = _brin_parallel_merge(state);
1210 :
1211 2 : _brin_end_parallel(state->bs_leader, state);
1212 : }
1213 : else /* no parallel index build */
1214 : {
1215 : /*
1216 : * Now scan the relation. No syncscan allowed here because we want
1217 : * the heap blocks in physical order (we want to produce the ranges
1218 : * starting from block 0, and the callback also relies on this to not
1219 : * generate summary for the same range twice).
1220 : */
1221 336 : reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
1222 : brinbuildCallback, (void *) state, NULL);
1223 :
1224 : /*
1225 : * process the final batch
1226 : *
1227 : * XXX Note this does not update state->bs_currRangeStart, i.e. it
1228 : * stays set to the last range added to the index. This is OK, because
1229 : * that's what brin_fill_empty_ranges expects.
1230 : */
1231 336 : form_and_insert_tuple(state);
1232 :
1233 : /*
1234 : * Backfill the final ranges with empty data.
1235 : *
1236 : * This saves us from doing what amounts to full table scans when the
1237 : * index with a predicate like WHERE (nonnull_column IS NULL), or
1238 : * other very selective predicates.
1239 : */
1240 336 : brin_fill_empty_ranges(state,
1241 : state->bs_currRangeStart,
1242 : state->bs_maxRangeStart);
1243 : }
1244 :
1245 : /* release resources */
1246 338 : idxtuples = state->bs_numtuples;
1247 338 : brinRevmapTerminate(state->bs_rmAccess);
1248 338 : terminate_brin_buildstate(state);
1249 :
1250 : /*
1251 : * Return statistics
1252 : */
1253 338 : result = palloc_object(IndexBuildResult);
1254 :
1255 338 : result->heap_tuples = reltuples;
1256 338 : result->index_tuples = idxtuples;
1257 :
1258 338 : return result;
1259 : }
1260 :
1261 : void
1262 6 : brinbuildempty(Relation index)
1263 : {
1264 : Buffer metabuf;
1265 :
1266 : /* An empty BRIN index has a metapage only. */
1267 6 : metabuf = ExtendBufferedRel(BMR_REL(index), INIT_FORKNUM, NULL,
1268 : EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
1269 :
1270 : /* Initialize and xlog metabuffer. */
1271 6 : START_CRIT_SECTION();
1272 6 : brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
1273 : BRIN_CURRENT_VERSION);
1274 6 : MarkBufferDirty(metabuf);
1275 6 : log_newpage_buffer(metabuf, true);
1276 6 : END_CRIT_SECTION();
1277 :
1278 6 : UnlockReleaseBuffer(metabuf);
1279 6 : }
1280 :
1281 : /*
1282 : * brinbulkdelete
1283 : * Since there are no per-heap-tuple index tuples in BRIN indexes,
1284 : * there's not a lot we can do here.
1285 : *
1286 : * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
1287 : * tuple is deleted), meaning the need to re-run summarization on the affected
1288 : * range. Would need to add an extra flag in brintuples for that.
1289 : */
1290 : IndexBulkDeleteResult *
1291 22 : brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
1292 : IndexBulkDeleteCallback callback, void *callback_state)
1293 : {
1294 : /* allocate stats if first time through, else re-use existing struct */
1295 22 : if (stats == NULL)
1296 22 : stats = palloc0_object(IndexBulkDeleteResult);
1297 :
1298 22 : return stats;
1299 : }
1300 :
1301 : /*
1302 : * This routine is in charge of "vacuuming" a BRIN index: we just summarize
1303 : * ranges that are currently unsummarized.
1304 : */
1305 : IndexBulkDeleteResult *
1306 90 : brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
1307 : {
1308 : Relation heapRel;
1309 :
1310 : /* No-op in ANALYZE ONLY mode */
1311 90 : if (info->analyze_only)
1312 4 : return stats;
1313 :
1314 86 : if (!stats)
1315 70 : stats = palloc0_object(IndexBulkDeleteResult);
1316 86 : stats->num_pages = RelationGetNumberOfBlocks(info->index);
1317 : /* rest of stats is initialized by zeroing */
1318 :
1319 86 : heapRel = table_open(IndexGetRelation(RelationGetRelid(info->index), false),
1320 : AccessShareLock);
1321 :
1322 86 : brin_vacuum_scan(info->index, info->strategy);
1323 :
1324 86 : brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES, false,
1325 : &stats->num_index_tuples, &stats->num_index_tuples);
1326 :
1327 86 : table_close(heapRel, AccessShareLock);
1328 :
1329 86 : return stats;
1330 : }
1331 :
1332 : /*
1333 : * reloptions processor for BRIN indexes
1334 : */
1335 : bytea *
1336 854 : brinoptions(Datum reloptions, bool validate)
1337 : {
1338 : static const relopt_parse_elt tab[] = {
1339 : {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
1340 : {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
1341 : };
1342 :
1343 854 : return (bytea *) build_reloptions(reloptions, validate,
1344 : RELOPT_KIND_BRIN,
1345 : sizeof(BrinOptions),
1346 : tab, lengthof(tab));
1347 : }
1348 :
1349 : /*
1350 : * SQL-callable function to scan through an index and summarize all ranges
1351 : * that are not currently summarized.
1352 : */
1353 : Datum
1354 76 : brin_summarize_new_values(PG_FUNCTION_ARGS)
1355 : {
1356 76 : Datum relation = PG_GETARG_DATUM(0);
1357 :
1358 76 : return DirectFunctionCall2(brin_summarize_range,
1359 : relation,
1360 : Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
1361 : }
1362 :
1363 : /*
1364 : * SQL-callable function to summarize the indicated page range, if not already
1365 : * summarized. If the second argument is BRIN_ALL_BLOCKRANGES, all
1366 : * unsummarized ranges are summarized.
1367 : */
1368 : Datum
1369 204 : brin_summarize_range(PG_FUNCTION_ARGS)
1370 : {
1371 204 : Oid indexoid = PG_GETARG_OID(0);
1372 204 : int64 heapBlk64 = PG_GETARG_INT64(1);
1373 : BlockNumber heapBlk;
1374 : Oid heapoid;
1375 : Relation indexRel;
1376 : Relation heapRel;
1377 : Oid save_userid;
1378 : int save_sec_context;
1379 : int save_nestlevel;
1380 204 : double numSummarized = 0;
1381 :
1382 204 : if (RecoveryInProgress())
1383 0 : ereport(ERROR,
1384 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1385 : errmsg("recovery is in progress"),
1386 : errhint("BRIN control functions cannot be executed during recovery.")));
1387 :
1388 204 : if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
1389 36 : ereport(ERROR,
1390 : (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1391 : errmsg("block number out of range: %lld",
1392 : (long long) heapBlk64)));
1393 168 : heapBlk = (BlockNumber) heapBlk64;
1394 :
1395 : /*
1396 : * We must lock table before index to avoid deadlocks. However, if the
1397 : * passed indexoid isn't an index then IndexGetRelation() will fail.
1398 : * Rather than emitting a not-very-helpful error message, postpone
1399 : * complaining, expecting that the is-it-an-index test below will fail.
1400 : */
1401 168 : heapoid = IndexGetRelation(indexoid, true);
1402 168 : if (OidIsValid(heapoid))
1403 : {
1404 150 : heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
1405 :
1406 : /*
1407 : * Autovacuum calls us. For its benefit, switch to the table owner's
1408 : * userid, so that any index functions are run as that user. Also
1409 : * lock down security-restricted operations and arrange to make GUC
1410 : * variable changes local to this command. This is harmless, albeit
1411 : * unnecessary, when called from SQL, because we fail shortly if the
1412 : * user does not own the index.
1413 : */
1414 150 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
1415 150 : SetUserIdAndSecContext(heapRel->rd_rel->relowner,
1416 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
1417 150 : save_nestlevel = NewGUCNestLevel();
1418 150 : RestrictSearchPath();
1419 : }
1420 : else
1421 : {
1422 18 : heapRel = NULL;
1423 : /* Set these just to suppress "uninitialized variable" warnings */
1424 18 : save_userid = InvalidOid;
1425 18 : save_sec_context = -1;
1426 18 : save_nestlevel = -1;
1427 : }
1428 :
1429 168 : indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
1430 :
1431 : /* Must be a BRIN index */
1432 150 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1433 150 : indexRel->rd_rel->relam != BRIN_AM_OID)
1434 18 : ereport(ERROR,
1435 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1436 : errmsg("\"%s\" is not a BRIN index",
1437 : RelationGetRelationName(indexRel))));
1438 :
1439 : /* User must own the index (comparable to privileges needed for VACUUM) */
1440 132 : if (heapRel != NULL && !object_ownercheck(RelationRelationId, indexoid, save_userid))
1441 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1442 0 : RelationGetRelationName(indexRel));
1443 :
1444 : /*
1445 : * Since we did the IndexGetRelation call above without any lock, it's
1446 : * barely possible that a race against an index drop/recreation could have
1447 : * netted us the wrong table. Recheck.
1448 : */
1449 132 : if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
1450 0 : ereport(ERROR,
1451 : (errcode(ERRCODE_UNDEFINED_TABLE),
1452 : errmsg("could not open parent table of index \"%s\"",
1453 : RelationGetRelationName(indexRel))));
1454 :
1455 : /* see gin_clean_pending_list() */
1456 132 : if (indexRel->rd_index->indisvalid)
1457 132 : brinsummarize(indexRel, heapRel, heapBlk, true, &numSummarized, NULL);
1458 : else
1459 0 : ereport(DEBUG1,
1460 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1461 : errmsg("index \"%s\" is not valid",
1462 : RelationGetRelationName(indexRel))));
1463 :
1464 : /* Roll back any GUC changes executed by index functions */
1465 132 : AtEOXact_GUC(false, save_nestlevel);
1466 :
1467 : /* Restore userid and security context */
1468 132 : SetUserIdAndSecContext(save_userid, save_sec_context);
1469 :
1470 132 : relation_close(indexRel, ShareUpdateExclusiveLock);
1471 132 : relation_close(heapRel, ShareUpdateExclusiveLock);
1472 :
1473 132 : PG_RETURN_INT32((int32) numSummarized);
1474 : }
1475 :
1476 : /*
1477 : * SQL-callable interface to mark a range as no longer summarized
1478 : */
1479 : Datum
1480 104 : brin_desummarize_range(PG_FUNCTION_ARGS)
1481 : {
1482 104 : Oid indexoid = PG_GETARG_OID(0);
1483 104 : int64 heapBlk64 = PG_GETARG_INT64(1);
1484 : BlockNumber heapBlk;
1485 : Oid heapoid;
1486 : Relation heapRel;
1487 : Relation indexRel;
1488 : bool done;
1489 :
1490 104 : if (RecoveryInProgress())
1491 0 : ereport(ERROR,
1492 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1493 : errmsg("recovery is in progress"),
1494 : errhint("BRIN control functions cannot be executed during recovery.")));
1495 :
1496 104 : if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
1497 18 : ereport(ERROR,
1498 : (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
1499 : errmsg("block number out of range: %lld",
1500 : (long long) heapBlk64)));
1501 86 : heapBlk = (BlockNumber) heapBlk64;
1502 :
1503 : /*
1504 : * We must lock table before index to avoid deadlocks. However, if the
1505 : * passed indexoid isn't an index then IndexGetRelation() will fail.
1506 : * Rather than emitting a not-very-helpful error message, postpone
1507 : * complaining, expecting that the is-it-an-index test below will fail.
1508 : *
1509 : * Unlike brin_summarize_range(), autovacuum never calls this. Hence, we
1510 : * don't switch userid.
1511 : */
1512 86 : heapoid = IndexGetRelation(indexoid, true);
1513 86 : if (OidIsValid(heapoid))
1514 86 : heapRel = table_open(heapoid, ShareUpdateExclusiveLock);
1515 : else
1516 0 : heapRel = NULL;
1517 :
1518 86 : indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
1519 :
1520 : /* Must be a BRIN index */
1521 86 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1522 86 : indexRel->rd_rel->relam != BRIN_AM_OID)
1523 0 : ereport(ERROR,
1524 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1525 : errmsg("\"%s\" is not a BRIN index",
1526 : RelationGetRelationName(indexRel))));
1527 :
1528 : /* User must own the index (comparable to privileges needed for VACUUM) */
1529 86 : if (!object_ownercheck(RelationRelationId, indexoid, GetUserId()))
1530 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1531 0 : RelationGetRelationName(indexRel));
1532 :
1533 : /*
1534 : * Since we did the IndexGetRelation call above without any lock, it's
1535 : * barely possible that a race against an index drop/recreation could have
1536 : * netted us the wrong table. Recheck.
1537 : */
1538 86 : if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
1539 0 : ereport(ERROR,
1540 : (errcode(ERRCODE_UNDEFINED_TABLE),
1541 : errmsg("could not open parent table of index \"%s\"",
1542 : RelationGetRelationName(indexRel))));
1543 :
1544 : /* see gin_clean_pending_list() */
1545 86 : if (indexRel->rd_index->indisvalid)
1546 : {
1547 : /* the revmap does the hard work */
1548 : do
1549 : {
1550 86 : done = brinRevmapDesummarizeRange(indexRel, heapBlk);
1551 : }
1552 86 : while (!done);
1553 : }
1554 : else
1555 0 : ereport(DEBUG1,
1556 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1557 : errmsg("index \"%s\" is not valid",
1558 : RelationGetRelationName(indexRel))));
1559 :
1560 86 : relation_close(indexRel, ShareUpdateExclusiveLock);
1561 86 : relation_close(heapRel, ShareUpdateExclusiveLock);
1562 :
1563 86 : PG_RETURN_VOID();
1564 : }
1565 :
1566 : /*
1567 : * Build a BrinDesc used to create or scan a BRIN index
1568 : */
1569 : BrinDesc *
1570 4502 : brin_build_desc(Relation rel)
1571 : {
1572 : BrinOpcInfo **opcinfo;
1573 : BrinDesc *bdesc;
1574 : TupleDesc tupdesc;
1575 4502 : int totalstored = 0;
1576 : int keyno;
1577 : long totalsize;
1578 : MemoryContext cxt;
1579 : MemoryContext oldcxt;
1580 :
1581 4502 : cxt = AllocSetContextCreate(CurrentMemoryContext,
1582 : "brin desc cxt",
1583 : ALLOCSET_SMALL_SIZES);
1584 4502 : oldcxt = MemoryContextSwitchTo(cxt);
1585 4502 : tupdesc = RelationGetDescr(rel);
1586 :
1587 : /*
1588 : * Obtain BrinOpcInfo for each indexed column. While at it, accumulate
1589 : * the number of columns stored, since the number is opclass-defined.
1590 : */
1591 4502 : opcinfo = palloc_array(BrinOpcInfo *, tupdesc->natts);
1592 75840 : for (keyno = 0; keyno < tupdesc->natts; keyno++)
1593 : {
1594 : FmgrInfo *opcInfoFn;
1595 71338 : Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
1596 :
1597 71338 : opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1598 :
1599 142676 : opcinfo[keyno] = (BrinOpcInfo *)
1600 71338 : DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
1601 71338 : totalstored += opcinfo[keyno]->oi_nstored;
1602 : }
1603 :
1604 : /* Allocate our result struct and fill it in */
1605 4502 : totalsize = offsetof(BrinDesc, bd_info) +
1606 4502 : sizeof(BrinOpcInfo *) * tupdesc->natts;
1607 :
1608 4502 : bdesc = palloc(totalsize);
1609 4502 : bdesc->bd_context = cxt;
1610 4502 : bdesc->bd_index = rel;
1611 4502 : bdesc->bd_tupdesc = tupdesc;
1612 4502 : bdesc->bd_disktdesc = NULL; /* generated lazily */
1613 4502 : bdesc->bd_totalstored = totalstored;
1614 :
1615 75840 : for (keyno = 0; keyno < tupdesc->natts; keyno++)
1616 71338 : bdesc->bd_info[keyno] = opcinfo[keyno];
1617 4502 : pfree(opcinfo);
1618 :
1619 4502 : MemoryContextSwitchTo(oldcxt);
1620 :
1621 4502 : return bdesc;
1622 : }
1623 :
1624 : void
1625 3404 : brin_free_desc(BrinDesc *bdesc)
1626 : {
1627 : /* make sure the tupdesc is still valid */
1628 : Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1629 : /* no need for retail pfree */
1630 3404 : MemoryContextDelete(bdesc->bd_context);
1631 3404 : }
1632 :
1633 : /*
1634 : * Fetch index's statistical data into *stats
1635 : */
1636 : void
1637 10730 : brinGetStats(Relation index, BrinStatsData *stats)
1638 : {
1639 : Buffer metabuffer;
1640 : Page metapage;
1641 : BrinMetaPageData *metadata;
1642 :
1643 10730 : metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1644 10730 : LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1645 10730 : metapage = BufferGetPage(metabuffer);
1646 10730 : metadata = (BrinMetaPageData *) PageGetContents(metapage);
1647 :
1648 10730 : stats->pagesPerRange = metadata->pagesPerRange;
1649 10730 : stats->revmapNumPages = metadata->lastRevmapPage - 1;
1650 :
1651 10730 : UnlockReleaseBuffer(metabuffer);
1652 10730 : }
1653 :
1654 : /*
1655 : * Initialize a BrinBuildState appropriate to create tuples on the given index.
1656 : */
1657 : static BrinBuildState *
1658 422 : initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1659 : BlockNumber pagesPerRange, BlockNumber tablePages)
1660 : {
1661 : BrinBuildState *state;
1662 422 : BlockNumber lastRange = 0;
1663 :
1664 422 : state = palloc_object(BrinBuildState);
1665 :
1666 422 : state->bs_irel = idxRel;
1667 422 : state->bs_numtuples = 0;
1668 422 : state->bs_reltuples = 0;
1669 422 : state->bs_currentInsertBuf = InvalidBuffer;
1670 422 : state->bs_pagesPerRange = pagesPerRange;
1671 422 : state->bs_currRangeStart = 0;
1672 422 : state->bs_rmAccess = revmap;
1673 422 : state->bs_bdesc = brin_build_desc(idxRel);
1674 422 : state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1675 422 : state->bs_leader = NULL;
1676 422 : state->bs_worker_id = 0;
1677 422 : state->bs_sortstate = NULL;
1678 422 : state->bs_context = CurrentMemoryContext;
1679 422 : state->bs_emptyTuple = NULL;
1680 422 : state->bs_emptyTupleLen = 0;
1681 :
1682 : /* Remember the memory context to use for an empty tuple, if needed. */
1683 422 : state->bs_context = CurrentMemoryContext;
1684 422 : state->bs_emptyTuple = NULL;
1685 422 : state->bs_emptyTupleLen = 0;
1686 :
1687 : /*
1688 : * Calculate the start of the last page range. Page numbers are 0-based,
1689 : * so to calculate the index we need to subtract one. The integer division
1690 : * gives us the index of the page range.
1691 : */
1692 422 : if (tablePages > 0)
1693 326 : lastRange = ((tablePages - 1) / pagesPerRange) * pagesPerRange;
1694 :
1695 : /* Now calculate the start of the next range. */
1696 422 : state->bs_maxRangeStart = lastRange + state->bs_pagesPerRange;
1697 :
1698 422 : return state;
1699 : }
1700 :
1701 : /*
1702 : * Release resources associated with a BrinBuildState.
1703 : */
1704 : static void
1705 416 : terminate_brin_buildstate(BrinBuildState *state)
1706 : {
1707 : /*
1708 : * Release the last index buffer used. We might as well ensure that
1709 : * whatever free space remains in that page is available in FSM, too.
1710 : */
1711 416 : if (!BufferIsInvalid(state->bs_currentInsertBuf))
1712 : {
1713 : Page page;
1714 : Size freespace;
1715 : BlockNumber blk;
1716 :
1717 338 : page = BufferGetPage(state->bs_currentInsertBuf);
1718 338 : freespace = PageGetFreeSpace(page);
1719 338 : blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
1720 338 : ReleaseBuffer(state->bs_currentInsertBuf);
1721 338 : RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
1722 338 : FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
1723 : }
1724 :
1725 416 : brin_free_desc(state->bs_bdesc);
1726 416 : pfree(state->bs_dtuple);
1727 416 : pfree(state);
1728 416 : }
1729 :
1730 : /*
1731 : * On the given BRIN index, summarize the heap page range that corresponds
1732 : * to the heap block number given.
1733 : *
1734 : * This routine can run in parallel with insertions into the heap. To avoid
1735 : * missing those values from the summary tuple, we first insert a placeholder
1736 : * index tuple into the index, then execute the heap scan; transactions
1737 : * concurrent with the scan update the placeholder tuple. After the scan, we
1738 : * union the placeholder tuple with the one computed by this routine. The
1739 : * update of the index value happens in a loop, so that if somebody updates
1740 : * the placeholder tuple after we read it, we detect the case and try again.
1741 : * This ensures that the concurrently inserted tuples are not lost.
1742 : *
1743 : * A further corner case is this routine being asked to summarize the partial
1744 : * range at the end of the table. heapNumBlocks is the (possibly outdated)
1745 : * table size; if we notice that the requested range lies beyond that size,
1746 : * we re-compute the table size after inserting the placeholder tuple, to
1747 : * avoid missing pages that were appended recently.
1748 : */
1749 : static void
1750 2934 : summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1751 : BlockNumber heapBlk, BlockNumber heapNumBlks)
1752 : {
1753 : Buffer phbuf;
1754 : BrinTuple *phtup;
1755 : Size phsz;
1756 : OffsetNumber offset;
1757 : BlockNumber scanNumBlks;
1758 :
1759 : /*
1760 : * Insert the placeholder tuple
1761 : */
1762 2934 : phbuf = InvalidBuffer;
1763 2934 : phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1764 2934 : offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1765 : state->bs_rmAccess, &phbuf,
1766 : heapBlk, phtup, phsz);
1767 :
1768 : /*
1769 : * Compute range end. We hold ShareUpdateExclusive lock on table, so it
1770 : * cannot shrink concurrently (but it can grow).
1771 : */
1772 : Assert(heapBlk % state->bs_pagesPerRange == 0);
1773 2934 : if (heapBlk + state->bs_pagesPerRange > heapNumBlks)
1774 : {
1775 : /*
1776 : * If we're asked to scan what we believe to be the final range on the
1777 : * table (i.e. a range that might be partial) we need to recompute our
1778 : * idea of what the latest page is after inserting the placeholder
1779 : * tuple. Anyone that grows the table later will update the
1780 : * placeholder tuple, so it doesn't matter that we won't scan these
1781 : * pages ourselves. Careful: the table might have been extended
1782 : * beyond the current range, so clamp our result.
1783 : *
1784 : * Fortunately, this should occur infrequently.
1785 : */
1786 24 : scanNumBlks = Min(RelationGetNumberOfBlocks(heapRel) - heapBlk,
1787 : state->bs_pagesPerRange);
1788 : }
1789 : else
1790 : {
1791 : /* Easy case: range is known to be complete */
1792 2910 : scanNumBlks = state->bs_pagesPerRange;
1793 : }
1794 :
1795 : /*
1796 : * Execute the partial heap scan covering the heap blocks in the specified
1797 : * page range, summarizing the heap tuples in it. This scan stops just
1798 : * short of brinbuildCallback creating the new index entry.
1799 : *
1800 : * Note that it is critical we use the "any visible" mode of
1801 : * table_index_build_range_scan here: otherwise, we would miss tuples
1802 : * inserted by transactions that are still in progress, among other corner
1803 : * cases.
1804 : */
1805 2934 : state->bs_currRangeStart = heapBlk;
1806 2934 : table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true, false,
1807 : heapBlk, scanNumBlks,
1808 : brinbuildCallback, (void *) state, NULL);
1809 :
1810 : /*
1811 : * Now we update the values obtained by the scan with the placeholder
1812 : * tuple. We do this in a loop which only terminates if we're able to
1813 : * update the placeholder tuple successfully; if we are not, this means
1814 : * somebody else modified the placeholder tuple after we read it.
1815 : */
1816 : for (;;)
1817 0 : {
1818 : BrinTuple *newtup;
1819 : Size newsize;
1820 : bool didupdate;
1821 : bool samepage;
1822 :
1823 2934 : CHECK_FOR_INTERRUPTS();
1824 :
1825 : /*
1826 : * Update the summary tuple and try to update.
1827 : */
1828 2934 : newtup = brin_form_tuple(state->bs_bdesc,
1829 : heapBlk, state->bs_dtuple, &newsize);
1830 2934 : samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1831 : didupdate =
1832 2934 : brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1833 : state->bs_rmAccess, heapBlk, phbuf, offset,
1834 : phtup, phsz, newtup, newsize, samepage);
1835 2934 : brin_free_tuple(phtup);
1836 2934 : brin_free_tuple(newtup);
1837 :
1838 : /* If the update succeeded, we're done. */
1839 2934 : if (didupdate)
1840 2934 : break;
1841 :
1842 : /*
1843 : * If the update didn't work, it might be because somebody updated the
1844 : * placeholder tuple concurrently. Extract the new version, union it
1845 : * with the values we have from the scan, and start over. (There are
1846 : * other reasons for the update to fail, but it's simple to treat them
1847 : * the same.)
1848 : */
1849 0 : phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1850 : &offset, &phsz, BUFFER_LOCK_SHARE);
1851 : /* the placeholder tuple must exist */
1852 0 : if (phtup == NULL)
1853 0 : elog(ERROR, "missing placeholder tuple");
1854 0 : phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1855 0 : LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1856 :
1857 : /* merge it into the tuple from the heap scan */
1858 0 : union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1859 : }
1860 :
1861 2934 : ReleaseBuffer(phbuf);
1862 2934 : }
1863 :
1864 : /*
1865 : * Summarize page ranges that are not already summarized. If pageRange is
1866 : * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1867 : * page range containing the given heap page number is scanned.
1868 : * If include_partial is true, then the partial range at the end of the table
1869 : * is summarized, otherwise not.
1870 : *
1871 : * For each new index tuple inserted, *numSummarized (if not NULL) is
1872 : * incremented; for each existing tuple, *numExisting (if not NULL) is
1873 : * incremented.
1874 : */
1875 : static void
1876 218 : brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1877 : bool include_partial, double *numSummarized, double *numExisting)
1878 : {
1879 : BrinRevmap *revmap;
1880 218 : BrinBuildState *state = NULL;
1881 218 : IndexInfo *indexInfo = NULL;
1882 : BlockNumber heapNumBlocks;
1883 : BlockNumber pagesPerRange;
1884 : Buffer buf;
1885 : BlockNumber startBlk;
1886 :
1887 218 : revmap = brinRevmapInitialize(index, &pagesPerRange);
1888 :
1889 : /* determine range of pages to process */
1890 218 : heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1891 218 : if (pageRange == BRIN_ALL_BLOCKRANGES)
1892 144 : startBlk = 0;
1893 : else
1894 : {
1895 74 : startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1896 74 : heapNumBlocks = Min(heapNumBlocks, startBlk + pagesPerRange);
1897 : }
1898 218 : if (startBlk > heapNumBlocks)
1899 : {
1900 : /* Nothing to do if start point is beyond end of table */
1901 0 : brinRevmapTerminate(revmap);
1902 0 : return;
1903 : }
1904 :
1905 : /*
1906 : * Scan the revmap to find unsummarized items.
1907 : */
1908 218 : buf = InvalidBuffer;
1909 18944 : for (; startBlk < heapNumBlocks; startBlk += pagesPerRange)
1910 : {
1911 : BrinTuple *tup;
1912 : OffsetNumber off;
1913 :
1914 : /*
1915 : * Unless requested to summarize even a partial range, go away now if
1916 : * we think the next range is partial. Caller would pass true when it
1917 : * is typically run once bulk data loading is done
1918 : * (brin_summarize_new_values), and false when it is typically the
1919 : * result of arbitrarily-scheduled maintenance command (vacuuming).
1920 : */
1921 18792 : if (!include_partial &&
1922 2050 : (startBlk + pagesPerRange > heapNumBlocks))
1923 66 : break;
1924 :
1925 18726 : CHECK_FOR_INTERRUPTS();
1926 :
1927 18726 : tup = brinGetTupleForHeapBlock(revmap, startBlk, &buf, &off, NULL,
1928 : BUFFER_LOCK_SHARE);
1929 18726 : if (tup == NULL)
1930 : {
1931 : /* no revmap entry for this heap range. Summarize it. */
1932 2934 : if (state == NULL)
1933 : {
1934 : /* first time through */
1935 : Assert(!indexInfo);
1936 78 : state = initialize_brin_buildstate(index, revmap,
1937 : pagesPerRange,
1938 : InvalidBlockNumber);
1939 78 : indexInfo = BuildIndexInfo(index);
1940 : }
1941 2934 : summarize_range(indexInfo, state, heapRel, startBlk, heapNumBlocks);
1942 :
1943 : /* and re-initialize state for the next range */
1944 2934 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1945 :
1946 2934 : if (numSummarized)
1947 2934 : *numSummarized += 1.0;
1948 : }
1949 : else
1950 : {
1951 15792 : if (numExisting)
1952 1892 : *numExisting += 1.0;
1953 15792 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1954 : }
1955 : }
1956 :
1957 218 : if (BufferIsValid(buf))
1958 152 : ReleaseBuffer(buf);
1959 :
1960 : /* free resources */
1961 218 : brinRevmapTerminate(revmap);
1962 218 : if (state)
1963 : {
1964 78 : terminate_brin_buildstate(state);
1965 78 : pfree(indexInfo);
1966 : }
1967 : }
1968 :
1969 : /*
1970 : * Given a deformed tuple in the build state, convert it into the on-disk
1971 : * format and insert it into the index, making the revmap point to it.
1972 : */
1973 : static void
1974 2604 : form_and_insert_tuple(BrinBuildState *state)
1975 : {
1976 : BrinTuple *tup;
1977 : Size size;
1978 :
1979 2604 : tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1980 : state->bs_dtuple, &size);
1981 2604 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1982 : &state->bs_currentInsertBuf, state->bs_currRangeStart,
1983 : tup, size);
1984 2604 : state->bs_numtuples++;
1985 :
1986 2604 : pfree(tup);
1987 2604 : }
1988 :
1989 : /*
1990 : * Given a deformed tuple in the build state, convert it into the on-disk
1991 : * format and write it to a (shared) tuplesort (the leader will insert it
1992 : * into the index later).
1993 : */
1994 : static void
1995 36 : form_and_spill_tuple(BrinBuildState *state)
1996 : {
1997 : BrinTuple *tup;
1998 : Size size;
1999 :
2000 : /* don't insert empty tuples in parallel build */
2001 36 : if (state->bs_dtuple->bt_empty_range)
2002 6 : return;
2003 :
2004 30 : tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
2005 : state->bs_dtuple, &size);
2006 :
2007 : /* write the BRIN tuple to the tuplesort */
2008 30 : tuplesort_putbrintuple(state->bs_sortstate, tup, size);
2009 :
2010 30 : state->bs_numtuples++;
2011 :
2012 30 : pfree(tup);
2013 : }
2014 :
2015 : /*
2016 : * Given two deformed tuples, adjust the first one so that it's consistent
2017 : * with the summary values in both.
2018 : */
2019 : static void
2020 0 : union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
2021 : {
2022 : int keyno;
2023 : BrinMemTuple *db;
2024 : MemoryContext cxt;
2025 : MemoryContext oldcxt;
2026 :
2027 : /* Use our own memory context to avoid retail pfree */
2028 0 : cxt = AllocSetContextCreate(CurrentMemoryContext,
2029 : "brin union",
2030 : ALLOCSET_DEFAULT_SIZES);
2031 0 : oldcxt = MemoryContextSwitchTo(cxt);
2032 0 : db = brin_deform_tuple(bdesc, b, NULL);
2033 0 : MemoryContextSwitchTo(oldcxt);
2034 :
2035 : /*
2036 : * Check if the ranges are empty.
2037 : *
2038 : * If at least one of them is empty, we don't need to call per-key union
2039 : * functions at all. If "b" is empty, we just use "a" as the result (it
2040 : * might be empty fine, but that's fine). If "a" is empty but "b" is not,
2041 : * we use "b" as the result (but we have to copy the data into "a" first).
2042 : *
2043 : * Only when both ranges are non-empty, we actually do the per-key merge.
2044 : */
2045 :
2046 : /* If "b" is empty - ignore it and just use "a" (even if it's empty etc.). */
2047 0 : if (db->bt_empty_range)
2048 : {
2049 : /* skip the per-key merge */
2050 0 : MemoryContextDelete(cxt);
2051 0 : return;
2052 : }
2053 :
2054 : /*
2055 : * Now we know "b" is not empty. If "a" is empty, then "b" is the result.
2056 : * But we need to copy the data from "b" to "a" first, because that's how
2057 : * we pass result out.
2058 : *
2059 : * We have to copy all the global/per-key flags etc. too.
2060 : */
2061 0 : if (a->bt_empty_range)
2062 : {
2063 0 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
2064 : {
2065 : int i;
2066 0 : BrinValues *col_a = &a->bt_columns[keyno];
2067 0 : BrinValues *col_b = &db->bt_columns[keyno];
2068 0 : BrinOpcInfo *opcinfo = bdesc->bd_info[keyno];
2069 :
2070 0 : col_a->bv_allnulls = col_b->bv_allnulls;
2071 0 : col_a->bv_hasnulls = col_b->bv_hasnulls;
2072 :
2073 : /* If "b" has no data, we're done. */
2074 0 : if (col_b->bv_allnulls)
2075 0 : continue;
2076 :
2077 0 : for (i = 0; i < opcinfo->oi_nstored; i++)
2078 0 : col_a->bv_values[i] =
2079 0 : datumCopy(col_b->bv_values[i],
2080 0 : opcinfo->oi_typcache[i]->typbyval,
2081 0 : opcinfo->oi_typcache[i]->typlen);
2082 : }
2083 :
2084 : /* "a" started empty, but "b" was not empty, so remember that */
2085 0 : a->bt_empty_range = false;
2086 :
2087 : /* skip the per-key merge */
2088 0 : MemoryContextDelete(cxt);
2089 0 : return;
2090 : }
2091 :
2092 : /* Now we know neither range is empty. */
2093 0 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
2094 : {
2095 : FmgrInfo *unionFn;
2096 0 : BrinValues *col_a = &a->bt_columns[keyno];
2097 0 : BrinValues *col_b = &db->bt_columns[keyno];
2098 0 : BrinOpcInfo *opcinfo = bdesc->bd_info[keyno];
2099 :
2100 0 : if (opcinfo->oi_regular_nulls)
2101 : {
2102 : /* Does the "b" summary represent any NULL values? */
2103 0 : bool b_has_nulls = (col_b->bv_hasnulls || col_b->bv_allnulls);
2104 :
2105 : /* Adjust "hasnulls". */
2106 0 : if (!col_a->bv_allnulls && b_has_nulls)
2107 0 : col_a->bv_hasnulls = true;
2108 :
2109 : /* If there are no values in B, there's nothing left to do. */
2110 0 : if (col_b->bv_allnulls)
2111 0 : continue;
2112 :
2113 : /*
2114 : * Adjust "allnulls". If A doesn't have values, just copy the
2115 : * values from B into A, and we're done. We cannot run the
2116 : * operators in this case, because values in A might contain
2117 : * garbage. Note we already established that B contains values.
2118 : *
2119 : * Also adjust "hasnulls" in order not to forget the summary
2120 : * represents NULL values. This is not redundant with the earlier
2121 : * update, because that only happens when allnulls=false.
2122 : */
2123 0 : if (col_a->bv_allnulls)
2124 : {
2125 : int i;
2126 :
2127 0 : col_a->bv_allnulls = false;
2128 0 : col_a->bv_hasnulls = true;
2129 :
2130 0 : for (i = 0; i < opcinfo->oi_nstored; i++)
2131 0 : col_a->bv_values[i] =
2132 0 : datumCopy(col_b->bv_values[i],
2133 0 : opcinfo->oi_typcache[i]->typbyval,
2134 0 : opcinfo->oi_typcache[i]->typlen);
2135 :
2136 0 : continue;
2137 : }
2138 : }
2139 :
2140 0 : unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
2141 : BRIN_PROCNUM_UNION);
2142 0 : FunctionCall3Coll(unionFn,
2143 0 : bdesc->bd_index->rd_indcollation[keyno],
2144 : PointerGetDatum(bdesc),
2145 : PointerGetDatum(col_a),
2146 : PointerGetDatum(col_b));
2147 : }
2148 :
2149 0 : MemoryContextDelete(cxt);
2150 : }
2151 :
2152 : /*
2153 : * brin_vacuum_scan
2154 : * Do a complete scan of the index during VACUUM.
2155 : *
2156 : * This routine scans the complete index looking for uncataloged index pages,
2157 : * i.e. those that might have been lost due to a crash after index extension
2158 : * and such.
2159 : */
2160 : static void
2161 86 : brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
2162 : {
2163 : BlockNumber nblocks;
2164 : BlockNumber blkno;
2165 :
2166 : /*
2167 : * Scan the index in physical order, and clean up any possible mess in
2168 : * each page.
2169 : */
2170 86 : nblocks = RelationGetNumberOfBlocks(idxrel);
2171 458 : for (blkno = 0; blkno < nblocks; blkno++)
2172 : {
2173 : Buffer buf;
2174 :
2175 372 : CHECK_FOR_INTERRUPTS();
2176 :
2177 372 : buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
2178 : RBM_NORMAL, strategy);
2179 :
2180 372 : brin_page_cleanup(idxrel, buf);
2181 :
2182 372 : ReleaseBuffer(buf);
2183 : }
2184 :
2185 : /*
2186 : * Update all upper pages in the index's FSM, as well. This ensures not
2187 : * only that we propagate leaf-page FSM updates made by brin_page_cleanup,
2188 : * but also that any pre-existing damage or out-of-dateness is repaired.
2189 : */
2190 86 : FreeSpaceMapVacuum(idxrel);
2191 86 : }
2192 :
2193 : static bool
2194 784152 : add_values_to_range(Relation idxRel, BrinDesc *bdesc, BrinMemTuple *dtup,
2195 : const Datum *values, const bool *nulls)
2196 : {
2197 : int keyno;
2198 :
2199 : /* If the range starts empty, we're certainly going to modify it. */
2200 784152 : bool modified = dtup->bt_empty_range;
2201 :
2202 : /*
2203 : * Compare the key values of the new tuple to the stored index values; our
2204 : * deformed tuple will get updated if the new tuple doesn't fit the
2205 : * original range (note this means we can't break out of the loop early).
2206 : * Make a note of whether this happens, so that we know to insert the
2207 : * modified tuple later.
2208 : */
2209 1847856 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
2210 : {
2211 : Datum result;
2212 : BrinValues *bval;
2213 : FmgrInfo *addValue;
2214 : bool has_nulls;
2215 :
2216 1063704 : bval = &dtup->bt_columns[keyno];
2217 :
2218 : /*
2219 : * Does the range have actual NULL values? Either of the flags can be
2220 : * set, but we ignore the state before adding first row.
2221 : *
2222 : * We have to remember this, because we'll modify the flags and we
2223 : * need to know if the range started as empty.
2224 : */
2225 2090972 : has_nulls = ((!dtup->bt_empty_range) &&
2226 1027268 : (bval->bv_hasnulls || bval->bv_allnulls));
2227 :
2228 : /*
2229 : * If the value we're adding is NULL, handle it locally. Otherwise
2230 : * call the BRIN_PROCNUM_ADDVALUE procedure.
2231 : */
2232 1063704 : if (bdesc->bd_info[keyno]->oi_regular_nulls && nulls[keyno])
2233 : {
2234 : /*
2235 : * If the new value is null, we record that we saw it if it's the
2236 : * first one; otherwise, there's nothing to do.
2237 : */
2238 18616 : if (!bval->bv_hasnulls)
2239 : {
2240 3544 : bval->bv_hasnulls = true;
2241 3544 : modified = true;
2242 : }
2243 :
2244 18616 : continue;
2245 : }
2246 :
2247 1045088 : addValue = index_getprocinfo(idxRel, keyno + 1,
2248 : BRIN_PROCNUM_ADDVALUE);
2249 1045088 : result = FunctionCall4Coll(addValue,
2250 1045088 : idxRel->rd_indcollation[keyno],
2251 : PointerGetDatum(bdesc),
2252 : PointerGetDatum(bval),
2253 1045088 : values[keyno],
2254 1045088 : nulls[keyno]);
2255 : /* if that returned true, we need to insert the updated tuple */
2256 1045088 : modified |= DatumGetBool(result);
2257 :
2258 : /*
2259 : * If the range was had actual NULL values (i.e. did not start empty),
2260 : * make sure we don't forget about the NULL values. Either the
2261 : * allnulls flag is still set to true, or (if the opclass cleared it)
2262 : * we need to set hasnulls=true.
2263 : *
2264 : * XXX This can only happen when the opclass modified the tuple, so
2265 : * the modified flag should be set.
2266 : */
2267 1045088 : if (has_nulls && !(bval->bv_hasnulls || bval->bv_allnulls))
2268 : {
2269 : Assert(modified);
2270 4 : bval->bv_hasnulls = true;
2271 : }
2272 : }
2273 :
2274 : /*
2275 : * After updating summaries for all the keys, mark it as not empty.
2276 : *
2277 : * If we're actually changing the flag value (i.e. tuple started as
2278 : * empty), we should have modified the tuple. So we should not see empty
2279 : * range that was not modified.
2280 : */
2281 : Assert(!dtup->bt_empty_range || modified);
2282 784152 : dtup->bt_empty_range = false;
2283 :
2284 784152 : return modified;
2285 : }
2286 :
2287 : static bool
2288 189936 : check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
2289 : {
2290 : int keyno;
2291 :
2292 : /*
2293 : * First check if there are any IS [NOT] NULL scan keys, and if we're
2294 : * violating them.
2295 : */
2296 191172 : for (keyno = 0; keyno < nnullkeys; keyno++)
2297 : {
2298 2232 : ScanKey key = nullkeys[keyno];
2299 :
2300 : Assert(key->sk_attno == bval->bv_attno);
2301 :
2302 : /* Handle only IS NULL/IS NOT NULL tests */
2303 2232 : if (!(key->sk_flags & SK_ISNULL))
2304 0 : continue;
2305 :
2306 2232 : if (key->sk_flags & SK_SEARCHNULL)
2307 : {
2308 : /* IS NULL scan key, but range has no NULLs */
2309 1116 : if (!bval->bv_allnulls && !bval->bv_hasnulls)
2310 978 : return false;
2311 : }
2312 1116 : else if (key->sk_flags & SK_SEARCHNOTNULL)
2313 : {
2314 : /*
2315 : * For IS NOT NULL, we can only skip ranges that are known to have
2316 : * only nulls.
2317 : */
2318 1116 : if (bval->bv_allnulls)
2319 18 : return false;
2320 : }
2321 : else
2322 : {
2323 : /*
2324 : * Neither IS NULL nor IS NOT NULL was used; assume all indexable
2325 : * operators are strict and thus return false with NULL value in
2326 : * the scan key.
2327 : */
2328 0 : return false;
2329 : }
2330 : }
2331 :
2332 188940 : return true;
2333 : }
2334 :
2335 : /*
2336 : * Create parallel context, and launch workers for leader.
2337 : *
2338 : * buildstate argument should be initialized (with the exception of the
2339 : * tuplesort states, which may later be created based on shared
2340 : * state initially set up here).
2341 : *
2342 : * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY.
2343 : *
2344 : * request is the target number of parallel worker processes to launch.
2345 : *
2346 : * Sets buildstate's BrinLeader, which caller must use to shut down parallel
2347 : * mode by passing it to _brin_end_parallel() at the very end of its index
2348 : * build. If not even a single worker process can be launched, this is
2349 : * never set, and caller should proceed with a serial index build.
2350 : */
2351 : static void
2352 4 : _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
2353 : bool isconcurrent, int request)
2354 : {
2355 : ParallelContext *pcxt;
2356 : int scantuplesortstates;
2357 : Snapshot snapshot;
2358 : Size estbrinshared;
2359 : Size estsort;
2360 : BrinShared *brinshared;
2361 : Sharedsort *sharedsort;
2362 4 : BrinLeader *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader));
2363 : WalUsage *walusage;
2364 : BufferUsage *bufferusage;
2365 4 : bool leaderparticipates = true;
2366 : int querylen;
2367 :
2368 : #ifdef DISABLE_LEADER_PARTICIPATION
2369 : leaderparticipates = false;
2370 : #endif
2371 :
2372 : /*
2373 : * Enter parallel mode, and create context for parallel build of brin
2374 : * index
2375 : */
2376 4 : EnterParallelMode();
2377 : Assert(request > 0);
2378 4 : pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main",
2379 : request);
2380 :
2381 4 : scantuplesortstates = leaderparticipates ? request + 1 : request;
2382 :
2383 : /*
2384 : * Prepare for scan of the base relation. In a normal index build, we use
2385 : * SnapshotAny because we must retrieve all tuples and do our own time
2386 : * qual checks (because we have to index RECENTLY_DEAD tuples). In a
2387 : * concurrent build, we take a regular MVCC snapshot and index whatever's
2388 : * live according to that.
2389 : */
2390 4 : if (!isconcurrent)
2391 4 : snapshot = SnapshotAny;
2392 : else
2393 0 : snapshot = RegisterSnapshot(GetTransactionSnapshot());
2394 :
2395 : /*
2396 : * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
2397 : */
2398 4 : estbrinshared = _brin_parallel_estimate_shared(heap, snapshot);
2399 4 : shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared);
2400 4 : estsort = tuplesort_estimate_shared(scantuplesortstates);
2401 4 : shm_toc_estimate_chunk(&pcxt->estimator, estsort);
2402 :
2403 4 : shm_toc_estimate_keys(&pcxt->estimator, 2);
2404 :
2405 : /*
2406 : * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
2407 : * and PARALLEL_KEY_BUFFER_USAGE.
2408 : *
2409 : * If there are no extensions loaded that care, we could skip this. We
2410 : * have no way of knowing whether anyone's looking at pgWalUsage or
2411 : * pgBufferUsage, so do it unconditionally.
2412 : */
2413 4 : shm_toc_estimate_chunk(&pcxt->estimator,
2414 : mul_size(sizeof(WalUsage), pcxt->nworkers));
2415 4 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2416 4 : shm_toc_estimate_chunk(&pcxt->estimator,
2417 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
2418 4 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2419 :
2420 : /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
2421 4 : if (debug_query_string)
2422 : {
2423 4 : querylen = strlen(debug_query_string);
2424 4 : shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
2425 4 : shm_toc_estimate_keys(&pcxt->estimator, 1);
2426 : }
2427 : else
2428 0 : querylen = 0; /* keep compiler quiet */
2429 :
2430 : /* Everyone's had a chance to ask for space, so now create the DSM */
2431 4 : InitializeParallelDSM(pcxt);
2432 :
2433 : /* If no DSM segment was available, back out (do serial build) */
2434 4 : if (pcxt->seg == NULL)
2435 : {
2436 0 : if (IsMVCCSnapshot(snapshot))
2437 0 : UnregisterSnapshot(snapshot);
2438 0 : DestroyParallelContext(pcxt);
2439 0 : ExitParallelMode();
2440 0 : return;
2441 : }
2442 :
2443 : /* Store shared build state, for which we reserved space */
2444 4 : brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared);
2445 : /* Initialize immutable state */
2446 4 : brinshared->heaprelid = RelationGetRelid(heap);
2447 4 : brinshared->indexrelid = RelationGetRelid(index);
2448 4 : brinshared->isconcurrent = isconcurrent;
2449 4 : brinshared->scantuplesortstates = scantuplesortstates;
2450 4 : brinshared->pagesPerRange = buildstate->bs_pagesPerRange;
2451 4 : ConditionVariableInit(&brinshared->workersdonecv);
2452 4 : SpinLockInit(&brinshared->mutex);
2453 :
2454 : /* Initialize mutable state */
2455 4 : brinshared->nparticipantsdone = 0;
2456 4 : brinshared->reltuples = 0.0;
2457 4 : brinshared->indtuples = 0.0;
2458 :
2459 4 : table_parallelscan_initialize(heap,
2460 : ParallelTableScanFromBrinShared(brinshared),
2461 : snapshot);
2462 :
2463 : /*
2464 : * Store shared tuplesort-private state, for which we reserved space.
2465 : * Then, initialize opaque state using tuplesort routine.
2466 : */
2467 4 : sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
2468 4 : tuplesort_initialize_shared(sharedsort, scantuplesortstates,
2469 : pcxt->seg);
2470 :
2471 : /*
2472 : * Store shared tuplesort-private state, for which we reserved space.
2473 : * Then, initialize opaque state using tuplesort routine.
2474 : */
2475 4 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BRIN_SHARED, brinshared);
2476 4 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
2477 :
2478 : /* Store query string for workers */
2479 4 : if (debug_query_string)
2480 : {
2481 : char *sharedquery;
2482 :
2483 4 : sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
2484 4 : memcpy(sharedquery, debug_query_string, querylen + 1);
2485 4 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
2486 : }
2487 :
2488 : /*
2489 : * Allocate space for each worker's WalUsage and BufferUsage; no need to
2490 : * initialize.
2491 : */
2492 4 : walusage = shm_toc_allocate(pcxt->toc,
2493 4 : mul_size(sizeof(WalUsage), pcxt->nworkers));
2494 4 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
2495 4 : bufferusage = shm_toc_allocate(pcxt->toc,
2496 4 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
2497 4 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
2498 :
2499 : /* Launch workers, saving status for leader/caller */
2500 4 : LaunchParallelWorkers(pcxt);
2501 4 : brinleader->pcxt = pcxt;
2502 4 : brinleader->nparticipanttuplesorts = pcxt->nworkers_launched;
2503 4 : if (leaderparticipates)
2504 4 : brinleader->nparticipanttuplesorts++;
2505 4 : brinleader->brinshared = brinshared;
2506 4 : brinleader->sharedsort = sharedsort;
2507 4 : brinleader->snapshot = snapshot;
2508 4 : brinleader->walusage = walusage;
2509 4 : brinleader->bufferusage = bufferusage;
2510 :
2511 : /* If no workers were successfully launched, back out (do serial build) */
2512 4 : if (pcxt->nworkers_launched == 0)
2513 : {
2514 2 : _brin_end_parallel(brinleader, NULL);
2515 2 : return;
2516 : }
2517 :
2518 : /* Save leader state now that it's clear build will be parallel */
2519 2 : buildstate->bs_leader = brinleader;
2520 :
2521 : /* Join heap scan ourselves */
2522 2 : if (leaderparticipates)
2523 2 : _brin_leader_participate_as_worker(buildstate, heap, index);
2524 :
2525 : /*
2526 : * Caller needs to wait for all launched workers when we return. Make
2527 : * sure that the failure-to-start case will not hang forever.
2528 : */
2529 2 : WaitForParallelWorkersToAttach(pcxt);
2530 : }
2531 :
2532 : /*
2533 : * Shut down workers, destroy parallel context, and end parallel mode.
2534 : */
2535 : static void
2536 4 : _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
2537 : {
2538 : int i;
2539 :
2540 : /* Shutdown worker processes */
2541 4 : WaitForParallelWorkersToFinish(brinleader->pcxt);
2542 :
2543 : /*
2544 : * Next, accumulate WAL usage. (This must wait for the workers to finish,
2545 : * or we might get incomplete data.)
2546 : */
2547 10 : for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
2548 6 : InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
2549 :
2550 : /* Free last reference to MVCC snapshot, if one was used */
2551 4 : if (IsMVCCSnapshot(brinleader->snapshot))
2552 0 : UnregisterSnapshot(brinleader->snapshot);
2553 4 : DestroyParallelContext(brinleader->pcxt);
2554 4 : ExitParallelMode();
2555 4 : }
2556 :
2557 : /*
2558 : * Within leader, wait for end of heap scan.
2559 : *
2560 : * When called, parallel heap scan started by _brin_begin_parallel() will
2561 : * already be underway within worker processes (when leader participates
2562 : * as a worker, we should end up here just as workers are finishing).
2563 : *
2564 : * Returns the total number of heap tuples scanned.
2565 : */
2566 : static double
2567 2 : _brin_parallel_heapscan(BrinBuildState *state)
2568 : {
2569 2 : BrinShared *brinshared = state->bs_leader->brinshared;
2570 : int nparticipanttuplesorts;
2571 :
2572 2 : nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts;
2573 : for (;;)
2574 : {
2575 6 : SpinLockAcquire(&brinshared->mutex);
2576 6 : if (brinshared->nparticipantsdone == nparticipanttuplesorts)
2577 : {
2578 : /* copy the data into leader state */
2579 2 : state->bs_reltuples = brinshared->reltuples;
2580 2 : state->bs_numtuples = brinshared->indtuples;
2581 :
2582 2 : SpinLockRelease(&brinshared->mutex);
2583 2 : break;
2584 : }
2585 4 : SpinLockRelease(&brinshared->mutex);
2586 :
2587 4 : ConditionVariableSleep(&brinshared->workersdonecv,
2588 : WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN);
2589 : }
2590 :
2591 2 : ConditionVariableCancelSleep();
2592 :
2593 2 : return state->bs_reltuples;
2594 : }
2595 :
2596 : /*
2597 : * Within leader, wait for end of heap scan and merge per-worker results.
2598 : *
2599 : * After waiting for all workers to finish, merge the per-worker results into
2600 : * the complete index. The results from each worker are sorted by block number
2601 : * (start of the page range). While combinig the per-worker results we merge
2602 : * summaries for the same page range, and also fill-in empty summaries for
2603 : * ranges without any tuples.
2604 : *
2605 : * Returns the total number of heap tuples scanned.
2606 : */
2607 : static double
2608 2 : _brin_parallel_merge(BrinBuildState *state)
2609 : {
2610 : BrinTuple *btup;
2611 2 : BrinMemTuple *memtuple = NULL;
2612 : Size tuplen;
2613 2 : BlockNumber prevblkno = InvalidBlockNumber;
2614 : MemoryContext rangeCxt,
2615 : oldCxt;
2616 : double reltuples;
2617 :
2618 : /* wait for workers to scan table and produce partial results */
2619 2 : reltuples = _brin_parallel_heapscan(state);
2620 :
2621 : /* do the actual sort in the leader */
2622 2 : tuplesort_performsort(state->bs_sortstate);
2623 :
2624 : /*
2625 : * Initialize BrinMemTuple we'll use to union summaries from workers (in
2626 : * case they happened to produce parts of the same page range).
2627 : */
2628 2 : memtuple = brin_new_memtuple(state->bs_bdesc);
2629 :
2630 : /*
2631 : * Create a memory context we'll reset to combine results for a single
2632 : * page range (received from the workers). We don't expect huge number of
2633 : * overlaps under regular circumstances, because for large tables the
2634 : * chunk size is likely larger than the BRIN page range), but it can
2635 : * happen, and the union functions may do all kinds of stuff. So we better
2636 : * reset the context once in a while.
2637 : */
2638 2 : rangeCxt = AllocSetContextCreate(CurrentMemoryContext,
2639 : "brin union",
2640 : ALLOCSET_DEFAULT_SIZES);
2641 2 : oldCxt = MemoryContextSwitchTo(rangeCxt);
2642 :
2643 : /*
2644 : * Read the BRIN tuples from the shared tuplesort, sorted by block number.
2645 : * That probably gives us an index that is cheaper to scan, thanks to
2646 : * mostly getting data from the same index page as before.
2647 : */
2648 32 : while ((btup = tuplesort_getbrintuple(state->bs_sortstate, &tuplen, true)) != NULL)
2649 : {
2650 : /* Ranges should be multiples of pages_per_range for the index. */
2651 : Assert(btup->bt_blkno % state->bs_leader->brinshared->pagesPerRange == 0);
2652 :
2653 : /*
2654 : * Do we need to union summaries for the same page range?
2655 : *
2656 : * If this is the first brin tuple we read, then just deform it into
2657 : * the memtuple, and continue with the next one from tuplesort. We
2658 : * however may need to insert empty summaries into the index.
2659 : *
2660 : * If it's the same block as the last we saw, we simply union the brin
2661 : * tuple into it, and we're done - we don't even need to insert empty
2662 : * ranges, because that was done earlier when we saw the first brin
2663 : * tuple (for this range).
2664 : *
2665 : * Finally, if it's not the first brin tuple, and it's not the same
2666 : * page range, we need to do the insert and then deform the tuple into
2667 : * the memtuple. Then we'll insert empty ranges before the new brin
2668 : * tuple, if needed.
2669 : */
2670 30 : if (prevblkno == InvalidBlockNumber)
2671 : {
2672 : /* First brin tuples, just deform into memtuple. */
2673 2 : memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
2674 :
2675 : /* continue to insert empty pages before thisblock */
2676 : }
2677 28 : else if (memtuple->bt_blkno == btup->bt_blkno)
2678 : {
2679 : /*
2680 : * Not the first brin tuple, but same page range as the previous
2681 : * one, so we can merge it into the memtuple.
2682 : */
2683 0 : union_tuples(state->bs_bdesc, memtuple, btup);
2684 0 : continue;
2685 : }
2686 : else
2687 : {
2688 : BrinTuple *tmp;
2689 : Size len;
2690 :
2691 : /*
2692 : * We got brin tuple for a different page range, so form a brin
2693 : * tuple from the memtuple, insert it, and re-init the memtuple
2694 : * from the new brin tuple.
2695 : */
2696 28 : tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
2697 : memtuple, &len);
2698 :
2699 28 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
2700 : &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
2701 :
2702 : /*
2703 : * Reset the per-output-range context. This frees all the memory
2704 : * possibly allocated by the union functions, and also the BRIN
2705 : * tuple we just formed and inserted.
2706 : */
2707 28 : MemoryContextReset(rangeCxt);
2708 :
2709 28 : memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
2710 :
2711 : /* continue to insert empty pages before thisblock */
2712 : }
2713 :
2714 : /* Fill empty ranges for all ranges missing in the tuplesort. */
2715 30 : brin_fill_empty_ranges(state, prevblkno, btup->bt_blkno);
2716 :
2717 30 : prevblkno = btup->bt_blkno;
2718 : }
2719 :
2720 2 : tuplesort_end(state->bs_sortstate);
2721 :
2722 : /* Fill the BRIN tuple for the last page range with data. */
2723 2 : if (prevblkno != InvalidBlockNumber)
2724 : {
2725 : BrinTuple *tmp;
2726 : Size len;
2727 :
2728 2 : tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
2729 : memtuple, &len);
2730 :
2731 2 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
2732 : &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
2733 :
2734 2 : pfree(tmp);
2735 : }
2736 :
2737 : /* Fill empty ranges at the end, for all ranges missing in the tuplesort. */
2738 2 : brin_fill_empty_ranges(state, prevblkno, state->bs_maxRangeStart);
2739 :
2740 : /*
2741 : * Switch back to the original memory context, and destroy the one we
2742 : * created to isolate the union_tuple calls.
2743 : */
2744 2 : MemoryContextSwitchTo(oldCxt);
2745 2 : MemoryContextDelete(rangeCxt);
2746 :
2747 2 : return reltuples;
2748 : }
2749 :
2750 : /*
2751 : * Returns size of shared memory required to store state for a parallel
2752 : * brin index build based on the snapshot its parallel scan will use.
2753 : */
2754 : static Size
2755 4 : _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
2756 : {
2757 : /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
2758 4 : return add_size(BUFFERALIGN(sizeof(BrinShared)),
2759 : table_parallelscan_estimate(heap, snapshot));
2760 : }
2761 :
2762 : /*
2763 : * Within leader, participate as a parallel worker.
2764 : */
2765 : static void
2766 2 : _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index)
2767 : {
2768 2 : BrinLeader *brinleader = buildstate->bs_leader;
2769 : int sortmem;
2770 :
2771 : /*
2772 : * Might as well use reliable figure when doling out maintenance_work_mem
2773 : * (when requested number of workers were not launched, this will be
2774 : * somewhat higher than it is for other workers).
2775 : */
2776 2 : sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts;
2777 :
2778 : /* Perform work common to all participants */
2779 2 : _brin_parallel_scan_and_build(buildstate, brinleader->brinshared,
2780 : brinleader->sharedsort, heap, index, sortmem, true);
2781 2 : }
2782 :
2783 : /*
2784 : * Perform a worker's portion of a parallel sort.
2785 : *
2786 : * This generates a tuplesort for the worker portion of the table.
2787 : *
2788 : * sortmem is the amount of working memory to use within each worker,
2789 : * expressed in KBs.
2790 : *
2791 : * When this returns, workers are done, and need only release resources.
2792 : */
2793 : static void
2794 8 : _brin_parallel_scan_and_build(BrinBuildState *state,
2795 : BrinShared *brinshared, Sharedsort *sharedsort,
2796 : Relation heap, Relation index,
2797 : int sortmem, bool progress)
2798 : {
2799 : SortCoordinate coordinate;
2800 : TableScanDesc scan;
2801 : double reltuples;
2802 : IndexInfo *indexInfo;
2803 :
2804 : /* Initialize local tuplesort coordination state */
2805 8 : coordinate = palloc0(sizeof(SortCoordinateData));
2806 8 : coordinate->isWorker = true;
2807 8 : coordinate->nParticipants = -1;
2808 8 : coordinate->sharedsort = sharedsort;
2809 :
2810 : /* Begin "partial" tuplesort */
2811 8 : state->bs_sortstate = tuplesort_begin_index_brin(sortmem, coordinate,
2812 : TUPLESORT_NONE);
2813 :
2814 : /* Join parallel scan */
2815 8 : indexInfo = BuildIndexInfo(index);
2816 8 : indexInfo->ii_Concurrent = brinshared->isconcurrent;
2817 :
2818 8 : scan = table_beginscan_parallel(heap,
2819 : ParallelTableScanFromBrinShared(brinshared));
2820 :
2821 8 : reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
2822 : brinbuildCallbackParallel, state, scan);
2823 :
2824 : /* insert the last item */
2825 8 : form_and_spill_tuple(state);
2826 :
2827 : /* sort the BRIN ranges built by this worker */
2828 8 : tuplesort_performsort(state->bs_sortstate);
2829 :
2830 8 : state->bs_reltuples += reltuples;
2831 :
2832 : /*
2833 : * Done. Record ambuild statistics.
2834 : */
2835 8 : SpinLockAcquire(&brinshared->mutex);
2836 8 : brinshared->nparticipantsdone++;
2837 8 : brinshared->reltuples += state->bs_reltuples;
2838 8 : brinshared->indtuples += state->bs_numtuples;
2839 8 : SpinLockRelease(&brinshared->mutex);
2840 :
2841 : /* Notify leader */
2842 8 : ConditionVariableSignal(&brinshared->workersdonecv);
2843 :
2844 8 : tuplesort_end(state->bs_sortstate);
2845 8 : }
2846 :
2847 : /*
2848 : * Perform work within a launched parallel process.
2849 : */
2850 : void
2851 6 : _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
2852 : {
2853 : char *sharedquery;
2854 : BrinShared *brinshared;
2855 : Sharedsort *sharedsort;
2856 : BrinBuildState *buildstate;
2857 : Relation heapRel;
2858 : Relation indexRel;
2859 : LOCKMODE heapLockmode;
2860 : LOCKMODE indexLockmode;
2861 : WalUsage *walusage;
2862 : BufferUsage *bufferusage;
2863 : int sortmem;
2864 :
2865 : /*
2866 : * The only possible status flag that can be set to the parallel worker is
2867 : * PROC_IN_SAFE_IC.
2868 : */
2869 : Assert((MyProc->statusFlags == 0) ||
2870 : (MyProc->statusFlags == PROC_IN_SAFE_IC));
2871 :
2872 : /* Set debug_query_string for individual workers first */
2873 6 : sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
2874 6 : debug_query_string = sharedquery;
2875 :
2876 : /* Report the query string from leader */
2877 6 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
2878 :
2879 : /* Look up brin shared state */
2880 6 : brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false);
2881 :
2882 : /* Open relations using lock modes known to be obtained by index.c */
2883 6 : if (!brinshared->isconcurrent)
2884 : {
2885 6 : heapLockmode = ShareLock;
2886 6 : indexLockmode = AccessExclusiveLock;
2887 : }
2888 : else
2889 : {
2890 0 : heapLockmode = ShareUpdateExclusiveLock;
2891 0 : indexLockmode = RowExclusiveLock;
2892 : }
2893 :
2894 : /* Open relations within worker */
2895 6 : heapRel = table_open(brinshared->heaprelid, heapLockmode);
2896 6 : indexRel = index_open(brinshared->indexrelid, indexLockmode);
2897 :
2898 6 : buildstate = initialize_brin_buildstate(indexRel, NULL,
2899 : brinshared->pagesPerRange,
2900 : InvalidBlockNumber);
2901 :
2902 : /* Look up shared state private to tuplesort.c */
2903 6 : sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
2904 6 : tuplesort_attach_shared(sharedsort, seg);
2905 :
2906 : /* Prepare to track buffer usage during parallel execution */
2907 6 : InstrStartParallelQuery();
2908 :
2909 : /*
2910 : * Might as well use reliable figure when doling out maintenance_work_mem
2911 : * (when requested number of workers were not launched, this will be
2912 : * somewhat higher than it is for other workers).
2913 : */
2914 6 : sortmem = maintenance_work_mem / brinshared->scantuplesortstates;
2915 :
2916 6 : _brin_parallel_scan_and_build(buildstate, brinshared, sharedsort,
2917 : heapRel, indexRel, sortmem, false);
2918 :
2919 : /* Report WAL/buffer usage during parallel execution */
2920 6 : bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
2921 6 : walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
2922 6 : InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
2923 6 : &walusage[ParallelWorkerNumber]);
2924 :
2925 6 : index_close(indexRel, indexLockmode);
2926 6 : table_close(heapRel, heapLockmode);
2927 6 : }
2928 :
2929 : /*
2930 : * brin_build_empty_tuple
2931 : * Maybe initialize a BRIN tuple representing empty range.
2932 : *
2933 : * Returns a BRIN tuple representing an empty page range starting at the
2934 : * specified block number. The empty tuple is initialized only once, when it's
2935 : * needed for the first time, stored in the memory context bs_context to ensure
2936 : * proper life span, and reused on following calls. All empty tuples are
2937 : * exactly the same except for the bt_blkno field, which is set to the value
2938 : * in blkno parameter.
2939 : */
2940 : static void
2941 10 : brin_build_empty_tuple(BrinBuildState *state, BlockNumber blkno)
2942 : {
2943 : /* First time an empty tuple is requested? If yes, initialize it. */
2944 10 : if (state->bs_emptyTuple == NULL)
2945 : {
2946 : MemoryContext oldcxt;
2947 4 : BrinMemTuple *dtuple = brin_new_memtuple(state->bs_bdesc);
2948 :
2949 : /* Allocate the tuple in context for the whole index build. */
2950 4 : oldcxt = MemoryContextSwitchTo(state->bs_context);
2951 :
2952 4 : state->bs_emptyTuple = brin_form_tuple(state->bs_bdesc, blkno, dtuple,
2953 : &state->bs_emptyTupleLen);
2954 :
2955 4 : MemoryContextSwitchTo(oldcxt);
2956 : }
2957 : else
2958 : {
2959 : /* If we already have an empty tuple, just update the block. */
2960 6 : state->bs_emptyTuple->bt_blkno = blkno;
2961 : }
2962 10 : }
2963 :
2964 : /*
2965 : * brin_fill_empty_ranges
2966 : * Add BRIN index tuples representing empty page ranges.
2967 : *
2968 : * prevRange/nextRange determine for which page ranges to add empty summaries.
2969 : * Both boundaries are exclusive, i.e. only ranges starting at blkno for which
2970 : * (prevRange < blkno < nextRange) will be added to the index.
2971 : *
2972 : * If prevRange is InvalidBlockNumber, this means there was no previous page
2973 : * range (i.e. the first empty range to add is for blkno=0).
2974 : *
2975 : * The empty tuple is built only once, and then reused for all future calls.
2976 : */
2977 : static void
2978 368 : brin_fill_empty_ranges(BrinBuildState *state,
2979 : BlockNumber prevRange, BlockNumber nextRange)
2980 : {
2981 : BlockNumber blkno;
2982 :
2983 : /*
2984 : * If we already summarized some ranges, we need to start with the next
2985 : * one. Otherwise start from the first range of the table.
2986 : */
2987 368 : blkno = (prevRange == InvalidBlockNumber) ? 0 : (prevRange + state->bs_pagesPerRange);
2988 :
2989 : /* Generate empty ranges until we hit the next non-empty range. */
2990 378 : while (blkno < nextRange)
2991 : {
2992 : /* Did we already build the empty tuple? If not, do it now. */
2993 10 : brin_build_empty_tuple(state, blkno);
2994 :
2995 10 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
2996 : &state->bs_currentInsertBuf,
2997 : blkno, state->bs_emptyTuple, state->bs_emptyTupleLen);
2998 :
2999 : /* try next page range */
3000 10 : blkno += state->bs_pagesPerRange;
3001 : }
3002 368 : }
|