LCOV - code coverage report
Current view: top level - src/backend/access/gin - ginfast.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 324 361 89.8 %
Date: 2024-04-26 05:11:26 Functions: 10 10 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * ginfast.c
       4             :  *    Fast insert routines for the Postgres inverted index access method.
       5             :  *    Pending entries are stored in linear list of pages.  Later on
       6             :  *    (typically during VACUUM), ginInsertCleanup() will be invoked to
       7             :  *    transfer pending entries into the regular index structure.  This
       8             :  *    wins because bulk insertion is much more efficient than retail.
       9             :  *
      10             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      11             :  * Portions Copyright (c) 1994, Regents of the University of California
      12             :  *
      13             :  * IDENTIFICATION
      14             :  *          src/backend/access/gin/ginfast.c
      15             :  *
      16             :  *-------------------------------------------------------------------------
      17             :  */
      18             : 
      19             : #include "postgres.h"
      20             : 
      21             : #include "access/gin_private.h"
      22             : #include "access/ginxlog.h"
      23             : #include "access/xlog.h"
      24             : #include "access/xloginsert.h"
      25             : #include "catalog/pg_am.h"
      26             : #include "commands/vacuum.h"
      27             : #include "miscadmin.h"
      28             : #include "port/pg_bitutils.h"
      29             : #include "postmaster/autovacuum.h"
      30             : #include "storage/indexfsm.h"
      31             : #include "storage/lmgr.h"
      32             : #include "storage/predicate.h"
      33             : #include "utils/acl.h"
      34             : #include "utils/fmgrprotos.h"
      35             : #include "utils/memutils.h"
      36             : #include "utils/rel.h"
      37             : 
      38             : /* GUC parameter */
      39             : int         gin_pending_list_limit = 0;
      40             : 
      41             : #define GIN_PAGE_FREESIZE \
      42             :     ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
      43             : 
      44             : typedef struct KeyArray
      45             : {
      46             :     Datum      *keys;           /* expansible array */
      47             :     GinNullCategory *categories;    /* another expansible array */
      48             :     int32       nvalues;        /* current number of valid entries */
      49             :     int32       maxvalues;      /* allocated size of arrays */
      50             : } KeyArray;
      51             : 
      52             : 
      53             : /*
      54             :  * Build a pending-list page from the given array of tuples, and write it out.
      55             :  *
      56             :  * Returns amount of free space left on the page.
      57             :  */
      58             : static int32
      59        2886 : writeListPage(Relation index, Buffer buffer,
      60             :               IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
      61             : {
      62        2886 :     Page        page = BufferGetPage(buffer);
      63             :     int32       i,
      64             :                 freesize,
      65        2886 :                 size = 0;
      66             :     OffsetNumber l,
      67             :                 off;
      68             :     PGAlignedBlock workspace;
      69             :     char       *ptr;
      70             : 
      71        2886 :     START_CRIT_SECTION();
      72             : 
      73        2886 :     GinInitBuffer(buffer, GIN_LIST);
      74             : 
      75        2886 :     off = FirstOffsetNumber;
      76        2886 :     ptr = workspace.data;
      77             : 
      78       16848 :     for (i = 0; i < ntuples; i++)
      79             :     {
      80       13962 :         int         this_size = IndexTupleSize(tuples[i]);
      81             : 
      82       13962 :         memcpy(ptr, tuples[i], this_size);
      83       13962 :         ptr += this_size;
      84       13962 :         size += this_size;
      85             : 
      86       13962 :         l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
      87             : 
      88       13962 :         if (l == InvalidOffsetNumber)
      89           0 :             elog(ERROR, "failed to add item to index page in \"%s\"",
      90             :                  RelationGetRelationName(index));
      91             : 
      92       13962 :         off++;
      93             :     }
      94             : 
      95             :     Assert(size <= BLCKSZ);      /* else we overran workspace */
      96             : 
      97        2886 :     GinPageGetOpaque(page)->rightlink = rightlink;
      98             : 
      99             :     /*
     100             :      * tail page may contain only whole row(s) or final part of row placed on
     101             :      * previous pages (a "row" here meaning all the index tuples generated for
     102             :      * one heap tuple)
     103             :      */
     104        2886 :     if (rightlink == InvalidBlockNumber)
     105             :     {
     106        2886 :         GinPageSetFullRow(page);
     107        2886 :         GinPageGetOpaque(page)->maxoff = 1;
     108             :     }
     109             :     else
     110             :     {
     111           0 :         GinPageGetOpaque(page)->maxoff = 0;
     112             :     }
     113             : 
     114        2886 :     MarkBufferDirty(buffer);
     115             : 
     116        2886 :     if (RelationNeedsWAL(index))
     117             :     {
     118             :         ginxlogInsertListPage data;
     119             :         XLogRecPtr  recptr;
     120             : 
     121        1102 :         data.rightlink = rightlink;
     122        1102 :         data.ntuples = ntuples;
     123             : 
     124        1102 :         XLogBeginInsert();
     125        1102 :         XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
     126             : 
     127        1102 :         XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
     128        1102 :         XLogRegisterBufData(0, workspace.data, size);
     129             : 
     130        1102 :         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
     131        1102 :         PageSetLSN(page, recptr);
     132             :     }
     133             : 
     134             :     /* get free space before releasing buffer */
     135        2886 :     freesize = PageGetExactFreeSpace(page);
     136             : 
     137        2886 :     UnlockReleaseBuffer(buffer);
     138             : 
     139        2886 :     END_CRIT_SECTION();
     140             : 
     141        2886 :     return freesize;
     142             : }
     143             : 
     144             : static void
     145        2886 : makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
     146             :             GinMetaPageData *res)
     147             : {
     148        2886 :     Buffer      curBuffer = InvalidBuffer;
     149        2886 :     Buffer      prevBuffer = InvalidBuffer;
     150             :     int         i,
     151        2886 :                 size = 0,
     152             :                 tupsize;
     153        2886 :     int         startTuple = 0;
     154             : 
     155             :     Assert(ntuples > 0);
     156             : 
     157             :     /*
     158             :      * Split tuples into pages
     159             :      */
     160       16848 :     for (i = 0; i < ntuples; i++)
     161             :     {
     162       13962 :         if (curBuffer == InvalidBuffer)
     163             :         {
     164        2886 :             curBuffer = GinNewBuffer(index);
     165             : 
     166        2886 :             if (prevBuffer != InvalidBuffer)
     167             :             {
     168           0 :                 res->nPendingPages++;
     169           0 :                 writeListPage(index, prevBuffer,
     170           0 :                               tuples + startTuple,
     171             :                               i - startTuple,
     172             :                               BufferGetBlockNumber(curBuffer));
     173             :             }
     174             :             else
     175             :             {
     176        2886 :                 res->head = BufferGetBlockNumber(curBuffer);
     177             :             }
     178             : 
     179        2886 :             prevBuffer = curBuffer;
     180        2886 :             startTuple = i;
     181        2886 :             size = 0;
     182             :         }
     183             : 
     184       13962 :         tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
     185             : 
     186       13962 :         if (size + tupsize > GinListPageSize)
     187             :         {
     188             :             /* won't fit, force a new page and reprocess */
     189           0 :             i--;
     190           0 :             curBuffer = InvalidBuffer;
     191             :         }
     192             :         else
     193             :         {
     194       13962 :             size += tupsize;
     195             :         }
     196             :     }
     197             : 
     198             :     /*
     199             :      * Write last page
     200             :      */
     201        2886 :     res->tail = BufferGetBlockNumber(curBuffer);
     202        5772 :     res->tailFreeSize = writeListPage(index, curBuffer,
     203        2886 :                                       tuples + startTuple,
     204             :                                       ntuples - startTuple,
     205             :                                       InvalidBlockNumber);
     206        2886 :     res->nPendingPages++;
     207             :     /* that was only one heap tuple */
     208        2886 :     res->nPendingHeapTuples = 1;
     209        2886 : }
     210             : 
     211             : /*
     212             :  * Write the index tuples contained in *collector into the index's
     213             :  * pending list.
     214             :  *
     215             :  * Function guarantees that all these tuples will be inserted consecutively,
     216             :  * preserving order
     217             :  */
     218             : void
     219      264950 : ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
     220             : {
     221      264950 :     Relation    index = ginstate->index;
     222             :     Buffer      metabuffer;
     223             :     Page        metapage;
     224      264950 :     GinMetaPageData *metadata = NULL;
     225      264950 :     Buffer      buffer = InvalidBuffer;
     226      264950 :     Page        page = NULL;
     227             :     ginxlogUpdateMeta data;
     228      264950 :     bool        separateList = false;
     229      264950 :     bool        needCleanup = false;
     230             :     int         cleanupSize;
     231             :     bool        needWal;
     232             : 
     233      264950 :     if (collector->ntuples == 0)
     234           0 :         return;
     235             : 
     236      264950 :     needWal = RelationNeedsWAL(index);
     237             : 
     238      264950 :     data.locator = index->rd_locator;
     239      264950 :     data.ntuples = 0;
     240      264950 :     data.newRightlink = data.prevTail = InvalidBlockNumber;
     241             : 
     242      264950 :     metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
     243      264950 :     metapage = BufferGetPage(metabuffer);
     244             : 
     245             :     /*
     246             :      * An insertion to the pending list could logically belong anywhere in the
     247             :      * tree, so it conflicts with all serializable scans.  All scans acquire a
     248             :      * predicate lock on the metabuffer to represent that.  Therefore we'll
     249             :      * check for conflicts in, but not until we have the page locked and are
     250             :      * ready to modify the page.
     251             :      */
     252             : 
     253      264950 :     if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
     254             :     {
     255             :         /*
     256             :          * Total size is greater than one page => make sublist
     257             :          */
     258           0 :         separateList = true;
     259             :     }
     260             :     else
     261             :     {
     262      264950 :         LockBuffer(metabuffer, GIN_EXCLUSIVE);
     263      264950 :         metadata = GinPageGetMeta(metapage);
     264             : 
     265      264950 :         if (metadata->head == InvalidBlockNumber ||
     266      264892 :             collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
     267             :         {
     268             :             /*
     269             :              * Pending list is empty or total size is greater than freespace
     270             :              * on tail page => make sublist
     271             :              *
     272             :              * We unlock metabuffer to keep high concurrency
     273             :              */
     274        2886 :             separateList = true;
     275        2886 :             LockBuffer(metabuffer, GIN_UNLOCK);
     276             :         }
     277             :     }
     278             : 
     279      264950 :     if (separateList)
     280             :     {
     281             :         /*
     282             :          * We should make sublist separately and append it to the tail
     283             :          */
     284             :         GinMetaPageData sublist;
     285             : 
     286        2886 :         memset(&sublist, 0, sizeof(GinMetaPageData));
     287        2886 :         makeSublist(index, collector->tuples, collector->ntuples, &sublist);
     288             : 
     289             :         /*
     290             :          * metapage was unlocked, see above
     291             :          */
     292        2886 :         LockBuffer(metabuffer, GIN_EXCLUSIVE);
     293        2886 :         metadata = GinPageGetMeta(metapage);
     294             : 
     295        2886 :         CheckForSerializableConflictIn(index, NULL, GIN_METAPAGE_BLKNO);
     296             : 
     297        2880 :         if (metadata->head == InvalidBlockNumber)
     298             :         {
     299             :             /*
     300             :              * Main list is empty, so just insert sublist as main list
     301             :              */
     302          52 :             START_CRIT_SECTION();
     303             : 
     304          52 :             metadata->head = sublist.head;
     305          52 :             metadata->tail = sublist.tail;
     306          52 :             metadata->tailFreeSize = sublist.tailFreeSize;
     307             : 
     308          52 :             metadata->nPendingPages = sublist.nPendingPages;
     309          52 :             metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
     310             : 
     311          52 :             if (needWal)
     312          32 :                 XLogBeginInsert();
     313             :         }
     314             :         else
     315             :         {
     316             :             /*
     317             :              * Merge lists
     318             :              */
     319        2828 :             data.prevTail = metadata->tail;
     320        2828 :             data.newRightlink = sublist.head;
     321             : 
     322        2828 :             buffer = ReadBuffer(index, metadata->tail);
     323        2828 :             LockBuffer(buffer, GIN_EXCLUSIVE);
     324        2828 :             page = BufferGetPage(buffer);
     325             : 
     326             :             Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
     327             : 
     328        2828 :             START_CRIT_SECTION();
     329             : 
     330        2828 :             GinPageGetOpaque(page)->rightlink = sublist.head;
     331             : 
     332        2828 :             MarkBufferDirty(buffer);
     333             : 
     334        2828 :             metadata->tail = sublist.tail;
     335        2828 :             metadata->tailFreeSize = sublist.tailFreeSize;
     336             : 
     337        2828 :             metadata->nPendingPages += sublist.nPendingPages;
     338        2828 :             metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
     339             : 
     340        2828 :             if (needWal)
     341             :             {
     342        1064 :                 XLogBeginInsert();
     343        1064 :                 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
     344             :             }
     345             :         }
     346             :     }
     347             :     else
     348             :     {
     349             :         /*
     350             :          * Insert into tail page.  Metapage is already locked
     351             :          */
     352             :         OffsetNumber l,
     353             :                     off;
     354             :         int         i,
     355             :                     tupsize;
     356             :         char       *ptr;
     357             :         char       *collectordata;
     358             : 
     359      262064 :         CheckForSerializableConflictIn(index, NULL, GIN_METAPAGE_BLKNO);
     360             : 
     361      262064 :         buffer = ReadBuffer(index, metadata->tail);
     362      262064 :         LockBuffer(buffer, GIN_EXCLUSIVE);
     363      262064 :         page = BufferGetPage(buffer);
     364             : 
     365      262064 :         off = (PageIsEmpty(page)) ? FirstOffsetNumber :
     366      262064 :             OffsetNumberNext(PageGetMaxOffsetNumber(page));
     367             : 
     368      262064 :         collectordata = ptr = (char *) palloc(collector->sumsize);
     369             : 
     370      262064 :         data.ntuples = collector->ntuples;
     371             : 
     372      262064 :         START_CRIT_SECTION();
     373             : 
     374      262064 :         if (needWal)
     375      143754 :             XLogBeginInsert();
     376             : 
     377             :         /*
     378             :          * Increase counter of heap tuples
     379             :          */
     380             :         Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
     381      262064 :         GinPageGetOpaque(page)->maxoff++;
     382      262064 :         metadata->nPendingHeapTuples++;
     383             : 
     384     1405700 :         for (i = 0; i < collector->ntuples; i++)
     385             :         {
     386     1143636 :             tupsize = IndexTupleSize(collector->tuples[i]);
     387     1143636 :             l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
     388             : 
     389     1143636 :             if (l == InvalidOffsetNumber)
     390           0 :                 elog(ERROR, "failed to add item to index page in \"%s\"",
     391             :                      RelationGetRelationName(index));
     392             : 
     393     1143636 :             memcpy(ptr, collector->tuples[i], tupsize);
     394     1143636 :             ptr += tupsize;
     395             : 
     396     1143636 :             off++;
     397             :         }
     398             : 
     399             :         Assert((ptr - collectordata) <= collector->sumsize);
     400             : 
     401      262064 :         MarkBufferDirty(buffer);
     402             : 
     403      262064 :         if (needWal)
     404             :         {
     405      143754 :             XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
     406      143754 :             XLogRegisterBufData(1, collectordata, collector->sumsize);
     407             :         }
     408             : 
     409      262064 :         metadata->tailFreeSize = PageGetExactFreeSpace(page);
     410             :     }
     411             : 
     412             :     /*
     413             :      * Set pd_lower just past the end of the metadata.  This is essential,
     414             :      * because without doing so, metadata will be lost if xlog.c compresses
     415             :      * the page.  (We must do this here because pre-v11 versions of PG did not
     416             :      * set the metapage's pd_lower correctly, so a pg_upgraded index might
     417             :      * contain the wrong value.)
     418             :      */
     419      264944 :     ((PageHeader) metapage)->pd_lower =
     420      264944 :         ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
     421             : 
     422             :     /*
     423             :      * Write metabuffer, make xlog entry
     424             :      */
     425      264944 :     MarkBufferDirty(metabuffer);
     426             : 
     427      264944 :     if (needWal)
     428             :     {
     429             :         XLogRecPtr  recptr;
     430             : 
     431      144850 :         memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
     432             : 
     433      144850 :         XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
     434      144850 :         XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
     435             : 
     436      144850 :         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
     437      144850 :         PageSetLSN(metapage, recptr);
     438             : 
     439      144850 :         if (buffer != InvalidBuffer)
     440             :         {
     441      144818 :             PageSetLSN(page, recptr);
     442             :         }
     443             :     }
     444             : 
     445      264944 :     if (buffer != InvalidBuffer)
     446      264892 :         UnlockReleaseBuffer(buffer);
     447             : 
     448             :     /*
     449             :      * Force pending list cleanup when it becomes too long. And,
     450             :      * ginInsertCleanup could take significant amount of time, so we prefer to
     451             :      * call it when it can do all the work in a single collection cycle. In
     452             :      * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
     453             :      * while pending list is still small enough to fit into
     454             :      * gin_pending_list_limit.
     455             :      *
     456             :      * ginInsertCleanup() should not be called inside our CRIT_SECTION.
     457             :      */
     458      264944 :     cleanupSize = GinGetPendingListCleanupSize(index);
     459      264944 :     if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
     460           0 :         needCleanup = true;
     461             : 
     462      264944 :     UnlockReleaseBuffer(metabuffer);
     463             : 
     464      264944 :     END_CRIT_SECTION();
     465             : 
     466             :     /*
     467             :      * Since it could contend with concurrent cleanup process we cleanup
     468             :      * pending list not forcibly.
     469             :      */
     470      264944 :     if (needCleanup)
     471           0 :         ginInsertCleanup(ginstate, false, true, false, NULL);
     472             : }
     473             : 
     474             : /*
     475             :  * Create temporary index tuples for a single indexable item (one index column
     476             :  * for the heap tuple specified by ht_ctid), and append them to the array
     477             :  * in *collector.  They will subsequently be written out using
     478             :  * ginHeapTupleFastInsert.  Note that to guarantee consistent state, all
     479             :  * temp tuples for a given heap tuple must be written in one call to
     480             :  * ginHeapTupleFastInsert.
     481             :  */
     482             : void
     483      385028 : ginHeapTupleFastCollect(GinState *ginstate,
     484             :                         GinTupleCollector *collector,
     485             :                         OffsetNumber attnum, Datum value, bool isNull,
     486             :                         ItemPointer ht_ctid)
     487             : {
     488             :     Datum      *entries;
     489             :     GinNullCategory *categories;
     490             :     int32       i,
     491             :                 nentries;
     492             : 
     493             :     /*
     494             :      * Extract the key values that need to be inserted in the index
     495             :      */
     496      385028 :     entries = ginExtractEntries(ginstate, attnum, value, isNull,
     497             :                                 &nentries, &categories);
     498             : 
     499             :     /*
     500             :      * Protect against integer overflow in allocation calculations
     501             :      */
     502      385028 :     if (nentries < 0 ||
     503      385028 :         collector->ntuples + nentries > MaxAllocSize / sizeof(IndexTuple))
     504           0 :         elog(ERROR, "too many entries for GIN index");
     505             : 
     506             :     /*
     507             :      * Allocate/reallocate memory for storing collected tuples
     508             :      */
     509      385028 :     if (collector->tuples == NULL)
     510             :     {
     511             :         /*
     512             :          * Determine the number of elements to allocate in the tuples array
     513             :          * initially.  Make it a power of 2 to avoid wasting memory when
     514             :          * resizing (since palloc likes powers of 2).
     515             :          */
     516      264950 :         collector->lentuples = pg_nextpower2_32(Max(16, nentries));
     517      264950 :         collector->tuples = palloc_array(IndexTuple, collector->lentuples);
     518             :     }
     519      120078 :     else if (collector->lentuples < collector->ntuples + nentries)
     520             :     {
     521             :         /*
     522             :          * Advance lentuples to the next suitable power of 2.  This won't
     523             :          * overflow, though we could get to a value that exceeds
     524             :          * MaxAllocSize/sizeof(IndexTuple), causing an error in repalloc.
     525             :          */
     526           0 :         collector->lentuples = pg_nextpower2_32(collector->ntuples + nentries);
     527           0 :         collector->tuples = repalloc_array(collector->tuples,
     528             :                                            IndexTuple, collector->lentuples);
     529             :     }
     530             : 
     531             :     /*
     532             :      * Build an index tuple for each key value, and add to array.  In pending
     533             :      * tuples we just stick the heap TID into t_tid.
     534             :      */
     535     1542626 :     for (i = 0; i < nentries; i++)
     536             :     {
     537             :         IndexTuple  itup;
     538             : 
     539     1157598 :         itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
     540             :                             NULL, 0, 0, true);
     541     1157598 :         itup->t_tid = *ht_ctid;
     542     1157598 :         collector->tuples[collector->ntuples++] = itup;
     543     1157598 :         collector->sumsize += IndexTupleSize(itup);
     544             :     }
     545      385028 : }
     546             : 
     547             : /*
     548             :  * Deletes pending list pages up to (not including) newHead page.
     549             :  * If newHead == InvalidBlockNumber then function drops the whole list.
     550             :  *
     551             :  * metapage is pinned and exclusive-locked throughout this function.
     552             :  */
     553             : static void
     554          32 : shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
     555             :           bool fill_fsm, IndexBulkDeleteResult *stats)
     556             : {
     557             :     Page        metapage;
     558             :     GinMetaPageData *metadata;
     559             :     BlockNumber blknoToDelete;
     560             : 
     561          32 :     metapage = BufferGetPage(metabuffer);
     562          32 :     metadata = GinPageGetMeta(metapage);
     563          32 :     blknoToDelete = metadata->head;
     564             : 
     565             :     do
     566             :     {
     567             :         Page        page;
     568             :         int         i;
     569         194 :         int64       nDeletedHeapTuples = 0;
     570             :         ginxlogDeleteListPages data;
     571             :         Buffer      buffers[GIN_NDELETE_AT_ONCE];
     572             :         BlockNumber freespace[GIN_NDELETE_AT_ONCE];
     573             : 
     574         194 :         data.ndeleted = 0;
     575        3054 :         while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
     576             :         {
     577        2860 :             freespace[data.ndeleted] = blknoToDelete;
     578        2860 :             buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
     579        2860 :             LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
     580        2860 :             page = BufferGetPage(buffers[data.ndeleted]);
     581             : 
     582        2860 :             data.ndeleted++;
     583             : 
     584             :             Assert(!GinPageIsDeleted(page));
     585             : 
     586        2860 :             nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
     587        2860 :             blknoToDelete = GinPageGetOpaque(page)->rightlink;
     588             :         }
     589             : 
     590         194 :         if (stats)
     591         194 :             stats->pages_deleted += data.ndeleted;
     592             : 
     593             :         /*
     594             :          * This operation touches an unusually large number of pages, so
     595             :          * prepare the XLogInsert machinery for that before entering the
     596             :          * critical section.
     597             :          */
     598         194 :         if (RelationNeedsWAL(index))
     599          80 :             XLogEnsureRecordSpace(data.ndeleted, 0);
     600             : 
     601         194 :         START_CRIT_SECTION();
     602             : 
     603         194 :         metadata->head = blknoToDelete;
     604             : 
     605             :         Assert(metadata->nPendingPages >= data.ndeleted);
     606         194 :         metadata->nPendingPages -= data.ndeleted;
     607             :         Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
     608         194 :         metadata->nPendingHeapTuples -= nDeletedHeapTuples;
     609             : 
     610         194 :         if (blknoToDelete == InvalidBlockNumber)
     611             :         {
     612          32 :             metadata->tail = InvalidBlockNumber;
     613          32 :             metadata->tailFreeSize = 0;
     614          32 :             metadata->nPendingPages = 0;
     615          32 :             metadata->nPendingHeapTuples = 0;
     616             :         }
     617             : 
     618             :         /*
     619             :          * Set pd_lower just past the end of the metadata.  This is essential,
     620             :          * because without doing so, metadata will be lost if xlog.c
     621             :          * compresses the page.  (We must do this here because pre-v11
     622             :          * versions of PG did not set the metapage's pd_lower correctly, so a
     623             :          * pg_upgraded index might contain the wrong value.)
     624             :          */
     625         194 :         ((PageHeader) metapage)->pd_lower =
     626         194 :             ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
     627             : 
     628         194 :         MarkBufferDirty(metabuffer);
     629             : 
     630        3054 :         for (i = 0; i < data.ndeleted; i++)
     631             :         {
     632        2860 :             page = BufferGetPage(buffers[i]);
     633        2860 :             GinPageGetOpaque(page)->flags = GIN_DELETED;
     634        2860 :             MarkBufferDirty(buffers[i]);
     635             :         }
     636             : 
     637         194 :         if (RelationNeedsWAL(index))
     638             :         {
     639             :             XLogRecPtr  recptr;
     640             : 
     641          80 :             XLogBeginInsert();
     642          80 :             XLogRegisterBuffer(0, metabuffer,
     643             :                                REGBUF_WILL_INIT | REGBUF_STANDARD);
     644        1170 :             for (i = 0; i < data.ndeleted; i++)
     645        1090 :                 XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
     646             : 
     647          80 :             memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
     648             : 
     649          80 :             XLogRegisterData((char *) &data,
     650             :                              sizeof(ginxlogDeleteListPages));
     651             : 
     652          80 :             recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
     653          80 :             PageSetLSN(metapage, recptr);
     654             : 
     655        1170 :             for (i = 0; i < data.ndeleted; i++)
     656             :             {
     657        1090 :                 page = BufferGetPage(buffers[i]);
     658        1090 :                 PageSetLSN(page, recptr);
     659             :             }
     660             :         }
     661             : 
     662        3054 :         for (i = 0; i < data.ndeleted; i++)
     663        2860 :             UnlockReleaseBuffer(buffers[i]);
     664             : 
     665         194 :         END_CRIT_SECTION();
     666             : 
     667        2894 :         for (i = 0; fill_fsm && i < data.ndeleted; i++)
     668        2700 :             RecordFreeIndexPage(index, freespace[i]);
     669             : 
     670         194 :     } while (blknoToDelete != newHead);
     671          32 : }
     672             : 
     673             : /* Initialize empty KeyArray */
     674             : static void
     675          32 : initKeyArray(KeyArray *keys, int32 maxvalues)
     676             : {
     677          32 :     keys->keys = palloc_array(Datum, maxvalues);
     678          32 :     keys->categories = palloc_array(GinNullCategory, maxvalues);
     679          32 :     keys->nvalues = 0;
     680          32 :     keys->maxvalues = maxvalues;
     681          32 : }
     682             : 
     683             : /* Add datum to KeyArray, resizing if needed */
     684             : static void
     685     1157448 : addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
     686             : {
     687     1157448 :     if (keys->nvalues >= keys->maxvalues)
     688             :     {
     689           0 :         keys->maxvalues *= 2;
     690           0 :         keys->keys = repalloc_array(keys->keys, Datum, keys->maxvalues);
     691           0 :         keys->categories = repalloc_array(keys->categories, GinNullCategory, keys->maxvalues);
     692             :     }
     693             : 
     694     1157448 :     keys->keys[keys->nvalues] = datum;
     695     1157448 :     keys->categories[keys->nvalues] = category;
     696     1157448 :     keys->nvalues++;
     697     1157448 : }
     698             : 
     699             : /*
     700             :  * Collect data from a pending-list page in preparation for insertion into
     701             :  * the main index.
     702             :  *
     703             :  * Go through all tuples >= startoff on page and collect values in accum
     704             :  *
     705             :  * Note that ka is just workspace --- it does not carry any state across
     706             :  * calls.
     707             :  */
     708             : static void
     709        2860 : processPendingPage(BuildAccumulator *accum, KeyArray *ka,
     710             :                    Page page, OffsetNumber startoff)
     711             : {
     712             :     ItemPointerData heapptr;
     713             :     OffsetNumber i,
     714             :                 maxoff;
     715             :     OffsetNumber attrnum;
     716             : 
     717             :     /* reset *ka to empty */
     718        2860 :     ka->nvalues = 0;
     719             : 
     720        2860 :     maxoff = PageGetMaxOffsetNumber(page);
     721             :     Assert(maxoff >= FirstOffsetNumber);
     722        2860 :     ItemPointerSetInvalid(&heapptr);
     723        2860 :     attrnum = 0;
     724             : 
     725     1160308 :     for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
     726             :     {
     727     1157448 :         IndexTuple  itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
     728             :         OffsetNumber curattnum;
     729             :         Datum       curkey;
     730             :         GinNullCategory curcategory;
     731             : 
     732             :         /* Check for change of heap TID or attnum */
     733     1157448 :         curattnum = gintuple_get_attrnum(accum->ginstate, itup);
     734             : 
     735     1157448 :         if (!ItemPointerIsValid(&heapptr))
     736             :         {
     737        2860 :             heapptr = itup->t_tid;
     738        2860 :             attrnum = curattnum;
     739             :         }
     740     1154588 :         else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
     741             :                    curattnum == attrnum))
     742             :         {
     743             :             /*
     744             :              * ginInsertBAEntries can insert several datums per call, but only
     745             :              * for one heap tuple and one column.  So call it at a boundary,
     746             :              * and reset ka.
     747             :              */
     748      382086 :             ginInsertBAEntries(accum, &heapptr, attrnum,
     749             :                                ka->keys, ka->categories, ka->nvalues);
     750      382086 :             ka->nvalues = 0;
     751      382086 :             heapptr = itup->t_tid;
     752      382086 :             attrnum = curattnum;
     753             :         }
     754             : 
     755             :         /* Add key to KeyArray */
     756     1157448 :         curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
     757     1157448 :         addDatum(ka, curkey, curcategory);
     758             :     }
     759             : 
     760             :     /* Dump out all remaining keys */
     761        2860 :     ginInsertBAEntries(accum, &heapptr, attrnum,
     762             :                        ka->keys, ka->categories, ka->nvalues);
     763        2860 : }
     764             : 
     765             : /*
     766             :  * Move tuples from pending pages into regular GIN structure.
     767             :  *
     768             :  * On first glance it looks completely not crash-safe. But if we crash
     769             :  * after posting entries to the main index and before removing them from the
     770             :  * pending list, it's okay because when we redo the posting later on, nothing
     771             :  * bad will happen.
     772             :  *
     773             :  * fill_fsm indicates that ginInsertCleanup should add deleted pages
     774             :  * to FSM otherwise caller is responsible to put deleted pages into
     775             :  * FSM.
     776             :  *
     777             :  * If stats isn't null, we count deleted pending pages into the counts.
     778             :  */
     779             : void
     780          74 : ginInsertCleanup(GinState *ginstate, bool full_clean,
     781             :                  bool fill_fsm, bool forceCleanup,
     782             :                  IndexBulkDeleteResult *stats)
     783             : {
     784          74 :     Relation    index = ginstate->index;
     785             :     Buffer      metabuffer,
     786             :                 buffer;
     787             :     Page        metapage,
     788             :                 page;
     789             :     GinMetaPageData *metadata;
     790             :     MemoryContext opCtx,
     791             :                 oldCtx;
     792             :     BuildAccumulator accum;
     793             :     KeyArray    datums;
     794             :     BlockNumber blkno,
     795             :                 blknoFinish;
     796          74 :     bool        cleanupFinish = false;
     797          74 :     bool        fsm_vac = false;
     798             :     Size        workMemory;
     799             : 
     800             :     /*
     801             :      * We would like to prevent concurrent cleanup process. For that we will
     802             :      * lock metapage in exclusive mode using LockPage() call. Nobody other
     803             :      * will use that lock for metapage, so we keep possibility of concurrent
     804             :      * insertion into pending list
     805             :      */
     806             : 
     807          74 :     if (forceCleanup)
     808             :     {
     809             :         /*
     810             :          * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
     811             :          * and we would like to wait concurrent cleanup to finish.
     812             :          */
     813          74 :         LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
     814          74 :         workMemory =
     815           0 :             (AmAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
     816          74 :             autovacuum_work_mem : maintenance_work_mem;
     817             :     }
     818             :     else
     819             :     {
     820             :         /*
     821             :          * We are called from regular insert and if we see concurrent cleanup
     822             :          * just exit in hope that concurrent process will clean up pending
     823             :          * list.
     824             :          */
     825           0 :         if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
     826          42 :             return;
     827           0 :         workMemory = work_mem;
     828             :     }
     829             : 
     830          74 :     metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
     831          74 :     LockBuffer(metabuffer, GIN_SHARE);
     832          74 :     metapage = BufferGetPage(metabuffer);
     833          74 :     metadata = GinPageGetMeta(metapage);
     834             : 
     835          74 :     if (metadata->head == InvalidBlockNumber)
     836             :     {
     837             :         /* Nothing to do */
     838          42 :         UnlockReleaseBuffer(metabuffer);
     839          42 :         UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
     840          42 :         return;
     841             :     }
     842             : 
     843             :     /*
     844             :      * Remember a tail page to prevent infinite cleanup if other backends add
     845             :      * new tuples faster than we can cleanup.
     846             :      */
     847          32 :     blknoFinish = metadata->tail;
     848             : 
     849             :     /*
     850             :      * Read and lock head of pending list
     851             :      */
     852          32 :     blkno = metadata->head;
     853          32 :     buffer = ReadBuffer(index, blkno);
     854          32 :     LockBuffer(buffer, GIN_SHARE);
     855          32 :     page = BufferGetPage(buffer);
     856             : 
     857          32 :     LockBuffer(metabuffer, GIN_UNLOCK);
     858             : 
     859             :     /*
     860             :      * Initialize.  All temporary space will be in opCtx
     861             :      */
     862          32 :     opCtx = AllocSetContextCreate(CurrentMemoryContext,
     863             :                                   "GIN insert cleanup temporary context",
     864             :                                   ALLOCSET_DEFAULT_SIZES);
     865             : 
     866          32 :     oldCtx = MemoryContextSwitchTo(opCtx);
     867             : 
     868          32 :     initKeyArray(&datums, 128);
     869          32 :     ginInitBA(&accum);
     870          32 :     accum.ginstate = ginstate;
     871             : 
     872             :     /*
     873             :      * At the top of this loop, we have pin and lock on the current page of
     874             :      * the pending list.  However, we'll release that before exiting the loop.
     875             :      * Note we also have pin but not lock on the metapage.
     876             :      */
     877             :     for (;;)
     878             :     {
     879        2828 :         Assert(!GinPageIsDeleted(page));
     880             : 
     881             :         /*
     882             :          * Are we walk through the page which as we remember was a tail when
     883             :          * we start our cleanup?  But if caller asks us to clean up whole
     884             :          * pending list then ignore old tail, we will work until list becomes
     885             :          * empty.
     886             :          */
     887        2860 :         if (blkno == blknoFinish && full_clean == false)
     888           0 :             cleanupFinish = true;
     889             : 
     890             :         /*
     891             :          * read page's datums into accum
     892             :          */
     893        2860 :         processPendingPage(&accum, &datums, page, FirstOffsetNumber);
     894             : 
     895        2860 :         vacuum_delay_point();
     896             : 
     897             :         /*
     898             :          * Is it time to flush memory to disk?  Flush if we are at the end of
     899             :          * the pending list, or if we have a full row and memory is getting
     900             :          * full.
     901             :          */
     902        2860 :         if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
     903        2828 :             (GinPageHasFullRow(page) &&
     904        2828 :              (accum.allocatedMemory >= workMemory * 1024L)))
     905           0 :         {
     906             :             ItemPointerData *list;
     907             :             uint32      nlist;
     908             :             Datum       key;
     909             :             GinNullCategory category;
     910             :             OffsetNumber maxoff,
     911             :                         attnum;
     912             : 
     913             :             /*
     914             :              * Unlock current page to increase performance. Changes of page
     915             :              * will be checked later by comparing maxoff after completion of
     916             :              * memory flush.
     917             :              */
     918          32 :             maxoff = PageGetMaxOffsetNumber(page);
     919          32 :             LockBuffer(buffer, GIN_UNLOCK);
     920             : 
     921             :             /*
     922             :              * Moving collected data into regular structure can take
     923             :              * significant amount of time - so, run it without locking pending
     924             :              * list.
     925             :              */
     926          32 :             ginBeginBAScan(&accum);
     927      366116 :             while ((list = ginGetBAEntry(&accum,
     928             :                                          &attnum, &key, &category, &nlist)) != NULL)
     929             :             {
     930      366084 :                 ginEntryInsert(ginstate, attnum, key, category,
     931             :                                list, nlist, NULL);
     932      366084 :                 vacuum_delay_point();
     933             :             }
     934             : 
     935             :             /*
     936             :              * Lock the whole list to remove pages
     937             :              */
     938          32 :             LockBuffer(metabuffer, GIN_EXCLUSIVE);
     939          32 :             LockBuffer(buffer, GIN_SHARE);
     940             : 
     941             :             Assert(!GinPageIsDeleted(page));
     942             : 
     943             :             /*
     944             :              * While we left the page unlocked, more stuff might have gotten
     945             :              * added to it.  If so, process those entries immediately.  There
     946             :              * shouldn't be very many, so we don't worry about the fact that
     947             :              * we're doing this with exclusive lock. Insertion algorithm
     948             :              * guarantees that inserted row(s) will not continue on next page.
     949             :              * NOTE: intentionally no vacuum_delay_point in this loop.
     950             :              */
     951          32 :             if (PageGetMaxOffsetNumber(page) != maxoff)
     952             :             {
     953           0 :                 ginInitBA(&accum);
     954           0 :                 processPendingPage(&accum, &datums, page, maxoff + 1);
     955             : 
     956           0 :                 ginBeginBAScan(&accum);
     957           0 :                 while ((list = ginGetBAEntry(&accum,
     958             :                                              &attnum, &key, &category, &nlist)) != NULL)
     959           0 :                     ginEntryInsert(ginstate, attnum, key, category,
     960             :                                    list, nlist, NULL);
     961             :             }
     962             : 
     963             :             /*
     964             :              * Remember next page - it will become the new list head
     965             :              */
     966          32 :             blkno = GinPageGetOpaque(page)->rightlink;
     967          32 :             UnlockReleaseBuffer(buffer);    /* shiftList will do exclusive
     968             :                                              * locking */
     969             : 
     970             :             /*
     971             :              * remove read pages from pending list, at this point all content
     972             :              * of read pages is in regular structure
     973             :              */
     974          32 :             shiftList(index, metabuffer, blkno, fill_fsm, stats);
     975             : 
     976             :             /* At this point, some pending pages have been freed up */
     977          32 :             fsm_vac = true;
     978             : 
     979             :             Assert(blkno == metadata->head);
     980          32 :             LockBuffer(metabuffer, GIN_UNLOCK);
     981             : 
     982             :             /*
     983             :              * if we removed the whole pending list or we cleanup tail (which
     984             :              * we remembered on start our cleanup process) then just exit
     985             :              */
     986          32 :             if (blkno == InvalidBlockNumber || cleanupFinish)
     987             :                 break;
     988             : 
     989             :             /*
     990             :              * release memory used so far and reinit state
     991             :              */
     992           0 :             MemoryContextReset(opCtx);
     993           0 :             initKeyArray(&datums, datums.maxvalues);
     994           0 :             ginInitBA(&accum);
     995             :         }
     996             :         else
     997             :         {
     998        2828 :             blkno = GinPageGetOpaque(page)->rightlink;
     999        2828 :             UnlockReleaseBuffer(buffer);
    1000             :         }
    1001             : 
    1002             :         /*
    1003             :          * Read next page in pending list
    1004             :          */
    1005        2828 :         vacuum_delay_point();
    1006        2828 :         buffer = ReadBuffer(index, blkno);
    1007        2828 :         LockBuffer(buffer, GIN_SHARE);
    1008        2828 :         page = BufferGetPage(buffer);
    1009             :     }
    1010             : 
    1011          32 :     UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
    1012          32 :     ReleaseBuffer(metabuffer);
    1013             : 
    1014             :     /*
    1015             :      * As pending list pages can have a high churn rate, it is desirable to
    1016             :      * recycle them immediately to the FreeSpaceMap when ordinary backends
    1017             :      * clean the list.
    1018             :      */
    1019          32 :     if (fsm_vac && fill_fsm)
    1020          12 :         IndexFreeSpaceMapVacuum(index);
    1021             : 
    1022             :     /* Clean up temporary space */
    1023          32 :     MemoryContextSwitchTo(oldCtx);
    1024          32 :     MemoryContextDelete(opCtx);
    1025             : }
    1026             : 
    1027             : /*
    1028             :  * SQL-callable function to clean the insert pending list
    1029             :  */
    1030             : Datum
    1031          18 : gin_clean_pending_list(PG_FUNCTION_ARGS)
    1032             : {
    1033          18 :     Oid         indexoid = PG_GETARG_OID(0);
    1034          18 :     Relation    indexRel = index_open(indexoid, RowExclusiveLock);
    1035             :     IndexBulkDeleteResult stats;
    1036             : 
    1037          18 :     if (RecoveryInProgress())
    1038           0 :         ereport(ERROR,
    1039             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1040             :                  errmsg("recovery is in progress"),
    1041             :                  errhint("GIN pending list cannot be cleaned up during recovery.")));
    1042             : 
    1043             :     /* Must be a GIN index */
    1044          18 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
    1045          18 :         indexRel->rd_rel->relam != GIN_AM_OID)
    1046           0 :         ereport(ERROR,
    1047             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1048             :                  errmsg("\"%s\" is not a GIN index",
    1049             :                         RelationGetRelationName(indexRel))));
    1050             : 
    1051             :     /*
    1052             :      * Reject attempts to read non-local temporary relations; we would be
    1053             :      * likely to get wrong data since we have no visibility into the owning
    1054             :      * session's local buffers.
    1055             :      */
    1056          18 :     if (RELATION_IS_OTHER_TEMP(indexRel))
    1057           0 :         ereport(ERROR,
    1058             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1059             :                  errmsg("cannot access temporary indexes of other sessions")));
    1060             : 
    1061             :     /* User must own the index (comparable to privileges needed for VACUUM) */
    1062          18 :     if (!object_ownercheck(RelationRelationId, indexoid, GetUserId()))
    1063           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
    1064           0 :                        RelationGetRelationName(indexRel));
    1065             : 
    1066          18 :     memset(&stats, 0, sizeof(stats));
    1067             : 
    1068             :     /*
    1069             :      * Can't assume anything about the content of an !indisready index.  Make
    1070             :      * those a no-op, not an error, so users can just run this function on all
    1071             :      * indexes of the access method.  Since an indisready&&!indisvalid index
    1072             :      * is merely awaiting missed aminsert calls, we're capable of processing
    1073             :      * it.  Decline to do so, out of an abundance of caution.
    1074             :      */
    1075          18 :     if (indexRel->rd_index->indisvalid)
    1076             :     {
    1077             :         GinState    ginstate;
    1078             : 
    1079          18 :         initGinState(&ginstate, indexRel);
    1080          18 :         ginInsertCleanup(&ginstate, true, true, true, &stats);
    1081             :     }
    1082             :     else
    1083           0 :         ereport(DEBUG1,
    1084             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1085             :                  errmsg("index \"%s\" is not valid",
    1086             :                         RelationGetRelationName(indexRel))));
    1087             : 
    1088          18 :     index_close(indexRel, RowExclusiveLock);
    1089             : 
    1090          18 :     PG_RETURN_INT64((int64) stats.pages_deleted);
    1091             : }

Generated by: LCOV version 1.14