Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * gininsert.c
4 : * insert routines for the postgres inverted index access method.
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/gin/gininsert.c
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/gin_private.h"
18 : #include "access/gin_tuple.h"
19 : #include "access/parallel.h"
20 : #include "access/table.h"
21 : #include "access/tableam.h"
22 : #include "access/xloginsert.h"
23 : #include "catalog/index.h"
24 : #include "catalog/pg_collation.h"
25 : #include "commands/progress.h"
26 : #include "miscadmin.h"
27 : #include "nodes/execnodes.h"
28 : #include "pgstat.h"
29 : #include "storage/bufmgr.h"
30 : #include "storage/predicate.h"
31 : #include "tcop/tcopprot.h"
32 : #include "utils/datum.h"
33 : #include "utils/memutils.h"
34 : #include "utils/rel.h"
35 : #include "utils/builtins.h"
36 :
37 :
38 : /* Magic numbers for parallel state sharing */
39 : #define PARALLEL_KEY_GIN_SHARED UINT64CONST(0xB000000000000001)
40 : #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002)
41 : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003)
42 : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004)
43 : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005)
44 :
45 : /*
46 : * Status for index builds performed in parallel. This is allocated in a
47 : * dynamic shared memory segment.
48 : */
49 : typedef struct GinBuildShared
50 : {
51 : /*
52 : * These fields are not modified during the build. They primarily exist
53 : * for the benefit of worker processes that need to create state
54 : * corresponding to that used by the leader.
55 : */
56 : Oid heaprelid;
57 : Oid indexrelid;
58 : bool isconcurrent;
59 : int scantuplesortstates;
60 :
61 : /*
62 : * workersdonecv is used to monitor the progress of workers. All parallel
63 : * participants must indicate that they are done before leader can use
64 : * results built by the workers (and before leader can write the data into
65 : * the index).
66 : */
67 : ConditionVariable workersdonecv;
68 :
69 : /*
70 : * mutex protects all following fields
71 : *
72 : * These fields contain status information of interest to GIN index builds
73 : * that must work just the same when an index is built in parallel.
74 : */
75 : slock_t mutex;
76 :
77 : /*
78 : * Mutable state that is maintained by workers, and reported back to
79 : * leader at end of the scans.
80 : *
81 : * nparticipantsdone is number of worker processes finished.
82 : *
83 : * reltuples is the total number of input heap tuples.
84 : *
85 : * indtuples is the total number of tuples that made it into the index.
86 : */
87 : int nparticipantsdone;
88 : double reltuples;
89 : double indtuples;
90 :
91 : /*
92 : * ParallelTableScanDescData data follows. Can't directly embed here, as
93 : * implementations of the parallel table scan desc interface might need
94 : * stronger alignment.
95 : */
96 : } GinBuildShared;
97 :
98 : /*
99 : * Return pointer to a GinBuildShared's parallel table scan.
100 : *
101 : * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just
102 : * MAXALIGN.
103 : */
104 : #define ParallelTableScanFromGinBuildShared(shared) \
105 : (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(GinBuildShared)))
106 :
107 : /*
108 : * Status for leader in parallel index build.
109 : */
110 : typedef struct GinLeader
111 : {
112 : /* parallel context itself */
113 : ParallelContext *pcxt;
114 :
115 : /*
116 : * nparticipanttuplesorts is the exact number of worker processes
117 : * successfully launched, plus one leader process if it participates as a
118 : * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader
119 : * participating as a worker).
120 : */
121 : int nparticipanttuplesorts;
122 :
123 : /*
124 : * Leader process convenience pointers to shared state (leader avoids TOC
125 : * lookups).
126 : *
127 : * GinBuildShared is the shared state for entire build. sharedsort is the
128 : * shared, tuplesort-managed state passed to each process tuplesort.
129 : * snapshot is the snapshot used by the scan iff an MVCC snapshot is
130 : * required.
131 : */
132 : GinBuildShared *ginshared;
133 : Sharedsort *sharedsort;
134 : Snapshot snapshot;
135 : WalUsage *walusage;
136 : BufferUsage *bufferusage;
137 : } GinLeader;
138 :
139 : typedef struct
140 : {
141 : GinState ginstate;
142 : double indtuples;
143 : GinStatsData buildStats;
144 : MemoryContext tmpCtx;
145 : MemoryContext funcCtx;
146 : BuildAccumulator accum;
147 : ItemPointerData tid;
148 : int work_mem;
149 :
150 : /*
151 : * bs_leader is only present when a parallel index build is performed, and
152 : * only in the leader process.
153 : */
154 : GinLeader *bs_leader;
155 : int bs_worker_id;
156 :
157 : /* used to pass information from workers to leader */
158 : double bs_numtuples;
159 : double bs_reltuples;
160 :
161 : /*
162 : * The sortstate is used by workers (including the leader). It has to be
163 : * part of the build state, because that's the only thing passed to the
164 : * build callback etc.
165 : */
166 : Tuplesortstate *bs_sortstate;
167 :
168 : /*
169 : * The sortstate used only within a single worker for the first merge pass
170 : * happenning there. In principle it doesn't need to be part of the build
171 : * state and we could pass it around directly, but it's more convenient
172 : * this way. And it's part of the build state, after all.
173 : */
174 : Tuplesortstate *bs_worker_sort;
175 : } GinBuildState;
176 :
177 :
178 : /* parallel index builds */
179 : static void _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
180 : bool isconcurrent, int request);
181 : static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state);
182 : static Size _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
183 : static double _gin_parallel_heapscan(GinBuildState *buildstate);
184 : static double _gin_parallel_merge(GinBuildState *buildstate);
185 : static void _gin_leader_participate_as_worker(GinBuildState *buildstate,
186 : Relation heap, Relation index);
187 : static void _gin_parallel_scan_and_build(GinBuildState *buildstate,
188 : GinBuildShared *ginshared,
189 : Sharedsort *sharedsort,
190 : Relation heap, Relation index,
191 : int sortmem, bool progress);
192 :
193 : static ItemPointer _gin_parse_tuple_items(GinTuple *a);
194 : static Datum _gin_parse_tuple_key(GinTuple *a);
195 :
196 : static GinTuple *_gin_build_tuple(OffsetNumber attrnum, unsigned char category,
197 : Datum key, int16 typlen, bool typbyval,
198 : ItemPointerData *items, uint32 nitems,
199 : Size *len);
200 :
201 : /*
202 : * Adds array of item pointers to tuple's posting list, or
203 : * creates posting tree and tuple pointing to tree in case
204 : * of not enough space. Max size of tuple is defined in
205 : * GinFormTuple(). Returns a new, modified index tuple.
206 : * items[] must be in sorted order with no duplicates.
207 : */
208 : static IndexTuple
209 228878 : addItemPointersToLeafTuple(GinState *ginstate,
210 : IndexTuple old,
211 : ItemPointerData *items, uint32 nitem,
212 : GinStatsData *buildStats, Buffer buffer)
213 : {
214 : OffsetNumber attnum;
215 : Datum key;
216 : GinNullCategory category;
217 : IndexTuple res;
218 : ItemPointerData *newItems,
219 : *oldItems;
220 : int oldNPosting,
221 : newNPosting;
222 : GinPostingList *compressedList;
223 :
224 : Assert(!GinIsPostingTree(old));
225 :
226 228878 : attnum = gintuple_get_attrnum(ginstate, old);
227 228878 : key = gintuple_get_key(ginstate, old, &category);
228 :
229 : /* merge the old and new posting lists */
230 228878 : oldItems = ginReadTuple(ginstate, attnum, old, &oldNPosting);
231 :
232 228878 : newItems = ginMergeItemPointers(items, nitem,
233 : oldItems, oldNPosting,
234 : &newNPosting);
235 :
236 : /* Compress the posting list, and try to a build tuple with room for it */
237 228878 : res = NULL;
238 228878 : compressedList = ginCompressPostingList(newItems, newNPosting, GinMaxItemSize,
239 : NULL);
240 228878 : pfree(newItems);
241 228878 : if (compressedList)
242 : {
243 228878 : res = GinFormTuple(ginstate, attnum, key, category,
244 : (char *) compressedList,
245 228878 : SizeOfGinPostingList(compressedList),
246 : newNPosting,
247 : false);
248 228878 : pfree(compressedList);
249 : }
250 228878 : if (!res)
251 : {
252 : /* posting list would be too big, convert to posting tree */
253 : BlockNumber postingRoot;
254 :
255 : /*
256 : * Initialize posting tree with the old tuple's posting list. It's
257 : * surely small enough to fit on one posting-tree page, and should
258 : * already be in order with no duplicates.
259 : */
260 16 : postingRoot = createPostingTree(ginstate->index,
261 : oldItems,
262 : oldNPosting,
263 : buildStats,
264 : buffer);
265 :
266 : /* Now insert the TIDs-to-be-added into the posting tree */
267 16 : ginInsertItemPointers(ginstate->index, postingRoot,
268 : items, nitem,
269 : buildStats);
270 :
271 : /* And build a new posting-tree-only result tuple */
272 16 : res = GinFormTuple(ginstate, attnum, key, category, NULL, 0, 0, true);
273 16 : GinSetPostingTree(res, postingRoot);
274 : }
275 228878 : pfree(oldItems);
276 :
277 228878 : return res;
278 : }
279 :
280 : /*
281 : * Build a fresh leaf tuple, either posting-list or posting-tree format
282 : * depending on whether the given items list will fit.
283 : * items[] must be in sorted order with no duplicates.
284 : *
285 : * This is basically the same logic as in addItemPointersToLeafTuple,
286 : * but working from slightly different input.
287 : */
288 : static IndexTuple
289 692604 : buildFreshLeafTuple(GinState *ginstate,
290 : OffsetNumber attnum, Datum key, GinNullCategory category,
291 : ItemPointerData *items, uint32 nitem,
292 : GinStatsData *buildStats, Buffer buffer)
293 : {
294 692604 : IndexTuple res = NULL;
295 : GinPostingList *compressedList;
296 :
297 : /* try to build a posting list tuple with all the items */
298 692604 : compressedList = ginCompressPostingList(items, nitem, GinMaxItemSize, NULL);
299 692604 : if (compressedList)
300 : {
301 692604 : res = GinFormTuple(ginstate, attnum, key, category,
302 : (char *) compressedList,
303 692604 : SizeOfGinPostingList(compressedList),
304 : nitem, false);
305 692604 : pfree(compressedList);
306 : }
307 692604 : if (!res)
308 : {
309 : /* posting list would be too big, build posting tree */
310 : BlockNumber postingRoot;
311 :
312 : /*
313 : * Build posting-tree-only result tuple. We do this first so as to
314 : * fail quickly if the key is too big.
315 : */
316 100 : res = GinFormTuple(ginstate, attnum, key, category, NULL, 0, 0, true);
317 :
318 : /*
319 : * Initialize a new posting tree with the TIDs.
320 : */
321 100 : postingRoot = createPostingTree(ginstate->index, items, nitem,
322 : buildStats, buffer);
323 :
324 : /* And save the root link in the result tuple */
325 100 : GinSetPostingTree(res, postingRoot);
326 : }
327 :
328 692604 : return res;
329 : }
330 :
331 : /*
332 : * Insert one or more heap TIDs associated with the given key value.
333 : * This will either add a single key entry, or enlarge a pre-existing entry.
334 : *
335 : * During an index build, buildStats is non-null and the counters
336 : * it contains should be incremented as needed.
337 : */
338 : void
339 970886 : ginEntryInsert(GinState *ginstate,
340 : OffsetNumber attnum, Datum key, GinNullCategory category,
341 : ItemPointerData *items, uint32 nitem,
342 : GinStatsData *buildStats)
343 : {
344 : GinBtreeData btree;
345 : GinBtreeEntryInsertData insertdata;
346 : GinBtreeStack *stack;
347 : IndexTuple itup;
348 : Page page;
349 :
350 970886 : insertdata.isDelete = false;
351 :
352 970886 : ginPrepareEntryScan(&btree, attnum, key, category, ginstate);
353 970886 : btree.isBuild = (buildStats != NULL);
354 :
355 970886 : stack = ginFindLeafPage(&btree, false, false);
356 970886 : page = BufferGetPage(stack->buffer);
357 :
358 970886 : if (btree.findItem(&btree, stack))
359 : {
360 : /* found pre-existing entry */
361 278278 : itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, stack->off));
362 :
363 278278 : if (GinIsPostingTree(itup))
364 : {
365 : /* add entries to existing posting tree */
366 49392 : BlockNumber rootPostingTree = GinGetPostingTree(itup);
367 :
368 : /* release all stack */
369 49392 : LockBuffer(stack->buffer, GIN_UNLOCK);
370 49392 : freeGinBtreeStack(stack);
371 :
372 : /* insert into posting tree */
373 49392 : ginInsertItemPointers(ginstate->index, rootPostingTree,
374 : items, nitem,
375 : buildStats);
376 49388 : return;
377 : }
378 :
379 228886 : CheckForSerializableConflictIn(ginstate->index, NULL,
380 : BufferGetBlockNumber(stack->buffer));
381 : /* modify an existing leaf entry */
382 228878 : itup = addItemPointersToLeafTuple(ginstate, itup,
383 : items, nitem, buildStats, stack->buffer);
384 :
385 228878 : insertdata.isDelete = true;
386 : }
387 : else
388 : {
389 692608 : CheckForSerializableConflictIn(ginstate->index, NULL,
390 : BufferGetBlockNumber(stack->buffer));
391 : /* no match, so construct a new leaf entry */
392 692604 : itup = buildFreshLeafTuple(ginstate, attnum, key, category,
393 : items, nitem, buildStats, stack->buffer);
394 :
395 : /*
396 : * nEntries counts leaf tuples, so increment it only when we make a
397 : * new one.
398 : */
399 692604 : if (buildStats)
400 151758 : buildStats->nEntries++;
401 : }
402 :
403 : /* Insert the new or modified leaf tuple */
404 921482 : insertdata.entry = itup;
405 921482 : ginInsertValue(&btree, stack, &insertdata, buildStats);
406 921478 : pfree(itup);
407 : }
408 :
409 : /*
410 : * Extract index entries for a single indexable item, and add them to the
411 : * BuildAccumulator's state.
412 : *
413 : * This function is used only during initial index creation.
414 : */
415 : static void
416 928274 : ginHeapTupleBulkInsert(GinBuildState *buildstate, OffsetNumber attnum,
417 : Datum value, bool isNull,
418 : ItemPointer heapptr)
419 : {
420 : Datum *entries;
421 : GinNullCategory *categories;
422 : int32 nentries;
423 : MemoryContext oldCtx;
424 :
425 928274 : oldCtx = MemoryContextSwitchTo(buildstate->funcCtx);
426 928274 : entries = ginExtractEntries(buildstate->accum.ginstate, attnum,
427 : value, isNull,
428 : &nentries, &categories);
429 928274 : MemoryContextSwitchTo(oldCtx);
430 :
431 928274 : ginInsertBAEntries(&buildstate->accum, heapptr, attnum,
432 : entries, categories, nentries);
433 :
434 928274 : buildstate->indtuples += nentries;
435 :
436 928274 : MemoryContextReset(buildstate->funcCtx);
437 928274 : }
438 :
439 : static void
440 927656 : ginBuildCallback(Relation index, ItemPointer tid, Datum *values,
441 : bool *isnull, bool tupleIsAlive, void *state)
442 : {
443 927656 : GinBuildState *buildstate = (GinBuildState *) state;
444 : MemoryContext oldCtx;
445 : int i;
446 :
447 927656 : oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
448 :
449 1855930 : for (i = 0; i < buildstate->ginstate.origTupdesc->natts; i++)
450 928274 : ginHeapTupleBulkInsert(buildstate, (OffsetNumber) (i + 1),
451 928274 : values[i], isnull[i], tid);
452 :
453 : /* If we've maxed out our available memory, dump everything to the index */
454 927656 : if (buildstate->accum.allocatedMemory >= maintenance_work_mem * (Size) 1024)
455 : {
456 : ItemPointerData *list;
457 : Datum key;
458 : GinNullCategory category;
459 : uint32 nlist;
460 : OffsetNumber attnum;
461 :
462 0 : ginBeginBAScan(&buildstate->accum);
463 0 : while ((list = ginGetBAEntry(&buildstate->accum,
464 : &attnum, &key, &category, &nlist)) != NULL)
465 : {
466 : /* there could be many entries, so be willing to abort here */
467 0 : CHECK_FOR_INTERRUPTS();
468 0 : ginEntryInsert(&buildstate->ginstate, attnum, key, category,
469 : list, nlist, &buildstate->buildStats);
470 : }
471 :
472 0 : MemoryContextReset(buildstate->tmpCtx);
473 0 : ginInitBA(&buildstate->accum);
474 : }
475 :
476 927656 : MemoryContextSwitchTo(oldCtx);
477 927656 : }
478 :
479 : /*
480 : * ginFlushBuildState
481 : * Write all data from BuildAccumulator into the tuplesort.
482 : */
483 : static void
484 0 : ginFlushBuildState(GinBuildState *buildstate, Relation index)
485 : {
486 : ItemPointerData *list;
487 : Datum key;
488 : GinNullCategory category;
489 : uint32 nlist;
490 : OffsetNumber attnum;
491 0 : TupleDesc tdesc = RelationGetDescr(index);
492 :
493 0 : ginBeginBAScan(&buildstate->accum);
494 0 : while ((list = ginGetBAEntry(&buildstate->accum,
495 : &attnum, &key, &category, &nlist)) != NULL)
496 : {
497 : /* information about the key */
498 0 : Form_pg_attribute attr = TupleDescAttr(tdesc, (attnum - 1));
499 :
500 : /* GIN tuple and tuple length */
501 : GinTuple *tup;
502 : Size tuplen;
503 :
504 : /* there could be many entries, so be willing to abort here */
505 0 : CHECK_FOR_INTERRUPTS();
506 :
507 0 : tup = _gin_build_tuple(attnum, category,
508 0 : key, attr->attlen, attr->attbyval,
509 : list, nlist, &tuplen);
510 :
511 0 : tuplesort_putgintuple(buildstate->bs_worker_sort, tup, tuplen);
512 :
513 0 : pfree(tup);
514 : }
515 :
516 0 : MemoryContextReset(buildstate->tmpCtx);
517 0 : ginInitBA(&buildstate->accum);
518 0 : }
519 :
520 : /*
521 : * ginBuildCallbackParallel
522 : * Callback for the parallel index build.
523 : *
524 : * This is similar to the serial build callback ginBuildCallback, but
525 : * instead of writing the accumulated entries into the index, each worker
526 : * writes them into a (local) tuplesort.
527 : *
528 : * The worker then sorts and combines these entries, before writing them
529 : * into a shared tuplesort for the leader (see _gin_parallel_scan_and_build
530 : * for the whole process).
531 : */
532 : static void
533 0 : ginBuildCallbackParallel(Relation index, ItemPointer tid, Datum *values,
534 : bool *isnull, bool tupleIsAlive, void *state)
535 : {
536 0 : GinBuildState *buildstate = (GinBuildState *) state;
537 : MemoryContext oldCtx;
538 : int i;
539 :
540 0 : oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
541 :
542 : /*
543 : * if scan wrapped around - flush accumulated entries and start anew
544 : *
545 : * With parallel scans, we don't have a guarantee the scan does not start
546 : * half-way through the relation (serial builds disable sync scans and
547 : * always start from block 0, parallel scans require allow_sync=true).
548 : *
549 : * Building the posting lists assumes the TIDs are monotonic and never go
550 : * back, and the wrap around would break that. We handle that by detecting
551 : * the wraparound, and flushing all entries. This means we'll later see
552 : * two separate entries with non-overlapping TID lists (which can be
553 : * combined by merge sort).
554 : *
555 : * To detect a wraparound, we remember the last TID seen by each worker
556 : * (for any key). If the next TID seen by the worker is lower, the scan
557 : * must have wrapped around.
558 : */
559 0 : if (ItemPointerCompare(tid, &buildstate->tid) < 0)
560 0 : ginFlushBuildState(buildstate, index);
561 :
562 : /* remember the TID we're about to process */
563 0 : buildstate->tid = *tid;
564 :
565 0 : for (i = 0; i < buildstate->ginstate.origTupdesc->natts; i++)
566 0 : ginHeapTupleBulkInsert(buildstate, (OffsetNumber) (i + 1),
567 0 : values[i], isnull[i], tid);
568 :
569 : /*
570 : * If we've maxed out our available memory, dump everything to the
571 : * tuplesort. We use half the per-worker fraction of maintenance_work_mem,
572 : * the other half is used for the tuplesort.
573 : */
574 0 : if (buildstate->accum.allocatedMemory >= buildstate->work_mem * (Size) 1024)
575 0 : ginFlushBuildState(buildstate, index);
576 :
577 0 : MemoryContextSwitchTo(oldCtx);
578 0 : }
579 :
580 : IndexBuildResult *
581 404 : ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
582 : {
583 : IndexBuildResult *result;
584 : double reltuples;
585 : GinBuildState buildstate;
586 404 : GinBuildState *state = &buildstate;
587 : Buffer RootBuffer,
588 : MetaBuffer;
589 : ItemPointerData *list;
590 : Datum key;
591 : GinNullCategory category;
592 : uint32 nlist;
593 : MemoryContext oldCtx;
594 : OffsetNumber attnum;
595 :
596 404 : if (RelationGetNumberOfBlocks(index) != 0)
597 0 : elog(ERROR, "index \"%s\" already contains data",
598 : RelationGetRelationName(index));
599 :
600 404 : initGinState(&buildstate.ginstate, index);
601 404 : buildstate.indtuples = 0;
602 404 : memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
603 :
604 : /* Initialize fields for parallel build too. */
605 404 : buildstate.bs_numtuples = 0;
606 404 : buildstate.bs_reltuples = 0;
607 404 : buildstate.bs_leader = NULL;
608 404 : memset(&buildstate.tid, 0, sizeof(ItemPointerData));
609 :
610 : /* initialize the meta page */
611 404 : MetaBuffer = GinNewBuffer(index);
612 :
613 : /* initialize the root page */
614 404 : RootBuffer = GinNewBuffer(index);
615 :
616 404 : START_CRIT_SECTION();
617 404 : GinInitMetabuffer(MetaBuffer);
618 404 : MarkBufferDirty(MetaBuffer);
619 404 : GinInitBuffer(RootBuffer, GIN_LEAF);
620 404 : MarkBufferDirty(RootBuffer);
621 :
622 :
623 404 : UnlockReleaseBuffer(MetaBuffer);
624 404 : UnlockReleaseBuffer(RootBuffer);
625 404 : END_CRIT_SECTION();
626 :
627 : /* count the root as first entry page */
628 404 : buildstate.buildStats.nEntryPages++;
629 :
630 : /*
631 : * create a temporary memory context that is used to hold data not yet
632 : * dumped out to the index
633 : */
634 404 : buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
635 : "Gin build temporary context",
636 : ALLOCSET_DEFAULT_SIZES);
637 :
638 : /*
639 : * create a temporary memory context that is used for calling
640 : * ginExtractEntries(), and can be reset after each tuple
641 : */
642 404 : buildstate.funcCtx = AllocSetContextCreate(CurrentMemoryContext,
643 : "Gin build temporary context for user-defined function",
644 : ALLOCSET_DEFAULT_SIZES);
645 :
646 404 : buildstate.accum.ginstate = &buildstate.ginstate;
647 404 : ginInitBA(&buildstate.accum);
648 :
649 : /* Report table scan phase started */
650 404 : pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
651 : PROGRESS_GIN_PHASE_INDEXBUILD_TABLESCAN);
652 :
653 : /*
654 : * Attempt to launch parallel worker scan when required
655 : *
656 : * XXX plan_create_index_workers makes the number of workers dependent on
657 : * maintenance_work_mem, requiring 32MB for each worker. For GIN that's
658 : * reasonable too, because we sort the data just like btree. It does
659 : * ignore the memory used to accumulate data in memory (set by work_mem),
660 : * but there is no way to communicate that to plan_create_index_workers.
661 : */
662 404 : if (indexInfo->ii_ParallelWorkers > 0)
663 0 : _gin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent,
664 : indexInfo->ii_ParallelWorkers);
665 :
666 : /*
667 : * If parallel build requested and at least one worker process was
668 : * successfully launched, set up coordination state, wait for workers to
669 : * complete. Then read all tuples from the shared tuplesort and insert
670 : * them into the index.
671 : *
672 : * In serial mode, simply scan the table and build the index one index
673 : * tuple at a time.
674 : */
675 404 : if (state->bs_leader)
676 : {
677 : SortCoordinate coordinate;
678 :
679 0 : coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData));
680 0 : coordinate->isWorker = false;
681 0 : coordinate->nParticipants =
682 0 : state->bs_leader->nparticipanttuplesorts;
683 0 : coordinate->sharedsort = state->bs_leader->sharedsort;
684 :
685 : /*
686 : * Begin leader tuplesort.
687 : *
688 : * In cases where parallelism is involved, the leader receives the
689 : * same share of maintenance_work_mem as a serial sort (it is
690 : * generally treated in the same way as a serial sort once we return).
691 : * Parallel worker Tuplesortstates will have received only a fraction
692 : * of maintenance_work_mem, though.
693 : *
694 : * We rely on the lifetime of the Leader Tuplesortstate almost not
695 : * overlapping with any worker Tuplesortstate's lifetime. There may
696 : * be some small overlap, but that's okay because we rely on leader
697 : * Tuplesortstate only allocating a small, fixed amount of memory
698 : * here. When its tuplesort_performsort() is called (by our caller),
699 : * and significant amounts of memory are likely to be used, all
700 : * workers must have already freed almost all memory held by their
701 : * Tuplesortstates (they are about to go away completely, too). The
702 : * overall effect is that maintenance_work_mem always represents an
703 : * absolute high watermark on the amount of memory used by a CREATE
704 : * INDEX operation, regardless of the use of parallelism or any other
705 : * factor.
706 : */
707 0 : state->bs_sortstate =
708 0 : tuplesort_begin_index_gin(heap, index,
709 : maintenance_work_mem, coordinate,
710 : TUPLESORT_NONE);
711 :
712 : /* scan the relation in parallel and merge per-worker results */
713 0 : reltuples = _gin_parallel_merge(state);
714 :
715 0 : _gin_end_parallel(state->bs_leader, state);
716 : }
717 : else /* no parallel index build */
718 : {
719 : /*
720 : * Do the heap scan. We disallow sync scan here because
721 : * dataPlaceToPage prefers to receive tuples in TID order.
722 : */
723 404 : reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
724 : ginBuildCallback, &buildstate, NULL);
725 :
726 : /* dump remaining entries to the index */
727 404 : oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
728 404 : ginBeginBAScan(&buildstate.accum);
729 152162 : while ((list = ginGetBAEntry(&buildstate.accum,
730 : &attnum, &key, &category, &nlist)) != NULL)
731 : {
732 : /* there could be many entries, so be willing to abort here */
733 151758 : CHECK_FOR_INTERRUPTS();
734 151758 : ginEntryInsert(&buildstate.ginstate, attnum, key, category,
735 : list, nlist, &buildstate.buildStats);
736 : }
737 404 : MemoryContextSwitchTo(oldCtx);
738 : }
739 :
740 404 : MemoryContextDelete(buildstate.funcCtx);
741 404 : MemoryContextDelete(buildstate.tmpCtx);
742 :
743 : /*
744 : * Update metapage stats
745 : */
746 404 : buildstate.buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
747 404 : ginUpdateStats(index, &buildstate.buildStats, true);
748 :
749 : /*
750 : * We didn't write WAL records as we built the index, so if WAL-logging is
751 : * required, write all pages to the WAL now.
752 : */
753 404 : if (RelationNeedsWAL(index))
754 : {
755 276 : log_newpage_range(index, MAIN_FORKNUM,
756 : 0, RelationGetNumberOfBlocks(index),
757 : true);
758 : }
759 :
760 : /*
761 : * Return statistics
762 : */
763 404 : result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
764 :
765 404 : result->heap_tuples = reltuples;
766 404 : result->index_tuples = buildstate.indtuples;
767 :
768 404 : return result;
769 : }
770 :
771 : /*
772 : * ginbuildempty() -- build an empty gin index in the initialization fork
773 : */
774 : void
775 6 : ginbuildempty(Relation index)
776 : {
777 : Buffer RootBuffer,
778 : MetaBuffer;
779 :
780 : /* An empty GIN index has two pages. */
781 6 : MetaBuffer = ExtendBufferedRel(BMR_REL(index), INIT_FORKNUM, NULL,
782 : EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
783 6 : RootBuffer = ExtendBufferedRel(BMR_REL(index), INIT_FORKNUM, NULL,
784 : EB_LOCK_FIRST | EB_SKIP_EXTENSION_LOCK);
785 :
786 : /* Initialize and xlog metabuffer and root buffer. */
787 6 : START_CRIT_SECTION();
788 6 : GinInitMetabuffer(MetaBuffer);
789 6 : MarkBufferDirty(MetaBuffer);
790 6 : log_newpage_buffer(MetaBuffer, true);
791 6 : GinInitBuffer(RootBuffer, GIN_LEAF);
792 6 : MarkBufferDirty(RootBuffer);
793 6 : log_newpage_buffer(RootBuffer, false);
794 6 : END_CRIT_SECTION();
795 :
796 : /* Unlock and release the buffers. */
797 6 : UnlockReleaseBuffer(MetaBuffer);
798 6 : UnlockReleaseBuffer(RootBuffer);
799 6 : }
800 :
801 : /*
802 : * Insert index entries for a single indexable item during "normal"
803 : * (non-fast-update) insertion
804 : */
805 : static void
806 54096 : ginHeapTupleInsert(GinState *ginstate, OffsetNumber attnum,
807 : Datum value, bool isNull,
808 : ItemPointer item)
809 : {
810 : Datum *entries;
811 : GinNullCategory *categories;
812 : int32 i,
813 : nentries;
814 :
815 54096 : entries = ginExtractEntries(ginstate, attnum, value, isNull,
816 : &nentries, &categories);
817 :
818 506998 : for (i = 0; i < nentries; i++)
819 452922 : ginEntryInsert(ginstate, attnum, entries[i], categories[i],
820 : item, 1, NULL);
821 54076 : }
822 :
823 : bool
824 319790 : gininsert(Relation index, Datum *values, bool *isnull,
825 : ItemPointer ht_ctid, Relation heapRel,
826 : IndexUniqueCheck checkUnique,
827 : bool indexUnchanged,
828 : IndexInfo *indexInfo)
829 : {
830 319790 : GinState *ginstate = (GinState *) indexInfo->ii_AmCache;
831 : MemoryContext oldCtx;
832 : MemoryContext insertCtx;
833 : int i;
834 :
835 : /* Initialize GinState cache if first call in this statement */
836 319790 : if (ginstate == NULL)
837 : {
838 2894 : oldCtx = MemoryContextSwitchTo(indexInfo->ii_Context);
839 2894 : ginstate = (GinState *) palloc(sizeof(GinState));
840 2894 : initGinState(ginstate, index);
841 2894 : indexInfo->ii_AmCache = ginstate;
842 2894 : MemoryContextSwitchTo(oldCtx);
843 : }
844 :
845 319790 : insertCtx = AllocSetContextCreate(CurrentMemoryContext,
846 : "Gin insert temporary context",
847 : ALLOCSET_DEFAULT_SIZES);
848 :
849 319790 : oldCtx = MemoryContextSwitchTo(insertCtx);
850 :
851 319790 : if (GinGetUseFastUpdate(index))
852 265688 : {
853 : GinTupleCollector collector;
854 :
855 265694 : memset(&collector, 0, sizeof(GinTupleCollector));
856 :
857 651466 : for (i = 0; i < ginstate->origTupdesc->natts; i++)
858 385772 : ginHeapTupleFastCollect(ginstate, &collector,
859 385772 : (OffsetNumber) (i + 1),
860 385772 : values[i], isnull[i],
861 : ht_ctid);
862 :
863 265694 : ginHeapTupleFastInsert(ginstate, &collector);
864 : }
865 : else
866 : {
867 108172 : for (i = 0; i < ginstate->origTupdesc->natts; i++)
868 54096 : ginHeapTupleInsert(ginstate, (OffsetNumber) (i + 1),
869 54096 : values[i], isnull[i],
870 : ht_ctid);
871 : }
872 :
873 319764 : MemoryContextSwitchTo(oldCtx);
874 319764 : MemoryContextDelete(insertCtx);
875 :
876 319764 : return false;
877 : }
878 :
879 : /*
880 : * Create parallel context, and launch workers for leader.
881 : *
882 : * buildstate argument should be initialized (with the exception of the
883 : * tuplesort states, which may later be created based on shared
884 : * state initially set up here).
885 : *
886 : * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY.
887 : *
888 : * request is the target number of parallel worker processes to launch.
889 : *
890 : * Sets buildstate's GinLeader, which caller must use to shut down parallel
891 : * mode by passing it to _gin_end_parallel() at the very end of its index
892 : * build. If not even a single worker process can be launched, this is
893 : * never set, and caller should proceed with a serial index build.
894 : */
895 : static void
896 0 : _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
897 : bool isconcurrent, int request)
898 : {
899 : ParallelContext *pcxt;
900 : int scantuplesortstates;
901 : Snapshot snapshot;
902 : Size estginshared;
903 : Size estsort;
904 : GinBuildShared *ginshared;
905 : Sharedsort *sharedsort;
906 0 : GinLeader *ginleader = (GinLeader *) palloc0(sizeof(GinLeader));
907 : WalUsage *walusage;
908 : BufferUsage *bufferusage;
909 0 : bool leaderparticipates = true;
910 : int querylen;
911 :
912 : #ifdef DISABLE_LEADER_PARTICIPATION
913 : leaderparticipates = false;
914 : #endif
915 :
916 : /*
917 : * Enter parallel mode, and create context for parallel build of gin index
918 : */
919 0 : EnterParallelMode();
920 : Assert(request > 0);
921 0 : pcxt = CreateParallelContext("postgres", "_gin_parallel_build_main",
922 : request);
923 :
924 0 : scantuplesortstates = leaderparticipates ? request + 1 : request;
925 :
926 : /*
927 : * Prepare for scan of the base relation. In a normal index build, we use
928 : * SnapshotAny because we must retrieve all tuples and do our own time
929 : * qual checks (because we have to index RECENTLY_DEAD tuples). In a
930 : * concurrent build, we take a regular MVCC snapshot and index whatever's
931 : * live according to that.
932 : */
933 0 : if (!isconcurrent)
934 0 : snapshot = SnapshotAny;
935 : else
936 0 : snapshot = RegisterSnapshot(GetTransactionSnapshot());
937 :
938 : /*
939 : * Estimate size for our own PARALLEL_KEY_GIN_SHARED workspace.
940 : */
941 0 : estginshared = _gin_parallel_estimate_shared(heap, snapshot);
942 0 : shm_toc_estimate_chunk(&pcxt->estimator, estginshared);
943 0 : estsort = tuplesort_estimate_shared(scantuplesortstates);
944 0 : shm_toc_estimate_chunk(&pcxt->estimator, estsort);
945 :
946 0 : shm_toc_estimate_keys(&pcxt->estimator, 2);
947 :
948 : /*
949 : * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
950 : * and PARALLEL_KEY_BUFFER_USAGE.
951 : *
952 : * If there are no extensions loaded that care, we could skip this. We
953 : * have no way of knowing whether anyone's looking at pgWalUsage or
954 : * pgBufferUsage, so do it unconditionally.
955 : */
956 0 : shm_toc_estimate_chunk(&pcxt->estimator,
957 : mul_size(sizeof(WalUsage), pcxt->nworkers));
958 0 : shm_toc_estimate_keys(&pcxt->estimator, 1);
959 0 : shm_toc_estimate_chunk(&pcxt->estimator,
960 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
961 0 : shm_toc_estimate_keys(&pcxt->estimator, 1);
962 :
963 : /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
964 0 : if (debug_query_string)
965 : {
966 0 : querylen = strlen(debug_query_string);
967 0 : shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
968 0 : shm_toc_estimate_keys(&pcxt->estimator, 1);
969 : }
970 : else
971 0 : querylen = 0; /* keep compiler quiet */
972 :
973 : /* Everyone's had a chance to ask for space, so now create the DSM */
974 0 : InitializeParallelDSM(pcxt);
975 :
976 : /* If no DSM segment was available, back out (do serial build) */
977 0 : if (pcxt->seg == NULL)
978 : {
979 0 : if (IsMVCCSnapshot(snapshot))
980 0 : UnregisterSnapshot(snapshot);
981 0 : DestroyParallelContext(pcxt);
982 0 : ExitParallelMode();
983 0 : return;
984 : }
985 :
986 : /* Store shared build state, for which we reserved space */
987 0 : ginshared = (GinBuildShared *) shm_toc_allocate(pcxt->toc, estginshared);
988 : /* Initialize immutable state */
989 0 : ginshared->heaprelid = RelationGetRelid(heap);
990 0 : ginshared->indexrelid = RelationGetRelid(index);
991 0 : ginshared->isconcurrent = isconcurrent;
992 0 : ginshared->scantuplesortstates = scantuplesortstates;
993 :
994 0 : ConditionVariableInit(&ginshared->workersdonecv);
995 0 : SpinLockInit(&ginshared->mutex);
996 :
997 : /* Initialize mutable state */
998 0 : ginshared->nparticipantsdone = 0;
999 0 : ginshared->reltuples = 0.0;
1000 0 : ginshared->indtuples = 0.0;
1001 :
1002 0 : table_parallelscan_initialize(heap,
1003 : ParallelTableScanFromGinBuildShared(ginshared),
1004 : snapshot);
1005 :
1006 : /*
1007 : * Store shared tuplesort-private state, for which we reserved space.
1008 : * Then, initialize opaque state using tuplesort routine.
1009 : */
1010 0 : sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
1011 0 : tuplesort_initialize_shared(sharedsort, scantuplesortstates,
1012 : pcxt->seg);
1013 :
1014 0 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_GIN_SHARED, ginshared);
1015 0 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
1016 :
1017 : /* Store query string for workers */
1018 0 : if (debug_query_string)
1019 : {
1020 : char *sharedquery;
1021 :
1022 0 : sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
1023 0 : memcpy(sharedquery, debug_query_string, querylen + 1);
1024 0 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
1025 : }
1026 :
1027 : /*
1028 : * Allocate space for each worker's WalUsage and BufferUsage; no need to
1029 : * initialize.
1030 : */
1031 0 : walusage = shm_toc_allocate(pcxt->toc,
1032 0 : mul_size(sizeof(WalUsage), pcxt->nworkers));
1033 0 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
1034 0 : bufferusage = shm_toc_allocate(pcxt->toc,
1035 0 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
1036 0 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
1037 :
1038 : /* Launch workers, saving status for leader/caller */
1039 0 : LaunchParallelWorkers(pcxt);
1040 0 : ginleader->pcxt = pcxt;
1041 0 : ginleader->nparticipanttuplesorts = pcxt->nworkers_launched;
1042 0 : if (leaderparticipates)
1043 0 : ginleader->nparticipanttuplesorts++;
1044 0 : ginleader->ginshared = ginshared;
1045 0 : ginleader->sharedsort = sharedsort;
1046 0 : ginleader->snapshot = snapshot;
1047 0 : ginleader->walusage = walusage;
1048 0 : ginleader->bufferusage = bufferusage;
1049 :
1050 : /* If no workers were successfully launched, back out (do serial build) */
1051 0 : if (pcxt->nworkers_launched == 0)
1052 : {
1053 0 : _gin_end_parallel(ginleader, NULL);
1054 0 : return;
1055 : }
1056 :
1057 : /* Save leader state now that it's clear build will be parallel */
1058 0 : buildstate->bs_leader = ginleader;
1059 :
1060 : /* Join heap scan ourselves */
1061 0 : if (leaderparticipates)
1062 0 : _gin_leader_participate_as_worker(buildstate, heap, index);
1063 :
1064 : /*
1065 : * Caller needs to wait for all launched workers when we return. Make
1066 : * sure that the failure-to-start case will not hang forever.
1067 : */
1068 0 : WaitForParallelWorkersToAttach(pcxt);
1069 : }
1070 :
1071 : /*
1072 : * Shut down workers, destroy parallel context, and end parallel mode.
1073 : */
1074 : static void
1075 0 : _gin_end_parallel(GinLeader *ginleader, GinBuildState *state)
1076 : {
1077 : int i;
1078 :
1079 : /* Shutdown worker processes */
1080 0 : WaitForParallelWorkersToFinish(ginleader->pcxt);
1081 :
1082 : /*
1083 : * Next, accumulate WAL usage. (This must wait for the workers to finish,
1084 : * or we might get incomplete data.)
1085 : */
1086 0 : for (i = 0; i < ginleader->pcxt->nworkers_launched; i++)
1087 0 : InstrAccumParallelQuery(&ginleader->bufferusage[i], &ginleader->walusage[i]);
1088 :
1089 : /* Free last reference to MVCC snapshot, if one was used */
1090 0 : if (IsMVCCSnapshot(ginleader->snapshot))
1091 0 : UnregisterSnapshot(ginleader->snapshot);
1092 0 : DestroyParallelContext(ginleader->pcxt);
1093 0 : ExitParallelMode();
1094 0 : }
1095 :
1096 : /*
1097 : * Within leader, wait for end of heap scan.
1098 : *
1099 : * When called, parallel heap scan started by _gin_begin_parallel() will
1100 : * already be underway within worker processes (when leader participates
1101 : * as a worker, we should end up here just as workers are finishing).
1102 : *
1103 : * Returns the total number of heap tuples scanned.
1104 : */
1105 : static double
1106 0 : _gin_parallel_heapscan(GinBuildState *state)
1107 : {
1108 0 : GinBuildShared *ginshared = state->bs_leader->ginshared;
1109 : int nparticipanttuplesorts;
1110 :
1111 0 : nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts;
1112 : for (;;)
1113 : {
1114 0 : SpinLockAcquire(&ginshared->mutex);
1115 0 : if (ginshared->nparticipantsdone == nparticipanttuplesorts)
1116 : {
1117 : /* copy the data into leader state */
1118 0 : state->bs_reltuples = ginshared->reltuples;
1119 0 : state->bs_numtuples = ginshared->indtuples;
1120 :
1121 0 : SpinLockRelease(&ginshared->mutex);
1122 0 : break;
1123 : }
1124 0 : SpinLockRelease(&ginshared->mutex);
1125 :
1126 0 : ConditionVariableSleep(&ginshared->workersdonecv,
1127 : WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN);
1128 : }
1129 :
1130 0 : ConditionVariableCancelSleep();
1131 :
1132 0 : return state->bs_reltuples;
1133 : }
1134 :
1135 : /*
1136 : * Buffer used to accumulate TIDs from multiple GinTuples for the same key
1137 : * (we read these from the tuplesort, sorted by the key).
1138 : *
1139 : * This is similar to BuildAccumulator in that it's used to collect TIDs
1140 : * in memory before inserting them into the index, but it's much simpler
1141 : * as it only deals with a single index key at a time.
1142 : *
1143 : * When adding TIDs to the buffer, we make sure to keep them sorted, both
1144 : * during the initial table scan (and detecting when the scan wraps around),
1145 : * and during merging (where we do mergesort).
1146 : */
1147 : typedef struct GinBuffer
1148 : {
1149 : OffsetNumber attnum;
1150 : GinNullCategory category;
1151 : Datum key; /* 0 if no key (and keylen == 0) */
1152 : Size keylen; /* number of bytes (not typlen) */
1153 :
1154 : /* type info */
1155 : int16 typlen;
1156 : bool typbyval;
1157 :
1158 : /* Number of TIDs to collect before attempt to write some out. */
1159 : int maxitems;
1160 :
1161 : /* array of TID values */
1162 : int nitems;
1163 : int nfrozen;
1164 : SortSupport ssup; /* for sorting/comparing keys */
1165 : ItemPointerData *items;
1166 : } GinBuffer;
1167 :
1168 : /*
1169 : * Check that TID array contains valid values, and that it's sorted (if we
1170 : * expect it to be).
1171 : */
1172 : static void
1173 0 : AssertCheckItemPointers(GinBuffer *buffer)
1174 : {
1175 : #ifdef USE_ASSERT_CHECKING
1176 : /* we should not have a buffer with no TIDs to sort */
1177 : Assert(buffer->items != NULL);
1178 : Assert(buffer->nitems > 0);
1179 :
1180 : for (int i = 0; i < buffer->nitems; i++)
1181 : {
1182 : Assert(ItemPointerIsValid(&buffer->items[i]));
1183 :
1184 : /* don't check ordering for the first TID item */
1185 : if (i == 0)
1186 : continue;
1187 :
1188 : Assert(ItemPointerCompare(&buffer->items[i - 1], &buffer->items[i]) < 0);
1189 : }
1190 : #endif
1191 0 : }
1192 :
1193 : /*
1194 : * GinBuffer checks
1195 : *
1196 : * Make sure the nitems/items fields are consistent (either the array is empty
1197 : * or not empty, the fields need to agree). If there are items, check ordering.
1198 : */
1199 : static void
1200 0 : AssertCheckGinBuffer(GinBuffer *buffer)
1201 : {
1202 : #ifdef USE_ASSERT_CHECKING
1203 : /* if we have any items, the array must exist */
1204 : Assert(!((buffer->nitems > 0) && (buffer->items == NULL)));
1205 :
1206 : /*
1207 : * The buffer may be empty, in which case we must not call the check of
1208 : * item pointers, because that assumes non-emptiness.
1209 : */
1210 : if (buffer->nitems == 0)
1211 : return;
1212 :
1213 : /* Make sure the item pointers are valid and sorted. */
1214 : AssertCheckItemPointers(buffer);
1215 : #endif
1216 0 : }
1217 :
1218 : /*
1219 : * GinBufferInit
1220 : * Initialize buffer to store tuples for a GIN index.
1221 : *
1222 : * Initialize the buffer used to accumulate TID for a single key at a time
1223 : * (we process the data sorted), so we know when we received all data for
1224 : * a given key.
1225 : *
1226 : * Initializes sort support procedures for all index attributes.
1227 : */
1228 : static GinBuffer *
1229 0 : GinBufferInit(Relation index)
1230 : {
1231 0 : GinBuffer *buffer = palloc0(sizeof(GinBuffer));
1232 : int i,
1233 : nKeys;
1234 0 : TupleDesc desc = RelationGetDescr(index);
1235 :
1236 : /*
1237 : * How many items can we fit into the memory limit? We don't want to end
1238 : * with too many TIDs. and 64kB seems more than enough. But maybe this
1239 : * should be tied to maintenance_work_mem or something like that?
1240 : */
1241 0 : buffer->maxitems = (64 * 1024L) / sizeof(ItemPointerData);
1242 :
1243 0 : nKeys = IndexRelationGetNumberOfKeyAttributes(index);
1244 :
1245 0 : buffer->ssup = palloc0(sizeof(SortSupportData) * nKeys);
1246 :
1247 : /*
1248 : * Lookup ordering operator for the index key data type, and initialize
1249 : * the sort support function.
1250 : */
1251 0 : for (i = 0; i < nKeys; i++)
1252 : {
1253 : Oid cmpFunc;
1254 0 : SortSupport sortKey = &buffer->ssup[i];
1255 0 : Form_pg_attribute att = TupleDescAttr(desc, i);
1256 :
1257 0 : sortKey->ssup_cxt = CurrentMemoryContext;
1258 0 : sortKey->ssup_collation = index->rd_indcollation[i];
1259 :
1260 0 : if (!OidIsValid(sortKey->ssup_collation))
1261 0 : sortKey->ssup_collation = DEFAULT_COLLATION_OID;
1262 :
1263 0 : sortKey->ssup_nulls_first = false;
1264 0 : sortKey->ssup_attno = i + 1;
1265 0 : sortKey->abbreviate = false;
1266 :
1267 : Assert(sortKey->ssup_attno != 0);
1268 :
1269 : /*
1270 : * If the compare proc isn't specified in the opclass definition, look
1271 : * up the index key type's default btree comparator.
1272 : */
1273 0 : cmpFunc = index_getprocid(index, i + 1, GIN_COMPARE_PROC);
1274 0 : if (cmpFunc == InvalidOid)
1275 : {
1276 : TypeCacheEntry *typentry;
1277 :
1278 0 : typentry = lookup_type_cache(att->atttypid,
1279 : TYPECACHE_CMP_PROC_FINFO);
1280 0 : if (!OidIsValid(typentry->cmp_proc_finfo.fn_oid))
1281 0 : ereport(ERROR,
1282 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
1283 : errmsg("could not identify a comparison function for type %s",
1284 : format_type_be(att->atttypid))));
1285 :
1286 0 : cmpFunc = typentry->cmp_proc_finfo.fn_oid;
1287 : }
1288 :
1289 0 : PrepareSortSupportComparisonShim(cmpFunc, sortKey);
1290 : }
1291 :
1292 0 : return buffer;
1293 : }
1294 :
1295 : /* Is the buffer empty, i.e. has no TID values in the array? */
1296 : static bool
1297 0 : GinBufferIsEmpty(GinBuffer *buffer)
1298 : {
1299 0 : return (buffer->nitems == 0);
1300 : }
1301 :
1302 : /*
1303 : * GinBufferKeyEquals
1304 : * Can the buffer store TIDs for the provided GIN tuple (same key)?
1305 : *
1306 : * Compare if the tuple matches the already accumulated data in the GIN
1307 : * buffer. Compare scalar fields first, before the actual key.
1308 : *
1309 : * Returns true if the key matches, and the TID belonds to the buffer, or
1310 : * false if the key does not match.
1311 : */
1312 : static bool
1313 0 : GinBufferKeyEquals(GinBuffer *buffer, GinTuple *tup)
1314 : {
1315 : int r;
1316 : Datum tupkey;
1317 :
1318 0 : AssertCheckGinBuffer(buffer);
1319 :
1320 0 : if (tup->attrnum != buffer->attnum)
1321 0 : return false;
1322 :
1323 : /* same attribute should have the same type info */
1324 : Assert(tup->typbyval == buffer->typbyval);
1325 : Assert(tup->typlen == buffer->typlen);
1326 :
1327 0 : if (tup->category != buffer->category)
1328 0 : return false;
1329 :
1330 : /*
1331 : * For NULL/empty keys, this means equality, for normal keys we need to
1332 : * compare the actual key value.
1333 : */
1334 0 : if (buffer->category != GIN_CAT_NORM_KEY)
1335 0 : return true;
1336 :
1337 : /*
1338 : * For the tuple, get either the first sizeof(Datum) bytes for byval
1339 : * types, or a pointer to the beginning of the data array.
1340 : */
1341 0 : tupkey = (buffer->typbyval) ? *(Datum *) tup->data : PointerGetDatum(tup->data);
1342 :
1343 0 : r = ApplySortComparator(buffer->key, false,
1344 : tupkey, false,
1345 0 : &buffer->ssup[buffer->attnum - 1]);
1346 :
1347 0 : return (r == 0);
1348 : }
1349 :
1350 : /*
1351 : * GinBufferShouldTrim
1352 : * Should we trim the list of item pointers?
1353 : *
1354 : * By trimming we understand writing out and removing the tuple IDs that
1355 : * we know can't change by future merges. We can deduce the TID up to which
1356 : * this is guaranteed from the "first" TID in each GIN tuple, which provides
1357 : * a "horizon" (for a given key) thanks to the sort.
1358 : *
1359 : * We don't want to do this too often - compressing longer TID lists is more
1360 : * efficient. But we also don't want to accumulate too many TIDs, for two
1361 : * reasons. First, it consumes memory and we might exceed maintenance_work_mem
1362 : * (or whatever limit applies), even if that's unlikely because TIDs are very
1363 : * small so we can fit a lot of them. Second, and more importantly, long TID
1364 : * lists are an issue if the scan wraps around, because a key may get a very
1365 : * wide list (with min/max TID for that key), forcing "full" mergesorts for
1366 : * every list merged into it (instead of the efficient append).
1367 : *
1368 : * So we look at two things when deciding if to trim - if the resulting list
1369 : * (after adding TIDs from the new tuple) would be too long, and if there is
1370 : * enough TIDs to trim (with values less than "first" TID from the new tuple),
1371 : * we do the trim. By enough we mean at least 128 TIDs (mostly an arbitrary
1372 : * number).
1373 : */
1374 : static bool
1375 0 : GinBufferShouldTrim(GinBuffer *buffer, GinTuple *tup)
1376 : {
1377 : /* not enough TIDs to trim (1024 is somewhat arbitrary number) */
1378 0 : if (buffer->nfrozen < 1024)
1379 0 : return false;
1380 :
1381 : /* no need to trim if we have not hit the memory limit yet */
1382 0 : if ((buffer->nitems + tup->nitems) < buffer->maxitems)
1383 0 : return false;
1384 :
1385 : /*
1386 : * OK, we have enough frozen TIDs to flush, and we have hit the memory
1387 : * limit, so it's time to write it out.
1388 : */
1389 0 : return true;
1390 : }
1391 :
1392 : /*
1393 : * GinBufferStoreTuple
1394 : * Add data (especially TID list) from a GIN tuple to the buffer.
1395 : *
1396 : * The buffer is expected to be empty (in which case it's initialized), or
1397 : * having the same key. The TID values from the tuple are combined with the
1398 : * stored values using a merge sort.
1399 : *
1400 : * The tuples (for the same key) are expected to be sorted by first TID. But
1401 : * this does not guarantee the lists do not overlap, especially in the leader,
1402 : * because the workers process interleaving data. There should be no overlaps
1403 : * in a single worker - it could happen when the parallel scan wraps around,
1404 : * but we detect that and flush the data (see ginBuildCallbackParallel).
1405 : *
1406 : * By sorting the GinTuple not only by key, but also by the first TID, we make
1407 : * it more less likely the lists will overlap during merge. We merge them using
1408 : * mergesort, but it's cheaper to just append one list to the other.
1409 : *
1410 : * How often can the lists overlap? There should be no overlaps in workers,
1411 : * and in the leader we can see overlaps between lists built by different
1412 : * workers. But the workers merge the items as much as possible, so there
1413 : * should not be too many.
1414 : */
1415 : static void
1416 0 : GinBufferStoreTuple(GinBuffer *buffer, GinTuple *tup)
1417 : {
1418 : ItemPointerData *items;
1419 : Datum key;
1420 :
1421 0 : AssertCheckGinBuffer(buffer);
1422 :
1423 0 : key = _gin_parse_tuple_key(tup);
1424 0 : items = _gin_parse_tuple_items(tup);
1425 :
1426 : /* if the buffer is empty, set the fields (and copy the key) */
1427 0 : if (GinBufferIsEmpty(buffer))
1428 : {
1429 0 : buffer->category = tup->category;
1430 0 : buffer->keylen = tup->keylen;
1431 0 : buffer->attnum = tup->attrnum;
1432 :
1433 0 : buffer->typlen = tup->typlen;
1434 0 : buffer->typbyval = tup->typbyval;
1435 :
1436 0 : if (tup->category == GIN_CAT_NORM_KEY)
1437 0 : buffer->key = datumCopy(key, buffer->typbyval, buffer->typlen);
1438 : else
1439 0 : buffer->key = (Datum) 0;
1440 : }
1441 :
1442 : /*
1443 : * Try freeze TIDs at the beginning of the list, i.e. exclude them from
1444 : * the mergesort. We can do that with TIDs before the first TID in the new
1445 : * tuple we're about to add into the buffer.
1446 : *
1447 : * We do this incrementally when adding data into the in-memory buffer,
1448 : * and not later (e.g. when hitting a memory limit), because it allows us
1449 : * to skip the frozen data during the mergesort, making it cheaper.
1450 : */
1451 :
1452 : /*
1453 : * Check if the last TID in the current list is frozen. This is the case
1454 : * when merging non-overlapping lists, e.g. in each parallel worker.
1455 : */
1456 0 : if ((buffer->nitems > 0) &&
1457 0 : (ItemPointerCompare(&buffer->items[buffer->nitems - 1],
1458 : GinTupleGetFirst(tup)) == 0))
1459 0 : buffer->nfrozen = buffer->nitems;
1460 :
1461 : /*
1462 : * Now find the last TID we know to be frozen, i.e. the last TID right
1463 : * before the new GIN tuple.
1464 : *
1465 : * Start with the first not-yet-frozen tuple, and walk until we find the
1466 : * first TID that's higher. If we already know the whole list is frozen
1467 : * (i.e. nfrozen == nitems), this does nothing.
1468 : *
1469 : * XXX This might do a binary search for sufficiently long lists, but it
1470 : * does not seem worth the complexity. Overlapping lists should be rare
1471 : * common, TID comparisons are cheap, and we should quickly freeze most of
1472 : * the list.
1473 : */
1474 0 : for (int i = buffer->nfrozen; i < buffer->nitems; i++)
1475 : {
1476 : /* Is the TID after the first TID of the new tuple? Can't freeze. */
1477 0 : if (ItemPointerCompare(&buffer->items[i],
1478 : GinTupleGetFirst(tup)) > 0)
1479 0 : break;
1480 :
1481 0 : buffer->nfrozen++;
1482 : }
1483 :
1484 : /* add the new TIDs into the buffer, combine using merge-sort */
1485 : {
1486 : int nnew;
1487 : ItemPointer new;
1488 :
1489 : /*
1490 : * Resize the array - we do this first, because we'll dereference the
1491 : * first unfrozen TID, which would fail if the array is NULL. We'll
1492 : * still pass 0 as number of elements in that array though.
1493 : */
1494 0 : if (buffer->items == NULL)
1495 0 : buffer->items = palloc((buffer->nitems + tup->nitems) * sizeof(ItemPointerData));
1496 : else
1497 0 : buffer->items = repalloc(buffer->items,
1498 0 : (buffer->nitems + tup->nitems) * sizeof(ItemPointerData));
1499 :
1500 0 : new = ginMergeItemPointers(&buffer->items[buffer->nfrozen], /* first unfronzen */
1501 0 : (buffer->nitems - buffer->nfrozen), /* num of unfrozen */
1502 0 : items, tup->nitems, &nnew);
1503 :
1504 : Assert(nnew == (tup->nitems + (buffer->nitems - buffer->nfrozen)));
1505 :
1506 0 : memcpy(&buffer->items[buffer->nfrozen], new,
1507 : nnew * sizeof(ItemPointerData));
1508 :
1509 0 : pfree(new);
1510 :
1511 0 : buffer->nitems += tup->nitems;
1512 :
1513 0 : AssertCheckItemPointers(buffer);
1514 : }
1515 :
1516 : /* free the decompressed TID list */
1517 0 : pfree(items);
1518 0 : }
1519 :
1520 : /*
1521 : * GinBufferReset
1522 : * Reset the buffer into a state as if it contains no data.
1523 : */
1524 : static void
1525 0 : GinBufferReset(GinBuffer *buffer)
1526 : {
1527 : Assert(!GinBufferIsEmpty(buffer));
1528 :
1529 : /* release byref values, do nothing for by-val ones */
1530 0 : if ((buffer->category == GIN_CAT_NORM_KEY) && !buffer->typbyval)
1531 0 : pfree(DatumGetPointer(buffer->key));
1532 :
1533 : /*
1534 : * Not required, but makes it more likely to trigger NULL derefefence if
1535 : * using the value incorrectly, etc.
1536 : */
1537 0 : buffer->key = (Datum) 0;
1538 :
1539 0 : buffer->attnum = 0;
1540 0 : buffer->category = 0;
1541 0 : buffer->keylen = 0;
1542 0 : buffer->nitems = 0;
1543 0 : buffer->nfrozen = 0;
1544 :
1545 0 : buffer->typlen = 0;
1546 0 : buffer->typbyval = 0;
1547 0 : }
1548 :
1549 : /*
1550 : * GinBufferTrim
1551 : * Discard the "frozen" part of the TID list (which should have been
1552 : * written to disk/index before this call).
1553 : */
1554 : static void
1555 0 : GinBufferTrim(GinBuffer *buffer)
1556 : {
1557 : Assert((buffer->nfrozen > 0) && (buffer->nfrozen <= buffer->nitems));
1558 :
1559 0 : memmove(&buffer->items[0], &buffer->items[buffer->nfrozen],
1560 0 : sizeof(ItemPointerData) * (buffer->nitems - buffer->nfrozen));
1561 :
1562 0 : buffer->nitems -= buffer->nfrozen;
1563 0 : buffer->nfrozen = 0;
1564 0 : }
1565 :
1566 : /*
1567 : * GinBufferFree
1568 : * Release memory associated with the GinBuffer (including TID array).
1569 : */
1570 : static void
1571 0 : GinBufferFree(GinBuffer *buffer)
1572 : {
1573 0 : if (buffer->items)
1574 0 : pfree(buffer->items);
1575 :
1576 : /* release byref values, do nothing for by-val ones */
1577 0 : if (!GinBufferIsEmpty(buffer) &&
1578 0 : (buffer->category == GIN_CAT_NORM_KEY) && !buffer->typbyval)
1579 0 : pfree(DatumGetPointer(buffer->key));
1580 :
1581 0 : pfree(buffer);
1582 0 : }
1583 :
1584 : /*
1585 : * GinBufferCanAddKey
1586 : * Check if a given GIN tuple can be added to the current buffer.
1587 : *
1588 : * Returns true if the buffer is either empty or for the same index key.
1589 : */
1590 : static bool
1591 0 : GinBufferCanAddKey(GinBuffer *buffer, GinTuple *tup)
1592 : {
1593 : /* empty buffer can accept data for any key */
1594 0 : if (GinBufferIsEmpty(buffer))
1595 0 : return true;
1596 :
1597 : /* otherwise just data for the same key */
1598 0 : return GinBufferKeyEquals(buffer, tup);
1599 : }
1600 :
1601 : /*
1602 : * Within leader, wait for end of heap scan and merge per-worker results.
1603 : *
1604 : * After waiting for all workers to finish, merge the per-worker results into
1605 : * the complete index. The results from each worker are sorted by block number
1606 : * (start of the page range). While combinig the per-worker results we merge
1607 : * summaries for the same page range, and also fill-in empty summaries for
1608 : * ranges without any tuples.
1609 : *
1610 : * Returns the total number of heap tuples scanned.
1611 : */
1612 : static double
1613 0 : _gin_parallel_merge(GinBuildState *state)
1614 : {
1615 : GinTuple *tup;
1616 : Size tuplen;
1617 0 : double reltuples = 0;
1618 : GinBuffer *buffer;
1619 :
1620 : /* GIN tuples from workers, merged by leader */
1621 0 : double numtuples = 0;
1622 :
1623 : /* wait for workers to scan table and produce partial results */
1624 0 : reltuples = _gin_parallel_heapscan(state);
1625 :
1626 : /* Execute the sort */
1627 0 : pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
1628 : PROGRESS_GIN_PHASE_PERFORMSORT_2);
1629 :
1630 : /* do the actual sort in the leader */
1631 0 : tuplesort_performsort(state->bs_sortstate);
1632 :
1633 : /*
1634 : * Initialize buffer to combine entries for the same key.
1635 : *
1636 : * The leader is allowed to use the whole maintenance_work_mem buffer to
1637 : * combine data. The parallel workers already completed.
1638 : */
1639 0 : buffer = GinBufferInit(state->ginstate.index);
1640 :
1641 : /*
1642 : * Set the progress target for the next phase. Reset the block number
1643 : * values set by table_index_build_scan
1644 : */
1645 : {
1646 0 : const int progress_index[] = {
1647 : PROGRESS_CREATEIDX_SUBPHASE,
1648 : PROGRESS_CREATEIDX_TUPLES_TOTAL,
1649 : PROGRESS_SCAN_BLOCKS_TOTAL,
1650 : PROGRESS_SCAN_BLOCKS_DONE
1651 : };
1652 0 : const int64 progress_vals[] = {
1653 : PROGRESS_GIN_PHASE_MERGE_2,
1654 0 : state->bs_numtuples,
1655 : 0, 0
1656 : };
1657 :
1658 0 : pgstat_progress_update_multi_param(4, progress_index, progress_vals);
1659 : }
1660 :
1661 : /*
1662 : * Read the GIN tuples from the shared tuplesort, sorted by category and
1663 : * key. That probably gives us order matching how data is organized in the
1664 : * index.
1665 : *
1666 : * We don't insert the GIN tuples right away, but instead accumulate as
1667 : * many TIDs for the same key as possible, and then insert that at once.
1668 : * This way we don't need to decompress/recompress the posting lists, etc.
1669 : */
1670 0 : while ((tup = tuplesort_getgintuple(state->bs_sortstate, &tuplen, true)) != NULL)
1671 : {
1672 0 : CHECK_FOR_INTERRUPTS();
1673 :
1674 : /*
1675 : * If the buffer can accept the new GIN tuple, just store it there and
1676 : * we're done. If it's a different key (or maybe too much data) flush
1677 : * the current contents into the index first.
1678 : */
1679 0 : if (!GinBufferCanAddKey(buffer, tup))
1680 : {
1681 : /*
1682 : * Buffer is not empty and it's storing a different key - flush
1683 : * the data into the insert, and start a new entry for current
1684 : * GinTuple.
1685 : */
1686 0 : AssertCheckItemPointers(buffer);
1687 :
1688 0 : ginEntryInsert(&state->ginstate,
1689 0 : buffer->attnum, buffer->key, buffer->category,
1690 0 : buffer->items, buffer->nitems, &state->buildStats);
1691 :
1692 : /* discard the existing data */
1693 0 : GinBufferReset(buffer);
1694 : }
1695 :
1696 : /*
1697 : * We're about to add a GIN tuple to the buffer - check the memory
1698 : * limit first, and maybe write out some of the data into the index
1699 : * first, if needed (and possible). We only flush the part of the TID
1700 : * list that we know won't change, and only if there's enough data for
1701 : * compression to work well.
1702 : */
1703 0 : if (GinBufferShouldTrim(buffer, tup))
1704 : {
1705 : Assert(buffer->nfrozen > 0);
1706 :
1707 : /*
1708 : * Buffer is not empty and it's storing a different key - flush
1709 : * the data into the insert, and start a new entry for current
1710 : * GinTuple.
1711 : */
1712 0 : AssertCheckItemPointers(buffer);
1713 :
1714 0 : ginEntryInsert(&state->ginstate,
1715 0 : buffer->attnum, buffer->key, buffer->category,
1716 0 : buffer->items, buffer->nfrozen, &state->buildStats);
1717 :
1718 : /* truncate the data we've just discarded */
1719 0 : GinBufferTrim(buffer);
1720 : }
1721 :
1722 : /*
1723 : * Remember data for the current tuple (either remember the new key,
1724 : * or append if to the existing data).
1725 : */
1726 0 : GinBufferStoreTuple(buffer, tup);
1727 :
1728 : /* Report progress */
1729 0 : pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE,
1730 : ++numtuples);
1731 : }
1732 :
1733 : /* flush data remaining in the buffer (for the last key) */
1734 0 : if (!GinBufferIsEmpty(buffer))
1735 : {
1736 0 : AssertCheckItemPointers(buffer);
1737 :
1738 0 : ginEntryInsert(&state->ginstate,
1739 0 : buffer->attnum, buffer->key, buffer->category,
1740 0 : buffer->items, buffer->nitems, &state->buildStats);
1741 :
1742 : /* discard the existing data */
1743 0 : GinBufferReset(buffer);
1744 :
1745 : /* Report progress */
1746 0 : pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE,
1747 : ++numtuples);
1748 : }
1749 :
1750 : /* relase all the memory */
1751 0 : GinBufferFree(buffer);
1752 :
1753 0 : tuplesort_end(state->bs_sortstate);
1754 :
1755 0 : return reltuples;
1756 : }
1757 :
1758 : /*
1759 : * Returns size of shared memory required to store state for a parallel
1760 : * gin index build based on the snapshot its parallel scan will use.
1761 : */
1762 : static Size
1763 0 : _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
1764 : {
1765 : /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
1766 0 : return add_size(BUFFERALIGN(sizeof(GinBuildShared)),
1767 : table_parallelscan_estimate(heap, snapshot));
1768 : }
1769 :
1770 : /*
1771 : * Within leader, participate as a parallel worker.
1772 : */
1773 : static void
1774 0 : _gin_leader_participate_as_worker(GinBuildState *buildstate, Relation heap, Relation index)
1775 : {
1776 0 : GinLeader *ginleader = buildstate->bs_leader;
1777 : int sortmem;
1778 :
1779 : /*
1780 : * Might as well use reliable figure when doling out maintenance_work_mem
1781 : * (when requested number of workers were not launched, this will be
1782 : * somewhat higher than it is for other workers).
1783 : */
1784 0 : sortmem = maintenance_work_mem / ginleader->nparticipanttuplesorts;
1785 :
1786 : /* Perform work common to all participants */
1787 0 : _gin_parallel_scan_and_build(buildstate, ginleader->ginshared,
1788 : ginleader->sharedsort, heap, index,
1789 : sortmem, true);
1790 0 : }
1791 :
1792 : /*
1793 : * _gin_process_worker_data
1794 : * First phase of the key merging, happening in the worker.
1795 : *
1796 : * Depending on the number of distinct keys, the TID lists produced by the
1797 : * callback may be very short (due to frequent evictions in the callback).
1798 : * But combining many tiny lists is expensive, so we try to do as much as
1799 : * possible in the workers and only then pass the results to the leader.
1800 : *
1801 : * We read the tuples sorted by the key, and merge them into larger lists.
1802 : * At the moment there's no memory limit, so this will just produce one
1803 : * huge (sorted) list per key in each worker. Which means the leader will
1804 : * do a very limited number of mergesorts, which is good.
1805 : */
1806 : static void
1807 0 : _gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort,
1808 : bool progress)
1809 : {
1810 : GinTuple *tup;
1811 : Size tuplen;
1812 :
1813 : GinBuffer *buffer;
1814 :
1815 : /*
1816 : * Initialize buffer to combine entries for the same key.
1817 : *
1818 : * The workers are limited to the same amount of memory as during the sort
1819 : * in ginBuildCallbackParallel. But this probably should be the 32MB used
1820 : * during planning, just like there.
1821 : */
1822 0 : buffer = GinBufferInit(state->ginstate.index);
1823 :
1824 : /* sort the raw per-worker data */
1825 0 : if (progress)
1826 0 : pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
1827 : PROGRESS_GIN_PHASE_PERFORMSORT_1);
1828 :
1829 0 : tuplesort_performsort(state->bs_worker_sort);
1830 :
1831 : /* reset the number of GIN tuples produced by this worker */
1832 0 : state->bs_numtuples = 0;
1833 :
1834 0 : if (progress)
1835 0 : pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
1836 : PROGRESS_GIN_PHASE_MERGE_1);
1837 :
1838 : /*
1839 : * Read the GIN tuples from the shared tuplesort, sorted by the key, and
1840 : * merge them into larger chunks for the leader to combine.
1841 : */
1842 0 : while ((tup = tuplesort_getgintuple(worker_sort, &tuplen, true)) != NULL)
1843 : {
1844 :
1845 0 : CHECK_FOR_INTERRUPTS();
1846 :
1847 : /*
1848 : * If the buffer can accept the new GIN tuple, just store it there and
1849 : * we're done. If it's a different key (or maybe too much data) flush
1850 : * the current contents into the index first.
1851 : */
1852 0 : if (!GinBufferCanAddKey(buffer, tup))
1853 : {
1854 : GinTuple *ntup;
1855 : Size ntuplen;
1856 :
1857 : /*
1858 : * Buffer is not empty and it's storing a different key - flush
1859 : * the data into the insert, and start a new entry for current
1860 : * GinTuple.
1861 : */
1862 0 : AssertCheckItemPointers(buffer);
1863 :
1864 0 : ntup = _gin_build_tuple(buffer->attnum, buffer->category,
1865 0 : buffer->key, buffer->typlen, buffer->typbyval,
1866 0 : buffer->items, buffer->nitems, &ntuplen);
1867 :
1868 0 : tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
1869 0 : state->bs_numtuples++;
1870 :
1871 0 : pfree(ntup);
1872 :
1873 : /* discard the existing data */
1874 0 : GinBufferReset(buffer);
1875 : }
1876 :
1877 : /*
1878 : * We're about to add a GIN tuple to the buffer - check the memory
1879 : * limit first, and maybe write out some of the data into the index
1880 : * first, if needed (and possible). We only flush the part of the TID
1881 : * list that we know won't change, and only if there's enough data for
1882 : * compression to work well.
1883 : */
1884 0 : if (GinBufferShouldTrim(buffer, tup))
1885 : {
1886 : GinTuple *ntup;
1887 : Size ntuplen;
1888 :
1889 : Assert(buffer->nfrozen > 0);
1890 :
1891 : /*
1892 : * Buffer is not empty and it's storing a different key - flush
1893 : * the data into the insert, and start a new entry for current
1894 : * GinTuple.
1895 : */
1896 0 : AssertCheckItemPointers(buffer);
1897 :
1898 0 : ntup = _gin_build_tuple(buffer->attnum, buffer->category,
1899 0 : buffer->key, buffer->typlen, buffer->typbyval,
1900 0 : buffer->items, buffer->nfrozen, &ntuplen);
1901 :
1902 0 : tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
1903 :
1904 0 : pfree(ntup);
1905 :
1906 : /* truncate the data we've just discarded */
1907 0 : GinBufferTrim(buffer);
1908 : }
1909 :
1910 : /*
1911 : * Remember data for the current tuple (either remember the new key,
1912 : * or append if to the existing data).
1913 : */
1914 0 : GinBufferStoreTuple(buffer, tup);
1915 : }
1916 :
1917 : /* flush data remaining in the buffer (for the last key) */
1918 0 : if (!GinBufferIsEmpty(buffer))
1919 : {
1920 : GinTuple *ntup;
1921 : Size ntuplen;
1922 :
1923 0 : AssertCheckItemPointers(buffer);
1924 :
1925 0 : ntup = _gin_build_tuple(buffer->attnum, buffer->category,
1926 0 : buffer->key, buffer->typlen, buffer->typbyval,
1927 0 : buffer->items, buffer->nitems, &ntuplen);
1928 :
1929 0 : tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
1930 0 : state->bs_numtuples++;
1931 :
1932 0 : pfree(ntup);
1933 :
1934 : /* discard the existing data */
1935 0 : GinBufferReset(buffer);
1936 : }
1937 :
1938 : /* relase all the memory */
1939 0 : GinBufferFree(buffer);
1940 :
1941 0 : tuplesort_end(worker_sort);
1942 0 : }
1943 :
1944 : /*
1945 : * Perform a worker's portion of a parallel GIN index build sort.
1946 : *
1947 : * This generates a tuplesort for the worker portion of the table.
1948 : *
1949 : * sortmem is the amount of working memory to use within each worker,
1950 : * expressed in KBs.
1951 : *
1952 : * When this returns, workers are done, and need only release resources.
1953 : *
1954 : * Before feeding data into a shared tuplesort (for the leader process),
1955 : * the workers process data in two phases.
1956 : *
1957 : * 1) A worker reads a portion of rows from the table, accumulates entries
1958 : * in memory, and flushes them into a private tuplesort (e.g. because of
1959 : * using too much memory).
1960 : *
1961 : * 2) The private tuplesort gets sorted (by key and TID), the worker reads
1962 : * the data again, and combines the entries as much as possible. This has
1963 : * to happen eventually, and this way it's done in workers in parallel.
1964 : *
1965 : * Finally, the combined entries are written into the shared tuplesort, so
1966 : * that the leader can process them.
1967 : *
1968 : * How well this works (compared to just writing entries into the shared
1969 : * tuplesort) depends on the data set. For large tables with many distinct
1970 : * keys this helps a lot. With many distinct keys it's likely the buffers has
1971 : * to be flushed often, generating many entries with the same key and short
1972 : * TID lists. These entries need to be sorted and merged at some point,
1973 : * before writing them to the index. The merging is quite expensive, it can
1974 : * easily be ~50% of a serial build, and doing as much of it in the workers
1975 : * means it's parallelized. The leader still has to merge results from the
1976 : * workers, but it's much more efficient to merge few large entries than
1977 : * many tiny ones.
1978 : *
1979 : * This also reduces the amount of data the workers pass to the leader through
1980 : * the shared tuplesort. OTOH the workers need more space for the private sort,
1981 : * possibly up to 2x of the data, if no entries be merged in a worker. But this
1982 : * is very unlikely, and the only consequence is inefficiency, so we ignore it.
1983 : */
1984 : static void
1985 0 : _gin_parallel_scan_and_build(GinBuildState *state,
1986 : GinBuildShared *ginshared, Sharedsort *sharedsort,
1987 : Relation heap, Relation index,
1988 : int sortmem, bool progress)
1989 : {
1990 : SortCoordinate coordinate;
1991 : TableScanDesc scan;
1992 : double reltuples;
1993 : IndexInfo *indexInfo;
1994 :
1995 : /* Initialize local tuplesort coordination state */
1996 0 : coordinate = palloc0(sizeof(SortCoordinateData));
1997 0 : coordinate->isWorker = true;
1998 0 : coordinate->nParticipants = -1;
1999 0 : coordinate->sharedsort = sharedsort;
2000 :
2001 : /* remember how much space is allowed for the accumulated entries */
2002 0 : state->work_mem = (sortmem / 2);
2003 :
2004 : /* Begin "partial" tuplesort */
2005 0 : state->bs_sortstate = tuplesort_begin_index_gin(heap, index,
2006 : state->work_mem,
2007 : coordinate,
2008 : TUPLESORT_NONE);
2009 :
2010 : /* Local per-worker sort of raw-data */
2011 0 : state->bs_worker_sort = tuplesort_begin_index_gin(heap, index,
2012 : state->work_mem,
2013 : NULL,
2014 : TUPLESORT_NONE);
2015 :
2016 : /* Join parallel scan */
2017 0 : indexInfo = BuildIndexInfo(index);
2018 0 : indexInfo->ii_Concurrent = ginshared->isconcurrent;
2019 :
2020 0 : scan = table_beginscan_parallel(heap,
2021 : ParallelTableScanFromGinBuildShared(ginshared));
2022 :
2023 0 : reltuples = table_index_build_scan(heap, index, indexInfo, true, progress,
2024 : ginBuildCallbackParallel, state, scan);
2025 :
2026 : /* write remaining accumulated entries */
2027 0 : ginFlushBuildState(state, index);
2028 :
2029 : /*
2030 : * Do the first phase of in-worker processing - sort the data produced by
2031 : * the callback, and combine them into much larger chunks and place that
2032 : * into the shared tuplestore for leader to process.
2033 : */
2034 0 : _gin_process_worker_data(state, state->bs_worker_sort, progress);
2035 :
2036 : /* sort the GIN tuples built by this worker */
2037 0 : tuplesort_performsort(state->bs_sortstate);
2038 :
2039 0 : state->bs_reltuples += reltuples;
2040 :
2041 : /*
2042 : * Done. Record ambuild statistics.
2043 : */
2044 0 : SpinLockAcquire(&ginshared->mutex);
2045 0 : ginshared->nparticipantsdone++;
2046 0 : ginshared->reltuples += state->bs_reltuples;
2047 0 : ginshared->indtuples += state->bs_numtuples;
2048 0 : SpinLockRelease(&ginshared->mutex);
2049 :
2050 : /* Notify leader */
2051 0 : ConditionVariableSignal(&ginshared->workersdonecv);
2052 :
2053 0 : tuplesort_end(state->bs_sortstate);
2054 0 : }
2055 :
2056 : /*
2057 : * Perform work within a launched parallel process.
2058 : */
2059 : void
2060 0 : _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
2061 : {
2062 : char *sharedquery;
2063 : GinBuildShared *ginshared;
2064 : Sharedsort *sharedsort;
2065 : GinBuildState buildstate;
2066 : Relation heapRel;
2067 : Relation indexRel;
2068 : LOCKMODE heapLockmode;
2069 : LOCKMODE indexLockmode;
2070 : WalUsage *walusage;
2071 : BufferUsage *bufferusage;
2072 : int sortmem;
2073 :
2074 : /*
2075 : * The only possible status flag that can be set to the parallel worker is
2076 : * PROC_IN_SAFE_IC.
2077 : */
2078 : Assert((MyProc->statusFlags == 0) ||
2079 : (MyProc->statusFlags == PROC_IN_SAFE_IC));
2080 :
2081 : /* Set debug_query_string for individual workers first */
2082 0 : sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
2083 0 : debug_query_string = sharedquery;
2084 :
2085 : /* Report the query string from leader */
2086 0 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
2087 :
2088 : /* Look up gin shared state */
2089 0 : ginshared = shm_toc_lookup(toc, PARALLEL_KEY_GIN_SHARED, false);
2090 :
2091 : /* Open relations using lock modes known to be obtained by index.c */
2092 0 : if (!ginshared->isconcurrent)
2093 : {
2094 0 : heapLockmode = ShareLock;
2095 0 : indexLockmode = AccessExclusiveLock;
2096 : }
2097 : else
2098 : {
2099 0 : heapLockmode = ShareUpdateExclusiveLock;
2100 0 : indexLockmode = RowExclusiveLock;
2101 : }
2102 :
2103 : /* Open relations within worker */
2104 0 : heapRel = table_open(ginshared->heaprelid, heapLockmode);
2105 0 : indexRel = index_open(ginshared->indexrelid, indexLockmode);
2106 :
2107 : /* initialize the GIN build state */
2108 0 : initGinState(&buildstate.ginstate, indexRel);
2109 0 : buildstate.indtuples = 0;
2110 0 : memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
2111 0 : memset(&buildstate.tid, 0, sizeof(ItemPointerData));
2112 :
2113 : /*
2114 : * create a temporary memory context that is used to hold data not yet
2115 : * dumped out to the index
2116 : */
2117 0 : buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
2118 : "Gin build temporary context",
2119 : ALLOCSET_DEFAULT_SIZES);
2120 :
2121 : /*
2122 : * create a temporary memory context that is used for calling
2123 : * ginExtractEntries(), and can be reset after each tuple
2124 : */
2125 0 : buildstate.funcCtx = AllocSetContextCreate(CurrentMemoryContext,
2126 : "Gin build temporary context for user-defined function",
2127 : ALLOCSET_DEFAULT_SIZES);
2128 :
2129 0 : buildstate.accum.ginstate = &buildstate.ginstate;
2130 0 : ginInitBA(&buildstate.accum);
2131 :
2132 :
2133 : /* Look up shared state private to tuplesort.c */
2134 0 : sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
2135 0 : tuplesort_attach_shared(sharedsort, seg);
2136 :
2137 : /* Prepare to track buffer usage during parallel execution */
2138 0 : InstrStartParallelQuery();
2139 :
2140 : /*
2141 : * Might as well use reliable figure when doling out maintenance_work_mem
2142 : * (when requested number of workers were not launched, this will be
2143 : * somewhat higher than it is for other workers).
2144 : */
2145 0 : sortmem = maintenance_work_mem / ginshared->scantuplesortstates;
2146 :
2147 0 : _gin_parallel_scan_and_build(&buildstate, ginshared, sharedsort,
2148 : heapRel, indexRel, sortmem, false);
2149 :
2150 : /* Report WAL/buffer usage during parallel execution */
2151 0 : bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
2152 0 : walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
2153 0 : InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
2154 0 : &walusage[ParallelWorkerNumber]);
2155 :
2156 0 : index_close(indexRel, indexLockmode);
2157 0 : table_close(heapRel, heapLockmode);
2158 0 : }
2159 :
2160 : /*
2161 : * Used to keep track of compressed TID lists when building a GIN tuple.
2162 : */
2163 : typedef struct
2164 : {
2165 : dlist_node node; /* linked list pointers */
2166 : GinPostingList *seg;
2167 : } GinSegmentInfo;
2168 :
2169 : /*
2170 : * _gin_build_tuple
2171 : * Serialize the state for an index key into a tuple for tuplesort.
2172 : *
2173 : * The tuple has a number of scalar fields (mostly matching the build state),
2174 : * and then a data array that stores the key first, and then the TID list.
2175 : *
2176 : * For by-reference data types, we store the actual data. For by-val types
2177 : * we simply copy the whole Datum, so that we don't have to care about stuff
2178 : * like endianess etc. We could make it a little bit smaller, but it's not
2179 : * worth it - it's a tiny fraction of the data, and we need to MAXALIGN the
2180 : * start of the TID list anyway. So we wouldn't save anything.
2181 : *
2182 : * The TID list is serialized as compressed - it's highly compressible, and
2183 : * we already have ginCompressPostingList for this purpose. The list may be
2184 : * pretty long, so we compress it into multiple segments and then copy all
2185 : * of that into the GIN tuple.
2186 : */
2187 : static GinTuple *
2188 0 : _gin_build_tuple(OffsetNumber attrnum, unsigned char category,
2189 : Datum key, int16 typlen, bool typbyval,
2190 : ItemPointerData *items, uint32 nitems,
2191 : Size *len)
2192 : {
2193 : GinTuple *tuple;
2194 : char *ptr;
2195 :
2196 : Size tuplen;
2197 : int keylen;
2198 :
2199 : dlist_mutable_iter iter;
2200 : dlist_head segments;
2201 : int ncompressed;
2202 : Size compresslen;
2203 :
2204 : /*
2205 : * Calculate how long is the key value. Only keys with GIN_CAT_NORM_KEY
2206 : * have actual non-empty key. We include varlena headers and \0 bytes for
2207 : * strings, to make it easier to access the data in-line.
2208 : *
2209 : * For byval types we simply copy the whole Datum. We could store just the
2210 : * necessary bytes, but this is simpler to work with and not worth the
2211 : * extra complexity. Moreover we still need to do the MAXALIGN to allow
2212 : * direct access to items pointers.
2213 : *
2214 : * XXX Note that for byval types we store the whole datum, no matter what
2215 : * the typlen value is.
2216 : */
2217 0 : if (category != GIN_CAT_NORM_KEY)
2218 0 : keylen = 0;
2219 0 : else if (typbyval)
2220 0 : keylen = sizeof(Datum);
2221 0 : else if (typlen > 0)
2222 0 : keylen = typlen;
2223 0 : else if (typlen == -1)
2224 0 : keylen = VARSIZE_ANY(key);
2225 0 : else if (typlen == -2)
2226 0 : keylen = strlen(DatumGetPointer(key)) + 1;
2227 : else
2228 0 : elog(ERROR, "unexpected typlen value (%d)", typlen);
2229 :
2230 : /* compress the item pointers */
2231 0 : ncompressed = 0;
2232 0 : compresslen = 0;
2233 0 : dlist_init(&segments);
2234 :
2235 : /* generate compressed segments of TID list chunks */
2236 0 : while (ncompressed < nitems)
2237 : {
2238 : int cnt;
2239 0 : GinSegmentInfo *seginfo = palloc(sizeof(GinSegmentInfo));
2240 :
2241 0 : seginfo->seg = ginCompressPostingList(&items[ncompressed],
2242 0 : (nitems - ncompressed),
2243 : UINT16_MAX,
2244 : &cnt);
2245 :
2246 0 : ncompressed += cnt;
2247 0 : compresslen += SizeOfGinPostingList(seginfo->seg);
2248 :
2249 0 : dlist_push_tail(&segments, &seginfo->node);
2250 : }
2251 :
2252 : /*
2253 : * Determine GIN tuple length with all the data included. Be careful about
2254 : * alignment, to allow direct access to compressed segments (those require
2255 : * only SHORTALIGN).
2256 : */
2257 0 : tuplen = SHORTALIGN(offsetof(GinTuple, data) + keylen) + compresslen;
2258 :
2259 0 : *len = tuplen;
2260 :
2261 : /*
2262 : * Allocate space for the whole GIN tuple.
2263 : *
2264 : * The palloc0 is needed - writetup_index_gin will write the whole tuple
2265 : * to disk, so we need to make sure the padding bytes are defined
2266 : * (otherwise valgrind would report this).
2267 : */
2268 0 : tuple = palloc0(tuplen);
2269 :
2270 0 : tuple->tuplen = tuplen;
2271 0 : tuple->attrnum = attrnum;
2272 0 : tuple->category = category;
2273 0 : tuple->keylen = keylen;
2274 0 : tuple->nitems = nitems;
2275 :
2276 : /* key type info */
2277 0 : tuple->typlen = typlen;
2278 0 : tuple->typbyval = typbyval;
2279 :
2280 : /*
2281 : * Copy the key and items into the tuple. First the key value, which we
2282 : * can simply copy right at the beginning of the data array.
2283 : */
2284 0 : if (category == GIN_CAT_NORM_KEY)
2285 : {
2286 0 : if (typbyval)
2287 : {
2288 0 : memcpy(tuple->data, &key, sizeof(Datum));
2289 : }
2290 0 : else if (typlen > 0) /* byref, fixed length */
2291 : {
2292 0 : memcpy(tuple->data, DatumGetPointer(key), typlen);
2293 : }
2294 0 : else if (typlen == -1)
2295 : {
2296 0 : memcpy(tuple->data, DatumGetPointer(key), keylen);
2297 : }
2298 0 : else if (typlen == -2)
2299 : {
2300 0 : memcpy(tuple->data, DatumGetPointer(key), keylen);
2301 : }
2302 : }
2303 :
2304 : /* finally, copy the TIDs into the array */
2305 0 : ptr = (char *) tuple + SHORTALIGN(offsetof(GinTuple, data) + keylen);
2306 :
2307 : /* copy in the compressed data, and free the segments */
2308 0 : dlist_foreach_modify(iter, &segments)
2309 : {
2310 0 : GinSegmentInfo *seginfo = dlist_container(GinSegmentInfo, node, iter.cur);
2311 :
2312 0 : memcpy(ptr, seginfo->seg, SizeOfGinPostingList(seginfo->seg));
2313 :
2314 0 : ptr += SizeOfGinPostingList(seginfo->seg);
2315 :
2316 0 : dlist_delete(&seginfo->node);
2317 :
2318 0 : pfree(seginfo->seg);
2319 0 : pfree(seginfo);
2320 : }
2321 :
2322 0 : return tuple;
2323 : }
2324 :
2325 : /*
2326 : * _gin_parse_tuple_key
2327 : * Return a Datum representing the key stored in the tuple.
2328 : *
2329 : * Most of the tuple fields are directly accessible, the only thing that
2330 : * needs more care is the key and the TID list.
2331 : *
2332 : * For the key, this returns a regular Datum representing it. It's either the
2333 : * actual key value, or a pointer to the beginning of the data array (which is
2334 : * where the data was copied by _gin_build_tuple).
2335 : */
2336 : static Datum
2337 0 : _gin_parse_tuple_key(GinTuple *a)
2338 : {
2339 : Datum key;
2340 :
2341 0 : if (a->category != GIN_CAT_NORM_KEY)
2342 0 : return (Datum) 0;
2343 :
2344 0 : if (a->typbyval)
2345 : {
2346 0 : memcpy(&key, a->data, a->keylen);
2347 0 : return key;
2348 : }
2349 :
2350 0 : return PointerGetDatum(a->data);
2351 : }
2352 :
2353 : /*
2354 : * _gin_parse_tuple_items
2355 : * Return a pointer to a palloc'd array of decompressed TID array.
2356 : */
2357 : static ItemPointer
2358 0 : _gin_parse_tuple_items(GinTuple *a)
2359 : {
2360 : int len;
2361 : char *ptr;
2362 : int ndecoded;
2363 : ItemPointer items;
2364 :
2365 0 : len = a->tuplen - SHORTALIGN(offsetof(GinTuple, data) + a->keylen);
2366 0 : ptr = (char *) a + SHORTALIGN(offsetof(GinTuple, data) + a->keylen);
2367 :
2368 0 : items = ginPostingListDecodeAllSegments((GinPostingList *) ptr, len, &ndecoded);
2369 :
2370 : Assert(ndecoded == a->nitems);
2371 :
2372 0 : return (ItemPointer) items;
2373 : }
2374 :
2375 : /*
2376 : * _gin_compare_tuples
2377 : * Compare GIN tuples, used by tuplesort during parallel index build.
2378 : *
2379 : * The scalar fields (attrnum, category) are compared first, the key value is
2380 : * compared last. The comparisons are done using type-specific sort support
2381 : * functions.
2382 : *
2383 : * If the key value matches, we compare the first TID value in the TID list,
2384 : * which means the tuples are merged in an order in which they are most
2385 : * likely to be simply concatenated. (This "first" TID will also allow us
2386 : * to determine a point up to which the list is fully determined and can be
2387 : * written into the index to enforce a memory limit etc.)
2388 : */
2389 : int
2390 0 : _gin_compare_tuples(GinTuple *a, GinTuple *b, SortSupport ssup)
2391 : {
2392 : int r;
2393 : Datum keya,
2394 : keyb;
2395 :
2396 0 : if (a->attrnum < b->attrnum)
2397 0 : return -1;
2398 :
2399 0 : if (a->attrnum > b->attrnum)
2400 0 : return 1;
2401 :
2402 0 : if (a->category < b->category)
2403 0 : return -1;
2404 :
2405 0 : if (a->category > b->category)
2406 0 : return 1;
2407 :
2408 0 : if (a->category == GIN_CAT_NORM_KEY)
2409 : {
2410 0 : keya = _gin_parse_tuple_key(a);
2411 0 : keyb = _gin_parse_tuple_key(b);
2412 :
2413 0 : r = ApplySortComparator(keya, false,
2414 : keyb, false,
2415 0 : &ssup[a->attrnum - 1]);
2416 :
2417 : /* if the key is the same, consider the first TID in the array */
2418 0 : return (r != 0) ? r : ItemPointerCompare(GinTupleGetFirst(a),
2419 : GinTupleGetFirst(b));
2420 : }
2421 :
2422 0 : return ItemPointerCompare(GinTupleGetFirst(a),
2423 : GinTupleGetFirst(b));
2424 : }
|