Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * ginfast.c
4 : * Fast insert routines for the Postgres inverted index access method.
5 : * Pending entries are stored in linear list of pages. Later on
6 : * (typically during VACUUM), ginInsertCleanup() will be invoked to
7 : * transfer pending entries into the regular index structure. This
8 : * wins because bulk insertion is much more efficient than retail.
9 : *
10 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
11 : * Portions Copyright (c) 1994, Regents of the University of California
12 : *
13 : * IDENTIFICATION
14 : * src/backend/access/gin/ginfast.c
15 : *
16 : *-------------------------------------------------------------------------
17 : */
18 :
19 : #include "postgres.h"
20 :
21 : #include "access/gin_private.h"
22 : #include "access/ginxlog.h"
23 : #include "access/xlog.h"
24 : #include "access/xloginsert.h"
25 : #include "catalog/pg_am.h"
26 : #include "commands/vacuum.h"
27 : #include "miscadmin.h"
28 : #include "port/pg_bitutils.h"
29 : #include "postmaster/autovacuum.h"
30 : #include "storage/indexfsm.h"
31 : #include "storage/lmgr.h"
32 : #include "storage/predicate.h"
33 : #include "utils/acl.h"
34 : #include "utils/builtins.h"
35 : #include "utils/memutils.h"
36 : #include "utils/rel.h"
37 :
38 : /* GUC parameter */
39 : int gin_pending_list_limit = 0;
40 :
41 : #define GIN_PAGE_FREESIZE \
42 : ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
43 :
44 : typedef struct KeyArray
45 : {
46 : Datum *keys; /* expansible array */
47 : GinNullCategory *categories; /* another expansible array */
48 : int32 nvalues; /* current number of valid entries */
49 : int32 maxvalues; /* allocated size of arrays */
50 : } KeyArray;
51 :
52 :
53 : /*
54 : * Build a pending-list page from the given array of tuples, and write it out.
55 : *
56 : * Returns amount of free space left on the page.
57 : */
58 : static int32
59 2864 : writeListPage(Relation index, Buffer buffer,
60 : IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
61 : {
62 2864 : Page page = BufferGetPage(buffer);
63 : int32 i,
64 : freesize,
65 2864 : size = 0;
66 : OffsetNumber l,
67 : off;
68 : PGAlignedBlock workspace;
69 : char *ptr;
70 :
71 2864 : START_CRIT_SECTION();
72 :
73 2864 : GinInitBuffer(buffer, GIN_LIST);
74 :
75 2864 : off = FirstOffsetNumber;
76 2864 : ptr = workspace.data;
77 :
78 16724 : for (i = 0; i < ntuples; i++)
79 : {
80 13860 : int this_size = IndexTupleSize(tuples[i]);
81 :
82 13860 : memcpy(ptr, tuples[i], this_size);
83 13860 : ptr += this_size;
84 13860 : size += this_size;
85 :
86 13860 : l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
87 :
88 13860 : if (l == InvalidOffsetNumber)
89 0 : elog(ERROR, "failed to add item to index page in \"%s\"",
90 : RelationGetRelationName(index));
91 :
92 13860 : off++;
93 : }
94 :
95 : Assert(size <= BLCKSZ); /* else we overran workspace */
96 :
97 2864 : GinPageGetOpaque(page)->rightlink = rightlink;
98 :
99 : /*
100 : * tail page may contain only whole row(s) or final part of row placed on
101 : * previous pages (a "row" here meaning all the index tuples generated for
102 : * one heap tuple)
103 : */
104 2864 : if (rightlink == InvalidBlockNumber)
105 : {
106 2864 : GinPageSetFullRow(page);
107 2864 : GinPageGetOpaque(page)->maxoff = 1;
108 : }
109 : else
110 : {
111 0 : GinPageGetOpaque(page)->maxoff = 0;
112 : }
113 :
114 2864 : MarkBufferDirty(buffer);
115 :
116 2864 : if (RelationNeedsWAL(index))
117 : {
118 : ginxlogInsertListPage data;
119 : XLogRecPtr recptr;
120 :
121 1080 : data.rightlink = rightlink;
122 1080 : data.ntuples = ntuples;
123 :
124 1080 : XLogBeginInsert();
125 1080 : XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
126 :
127 1080 : XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
128 1080 : XLogRegisterBufData(0, workspace.data, size);
129 :
130 1080 : recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
131 1080 : PageSetLSN(page, recptr);
132 : }
133 :
134 : /* get free space before releasing buffer */
135 2864 : freesize = PageGetExactFreeSpace(page);
136 :
137 2864 : UnlockReleaseBuffer(buffer);
138 :
139 2864 : END_CRIT_SECTION();
140 :
141 2864 : return freesize;
142 : }
143 :
144 : static void
145 2864 : makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
146 : GinMetaPageData *res)
147 : {
148 2864 : Buffer curBuffer = InvalidBuffer;
149 2864 : Buffer prevBuffer = InvalidBuffer;
150 : int i,
151 2864 : size = 0,
152 : tupsize;
153 2864 : int startTuple = 0;
154 :
155 : Assert(ntuples > 0);
156 :
157 : /*
158 : * Split tuples into pages
159 : */
160 16724 : for (i = 0; i < ntuples; i++)
161 : {
162 13860 : if (curBuffer == InvalidBuffer)
163 : {
164 2864 : curBuffer = GinNewBuffer(index);
165 :
166 2864 : if (prevBuffer != InvalidBuffer)
167 : {
168 0 : res->nPendingPages++;
169 0 : writeListPage(index, prevBuffer,
170 0 : tuples + startTuple,
171 : i - startTuple,
172 : BufferGetBlockNumber(curBuffer));
173 : }
174 : else
175 : {
176 2864 : res->head = BufferGetBlockNumber(curBuffer);
177 : }
178 :
179 2864 : prevBuffer = curBuffer;
180 2864 : startTuple = i;
181 2864 : size = 0;
182 : }
183 :
184 13860 : tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
185 :
186 13860 : if (size + tupsize > GinListPageSize)
187 : {
188 : /* won't fit, force a new page and reprocess */
189 0 : i--;
190 0 : curBuffer = InvalidBuffer;
191 : }
192 : else
193 : {
194 13860 : size += tupsize;
195 : }
196 : }
197 :
198 : /*
199 : * Write last page
200 : */
201 2864 : res->tail = BufferGetBlockNumber(curBuffer);
202 5728 : res->tailFreeSize = writeListPage(index, curBuffer,
203 2864 : tuples + startTuple,
204 : ntuples - startTuple,
205 : InvalidBlockNumber);
206 2864 : res->nPendingPages++;
207 : /* that was only one heap tuple */
208 2864 : res->nPendingHeapTuples = 1;
209 2864 : }
210 :
211 : /*
212 : * Write the index tuples contained in *collector into the index's
213 : * pending list.
214 : *
215 : * Function guarantees that all these tuples will be inserted consecutively,
216 : * preserving order
217 : */
218 : void
219 263950 : ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
220 : {
221 263950 : Relation index = ginstate->index;
222 : Buffer metabuffer;
223 : Page metapage;
224 263950 : GinMetaPageData *metadata = NULL;
225 263950 : Buffer buffer = InvalidBuffer;
226 263950 : Page page = NULL;
227 : ginxlogUpdateMeta data;
228 263950 : bool separateList = false;
229 263950 : bool needCleanup = false;
230 : int cleanupSize;
231 : bool needWal;
232 :
233 263950 : if (collector->ntuples == 0)
234 0 : return;
235 :
236 263950 : needWal = RelationNeedsWAL(index);
237 :
238 263950 : data.locator = index->rd_locator;
239 263950 : data.ntuples = 0;
240 263950 : data.newRightlink = data.prevTail = InvalidBlockNumber;
241 :
242 263950 : metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
243 263950 : metapage = BufferGetPage(metabuffer);
244 :
245 : /*
246 : * An insertion to the pending list could logically belong anywhere in the
247 : * tree, so it conflicts with all serializable scans. All scans acquire a
248 : * predicate lock on the metabuffer to represent that.
249 : */
250 263950 : CheckForSerializableConflictIn(index, NULL, GIN_METAPAGE_BLKNO);
251 :
252 263944 : if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
253 : {
254 : /*
255 : * Total size is greater than one page => make sublist
256 : */
257 0 : separateList = true;
258 : }
259 : else
260 : {
261 263944 : LockBuffer(metabuffer, GIN_EXCLUSIVE);
262 263944 : metadata = GinPageGetMeta(metapage);
263 :
264 263944 : if (metadata->head == InvalidBlockNumber ||
265 263894 : collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
266 : {
267 : /*
268 : * Pending list is empty or total size is greater than freespace
269 : * on tail page => make sublist
270 : *
271 : * We unlock metabuffer to keep high concurrency
272 : */
273 2864 : separateList = true;
274 2864 : LockBuffer(metabuffer, GIN_UNLOCK);
275 : }
276 : }
277 :
278 263944 : if (separateList)
279 : {
280 : /*
281 : * We should make sublist separately and append it to the tail
282 : */
283 : GinMetaPageData sublist;
284 :
285 2864 : memset(&sublist, 0, sizeof(GinMetaPageData));
286 2864 : makeSublist(index, collector->tuples, collector->ntuples, &sublist);
287 :
288 : /*
289 : * metapage was unlocked, see above
290 : */
291 2864 : LockBuffer(metabuffer, GIN_EXCLUSIVE);
292 2864 : metadata = GinPageGetMeta(metapage);
293 :
294 2864 : if (metadata->head == InvalidBlockNumber)
295 : {
296 : /*
297 : * Main list is empty, so just insert sublist as main list
298 : */
299 50 : START_CRIT_SECTION();
300 :
301 50 : metadata->head = sublist.head;
302 50 : metadata->tail = sublist.tail;
303 50 : metadata->tailFreeSize = sublist.tailFreeSize;
304 :
305 50 : metadata->nPendingPages = sublist.nPendingPages;
306 50 : metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
307 :
308 50 : if (needWal)
309 30 : XLogBeginInsert();
310 : }
311 : else
312 : {
313 : /*
314 : * Merge lists
315 : */
316 2814 : data.prevTail = metadata->tail;
317 2814 : data.newRightlink = sublist.head;
318 :
319 2814 : buffer = ReadBuffer(index, metadata->tail);
320 2814 : LockBuffer(buffer, GIN_EXCLUSIVE);
321 2814 : page = BufferGetPage(buffer);
322 :
323 : Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
324 :
325 2814 : START_CRIT_SECTION();
326 :
327 2814 : GinPageGetOpaque(page)->rightlink = sublist.head;
328 :
329 2814 : MarkBufferDirty(buffer);
330 :
331 2814 : metadata->tail = sublist.tail;
332 2814 : metadata->tailFreeSize = sublist.tailFreeSize;
333 :
334 2814 : metadata->nPendingPages += sublist.nPendingPages;
335 2814 : metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
336 :
337 2814 : if (needWal)
338 : {
339 1050 : XLogBeginInsert();
340 1050 : XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
341 : }
342 : }
343 : }
344 : else
345 : {
346 : /*
347 : * Insert into tail page. Metapage is already locked
348 : */
349 : OffsetNumber l,
350 : off;
351 : int i,
352 : tupsize;
353 : char *ptr;
354 : char *collectordata;
355 :
356 261080 : buffer = ReadBuffer(index, metadata->tail);
357 261080 : LockBuffer(buffer, GIN_EXCLUSIVE);
358 261080 : page = BufferGetPage(buffer);
359 :
360 261080 : off = (PageIsEmpty(page)) ? FirstOffsetNumber :
361 261080 : OffsetNumberNext(PageGetMaxOffsetNumber(page));
362 :
363 261080 : collectordata = ptr = (char *) palloc(collector->sumsize);
364 :
365 261080 : data.ntuples = collector->ntuples;
366 :
367 261080 : START_CRIT_SECTION();
368 :
369 261080 : if (needWal)
370 142770 : XLogBeginInsert();
371 :
372 : /*
373 : * Increase counter of heap tuples
374 : */
375 : Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
376 261080 : GinPageGetOpaque(page)->maxoff++;
377 261080 : metadata->nPendingHeapTuples++;
378 :
379 1398812 : for (i = 0; i < collector->ntuples; i++)
380 : {
381 1137732 : tupsize = IndexTupleSize(collector->tuples[i]);
382 1137732 : l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
383 :
384 1137732 : if (l == InvalidOffsetNumber)
385 0 : elog(ERROR, "failed to add item to index page in \"%s\"",
386 : RelationGetRelationName(index));
387 :
388 1137732 : memcpy(ptr, collector->tuples[i], tupsize);
389 1137732 : ptr += tupsize;
390 :
391 1137732 : off++;
392 : }
393 :
394 : Assert((ptr - collectordata) <= collector->sumsize);
395 261080 : if (needWal)
396 : {
397 142770 : XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
398 142770 : XLogRegisterBufData(1, collectordata, collector->sumsize);
399 : }
400 :
401 261080 : metadata->tailFreeSize = PageGetExactFreeSpace(page);
402 :
403 261080 : MarkBufferDirty(buffer);
404 : }
405 :
406 : /*
407 : * Set pd_lower just past the end of the metadata. This is essential,
408 : * because without doing so, metadata will be lost if xlog.c compresses
409 : * the page. (We must do this here because pre-v11 versions of PG did not
410 : * set the metapage's pd_lower correctly, so a pg_upgraded index might
411 : * contain the wrong value.)
412 : */
413 263944 : ((PageHeader) metapage)->pd_lower =
414 263944 : ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
415 :
416 : /*
417 : * Write metabuffer, make xlog entry
418 : */
419 263944 : MarkBufferDirty(metabuffer);
420 :
421 263944 : if (needWal)
422 : {
423 : XLogRecPtr recptr;
424 :
425 143850 : memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
426 :
427 143850 : XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
428 143850 : XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
429 :
430 143850 : recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
431 143850 : PageSetLSN(metapage, recptr);
432 :
433 143850 : if (buffer != InvalidBuffer)
434 : {
435 143820 : PageSetLSN(page, recptr);
436 : }
437 : }
438 :
439 263944 : if (buffer != InvalidBuffer)
440 263894 : UnlockReleaseBuffer(buffer);
441 :
442 : /*
443 : * Force pending list cleanup when it becomes too long. And,
444 : * ginInsertCleanup could take significant amount of time, so we prefer to
445 : * call it when it can do all the work in a single collection cycle. In
446 : * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
447 : * while pending list is still small enough to fit into
448 : * gin_pending_list_limit.
449 : *
450 : * ginInsertCleanup() should not be called inside our CRIT_SECTION.
451 : */
452 263944 : cleanupSize = GinGetPendingListCleanupSize(index);
453 263944 : if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
454 0 : needCleanup = true;
455 :
456 263944 : UnlockReleaseBuffer(metabuffer);
457 :
458 263944 : END_CRIT_SECTION();
459 :
460 : /*
461 : * Since it could contend with concurrent cleanup process we cleanup
462 : * pending list not forcibly.
463 : */
464 263944 : if (needCleanup)
465 0 : ginInsertCleanup(ginstate, false, true, false, NULL);
466 : }
467 :
468 : /*
469 : * Create temporary index tuples for a single indexable item (one index column
470 : * for the heap tuple specified by ht_ctid), and append them to the array
471 : * in *collector. They will subsequently be written out using
472 : * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
473 : * temp tuples for a given heap tuple must be written in one call to
474 : * ginHeapTupleFastInsert.
475 : */
476 : void
477 384028 : ginHeapTupleFastCollect(GinState *ginstate,
478 : GinTupleCollector *collector,
479 : OffsetNumber attnum, Datum value, bool isNull,
480 : ItemPointer ht_ctid)
481 : {
482 : Datum *entries;
483 : GinNullCategory *categories;
484 : int32 i,
485 : nentries;
486 :
487 : /*
488 : * Extract the key values that need to be inserted in the index
489 : */
490 384028 : entries = ginExtractEntries(ginstate, attnum, value, isNull,
491 : &nentries, &categories);
492 :
493 : /*
494 : * Protect against integer overflow in allocation calculations
495 : */
496 384028 : if (nentries < 0 ||
497 384028 : collector->ntuples + nentries > MaxAllocSize / sizeof(IndexTuple))
498 0 : elog(ERROR, "too many entries for GIN index");
499 :
500 : /*
501 : * Allocate/reallocate memory for storing collected tuples
502 : */
503 384028 : if (collector->tuples == NULL)
504 : {
505 : /*
506 : * Determine the number of elements to allocate in the tuples array
507 : * initially. Make it a power of 2 to avoid wasting memory when
508 : * resizing (since palloc likes powers of 2).
509 : */
510 263950 : collector->lentuples = pg_nextpower2_32(Max(16, nentries));
511 263950 : collector->tuples = palloc_array(IndexTuple, collector->lentuples);
512 : }
513 120078 : else if (collector->lentuples < collector->ntuples + nentries)
514 : {
515 : /*
516 : * Advance lentuples to the next suitable power of 2. This won't
517 : * overflow, though we could get to a value that exceeds
518 : * MaxAllocSize/sizeof(IndexTuple), causing an error in repalloc.
519 : */
520 0 : collector->lentuples = pg_nextpower2_32(collector->ntuples + nentries);
521 0 : collector->tuples = repalloc_array(collector->tuples,
522 : IndexTuple, collector->lentuples);
523 : }
524 :
525 : /*
526 : * Build an index tuple for each key value, and add to array. In pending
527 : * tuples we just stick the heap TID into t_tid.
528 : */
529 1535626 : for (i = 0; i < nentries; i++)
530 : {
531 : IndexTuple itup;
532 :
533 1151598 : itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
534 : NULL, 0, 0, true);
535 1151598 : itup->t_tid = *ht_ctid;
536 1151598 : collector->tuples[collector->ntuples++] = itup;
537 1151598 : collector->sumsize += IndexTupleSize(itup);
538 : }
539 384028 : }
540 :
541 : /*
542 : * Deletes pending list pages up to (not including) newHead page.
543 : * If newHead == InvalidBlockNumber then function drops the whole list.
544 : *
545 : * metapage is pinned and exclusive-locked throughout this function.
546 : */
547 : static void
548 30 : shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
549 : bool fill_fsm, IndexBulkDeleteResult *stats)
550 : {
551 : Page metapage;
552 : GinMetaPageData *metadata;
553 : BlockNumber blknoToDelete;
554 :
555 30 : metapage = BufferGetPage(metabuffer);
556 30 : metadata = GinPageGetMeta(metapage);
557 30 : blknoToDelete = metadata->head;
558 :
559 : do
560 : {
561 : Page page;
562 : int i;
563 192 : int64 nDeletedHeapTuples = 0;
564 : ginxlogDeleteListPages data;
565 : Buffer buffers[GIN_NDELETE_AT_ONCE];
566 : BlockNumber freespace[GIN_NDELETE_AT_ONCE];
567 :
568 192 : data.ndeleted = 0;
569 3036 : while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
570 : {
571 2844 : freespace[data.ndeleted] = blknoToDelete;
572 2844 : buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
573 2844 : LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
574 2844 : page = BufferGetPage(buffers[data.ndeleted]);
575 :
576 2844 : data.ndeleted++;
577 :
578 : Assert(!GinPageIsDeleted(page));
579 :
580 2844 : nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
581 2844 : blknoToDelete = GinPageGetOpaque(page)->rightlink;
582 : }
583 :
584 192 : if (stats)
585 192 : stats->pages_deleted += data.ndeleted;
586 :
587 : /*
588 : * This operation touches an unusually large number of pages, so
589 : * prepare the XLogInsert machinery for that before entering the
590 : * critical section.
591 : */
592 192 : if (RelationNeedsWAL(index))
593 78 : XLogEnsureRecordSpace(data.ndeleted, 0);
594 :
595 192 : START_CRIT_SECTION();
596 :
597 192 : metadata->head = blknoToDelete;
598 :
599 : Assert(metadata->nPendingPages >= data.ndeleted);
600 192 : metadata->nPendingPages -= data.ndeleted;
601 : Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
602 192 : metadata->nPendingHeapTuples -= nDeletedHeapTuples;
603 :
604 192 : if (blknoToDelete == InvalidBlockNumber)
605 : {
606 30 : metadata->tail = InvalidBlockNumber;
607 30 : metadata->tailFreeSize = 0;
608 30 : metadata->nPendingPages = 0;
609 30 : metadata->nPendingHeapTuples = 0;
610 : }
611 :
612 : /*
613 : * Set pd_lower just past the end of the metadata. This is essential,
614 : * because without doing so, metadata will be lost if xlog.c
615 : * compresses the page. (We must do this here because pre-v11
616 : * versions of PG did not set the metapage's pd_lower correctly, so a
617 : * pg_upgraded index might contain the wrong value.)
618 : */
619 192 : ((PageHeader) metapage)->pd_lower =
620 192 : ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
621 :
622 192 : MarkBufferDirty(metabuffer);
623 :
624 3036 : for (i = 0; i < data.ndeleted; i++)
625 : {
626 2844 : page = BufferGetPage(buffers[i]);
627 2844 : GinPageGetOpaque(page)->flags = GIN_DELETED;
628 2844 : MarkBufferDirty(buffers[i]);
629 : }
630 :
631 192 : if (RelationNeedsWAL(index))
632 : {
633 : XLogRecPtr recptr;
634 :
635 78 : XLogBeginInsert();
636 78 : XLogRegisterBuffer(0, metabuffer,
637 : REGBUF_WILL_INIT | REGBUF_STANDARD);
638 1152 : for (i = 0; i < data.ndeleted; i++)
639 1074 : XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
640 :
641 78 : memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
642 :
643 78 : XLogRegisterData((char *) &data,
644 : sizeof(ginxlogDeleteListPages));
645 :
646 78 : recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
647 78 : PageSetLSN(metapage, recptr);
648 :
649 1152 : for (i = 0; i < data.ndeleted; i++)
650 : {
651 1074 : page = BufferGetPage(buffers[i]);
652 1074 : PageSetLSN(page, recptr);
653 : }
654 : }
655 :
656 3036 : for (i = 0; i < data.ndeleted; i++)
657 2844 : UnlockReleaseBuffer(buffers[i]);
658 :
659 192 : END_CRIT_SECTION();
660 :
661 2892 : for (i = 0; fill_fsm && i < data.ndeleted; i++)
662 2700 : RecordFreeIndexPage(index, freespace[i]);
663 :
664 192 : } while (blknoToDelete != newHead);
665 30 : }
666 :
667 : /* Initialize empty KeyArray */
668 : static void
669 30 : initKeyArray(KeyArray *keys, int32 maxvalues)
670 : {
671 30 : keys->keys = palloc_array(Datum, maxvalues);
672 30 : keys->categories = palloc_array(GinNullCategory, maxvalues);
673 30 : keys->nvalues = 0;
674 30 : keys->maxvalues = maxvalues;
675 30 : }
676 :
677 : /* Add datum to KeyArray, resizing if needed */
678 : static void
679 1151448 : addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
680 : {
681 1151448 : if (keys->nvalues >= keys->maxvalues)
682 : {
683 0 : keys->maxvalues *= 2;
684 0 : keys->keys = repalloc_array(keys->keys, Datum, keys->maxvalues);
685 0 : keys->categories = repalloc_array(keys->categories, GinNullCategory, keys->maxvalues);
686 : }
687 :
688 1151448 : keys->keys[keys->nvalues] = datum;
689 1151448 : keys->categories[keys->nvalues] = category;
690 1151448 : keys->nvalues++;
691 1151448 : }
692 :
693 : /*
694 : * Collect data from a pending-list page in preparation for insertion into
695 : * the main index.
696 : *
697 : * Go through all tuples >= startoff on page and collect values in accum
698 : *
699 : * Note that ka is just workspace --- it does not carry any state across
700 : * calls.
701 : */
702 : static void
703 2844 : processPendingPage(BuildAccumulator *accum, KeyArray *ka,
704 : Page page, OffsetNumber startoff)
705 : {
706 : ItemPointerData heapptr;
707 : OffsetNumber i,
708 : maxoff;
709 : OffsetNumber attrnum;
710 :
711 : /* reset *ka to empty */
712 2844 : ka->nvalues = 0;
713 :
714 2844 : maxoff = PageGetMaxOffsetNumber(page);
715 : Assert(maxoff >= FirstOffsetNumber);
716 2844 : ItemPointerSetInvalid(&heapptr);
717 2844 : attrnum = 0;
718 :
719 1154292 : for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
720 : {
721 1151448 : IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
722 : OffsetNumber curattnum;
723 : Datum curkey;
724 : GinNullCategory curcategory;
725 :
726 : /* Check for change of heap TID or attnum */
727 1151448 : curattnum = gintuple_get_attrnum(accum->ginstate, itup);
728 :
729 1151448 : if (!ItemPointerIsValid(&heapptr))
730 : {
731 2844 : heapptr = itup->t_tid;
732 2844 : attrnum = curattnum;
733 : }
734 1148604 : else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
735 : curattnum == attrnum))
736 : {
737 : /*
738 : * ginInsertBAEntries can insert several datums per call, but only
739 : * for one heap tuple and one column. So call it at a boundary,
740 : * and reset ka.
741 : */
742 381102 : ginInsertBAEntries(accum, &heapptr, attrnum,
743 : ka->keys, ka->categories, ka->nvalues);
744 381102 : ka->nvalues = 0;
745 381102 : heapptr = itup->t_tid;
746 381102 : attrnum = curattnum;
747 : }
748 :
749 : /* Add key to KeyArray */
750 1151448 : curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
751 1151448 : addDatum(ka, curkey, curcategory);
752 : }
753 :
754 : /* Dump out all remaining keys */
755 2844 : ginInsertBAEntries(accum, &heapptr, attrnum,
756 : ka->keys, ka->categories, ka->nvalues);
757 2844 : }
758 :
759 : /*
760 : * Move tuples from pending pages into regular GIN structure.
761 : *
762 : * On first glance it looks completely not crash-safe. But if we crash
763 : * after posting entries to the main index and before removing them from the
764 : * pending list, it's okay because when we redo the posting later on, nothing
765 : * bad will happen.
766 : *
767 : * fill_fsm indicates that ginInsertCleanup should add deleted pages
768 : * to FSM otherwise caller is responsible to put deleted pages into
769 : * FSM.
770 : *
771 : * If stats isn't null, we count deleted pending pages into the counts.
772 : */
773 : void
774 76 : ginInsertCleanup(GinState *ginstate, bool full_clean,
775 : bool fill_fsm, bool forceCleanup,
776 : IndexBulkDeleteResult *stats)
777 : {
778 76 : Relation index = ginstate->index;
779 : Buffer metabuffer,
780 : buffer;
781 : Page metapage,
782 : page;
783 : GinMetaPageData *metadata;
784 : MemoryContext opCtx,
785 : oldCtx;
786 : BuildAccumulator accum;
787 : KeyArray datums;
788 : BlockNumber blkno,
789 : blknoFinish;
790 76 : bool cleanupFinish = false;
791 76 : bool fsm_vac = false;
792 : Size workMemory;
793 :
794 : /*
795 : * We would like to prevent concurrent cleanup process. For that we will
796 : * lock metapage in exclusive mode using LockPage() call. Nobody other
797 : * will use that lock for metapage, so we keep possibility of concurrent
798 : * insertion into pending list
799 : */
800 :
801 76 : if (forceCleanup)
802 : {
803 : /*
804 : * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
805 : * and we would like to wait concurrent cleanup to finish.
806 : */
807 76 : LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
808 76 : workMemory =
809 80 : (IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
810 80 : autovacuum_work_mem : maintenance_work_mem;
811 : }
812 : else
813 : {
814 : /*
815 : * We are called from regular insert and if we see concurrent cleanup
816 : * just exit in hope that concurrent process will clean up pending
817 : * list.
818 : */
819 0 : if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
820 46 : return;
821 0 : workMemory = work_mem;
822 : }
823 :
824 76 : metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
825 76 : LockBuffer(metabuffer, GIN_SHARE);
826 76 : metapage = BufferGetPage(metabuffer);
827 76 : metadata = GinPageGetMeta(metapage);
828 :
829 76 : if (metadata->head == InvalidBlockNumber)
830 : {
831 : /* Nothing to do */
832 46 : UnlockReleaseBuffer(metabuffer);
833 46 : UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
834 46 : return;
835 : }
836 :
837 : /*
838 : * Remember a tail page to prevent infinite cleanup if other backends add
839 : * new tuples faster than we can cleanup.
840 : */
841 30 : blknoFinish = metadata->tail;
842 :
843 : /*
844 : * Read and lock head of pending list
845 : */
846 30 : blkno = metadata->head;
847 30 : buffer = ReadBuffer(index, blkno);
848 30 : LockBuffer(buffer, GIN_SHARE);
849 30 : page = BufferGetPage(buffer);
850 :
851 30 : LockBuffer(metabuffer, GIN_UNLOCK);
852 :
853 : /*
854 : * Initialize. All temporary space will be in opCtx
855 : */
856 30 : opCtx = AllocSetContextCreate(CurrentMemoryContext,
857 : "GIN insert cleanup temporary context",
858 : ALLOCSET_DEFAULT_SIZES);
859 :
860 30 : oldCtx = MemoryContextSwitchTo(opCtx);
861 :
862 30 : initKeyArray(&datums, 128);
863 30 : ginInitBA(&accum);
864 30 : accum.ginstate = ginstate;
865 :
866 : /*
867 : * At the top of this loop, we have pin and lock on the current page of
868 : * the pending list. However, we'll release that before exiting the loop.
869 : * Note we also have pin but not lock on the metapage.
870 : */
871 : for (;;)
872 : {
873 2814 : Assert(!GinPageIsDeleted(page));
874 :
875 : /*
876 : * Are we walk through the page which as we remember was a tail when
877 : * we start our cleanup? But if caller asks us to clean up whole
878 : * pending list then ignore old tail, we will work until list becomes
879 : * empty.
880 : */
881 2844 : if (blkno == blknoFinish && full_clean == false)
882 0 : cleanupFinish = true;
883 :
884 : /*
885 : * read page's datums into accum
886 : */
887 2844 : processPendingPage(&accum, &datums, page, FirstOffsetNumber);
888 :
889 2844 : vacuum_delay_point();
890 :
891 : /*
892 : * Is it time to flush memory to disk? Flush if we are at the end of
893 : * the pending list, or if we have a full row and memory is getting
894 : * full.
895 : */
896 2844 : if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
897 2814 : (GinPageHasFullRow(page) &&
898 2814 : (accum.allocatedMemory >= workMemory * 1024L)))
899 0 : {
900 : ItemPointerData *list;
901 : uint32 nlist;
902 : Datum key;
903 : GinNullCategory category;
904 : OffsetNumber maxoff,
905 : attnum;
906 :
907 : /*
908 : * Unlock current page to increase performance. Changes of page
909 : * will be checked later by comparing maxoff after completion of
910 : * memory flush.
911 : */
912 30 : maxoff = PageGetMaxOffsetNumber(page);
913 30 : LockBuffer(buffer, GIN_UNLOCK);
914 :
915 : /*
916 : * Moving collected data into regular structure can take
917 : * significant amount of time - so, run it without locking pending
918 : * list.
919 : */
920 30 : ginBeginBAScan(&accum);
921 366102 : while ((list = ginGetBAEntry(&accum,
922 : &attnum, &key, &category, &nlist)) != NULL)
923 : {
924 366072 : ginEntryInsert(ginstate, attnum, key, category,
925 : list, nlist, NULL);
926 366072 : vacuum_delay_point();
927 : }
928 :
929 : /*
930 : * Lock the whole list to remove pages
931 : */
932 30 : LockBuffer(metabuffer, GIN_EXCLUSIVE);
933 30 : LockBuffer(buffer, GIN_SHARE);
934 :
935 : Assert(!GinPageIsDeleted(page));
936 :
937 : /*
938 : * While we left the page unlocked, more stuff might have gotten
939 : * added to it. If so, process those entries immediately. There
940 : * shouldn't be very many, so we don't worry about the fact that
941 : * we're doing this with exclusive lock. Insertion algorithm
942 : * guarantees that inserted row(s) will not continue on next page.
943 : * NOTE: intentionally no vacuum_delay_point in this loop.
944 : */
945 30 : if (PageGetMaxOffsetNumber(page) != maxoff)
946 : {
947 0 : ginInitBA(&accum);
948 0 : processPendingPage(&accum, &datums, page, maxoff + 1);
949 :
950 0 : ginBeginBAScan(&accum);
951 0 : while ((list = ginGetBAEntry(&accum,
952 : &attnum, &key, &category, &nlist)) != NULL)
953 0 : ginEntryInsert(ginstate, attnum, key, category,
954 : list, nlist, NULL);
955 : }
956 :
957 : /*
958 : * Remember next page - it will become the new list head
959 : */
960 30 : blkno = GinPageGetOpaque(page)->rightlink;
961 30 : UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
962 : * locking */
963 :
964 : /*
965 : * remove read pages from pending list, at this point all content
966 : * of read pages is in regular structure
967 : */
968 30 : shiftList(index, metabuffer, blkno, fill_fsm, stats);
969 :
970 : /* At this point, some pending pages have been freed up */
971 30 : fsm_vac = true;
972 :
973 : Assert(blkno == metadata->head);
974 30 : LockBuffer(metabuffer, GIN_UNLOCK);
975 :
976 : /*
977 : * if we removed the whole pending list or we cleanup tail (which
978 : * we remembered on start our cleanup process) then just exit
979 : */
980 30 : if (blkno == InvalidBlockNumber || cleanupFinish)
981 : break;
982 :
983 : /*
984 : * release memory used so far and reinit state
985 : */
986 0 : MemoryContextReset(opCtx);
987 0 : initKeyArray(&datums, datums.maxvalues);
988 0 : ginInitBA(&accum);
989 : }
990 : else
991 : {
992 2814 : blkno = GinPageGetOpaque(page)->rightlink;
993 2814 : UnlockReleaseBuffer(buffer);
994 : }
995 :
996 : /*
997 : * Read next page in pending list
998 : */
999 2814 : vacuum_delay_point();
1000 2814 : buffer = ReadBuffer(index, blkno);
1001 2814 : LockBuffer(buffer, GIN_SHARE);
1002 2814 : page = BufferGetPage(buffer);
1003 : }
1004 :
1005 30 : UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
1006 30 : ReleaseBuffer(metabuffer);
1007 :
1008 : /*
1009 : * As pending list pages can have a high churn rate, it is desirable to
1010 : * recycle them immediately to the FreeSpaceMap when ordinary backends
1011 : * clean the list.
1012 : */
1013 30 : if (fsm_vac && fill_fsm)
1014 12 : IndexFreeSpaceMapVacuum(index);
1015 :
1016 : /* Clean up temporary space */
1017 30 : MemoryContextSwitchTo(oldCtx);
1018 30 : MemoryContextDelete(opCtx);
1019 : }
1020 :
1021 : /*
1022 : * SQL-callable function to clean the insert pending list
1023 : */
1024 : Datum
1025 18 : gin_clean_pending_list(PG_FUNCTION_ARGS)
1026 : {
1027 18 : Oid indexoid = PG_GETARG_OID(0);
1028 18 : Relation indexRel = index_open(indexoid, RowExclusiveLock);
1029 : IndexBulkDeleteResult stats;
1030 : GinState ginstate;
1031 :
1032 18 : if (RecoveryInProgress())
1033 0 : ereport(ERROR,
1034 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1035 : errmsg("recovery is in progress"),
1036 : errhint("GIN pending list cannot be cleaned up during recovery.")));
1037 :
1038 : /* Must be a GIN index */
1039 18 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
1040 18 : indexRel->rd_rel->relam != GIN_AM_OID)
1041 0 : ereport(ERROR,
1042 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1043 : errmsg("\"%s\" is not a GIN index",
1044 : RelationGetRelationName(indexRel))));
1045 :
1046 : /*
1047 : * Reject attempts to read non-local temporary relations; we would be
1048 : * likely to get wrong data since we have no visibility into the owning
1049 : * session's local buffers.
1050 : */
1051 18 : if (RELATION_IS_OTHER_TEMP(indexRel))
1052 0 : ereport(ERROR,
1053 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1054 : errmsg("cannot access temporary indexes of other sessions")));
1055 :
1056 : /* User must own the index (comparable to privileges needed for VACUUM) */
1057 18 : if (!object_ownercheck(RelationRelationId, indexoid, GetUserId()))
1058 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
1059 0 : RelationGetRelationName(indexRel));
1060 :
1061 18 : memset(&stats, 0, sizeof(stats));
1062 18 : initGinState(&ginstate, indexRel);
1063 18 : ginInsertCleanup(&ginstate, true, true, true, &stats);
1064 :
1065 18 : index_close(indexRel, RowExclusiveLock);
1066 :
1067 18 : PG_RETURN_INT64((int64) stats.pages_deleted);
1068 : }
|