LCOV - code coverage report
Current view: top level - src/backend/access/gin - ginfast.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 323 361 89.5 %
Date: 2020-05-25 04:06:28 Functions: 10 10 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * ginfast.c
       4             :  *    Fast insert routines for the Postgres inverted index access method.
       5             :  *    Pending entries are stored in linear list of pages.  Later on
       6             :  *    (typically during VACUUM), ginInsertCleanup() will be invoked to
       7             :  *    transfer pending entries into the regular index structure.  This
       8             :  *    wins because bulk insertion is much more efficient than retail.
       9             :  *
      10             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
      11             :  * Portions Copyright (c) 1994, Regents of the University of California
      12             :  *
      13             :  * IDENTIFICATION
      14             :  *          src/backend/access/gin/ginfast.c
      15             :  *
      16             :  *-------------------------------------------------------------------------
      17             :  */
      18             : 
      19             : #include "postgres.h"
      20             : 
      21             : #include "access/gin_private.h"
      22             : #include "access/ginxlog.h"
      23             : #include "access/xlog.h"
      24             : #include "access/xloginsert.h"
      25             : #include "catalog/pg_am.h"
      26             : #include "commands/vacuum.h"
      27             : #include "miscadmin.h"
      28             : #include "port/pg_bitutils.h"
      29             : #include "postmaster/autovacuum.h"
      30             : #include "storage/indexfsm.h"
      31             : #include "storage/lmgr.h"
      32             : #include "storage/predicate.h"
      33             : #include "utils/acl.h"
      34             : #include "utils/builtins.h"
      35             : #include "utils/memutils.h"
      36             : #include "utils/rel.h"
      37             : 
      38             : /* GUC parameter */
      39             : int         gin_pending_list_limit = 0;
      40             : 
      41             : #define GIN_PAGE_FREESIZE \
      42             :     ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
      43             : 
      44             : typedef struct KeyArray
      45             : {
      46             :     Datum      *keys;           /* expansible array */
      47             :     GinNullCategory *categories;    /* another expansible array */
      48             :     int32       nvalues;        /* current number of valid entries */
      49             :     int32       maxvalues;      /* allocated size of arrays */
      50             : } KeyArray;
      51             : 
      52             : 
      53             : /*
      54             :  * Build a pending-list page from the given array of tuples, and write it out.
      55             :  *
      56             :  * Returns amount of free space left on the page.
      57             :  */
      58             : static int32
      59         730 : writeListPage(Relation index, Buffer buffer,
      60             :               IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
      61             : {
      62         730 :     Page        page = BufferGetPage(buffer);
      63             :     int32       i,
      64             :                 freesize,
      65         730 :                 size = 0;
      66             :     OffsetNumber l,
      67             :                 off;
      68             :     PGAlignedBlock workspace;
      69             :     char       *ptr;
      70             : 
      71         730 :     START_CRIT_SECTION();
      72             : 
      73         730 :     GinInitBuffer(buffer, GIN_LIST);
      74             : 
      75         730 :     off = FirstOffsetNumber;
      76         730 :     ptr = workspace.data;
      77             : 
      78        2912 :     for (i = 0; i < ntuples; i++)
      79             :     {
      80        2182 :         int         this_size = IndexTupleSize(tuples[i]);
      81             : 
      82        2182 :         memcpy(ptr, tuples[i], this_size);
      83        2182 :         ptr += this_size;
      84        2182 :         size += this_size;
      85             : 
      86        2182 :         l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
      87             : 
      88        2182 :         if (l == InvalidOffsetNumber)
      89           0 :             elog(ERROR, "failed to add item to index page in \"%s\"",
      90             :                  RelationGetRelationName(index));
      91             : 
      92        2182 :         off++;
      93             :     }
      94             : 
      95             :     Assert(size <= BLCKSZ);      /* else we overran workspace */
      96             : 
      97         730 :     GinPageGetOpaque(page)->rightlink = rightlink;
      98             : 
      99             :     /*
     100             :      * tail page may contain only whole row(s) or final part of row placed on
     101             :      * previous pages (a "row" here meaning all the index tuples generated for
     102             :      * one heap tuple)
     103             :      */
     104         730 :     if (rightlink == InvalidBlockNumber)
     105             :     {
     106         730 :         GinPageSetFullRow(page);
     107         730 :         GinPageGetOpaque(page)->maxoff = 1;
     108             :     }
     109             :     else
     110             :     {
     111           0 :         GinPageGetOpaque(page)->maxoff = 0;
     112             :     }
     113             : 
     114         730 :     MarkBufferDirty(buffer);
     115             : 
     116         730 :     if (RelationNeedsWAL(index))
     117             :     {
     118             :         ginxlogInsertListPage data;
     119             :         XLogRecPtr  recptr;
     120             : 
     121         720 :         data.rightlink = rightlink;
     122         720 :         data.ntuples = ntuples;
     123             : 
     124         720 :         XLogBeginInsert();
     125         720 :         XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
     126             : 
     127         720 :         XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
     128         720 :         XLogRegisterBufData(0, workspace.data, size);
     129             : 
     130         720 :         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
     131         720 :         PageSetLSN(page, recptr);
     132             :     }
     133             : 
     134             :     /* get free space before releasing buffer */
     135         730 :     freesize = PageGetExactFreeSpace(page);
     136             : 
     137         730 :     UnlockReleaseBuffer(buffer);
     138             : 
     139         730 :     END_CRIT_SECTION();
     140             : 
     141         730 :     return freesize;
     142             : }
     143             : 
     144             : static void
     145         730 : makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
     146             :             GinMetaPageData *res)
     147             : {
     148         730 :     Buffer      curBuffer = InvalidBuffer;
     149         730 :     Buffer      prevBuffer = InvalidBuffer;
     150             :     int         i,
     151         730 :                 size = 0,
     152             :                 tupsize;
     153         730 :     int         startTuple = 0;
     154             : 
     155             :     Assert(ntuples > 0);
     156             : 
     157             :     /*
     158             :      * Split tuples into pages
     159             :      */
     160        2912 :     for (i = 0; i < ntuples; i++)
     161             :     {
     162        2182 :         if (curBuffer == InvalidBuffer)
     163             :         {
     164         730 :             curBuffer = GinNewBuffer(index);
     165             : 
     166         730 :             if (prevBuffer != InvalidBuffer)
     167             :             {
     168           0 :                 res->nPendingPages++;
     169           0 :                 writeListPage(index, prevBuffer,
     170           0 :                               tuples + startTuple,
     171             :                               i - startTuple,
     172             :                               BufferGetBlockNumber(curBuffer));
     173             :             }
     174             :             else
     175             :             {
     176         730 :                 res->head = BufferGetBlockNumber(curBuffer);
     177             :             }
     178             : 
     179         730 :             prevBuffer = curBuffer;
     180         730 :             startTuple = i;
     181         730 :             size = 0;
     182             :         }
     183             : 
     184        2182 :         tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
     185             : 
     186        2182 :         if (size + tupsize > GinListPageSize)
     187             :         {
     188             :             /* won't fit, force a new page and reprocess */
     189           0 :             i--;
     190           0 :             curBuffer = InvalidBuffer;
     191             :         }
     192             :         else
     193             :         {
     194        2182 :             size += tupsize;
     195             :         }
     196             :     }
     197             : 
     198             :     /*
     199             :      * Write last page
     200             :      */
     201         730 :     res->tail = BufferGetBlockNumber(curBuffer);
     202        2190 :     res->tailFreeSize = writeListPage(index, curBuffer,
     203         730 :                                       tuples + startTuple,
     204             :                                       ntuples - startTuple,
     205             :                                       InvalidBlockNumber);
     206         730 :     res->nPendingPages++;
     207             :     /* that was only one heap tuple */
     208         730 :     res->nPendingHeapTuples = 1;
     209         730 : }
     210             : 
     211             : /*
     212             :  * Write the index tuples contained in *collector into the index's
     213             :  * pending list.
     214             :  *
     215             :  * Function guarantees that all these tuples will be inserted consecutively,
     216             :  * preserving order
     217             :  */
     218             : void
     219       96012 : ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
     220             : {
     221       96012 :     Relation    index = ginstate->index;
     222             :     Buffer      metabuffer;
     223             :     Page        metapage;
     224       96012 :     GinMetaPageData *metadata = NULL;
     225       96012 :     Buffer      buffer = InvalidBuffer;
     226       96012 :     Page        page = NULL;
     227             :     ginxlogUpdateMeta data;
     228       96012 :     bool        separateList = false;
     229       96012 :     bool        needCleanup = false;
     230             :     int         cleanupSize;
     231             :     bool        needWal;
     232             : 
     233       96012 :     if (collector->ntuples == 0)
     234           0 :         return;
     235             : 
     236       96012 :     needWal = RelationNeedsWAL(index);
     237             : 
     238       96012 :     data.node = index->rd_node;
     239       96012 :     data.ntuples = 0;
     240       96012 :     data.newRightlink = data.prevTail = InvalidBlockNumber;
     241             : 
     242       96012 :     metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
     243       96012 :     metapage = BufferGetPage(metabuffer);
     244             : 
     245             :     /*
     246             :      * An insertion to the pending list could logically belong anywhere in the
     247             :      * tree, so it conflicts with all serializable scans.  All scans acquire a
     248             :      * predicate lock on the metabuffer to represent that.
     249             :      */
     250       96012 :     CheckForSerializableConflictIn(index, NULL, GIN_METAPAGE_BLKNO);
     251             : 
     252       96006 :     if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
     253             :     {
     254             :         /*
     255             :          * Total size is greater than one page => make sublist
     256             :          */
     257           0 :         separateList = true;
     258             :     }
     259             :     else
     260             :     {
     261       96006 :         LockBuffer(metabuffer, GIN_EXCLUSIVE);
     262       96006 :         metadata = GinPageGetMeta(metapage);
     263             : 
     264       96006 :         if (metadata->head == InvalidBlockNumber ||
     265       95976 :             collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
     266             :         {
     267             :             /*
     268             :              * Pending list is empty or total size is greater than freespace
     269             :              * on tail page => make sublist
     270             :              *
     271             :              * We unlock metabuffer to keep high concurrency
     272             :              */
     273         730 :             separateList = true;
     274         730 :             LockBuffer(metabuffer, GIN_UNLOCK);
     275             :         }
     276             :     }
     277             : 
     278       96006 :     if (separateList)
     279             :     {
     280             :         /*
     281             :          * We should make sublist separately and append it to the tail
     282             :          */
     283             :         GinMetaPageData sublist;
     284             : 
     285         730 :         memset(&sublist, 0, sizeof(GinMetaPageData));
     286         730 :         makeSublist(index, collector->tuples, collector->ntuples, &sublist);
     287             : 
     288         730 :         if (needWal)
     289         720 :             XLogBeginInsert();
     290             : 
     291             :         /*
     292             :          * metapage was unlocked, see above
     293             :          */
     294         730 :         LockBuffer(metabuffer, GIN_EXCLUSIVE);
     295         730 :         metadata = GinPageGetMeta(metapage);
     296             : 
     297         730 :         if (metadata->head == InvalidBlockNumber)
     298             :         {
     299             :             /*
     300             :              * Main list is empty, so just insert sublist as main list
     301             :              */
     302          30 :             START_CRIT_SECTION();
     303             : 
     304          30 :             metadata->head = sublist.head;
     305          30 :             metadata->tail = sublist.tail;
     306          30 :             metadata->tailFreeSize = sublist.tailFreeSize;
     307             : 
     308          30 :             metadata->nPendingPages = sublist.nPendingPages;
     309          30 :             metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
     310             :         }
     311             :         else
     312             :         {
     313             :             /*
     314             :              * Merge lists
     315             :              */
     316         700 :             data.prevTail = metadata->tail;
     317         700 :             data.newRightlink = sublist.head;
     318             : 
     319         700 :             buffer = ReadBuffer(index, metadata->tail);
     320         700 :             LockBuffer(buffer, GIN_EXCLUSIVE);
     321         700 :             page = BufferGetPage(buffer);
     322             : 
     323             :             Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
     324             : 
     325         700 :             START_CRIT_SECTION();
     326             : 
     327         700 :             GinPageGetOpaque(page)->rightlink = sublist.head;
     328             : 
     329         700 :             MarkBufferDirty(buffer);
     330             : 
     331         700 :             metadata->tail = sublist.tail;
     332         700 :             metadata->tailFreeSize = sublist.tailFreeSize;
     333             : 
     334         700 :             metadata->nPendingPages += sublist.nPendingPages;
     335         700 :             metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
     336             : 
     337         700 :             if (needWal)
     338         700 :                 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
     339             :         }
     340             :     }
     341             :     else
     342             :     {
     343             :         /*
     344             :          * Insert into tail page.  Metapage is already locked
     345             :          */
     346             :         OffsetNumber l,
     347             :                     off;
     348             :         int         i,
     349             :                     tupsize;
     350             :         char       *ptr;
     351             :         char       *collectordata;
     352             : 
     353       95276 :         buffer = ReadBuffer(index, metadata->tail);
     354       95276 :         LockBuffer(buffer, GIN_EXCLUSIVE);
     355       95276 :         page = BufferGetPage(buffer);
     356             : 
     357       95276 :         off = (PageIsEmpty(page)) ? FirstOffsetNumber :
     358       95276 :             OffsetNumberNext(PageGetMaxOffsetNumber(page));
     359             : 
     360       95276 :         collectordata = ptr = (char *) palloc(collector->sumsize);
     361             : 
     362       95276 :         data.ntuples = collector->ntuples;
     363             : 
     364       95276 :         if (needWal)
     365       95234 :             XLogBeginInsert();
     366             : 
     367       95276 :         START_CRIT_SECTION();
     368             : 
     369             :         /*
     370             :          * Increase counter of heap tuples
     371             :          */
     372             :         Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
     373       95276 :         GinPageGetOpaque(page)->maxoff++;
     374       95276 :         metadata->nPendingHeapTuples++;
     375             : 
     376      381056 :         for (i = 0; i < collector->ntuples; i++)
     377             :         {
     378      285780 :             tupsize = IndexTupleSize(collector->tuples[i]);
     379      285780 :             l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
     380             : 
     381      285780 :             if (l == InvalidOffsetNumber)
     382           0 :                 elog(ERROR, "failed to add item to index page in \"%s\"",
     383             :                      RelationGetRelationName(index));
     384             : 
     385      285780 :             memcpy(ptr, collector->tuples[i], tupsize);
     386      285780 :             ptr += tupsize;
     387             : 
     388      285780 :             off++;
     389             :         }
     390             : 
     391             :         Assert((ptr - collectordata) <= collector->sumsize);
     392       95276 :         if (needWal)
     393             :         {
     394       95234 :             XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
     395       95234 :             XLogRegisterBufData(1, collectordata, collector->sumsize);
     396             :         }
     397             : 
     398       95276 :         metadata->tailFreeSize = PageGetExactFreeSpace(page);
     399             : 
     400       95276 :         MarkBufferDirty(buffer);
     401             :     }
     402             : 
     403             :     /*
     404             :      * Set pd_lower just past the end of the metadata.  This is essential,
     405             :      * because without doing so, metadata will be lost if xlog.c compresses
     406             :      * the page.  (We must do this here because pre-v11 versions of PG did not
     407             :      * set the metapage's pd_lower correctly, so a pg_upgraded index might
     408             :      * contain the wrong value.)
     409             :      */
     410       96006 :     ((PageHeader) metapage)->pd_lower =
     411       96006 :         ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
     412             : 
     413             :     /*
     414             :      * Write metabuffer, make xlog entry
     415             :      */
     416       96006 :     MarkBufferDirty(metabuffer);
     417             : 
     418       96006 :     if (needWal)
     419             :     {
     420             :         XLogRecPtr  recptr;
     421             : 
     422       95954 :         memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
     423             : 
     424       95954 :         XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
     425       95954 :         XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
     426             : 
     427       95954 :         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
     428       95954 :         PageSetLSN(metapage, recptr);
     429             : 
     430       95954 :         if (buffer != InvalidBuffer)
     431             :         {
     432       95934 :             PageSetLSN(page, recptr);
     433             :         }
     434             :     }
     435             : 
     436       96006 :     if (buffer != InvalidBuffer)
     437       95976 :         UnlockReleaseBuffer(buffer);
     438             : 
     439             :     /*
     440             :      * Force pending list cleanup when it becomes too long. And,
     441             :      * ginInsertCleanup could take significant amount of time, so we prefer to
     442             :      * call it when it can do all the work in a single collection cycle. In
     443             :      * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
     444             :      * while pending list is still small enough to fit into
     445             :      * gin_pending_list_limit.
     446             :      *
     447             :      * ginInsertCleanup() should not be called inside our CRIT_SECTION.
     448             :      */
     449       96006 :     cleanupSize = GinGetPendingListCleanupSize(index);
     450       96006 :     if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
     451           0 :         needCleanup = true;
     452             : 
     453       96006 :     UnlockReleaseBuffer(metabuffer);
     454             : 
     455       96006 :     END_CRIT_SECTION();
     456             : 
     457             :     /*
     458             :      * Since it could contend with concurrent cleanup process we cleanup
     459             :      * pending list not forcibly.
     460             :      */
     461       96006 :     if (needCleanup)
     462           0 :         ginInsertCleanup(ginstate, false, true, false, NULL);
     463             : }
     464             : 
     465             : /*
     466             :  * Create temporary index tuples for a single indexable item (one index column
     467             :  * for the heap tuple specified by ht_ctid), and append them to the array
     468             :  * in *collector.  They will subsequently be written out using
     469             :  * ginHeapTupleFastInsert.  Note that to guarantee consistent state, all
     470             :  * temp tuples for a given heap tuple must be written in one call to
     471             :  * ginHeapTupleFastInsert.
     472             :  */
     473             : void
     474       96052 : ginHeapTupleFastCollect(GinState *ginstate,
     475             :                         GinTupleCollector *collector,
     476             :                         OffsetNumber attnum, Datum value, bool isNull,
     477             :                         ItemPointer ht_ctid)
     478             : {
     479             :     Datum      *entries;
     480             :     GinNullCategory *categories;
     481             :     int32       i,
     482             :                 nentries;
     483             : 
     484             :     /*
     485             :      * Extract the key values that need to be inserted in the index
     486             :      */
     487       96052 :     entries = ginExtractEntries(ginstate, attnum, value, isNull,
     488             :                                 &nentries, &categories);
     489             : 
     490             :     /*
     491             :      * Protect against integer overflow in allocation calculations
     492             :      */
     493       96052 :     if (nentries < 0 ||
     494       96052 :         collector->ntuples + nentries > MaxAllocSize / sizeof(IndexTuple))
     495           0 :         elog(ERROR, "too many entries for GIN index");
     496             : 
     497             :     /*
     498             :      * Allocate/reallocate memory for storing collected tuples
     499             :      */
     500       96052 :     if (collector->tuples == NULL)
     501             :     {
     502             :         /*
     503             :          * Determine the number of elements to allocate in the tuples array
     504             :          * initially.  Make it a power of 2 to avoid wasting memory when
     505             :          * resizing (since palloc likes powers of 2).
     506             :          */
     507       96012 :         collector->lentuples = pg_nextpower2_32(Max(16, nentries));
     508       96012 :         collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
     509             :     }
     510          40 :     else if (collector->lentuples < collector->ntuples + nentries)
     511             :     {
     512             :         /*
     513             :          * Advance lentuples to the next suitable power of 2.  This won't
     514             :          * overflow, though we could get to a value that exceeds
     515             :          * MaxAllocSize/sizeof(IndexTuple), causing an error in repalloc.
     516             :          */
     517           0 :         collector->lentuples = pg_nextpower2_32(collector->ntuples + nentries);
     518           0 :         collector->tuples = (IndexTuple *) repalloc(collector->tuples,
     519           0 :                                                     sizeof(IndexTuple) * collector->lentuples);
     520             :     }
     521             : 
     522             :     /*
     523             :      * Build an index tuple for each key value, and add to array.  In pending
     524             :      * tuples we just stick the heap TID into t_tid.
     525             :      */
     526      384020 :     for (i = 0; i < nentries; i++)
     527             :     {
     528             :         IndexTuple  itup;
     529             : 
     530      287968 :         itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
     531             :                             NULL, 0, 0, true);
     532      287968 :         itup->t_tid = *ht_ctid;
     533      287968 :         collector->tuples[collector->ntuples++] = itup;
     534      287968 :         collector->sumsize += IndexTupleSize(itup);
     535             :     }
     536       96052 : }
     537             : 
     538             : /*
     539             :  * Deletes pending list pages up to (not including) newHead page.
     540             :  * If newHead == InvalidBlockNumber then function drops the whole list.
     541             :  *
     542             :  * metapage is pinned and exclusive-locked throughout this function.
     543             :  */
     544             : static void
     545          16 : shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
     546             :           bool fill_fsm, IndexBulkDeleteResult *stats)
     547             : {
     548             :     Page        metapage;
     549             :     GinMetaPageData *metadata;
     550             :     BlockNumber blknoToDelete;
     551             : 
     552          16 :     metapage = BufferGetPage(metabuffer);
     553          16 :     metadata = GinPageGetMeta(metapage);
     554          16 :     blknoToDelete = metadata->head;
     555             : 
     556             :     do
     557             :     {
     558             :         Page        page;
     559             :         int         i;
     560          52 :         int64       nDeletedHeapTuples = 0;
     561             :         ginxlogDeleteListPages data;
     562             :         Buffer      buffers[GIN_NDELETE_AT_ONCE];
     563             :         BlockNumber freespace[GIN_NDELETE_AT_ONCE];
     564             : 
     565          52 :         data.ndeleted = 0;
     566         768 :         while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
     567             :         {
     568         716 :             freespace[data.ndeleted] = blknoToDelete;
     569         716 :             buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
     570         716 :             LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
     571         716 :             page = BufferGetPage(buffers[data.ndeleted]);
     572             : 
     573         716 :             data.ndeleted++;
     574             : 
     575             :             Assert(!GinPageIsDeleted(page));
     576             : 
     577         716 :             nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
     578         716 :             blknoToDelete = GinPageGetOpaque(page)->rightlink;
     579             :         }
     580             : 
     581          52 :         if (stats)
     582          52 :             stats->pages_deleted += data.ndeleted;
     583             : 
     584             :         /*
     585             :          * This operation touches an unusually large number of pages, so
     586             :          * prepare the XLogInsert machinery for that before entering the
     587             :          * critical section.
     588             :          */
     589          52 :         if (RelationNeedsWAL(index))
     590          52 :             XLogEnsureRecordSpace(data.ndeleted, 0);
     591             : 
     592          52 :         START_CRIT_SECTION();
     593             : 
     594          52 :         metadata->head = blknoToDelete;
     595             : 
     596             :         Assert(metadata->nPendingPages >= data.ndeleted);
     597          52 :         metadata->nPendingPages -= data.ndeleted;
     598             :         Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
     599          52 :         metadata->nPendingHeapTuples -= nDeletedHeapTuples;
     600             : 
     601          52 :         if (blknoToDelete == InvalidBlockNumber)
     602             :         {
     603          16 :             metadata->tail = InvalidBlockNumber;
     604          16 :             metadata->tailFreeSize = 0;
     605          16 :             metadata->nPendingPages = 0;
     606          16 :             metadata->nPendingHeapTuples = 0;
     607             :         }
     608             : 
     609             :         /*
     610             :          * Set pd_lower just past the end of the metadata.  This is essential,
     611             :          * because without doing so, metadata will be lost if xlog.c
     612             :          * compresses the page.  (We must do this here because pre-v11
     613             :          * versions of PG did not set the metapage's pd_lower correctly, so a
     614             :          * pg_upgraded index might contain the wrong value.)
     615             :          */
     616          52 :         ((PageHeader) metapage)->pd_lower =
     617          52 :             ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
     618             : 
     619          52 :         MarkBufferDirty(metabuffer);
     620             : 
     621         768 :         for (i = 0; i < data.ndeleted; i++)
     622             :         {
     623         716 :             page = BufferGetPage(buffers[i]);
     624         716 :             GinPageGetOpaque(page)->flags = GIN_DELETED;
     625         716 :             MarkBufferDirty(buffers[i]);
     626             :         }
     627             : 
     628          52 :         if (RelationNeedsWAL(index))
     629             :         {
     630             :             XLogRecPtr  recptr;
     631             : 
     632          52 :             XLogBeginInsert();
     633          52 :             XLogRegisterBuffer(0, metabuffer,
     634             :                                REGBUF_WILL_INIT | REGBUF_STANDARD);
     635         768 :             for (i = 0; i < data.ndeleted; i++)
     636         716 :                 XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
     637             : 
     638          52 :             memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
     639             : 
     640          52 :             XLogRegisterData((char *) &data,
     641             :                              sizeof(ginxlogDeleteListPages));
     642             : 
     643          52 :             recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
     644          52 :             PageSetLSN(metapage, recptr);
     645             : 
     646         768 :             for (i = 0; i < data.ndeleted; i++)
     647             :             {
     648         716 :                 page = BufferGetPage(buffers[i]);
     649         716 :                 PageSetLSN(page, recptr);
     650             :             }
     651             :         }
     652             : 
     653         768 :         for (i = 0; i < data.ndeleted; i++)
     654         716 :             UnlockReleaseBuffer(buffers[i]);
     655             : 
     656          52 :         END_CRIT_SECTION();
     657             : 
     658         672 :         for (i = 0; fill_fsm && i < data.ndeleted; i++)
     659         620 :             RecordFreeIndexPage(index, freespace[i]);
     660             : 
     661          52 :     } while (blknoToDelete != newHead);
     662          16 : }
     663             : 
     664             : /* Initialize empty KeyArray */
     665             : static void
     666          16 : initKeyArray(KeyArray *keys, int32 maxvalues)
     667             : {
     668          16 :     keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
     669          16 :     keys->categories = (GinNullCategory *)
     670          16 :         palloc(sizeof(GinNullCategory) * maxvalues);
     671          16 :     keys->nvalues = 0;
     672          16 :     keys->maxvalues = maxvalues;
     673          16 : }
     674             : 
     675             : /* Add datum to KeyArray, resizing if needed */
     676             : static void
     677      287790 : addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
     678             : {
     679      287790 :     if (keys->nvalues >= keys->maxvalues)
     680             :     {
     681           0 :         keys->maxvalues *= 2;
     682           0 :         keys->keys = (Datum *)
     683           0 :             repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
     684           0 :         keys->categories = (GinNullCategory *)
     685           0 :             repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
     686             :     }
     687             : 
     688      287790 :     keys->keys[keys->nvalues] = datum;
     689      287790 :     keys->categories[keys->nvalues] = category;
     690      287790 :     keys->nvalues++;
     691      287790 : }
     692             : 
     693             : /*
     694             :  * Collect data from a pending-list page in preparation for insertion into
     695             :  * the main index.
     696             :  *
     697             :  * Go through all tuples >= startoff on page and collect values in accum
     698             :  *
     699             :  * Note that ka is just workspace --- it does not carry any state across
     700             :  * calls.
     701             :  */
     702             : static void
     703         716 : processPendingPage(BuildAccumulator *accum, KeyArray *ka,
     704             :                    Page page, OffsetNumber startoff)
     705             : {
     706             :     ItemPointerData heapptr;
     707             :     OffsetNumber i,
     708             :                 maxoff;
     709             :     OffsetNumber attrnum;
     710             : 
     711             :     /* reset *ka to empty */
     712         716 :     ka->nvalues = 0;
     713             : 
     714         716 :     maxoff = PageGetMaxOffsetNumber(page);
     715             :     Assert(maxoff >= FirstOffsetNumber);
     716         716 :     ItemPointerSetInvalid(&heapptr);
     717         716 :     attrnum = 0;
     718             : 
     719      288506 :     for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
     720             :     {
     721      287790 :         IndexTuple  itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
     722             :         OffsetNumber curattnum;
     723             :         Datum       curkey;
     724             :         GinNullCategory curcategory;
     725             : 
     726             :         /* Check for change of heap TID or attnum */
     727      287790 :         curattnum = gintuple_get_attrnum(accum->ginstate, itup);
     728             : 
     729      287790 :         if (!ItemPointerIsValid(&heapptr))
     730             :         {
     731         716 :             heapptr = itup->t_tid;
     732         716 :             attrnum = curattnum;
     733             :         }
     734      287074 :         else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
     735             :                    curattnum == attrnum))
     736             :         {
     737             :             /*
     738             :              * ginInsertBAEntries can insert several datums per call, but only
     739             :              * for one heap tuple and one column.  So call it at a boundary,
     740             :              * and reset ka.
     741             :              */
     742       95222 :             ginInsertBAEntries(accum, &heapptr, attrnum,
     743             :                                ka->keys, ka->categories, ka->nvalues);
     744       95222 :             ka->nvalues = 0;
     745       95222 :             heapptr = itup->t_tid;
     746       95222 :             attrnum = curattnum;
     747             :         }
     748             : 
     749             :         /* Add key to KeyArray */
     750      287790 :         curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
     751      287790 :         addDatum(ka, curkey, curcategory);
     752             :     }
     753             : 
     754             :     /* Dump out all remaining keys */
     755         716 :     ginInsertBAEntries(accum, &heapptr, attrnum,
     756             :                        ka->keys, ka->categories, ka->nvalues);
     757         716 : }
     758             : 
     759             : /*
     760             :  * Move tuples from pending pages into regular GIN structure.
     761             :  *
     762             :  * On first glance it looks completely not crash-safe. But if we crash
     763             :  * after posting entries to the main index and before removing them from the
     764             :  * pending list, it's okay because when we redo the posting later on, nothing
     765             :  * bad will happen.
     766             :  *
     767             :  * fill_fsm indicates that ginInsertCleanup should add deleted pages
     768             :  * to FSM otherwise caller is responsible to put deleted pages into
     769             :  * FSM.
     770             :  *
     771             :  * If stats isn't null, we count deleted pending pages into the counts.
     772             :  */
     773             : void
     774          44 : ginInsertCleanup(GinState *ginstate, bool full_clean,
     775             :                  bool fill_fsm, bool forceCleanup,
     776             :                  IndexBulkDeleteResult *stats)
     777             : {
     778          44 :     Relation    index = ginstate->index;
     779             :     Buffer      metabuffer,
     780             :                 buffer;
     781             :     Page        metapage,
     782             :                 page;
     783             :     GinMetaPageData *metadata;
     784             :     MemoryContext opCtx,
     785             :                 oldCtx;
     786             :     BuildAccumulator accum;
     787             :     KeyArray    datums;
     788             :     BlockNumber blkno,
     789             :                 blknoFinish;
     790          44 :     bool        cleanupFinish = false;
     791          44 :     bool        fsm_vac = false;
     792             :     Size        workMemory;
     793             : 
     794             :     /*
     795             :      * We would like to prevent concurrent cleanup process. For that we will
     796             :      * lock metapage in exclusive mode using LockPage() call. Nobody other
     797             :      * will use that lock for metapage, so we keep possibility of concurrent
     798             :      * insertion into pending list
     799             :      */
     800             : 
     801          44 :     if (forceCleanup)
     802             :     {
     803             :         /*
     804             :          * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
     805             :          * and we would like to wait concurrent cleanup to finish.
     806             :          */
     807          44 :         LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
     808          44 :         workMemory =
     809          48 :             (IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
     810          48 :             autovacuum_work_mem : maintenance_work_mem;
     811             :     }
     812             :     else
     813             :     {
     814             :         /*
     815             :          * We are called from regular insert and if we see concurrent cleanup
     816             :          * just exit in hope that concurrent process will clean up pending
     817             :          * list.
     818             :          */
     819           0 :         if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
     820          28 :             return;
     821           0 :         workMemory = work_mem;
     822             :     }
     823             : 
     824          44 :     metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
     825          44 :     LockBuffer(metabuffer, GIN_SHARE);
     826          44 :     metapage = BufferGetPage(metabuffer);
     827          44 :     metadata = GinPageGetMeta(metapage);
     828             : 
     829          44 :     if (metadata->head == InvalidBlockNumber)
     830             :     {
     831             :         /* Nothing to do */
     832          28 :         UnlockReleaseBuffer(metabuffer);
     833          28 :         UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
     834          28 :         return;
     835             :     }
     836             : 
     837             :     /*
     838             :      * Remember a tail page to prevent infinite cleanup if other backends add
     839             :      * new tuples faster than we can cleanup.
     840             :      */
     841          16 :     blknoFinish = metadata->tail;
     842             : 
     843             :     /*
     844             :      * Read and lock head of pending list
     845             :      */
     846          16 :     blkno = metadata->head;
     847          16 :     buffer = ReadBuffer(index, blkno);
     848          16 :     LockBuffer(buffer, GIN_SHARE);
     849          16 :     page = BufferGetPage(buffer);
     850             : 
     851          16 :     LockBuffer(metabuffer, GIN_UNLOCK);
     852             : 
     853             :     /*
     854             :      * Initialize.  All temporary space will be in opCtx
     855             :      */
     856          16 :     opCtx = AllocSetContextCreate(CurrentMemoryContext,
     857             :                                   "GIN insert cleanup temporary context",
     858             :                                   ALLOCSET_DEFAULT_SIZES);
     859             : 
     860          16 :     oldCtx = MemoryContextSwitchTo(opCtx);
     861             : 
     862          16 :     initKeyArray(&datums, 128);
     863          16 :     ginInitBA(&accum);
     864          16 :     accum.ginstate = ginstate;
     865             : 
     866             :     /*
     867             :      * At the top of this loop, we have pin and lock on the current page of
     868             :      * the pending list.  However, we'll release that before exiting the loop.
     869             :      * Note we also have pin but not lock on the metapage.
     870             :      */
     871             :     for (;;)
     872             :     {
     873         700 :         Assert(!GinPageIsDeleted(page));
     874             : 
     875             :         /*
     876             :          * Are we walk through the page which as we remember was a tail when
     877             :          * we start our cleanup?  But if caller asks us to clean up whole
     878             :          * pending list then ignore old tail, we will work until list becomes
     879             :          * empty.
     880             :          */
     881         716 :         if (blkno == blknoFinish && full_clean == false)
     882           0 :             cleanupFinish = true;
     883             : 
     884             :         /*
     885             :          * read page's datums into accum
     886             :          */
     887         716 :         processPendingPage(&accum, &datums, page, FirstOffsetNumber);
     888             : 
     889         716 :         vacuum_delay_point();
     890             : 
     891             :         /*
     892             :          * Is it time to flush memory to disk?  Flush if we are at the end of
     893             :          * the pending list, or if we have a full row and memory is getting
     894             :          * full.
     895             :          */
     896         716 :         if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
     897         700 :             (GinPageHasFullRow(page) &&
     898         700 :              (accum.allocatedMemory >= workMemory * 1024L)))
     899           0 :         {
     900             :             ItemPointerData *list;
     901             :             uint32      nlist;
     902             :             Datum       key;
     903             :             GinNullCategory category;
     904             :             OffsetNumber maxoff,
     905             :                         attnum;
     906             : 
     907             :             /*
     908             :              * Unlock current page to increase performance. Changes of page
     909             :              * will be checked later by comparing maxoff after completion of
     910             :              * memory flush.
     911             :              */
     912          16 :             maxoff = PageGetMaxOffsetNumber(page);
     913          16 :             LockBuffer(buffer, GIN_UNLOCK);
     914             : 
     915             :             /*
     916             :              * Moving collected data into regular structure can take
     917             :              * significant amount of time - so, run it without locking pending
     918             :              * list.
     919             :              */
     920          16 :             ginBeginBAScan(&accum);
     921       84040 :             while ((list = ginGetBAEntry(&accum,
     922             :                                          &attnum, &key, &category, &nlist)) != NULL)
     923             :             {
     924       84024 :                 ginEntryInsert(ginstate, attnum, key, category,
     925             :                                list, nlist, NULL);
     926       84024 :                 vacuum_delay_point();
     927             :             }
     928             : 
     929             :             /*
     930             :              * Lock the whole list to remove pages
     931             :              */
     932          16 :             LockBuffer(metabuffer, GIN_EXCLUSIVE);
     933          16 :             LockBuffer(buffer, GIN_SHARE);
     934             : 
     935             :             Assert(!GinPageIsDeleted(page));
     936             : 
     937             :             /*
     938             :              * While we left the page unlocked, more stuff might have gotten
     939             :              * added to it.  If so, process those entries immediately.  There
     940             :              * shouldn't be very many, so we don't worry about the fact that
     941             :              * we're doing this with exclusive lock. Insertion algorithm
     942             :              * guarantees that inserted row(s) will not continue on next page.
     943             :              * NOTE: intentionally no vacuum_delay_point in this loop.
     944             :              */
     945          16 :             if (PageGetMaxOffsetNumber(page) != maxoff)
     946             :             {
     947           0 :                 ginInitBA(&accum);
     948           0 :                 processPendingPage(&accum, &datums, page, maxoff + 1);
     949             : 
     950           0 :                 ginBeginBAScan(&accum);
     951           0 :                 while ((list = ginGetBAEntry(&accum,
     952             :                                              &attnum, &key, &category, &nlist)) != NULL)
     953           0 :                     ginEntryInsert(ginstate, attnum, key, category,
     954             :                                    list, nlist, NULL);
     955             :             }
     956             : 
     957             :             /*
     958             :              * Remember next page - it will become the new list head
     959             :              */
     960          16 :             blkno = GinPageGetOpaque(page)->rightlink;
     961          16 :             UnlockReleaseBuffer(buffer);    /* shiftList will do exclusive
     962             :                                              * locking */
     963             : 
     964             :             /*
     965             :              * remove read pages from pending list, at this point all content
     966             :              * of read pages is in regular structure
     967             :              */
     968          16 :             shiftList(index, metabuffer, blkno, fill_fsm, stats);
     969             : 
     970             :             /* At this point, some pending pages have been freed up */
     971          16 :             fsm_vac = true;
     972             : 
     973             :             Assert(blkno == metadata->head);
     974          16 :             LockBuffer(metabuffer, GIN_UNLOCK);
     975             : 
     976             :             /*
     977             :              * if we removed the whole pending list or we cleanup tail (which
     978             :              * we remembered on start our cleanup process) then just exit
     979             :              */
     980          16 :             if (blkno == InvalidBlockNumber || cleanupFinish)
     981             :                 break;
     982             : 
     983             :             /*
     984             :              * release memory used so far and reinit state
     985             :              */
     986           0 :             MemoryContextReset(opCtx);
     987           0 :             initKeyArray(&datums, datums.maxvalues);
     988           0 :             ginInitBA(&accum);
     989             :         }
     990             :         else
     991             :         {
     992         700 :             blkno = GinPageGetOpaque(page)->rightlink;
     993         700 :             UnlockReleaseBuffer(buffer);
     994             :         }
     995             : 
     996             :         /*
     997             :          * Read next page in pending list
     998             :          */
     999         700 :         vacuum_delay_point();
    1000         700 :         buffer = ReadBuffer(index, blkno);
    1001         700 :         LockBuffer(buffer, GIN_SHARE);
    1002         700 :         page = BufferGetPage(buffer);
    1003             :     }
    1004             : 
    1005          16 :     UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
    1006          16 :     ReleaseBuffer(metabuffer);
    1007             : 
    1008             :     /*
    1009             :      * As pending list pages can have a high churn rate, it is desirable to
    1010             :      * recycle them immediately to the FreeSpaceMap when ordinary backends
    1011             :      * clean the list.
    1012             :      */
    1013          16 :     if (fsm_vac && fill_fsm)
    1014           4 :         IndexFreeSpaceMapVacuum(index);
    1015             : 
    1016             :     /* Clean up temporary space */
    1017          16 :     MemoryContextSwitchTo(oldCtx);
    1018          16 :     MemoryContextDelete(opCtx);
    1019             : }
    1020             : 
    1021             : /*
    1022             :  * SQL-callable function to clean the insert pending list
    1023             :  */
    1024             : Datum
    1025           8 : gin_clean_pending_list(PG_FUNCTION_ARGS)
    1026             : {
    1027           8 :     Oid         indexoid = PG_GETARG_OID(0);
    1028           8 :     Relation    indexRel = index_open(indexoid, RowExclusiveLock);
    1029             :     IndexBulkDeleteResult stats;
    1030             :     GinState    ginstate;
    1031             : 
    1032           8 :     if (RecoveryInProgress())
    1033           0 :         ereport(ERROR,
    1034             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1035             :                  errmsg("recovery is in progress"),
    1036             :                  errhint("GIN pending list cannot be cleaned up during recovery.")));
    1037             : 
    1038             :     /* Must be a GIN index */
    1039           8 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
    1040           8 :         indexRel->rd_rel->relam != GIN_AM_OID)
    1041           0 :         ereport(ERROR,
    1042             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1043             :                  errmsg("\"%s\" is not a GIN index",
    1044             :                         RelationGetRelationName(indexRel))));
    1045             : 
    1046             :     /*
    1047             :      * Reject attempts to read non-local temporary relations; we would be
    1048             :      * likely to get wrong data since we have no visibility into the owning
    1049             :      * session's local buffers.
    1050             :      */
    1051           8 :     if (RELATION_IS_OTHER_TEMP(indexRel))
    1052           0 :         ereport(ERROR,
    1053             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1054             :                  errmsg("cannot access temporary indexes of other sessions")));
    1055             : 
    1056             :     /* User must own the index (comparable to privileges needed for VACUUM) */
    1057           8 :     if (!pg_class_ownercheck(indexoid, GetUserId()))
    1058           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
    1059           0 :                        RelationGetRelationName(indexRel));
    1060             : 
    1061           8 :     memset(&stats, 0, sizeof(stats));
    1062           8 :     initGinState(&ginstate, indexRel);
    1063           8 :     ginInsertCleanup(&ginstate, true, true, true, &stats);
    1064             : 
    1065           8 :     index_close(indexRel, RowExclusiveLock);
    1066             : 
    1067           8 :     PG_RETURN_INT64((int64) stats.pages_deleted);
    1068             : }

Generated by: LCOV version 1.13