LCOV - code coverage report
Current view: top level - src/backend/access/gin - ginfast.c (source / functions) Hit Total Coverage
Test: PostgreSQL 16beta1 Lines: 323 358 90.2 %
Date: 2023-05-30 23:12:14 Functions: 10 10 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * ginfast.c
       4             :  *    Fast insert routines for the Postgres inverted index access method.
       5             :  *    Pending entries are stored in linear list of pages.  Later on
       6             :  *    (typically during VACUUM), ginInsertCleanup() will be invoked to
       7             :  *    transfer pending entries into the regular index structure.  This
       8             :  *    wins because bulk insertion is much more efficient than retail.
       9             :  *
      10             :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
      11             :  * Portions Copyright (c) 1994, Regents of the University of California
      12             :  *
      13             :  * IDENTIFICATION
      14             :  *          src/backend/access/gin/ginfast.c
      15             :  *
      16             :  *-------------------------------------------------------------------------
      17             :  */
      18             : 
      19             : #include "postgres.h"
      20             : 
      21             : #include "access/gin_private.h"
      22             : #include "access/ginxlog.h"
      23             : #include "access/xlog.h"
      24             : #include "access/xloginsert.h"
      25             : #include "catalog/pg_am.h"
      26             : #include "commands/vacuum.h"
      27             : #include "miscadmin.h"
      28             : #include "port/pg_bitutils.h"
      29             : #include "postmaster/autovacuum.h"
      30             : #include "storage/indexfsm.h"
      31             : #include "storage/lmgr.h"
      32             : #include "storage/predicate.h"
      33             : #include "utils/acl.h"
      34             : #include "utils/builtins.h"
      35             : #include "utils/memutils.h"
      36             : #include "utils/rel.h"
      37             : 
      38             : /* GUC parameter */
      39             : int         gin_pending_list_limit = 0;
      40             : 
      41             : #define GIN_PAGE_FREESIZE \
      42             :     ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
      43             : 
      44             : typedef struct KeyArray
      45             : {
      46             :     Datum      *keys;           /* expansible array */
      47             :     GinNullCategory *categories;    /* another expansible array */
      48             :     int32       nvalues;        /* current number of valid entries */
      49             :     int32       maxvalues;      /* allocated size of arrays */
      50             : } KeyArray;
      51             : 
      52             : 
      53             : /*
      54             :  * Build a pending-list page from the given array of tuples, and write it out.
      55             :  *
      56             :  * Returns amount of free space left on the page.
      57             :  */
      58             : static int32
      59        2864 : writeListPage(Relation index, Buffer buffer,
      60             :               IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
      61             : {
      62        2864 :     Page        page = BufferGetPage(buffer);
      63             :     int32       i,
      64             :                 freesize,
      65        2864 :                 size = 0;
      66             :     OffsetNumber l,
      67             :                 off;
      68             :     PGAlignedBlock workspace;
      69             :     char       *ptr;
      70             : 
      71        2864 :     START_CRIT_SECTION();
      72             : 
      73        2864 :     GinInitBuffer(buffer, GIN_LIST);
      74             : 
      75        2864 :     off = FirstOffsetNumber;
      76        2864 :     ptr = workspace.data;
      77             : 
      78       16724 :     for (i = 0; i < ntuples; i++)
      79             :     {
      80       13860 :         int         this_size = IndexTupleSize(tuples[i]);
      81             : 
      82       13860 :         memcpy(ptr, tuples[i], this_size);
      83       13860 :         ptr += this_size;
      84       13860 :         size += this_size;
      85             : 
      86       13860 :         l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
      87             : 
      88       13860 :         if (l == InvalidOffsetNumber)
      89           0 :             elog(ERROR, "failed to add item to index page in \"%s\"",
      90             :                  RelationGetRelationName(index));
      91             : 
      92       13860 :         off++;
      93             :     }
      94             : 
      95             :     Assert(size <= BLCKSZ);      /* else we overran workspace */
      96             : 
      97        2864 :     GinPageGetOpaque(page)->rightlink = rightlink;
      98             : 
      99             :     /*
     100             :      * tail page may contain only whole row(s) or final part of row placed on
     101             :      * previous pages (a "row" here meaning all the index tuples generated for
     102             :      * one heap tuple)
     103             :      */
     104        2864 :     if (rightlink == InvalidBlockNumber)
     105             :     {
     106        2864 :         GinPageSetFullRow(page);
     107        2864 :         GinPageGetOpaque(page)->maxoff = 1;
     108             :     }
     109             :     else
     110             :     {
     111           0 :         GinPageGetOpaque(page)->maxoff = 0;
     112             :     }
     113             : 
     114        2864 :     MarkBufferDirty(buffer);
     115             : 
     116        2864 :     if (RelationNeedsWAL(index))
     117             :     {
     118             :         ginxlogInsertListPage data;
     119             :         XLogRecPtr  recptr;
     120             : 
     121        1080 :         data.rightlink = rightlink;
     122        1080 :         data.ntuples = ntuples;
     123             : 
     124        1080 :         XLogBeginInsert();
     125        1080 :         XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
     126             : 
     127        1080 :         XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
     128        1080 :         XLogRegisterBufData(0, workspace.data, size);
     129             : 
     130        1080 :         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
     131        1080 :         PageSetLSN(page, recptr);
     132             :     }
     133             : 
     134             :     /* get free space before releasing buffer */
     135        2864 :     freesize = PageGetExactFreeSpace(page);
     136             : 
     137        2864 :     UnlockReleaseBuffer(buffer);
     138             : 
     139        2864 :     END_CRIT_SECTION();
     140             : 
     141        2864 :     return freesize;
     142             : }
     143             : 
     144             : static void
     145        2864 : makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
     146             :             GinMetaPageData *res)
     147             : {
     148        2864 :     Buffer      curBuffer = InvalidBuffer;
     149        2864 :     Buffer      prevBuffer = InvalidBuffer;
     150             :     int         i,
     151        2864 :                 size = 0,
     152             :                 tupsize;
     153        2864 :     int         startTuple = 0;
     154             : 
     155             :     Assert(ntuples > 0);
     156             : 
     157             :     /*
     158             :      * Split tuples into pages
     159             :      */
     160       16724 :     for (i = 0; i < ntuples; i++)
     161             :     {
     162       13860 :         if (curBuffer == InvalidBuffer)
     163             :         {
     164        2864 :             curBuffer = GinNewBuffer(index);
     165             : 
     166        2864 :             if (prevBuffer != InvalidBuffer)
     167             :             {
     168           0 :                 res->nPendingPages++;
     169           0 :                 writeListPage(index, prevBuffer,
     170           0 :                               tuples + startTuple,
     171             :                               i - startTuple,
     172             :                               BufferGetBlockNumber(curBuffer));
     173             :             }
     174             :             else
     175             :             {
     176        2864 :                 res->head = BufferGetBlockNumber(curBuffer);
     177             :             }
     178             : 
     179        2864 :             prevBuffer = curBuffer;
     180        2864 :             startTuple = i;
     181        2864 :             size = 0;
     182             :         }
     183             : 
     184       13860 :         tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
     185             : 
     186       13860 :         if (size + tupsize > GinListPageSize)
     187             :         {
     188             :             /* won't fit, force a new page and reprocess */
     189           0 :             i--;
     190           0 :             curBuffer = InvalidBuffer;
     191             :         }
     192             :         else
     193             :         {
     194       13860 :             size += tupsize;
     195             :         }
     196             :     }
     197             : 
     198             :     /*
     199             :      * Write last page
     200             :      */
     201        2864 :     res->tail = BufferGetBlockNumber(curBuffer);
     202        5728 :     res->tailFreeSize = writeListPage(index, curBuffer,
     203        2864 :                                       tuples + startTuple,
     204             :                                       ntuples - startTuple,
     205             :                                       InvalidBlockNumber);
     206        2864 :     res->nPendingPages++;
     207             :     /* that was only one heap tuple */
     208        2864 :     res->nPendingHeapTuples = 1;
     209        2864 : }
     210             : 
     211             : /*
     212             :  * Write the index tuples contained in *collector into the index's
     213             :  * pending list.
     214             :  *
     215             :  * Function guarantees that all these tuples will be inserted consecutively,
     216             :  * preserving order
     217             :  */
     218             : void
     219      263950 : ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
     220             : {
     221      263950 :     Relation    index = ginstate->index;
     222             :     Buffer      metabuffer;
     223             :     Page        metapage;
     224      263950 :     GinMetaPageData *metadata = NULL;
     225      263950 :     Buffer      buffer = InvalidBuffer;
     226      263950 :     Page        page = NULL;
     227             :     ginxlogUpdateMeta data;
     228      263950 :     bool        separateList = false;
     229      263950 :     bool        needCleanup = false;
     230             :     int         cleanupSize;
     231             :     bool        needWal;
     232             : 
     233      263950 :     if (collector->ntuples == 0)
     234           0 :         return;
     235             : 
     236      263950 :     needWal = RelationNeedsWAL(index);
     237             : 
     238      263950 :     data.locator = index->rd_locator;
     239      263950 :     data.ntuples = 0;
     240      263950 :     data.newRightlink = data.prevTail = InvalidBlockNumber;
     241             : 
     242      263950 :     metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
     243      263950 :     metapage = BufferGetPage(metabuffer);
     244             : 
     245             :     /*
     246             :      * An insertion to the pending list could logically belong anywhere in the
     247             :      * tree, so it conflicts with all serializable scans.  All scans acquire a
     248             :      * predicate lock on the metabuffer to represent that.
     249             :      */
     250      263950 :     CheckForSerializableConflictIn(index, NULL, GIN_METAPAGE_BLKNO);
     251             : 
     252      263944 :     if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
     253             :     {
     254             :         /*
     255             :          * Total size is greater than one page => make sublist
     256             :          */
     257           0 :         separateList = true;
     258             :     }
     259             :     else
     260             :     {
     261      263944 :         LockBuffer(metabuffer, GIN_EXCLUSIVE);
     262      263944 :         metadata = GinPageGetMeta(metapage);
     263             : 
     264      263944 :         if (metadata->head == InvalidBlockNumber ||
     265      263894 :             collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
     266             :         {
     267             :             /*
     268             :              * Pending list is empty or total size is greater than freespace
     269             :              * on tail page => make sublist
     270             :              *
     271             :              * We unlock metabuffer to keep high concurrency
     272             :              */
     273        2864 :             separateList = true;
     274        2864 :             LockBuffer(metabuffer, GIN_UNLOCK);
     275             :         }
     276             :     }
     277             : 
     278      263944 :     if (separateList)
     279             :     {
     280             :         /*
     281             :          * We should make sublist separately and append it to the tail
     282             :          */
     283             :         GinMetaPageData sublist;
     284             : 
     285        2864 :         memset(&sublist, 0, sizeof(GinMetaPageData));
     286        2864 :         makeSublist(index, collector->tuples, collector->ntuples, &sublist);
     287             : 
     288             :         /*
     289             :          * metapage was unlocked, see above
     290             :          */
     291        2864 :         LockBuffer(metabuffer, GIN_EXCLUSIVE);
     292        2864 :         metadata = GinPageGetMeta(metapage);
     293             : 
     294        2864 :         if (metadata->head == InvalidBlockNumber)
     295             :         {
     296             :             /*
     297             :              * Main list is empty, so just insert sublist as main list
     298             :              */
     299          50 :             START_CRIT_SECTION();
     300             : 
     301          50 :             metadata->head = sublist.head;
     302          50 :             metadata->tail = sublist.tail;
     303          50 :             metadata->tailFreeSize = sublist.tailFreeSize;
     304             : 
     305          50 :             metadata->nPendingPages = sublist.nPendingPages;
     306          50 :             metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
     307             : 
     308          50 :             if (needWal)
     309          30 :                 XLogBeginInsert();
     310             :         }
     311             :         else
     312             :         {
     313             :             /*
     314             :              * Merge lists
     315             :              */
     316        2814 :             data.prevTail = metadata->tail;
     317        2814 :             data.newRightlink = sublist.head;
     318             : 
     319        2814 :             buffer = ReadBuffer(index, metadata->tail);
     320        2814 :             LockBuffer(buffer, GIN_EXCLUSIVE);
     321        2814 :             page = BufferGetPage(buffer);
     322             : 
     323             :             Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
     324             : 
     325        2814 :             START_CRIT_SECTION();
     326             : 
     327        2814 :             GinPageGetOpaque(page)->rightlink = sublist.head;
     328             : 
     329        2814 :             MarkBufferDirty(buffer);
     330             : 
     331        2814 :             metadata->tail = sublist.tail;
     332        2814 :             metadata->tailFreeSize = sublist.tailFreeSize;
     333             : 
     334        2814 :             metadata->nPendingPages += sublist.nPendingPages;
     335        2814 :             metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
     336             : 
     337        2814 :             if (needWal)
     338             :             {
     339        1050 :                 XLogBeginInsert();
     340        1050 :                 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
     341             :             }
     342             :         }
     343             :     }
     344             :     else
     345             :     {
     346             :         /*
     347             :          * Insert into tail page.  Metapage is already locked
     348             :          */
     349             :         OffsetNumber l,
     350             :                     off;
     351             :         int         i,
     352             :                     tupsize;
     353             :         char       *ptr;
     354             :         char       *collectordata;
     355             : 
     356      261080 :         buffer = ReadBuffer(index, metadata->tail);
     357      261080 :         LockBuffer(buffer, GIN_EXCLUSIVE);
     358      261080 :         page = BufferGetPage(buffer);
     359             : 
     360      261080 :         off = (PageIsEmpty(page)) ? FirstOffsetNumber :
     361      261080 :             OffsetNumberNext(PageGetMaxOffsetNumber(page));
     362             : 
     363      261080 :         collectordata = ptr = (char *) palloc(collector->sumsize);
     364             : 
     365      261080 :         data.ntuples = collector->ntuples;
     366             : 
     367      261080 :         START_CRIT_SECTION();
     368             : 
     369      261080 :         if (needWal)
     370      142770 :             XLogBeginInsert();
     371             : 
     372             :         /*
     373             :          * Increase counter of heap tuples
     374             :          */
     375             :         Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
     376      261080 :         GinPageGetOpaque(page)->maxoff++;
     377      261080 :         metadata->nPendingHeapTuples++;
     378             : 
     379     1398812 :         for (i = 0; i < collector->ntuples; i++)
     380             :         {
     381     1137732 :             tupsize = IndexTupleSize(collector->tuples[i]);
     382     1137732 :             l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
     383             : 
     384     1137732 :             if (l == InvalidOffsetNumber)
     385           0 :                 elog(ERROR, "failed to add item to index page in \"%s\"",
     386             :                      RelationGetRelationName(index));
     387             : 
     388     1137732 :             memcpy(ptr, collector->tuples[i], tupsize);
     389     1137732 :             ptr += tupsize;
     390             : 
     391     1137732 :             off++;
     392             :         }
     393             : 
     394             :         Assert((ptr - collectordata) <= collector->sumsize);
     395      261080 :         if (needWal)
     396             :         {
     397      142770 :             XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
     398      142770 :             XLogRegisterBufData(1, collectordata, collector->sumsize);
     399             :         }
     400             : 
     401      261080 :         metadata->tailFreeSize = PageGetExactFreeSpace(page);
     402             : 
     403      261080 :         MarkBufferDirty(buffer);
     404             :     }
     405             : 
     406             :     /*
     407             :      * Set pd_lower just past the end of the metadata.  This is essential,
     408             :      * because without doing so, metadata will be lost if xlog.c compresses
     409             :      * the page.  (We must do this here because pre-v11 versions of PG did not
     410             :      * set the metapage's pd_lower correctly, so a pg_upgraded index might
     411             :      * contain the wrong value.)
     412             :      */
     413      263944 :     ((PageHeader) metapage)->pd_lower =
     414      263944 :         ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
     415             : 
     416             :     /*
     417             :      * Write metabuffer, make xlog entry
     418             :      */
     419      263944 :     MarkBufferDirty(metabuffer);
     420             : 
     421      263944 :     if (needWal)
     422             :     {
     423             :         XLogRecPtr  recptr;
     424             : 
     425      143850 :         memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
     426             : 
     427      143850 :         XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
     428      143850 :         XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
     429             : 
     430      143850 :         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
     431      143850 :         PageSetLSN(metapage, recptr);
     432             : 
     433      143850 :         if (buffer != InvalidBuffer)
     434             :         {
     435      143820 :             PageSetLSN(page, recptr);
     436             :         }
     437             :     }
     438             : 
     439      263944 :     if (buffer != InvalidBuffer)
     440      263894 :         UnlockReleaseBuffer(buffer);
     441             : 
     442             :     /*
     443             :      * Force pending list cleanup when it becomes too long. And,
     444             :      * ginInsertCleanup could take significant amount of time, so we prefer to
     445             :      * call it when it can do all the work in a single collection cycle. In
     446             :      * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
     447             :      * while pending list is still small enough to fit into
     448             :      * gin_pending_list_limit.
     449             :      *
     450             :      * ginInsertCleanup() should not be called inside our CRIT_SECTION.
     451             :      */
     452      263944 :     cleanupSize = GinGetPendingListCleanupSize(index);
     453      263944 :     if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
     454           0 :         needCleanup = true;
     455             : 
     456      263944 :     UnlockReleaseBuffer(metabuffer);
     457             : 
     458      263944 :     END_CRIT_SECTION();
     459             : 
     460             :     /*
     461             :      * Since it could contend with concurrent cleanup process we cleanup
     462             :      * pending list not forcibly.
     463             :      */
     464      263944 :     if (needCleanup)
     465           0 :         ginInsertCleanup(ginstate, false, true, false, NULL);
     466             : }
     467             : 
     468             : /*
     469             :  * Create temporary index tuples for a single indexable item (one index column
     470             :  * for the heap tuple specified by ht_ctid), and append them to the array
     471             :  * in *collector.  They will subsequently be written out using
     472             :  * ginHeapTupleFastInsert.  Note that to guarantee consistent state, all
     473             :  * temp tuples for a given heap tuple must be written in one call to
     474             :  * ginHeapTupleFastInsert.
     475             :  */
     476             : void
     477      384028 : ginHeapTupleFastCollect(GinState *ginstate,
     478             :                         GinTupleCollector *collector,
     479             :                         OffsetNumber attnum, Datum value, bool isNull,
     480             :                         ItemPointer ht_ctid)
     481             : {
     482             :     Datum      *entries;
     483             :     GinNullCategory *categories;
     484             :     int32       i,
     485             :                 nentries;
     486             : 
     487             :     /*
     488             :      * Extract the key values that need to be inserted in the index
     489             :      */
     490      384028 :     entries = ginExtractEntries(ginstate, attnum, value, isNull,
     491             :                                 &nentries, &categories);
     492             : 
     493             :     /*
     494             :      * Protect against integer overflow in allocation calculations
     495             :      */
     496      384028 :     if (nentries < 0 ||
     497      384028 :         collector->ntuples + nentries > MaxAllocSize / sizeof(IndexTuple))
     498           0 :         elog(ERROR, "too many entries for GIN index");
     499             : 
     500             :     /*
     501             :      * Allocate/reallocate memory for storing collected tuples
     502             :      */
     503      384028 :     if (collector->tuples == NULL)
     504             :     {
     505             :         /*
     506             :          * Determine the number of elements to allocate in the tuples array
     507             :          * initially.  Make it a power of 2 to avoid wasting memory when
     508             :          * resizing (since palloc likes powers of 2).
     509             :          */
     510      263950 :         collector->lentuples = pg_nextpower2_32(Max(16, nentries));
     511      263950 :         collector->tuples = palloc_array(IndexTuple, collector->lentuples);
     512             :     }
     513      120078 :     else if (collector->lentuples < collector->ntuples + nentries)
     514             :     {
     515             :         /*
     516             :          * Advance lentuples to the next suitable power of 2.  This won't
     517             :          * overflow, though we could get to a value that exceeds
     518             :          * MaxAllocSize/sizeof(IndexTuple), causing an error in repalloc.
     519             :          */
     520           0 :         collector->lentuples = pg_nextpower2_32(collector->ntuples + nentries);
     521           0 :         collector->tuples = repalloc_array(collector->tuples,
     522             :                                            IndexTuple, collector->lentuples);
     523             :     }
     524             : 
     525             :     /*
     526             :      * Build an index tuple for each key value, and add to array.  In pending
     527             :      * tuples we just stick the heap TID into t_tid.
     528             :      */
     529     1535626 :     for (i = 0; i < nentries; i++)
     530             :     {
     531             :         IndexTuple  itup;
     532             : 
     533     1151598 :         itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
     534             :                             NULL, 0, 0, true);
     535     1151598 :         itup->t_tid = *ht_ctid;
     536     1151598 :         collector->tuples[collector->ntuples++] = itup;
     537     1151598 :         collector->sumsize += IndexTupleSize(itup);
     538             :     }
     539      384028 : }
     540             : 
     541             : /*
     542             :  * Deletes pending list pages up to (not including) newHead page.
     543             :  * If newHead == InvalidBlockNumber then function drops the whole list.
     544             :  *
     545             :  * metapage is pinned and exclusive-locked throughout this function.
     546             :  */
     547             : static void
     548          30 : shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
     549             :           bool fill_fsm, IndexBulkDeleteResult *stats)
     550             : {
     551             :     Page        metapage;
     552             :     GinMetaPageData *metadata;
     553             :     BlockNumber blknoToDelete;
     554             : 
     555          30 :     metapage = BufferGetPage(metabuffer);
     556          30 :     metadata = GinPageGetMeta(metapage);
     557          30 :     blknoToDelete = metadata->head;
     558             : 
     559             :     do
     560             :     {
     561             :         Page        page;
     562             :         int         i;
     563         192 :         int64       nDeletedHeapTuples = 0;
     564             :         ginxlogDeleteListPages data;
     565             :         Buffer      buffers[GIN_NDELETE_AT_ONCE];
     566             :         BlockNumber freespace[GIN_NDELETE_AT_ONCE];
     567             : 
     568         192 :         data.ndeleted = 0;
     569        3036 :         while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
     570             :         {
     571        2844 :             freespace[data.ndeleted] = blknoToDelete;
     572        2844 :             buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
     573        2844 :             LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
     574        2844 :             page = BufferGetPage(buffers[data.ndeleted]);
     575             : 
     576        2844 :             data.ndeleted++;
     577             : 
     578             :             Assert(!GinPageIsDeleted(page));
     579             : 
     580        2844 :             nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
     581        2844 :             blknoToDelete = GinPageGetOpaque(page)->rightlink;
     582             :         }
     583             : 
     584         192 :         if (stats)
     585         192 :             stats->pages_deleted += data.ndeleted;
     586             : 
     587             :         /*
     588             :          * This operation touches an unusually large number of pages, so
     589             :          * prepare the XLogInsert machinery for that before entering the
     590             :          * critical section.
     591             :          */
     592         192 :         if (RelationNeedsWAL(index))
     593          78 :             XLogEnsureRecordSpace(data.ndeleted, 0);
     594             : 
     595         192 :         START_CRIT_SECTION();
     596             : 
     597         192 :         metadata->head = blknoToDelete;
     598             : 
     599             :         Assert(metadata->nPendingPages >= data.ndeleted);
     600         192 :         metadata->nPendingPages -= data.ndeleted;
     601             :         Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
     602         192 :         metadata->nPendingHeapTuples -= nDeletedHeapTuples;
     603             : 
     604         192 :         if (blknoToDelete == InvalidBlockNumber)
     605             :         {
     606          30 :             metadata->tail = InvalidBlockNumber;
     607          30 :             metadata->tailFreeSize = 0;
     608          30 :             metadata->nPendingPages = 0;
     609          30 :             metadata->nPendingHeapTuples = 0;
     610             :         }
     611             : 
     612             :         /*
     613             :          * Set pd_lower just past the end of the metadata.  This is essential,
     614             :          * because without doing so, metadata will be lost if xlog.c
     615             :          * compresses the page.  (We must do this here because pre-v11
     616             :          * versions of PG did not set the metapage's pd_lower correctly, so a
     617             :          * pg_upgraded index might contain the wrong value.)
     618             :          */
     619         192 :         ((PageHeader) metapage)->pd_lower =
     620         192 :             ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
     621             : 
     622         192 :         MarkBufferDirty(metabuffer);
     623             : 
     624        3036 :         for (i = 0; i < data.ndeleted; i++)
     625             :         {
     626        2844 :             page = BufferGetPage(buffers[i]);
     627        2844 :             GinPageGetOpaque(page)->flags = GIN_DELETED;
     628        2844 :             MarkBufferDirty(buffers[i]);
     629             :         }
     630             : 
     631         192 :         if (RelationNeedsWAL(index))
     632             :         {
     633             :             XLogRecPtr  recptr;
     634             : 
     635          78 :             XLogBeginInsert();
     636          78 :             XLogRegisterBuffer(0, metabuffer,
     637             :                                REGBUF_WILL_INIT | REGBUF_STANDARD);
     638        1152 :             for (i = 0; i < data.ndeleted; i++)
     639        1074 :                 XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
     640             : 
     641          78 :             memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
     642             : 
     643          78 :             XLogRegisterData((char *) &data,
     644             :                              sizeof(ginxlogDeleteListPages));
     645             : 
     646          78 :             recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
     647          78 :             PageSetLSN(metapage, recptr);
     648             : 
     649        1152 :             for (i = 0; i < data.ndeleted; i++)
     650             :             {
     651        1074 :                 page = BufferGetPage(buffers[i]);
     652        1074 :                 PageSetLSN(page, recptr);
     653             :             }
     654             :         }
     655             : 
     656        3036 :         for (i = 0; i < data.ndeleted; i++)
     657        2844 :             UnlockReleaseBuffer(buffers[i]);
     658             : 
     659         192 :         END_CRIT_SECTION();
     660             : 
     661        2892 :         for (i = 0; fill_fsm && i < data.ndeleted; i++)
     662        2700 :             RecordFreeIndexPage(index, freespace[i]);
     663             : 
     664         192 :     } while (blknoToDelete != newHead);
     665          30 : }
     666             : 
     667             : /* Initialize empty KeyArray */
     668             : static void
     669          30 : initKeyArray(KeyArray *keys, int32 maxvalues)
     670             : {
     671          30 :     keys->keys = palloc_array(Datum, maxvalues);
     672          30 :     keys->categories = palloc_array(GinNullCategory, maxvalues);
     673          30 :     keys->nvalues = 0;
     674          30 :     keys->maxvalues = maxvalues;
     675          30 : }
     676             : 
     677             : /* Add datum to KeyArray, resizing if needed */
     678             : static void
     679     1151448 : addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
     680             : {
     681     1151448 :     if (keys->nvalues >= keys->maxvalues)
     682             :     {
     683           0 :         keys->maxvalues *= 2;
     684           0 :         keys->keys = repalloc_array(keys->keys, Datum, keys->maxvalues);
     685           0 :         keys->categories = repalloc_array(keys->categories, GinNullCategory, keys->maxvalues);
     686             :     }
     687             : 
     688     1151448 :     keys->keys[keys->nvalues] = datum;
     689     1151448 :     keys->categories[keys->nvalues] = category;
     690     1151448 :     keys->nvalues++;
     691     1151448 : }
     692             : 
     693             : /*
     694             :  * Collect data from a pending-list page in preparation for insertion into
     695             :  * the main index.
     696             :  *
     697             :  * Go through all tuples >= startoff on page and collect values in accum
     698             :  *
     699             :  * Note that ka is just workspace --- it does not carry any state across
     700             :  * calls.
     701             :  */
     702             : static void
     703        2844 : processPendingPage(BuildAccumulator *accum, KeyArray *ka,
     704             :                    Page page, OffsetNumber startoff)
     705             : {
     706             :     ItemPointerData heapptr;
     707             :     OffsetNumber i,
     708             :                 maxoff;
     709             :     OffsetNumber attrnum;
     710             : 
     711             :     /* reset *ka to empty */
     712        2844 :     ka->nvalues = 0;
     713             : 
     714        2844 :     maxoff = PageGetMaxOffsetNumber(page);
     715             :     Assert(maxoff >= FirstOffsetNumber);
     716        2844 :     ItemPointerSetInvalid(&heapptr);
     717        2844 :     attrnum = 0;
     718             : 
     719     1154292 :     for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
     720             :     {
     721     1151448 :         IndexTuple  itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
     722             :         OffsetNumber curattnum;
     723             :         Datum       curkey;
     724             :         GinNullCategory curcategory;
     725             : 
     726             :         /* Check for change of heap TID or attnum */
     727     1151448 :         curattnum = gintuple_get_attrnum(accum->ginstate, itup);
     728             : 
     729     1151448 :         if (!ItemPointerIsValid(&heapptr))
     730             :         {
     731        2844 :             heapptr = itup->t_tid;
     732        2844 :             attrnum = curattnum;
     733             :         }
     734     1148604 :         else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
     735             :                    curattnum == attrnum))
     736             :         {
     737             :             /*
     738             :              * ginInsertBAEntries can insert several datums per call, but only
     739             :              * for one heap tuple and one column.  So call it at a boundary,
     740             :              * and reset ka.
     741             :              */
     742      381102 :             ginInsertBAEntries(accum, &heapptr, attrnum,
     743             :                                ka->keys, ka->categories, ka->nvalues);
     744      381102 :             ka->nvalues = 0;
     745      381102 :             heapptr = itup->t_tid;
     746      381102 :             attrnum = curattnum;
     747             :         }
     748             : 
     749             :         /* Add key to KeyArray */
     750     1151448 :         curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
     751     1151448 :         addDatum(ka, curkey, curcategory);
     752             :     }
     753             : 
     754             :     /* Dump out all remaining keys */
     755        2844 :     ginInsertBAEntries(accum, &heapptr, attrnum,
     756             :                        ka->keys, ka->categories, ka->nvalues);
     757        2844 : }
     758             : 
     759             : /*
     760             :  * Move tuples from pending pages into regular GIN structure.
     761             :  *
     762             :  * On first glance it looks completely not crash-safe. But if we crash
     763             :  * after posting entries to the main index and before removing them from the
     764             :  * pending list, it's okay because when we redo the posting later on, nothing
     765             :  * bad will happen.
     766             :  *
     767             :  * fill_fsm indicates that ginInsertCleanup should add deleted pages
     768             :  * to FSM otherwise caller is responsible to put deleted pages into
     769             :  * FSM.
     770             :  *
     771             :  * If stats isn't null, we count deleted pending pages into the counts.
     772             :  */
     773             : void
     774          76 : ginInsertCleanup(GinState *ginstate, bool full_clean,
     775             :                  bool fill_fsm, bool forceCleanup,
     776             :                  IndexBulkDeleteResult *stats)
     777             : {
     778          76 :     Relation    index = ginstate->index;
     779             :     Buffer      metabuffer,
     780             :                 buffer;
     781             :     Page        metapage,
     782             :                 page;
     783             :     GinMetaPageData *metadata;
     784             :     MemoryContext opCtx,
     785             :                 oldCtx;
     786             :     BuildAccumulator accum;
     787             :     KeyArray    datums;
     788             :     BlockNumber blkno,
     789             :                 blknoFinish;
     790          76 :     bool        cleanupFinish = false;
     791          76 :     bool        fsm_vac = false;
     792             :     Size        workMemory;
     793             : 
     794             :     /*
     795             :      * We would like to prevent concurrent cleanup process. For that we will
     796             :      * lock metapage in exclusive mode using LockPage() call. Nobody other
     797             :      * will use that lock for metapage, so we keep possibility of concurrent
     798             :      * insertion into pending list
     799             :      */
     800             : 
     801          76 :     if (forceCleanup)
     802             :     {
     803             :         /*
     804             :          * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
     805             :          * and we would like to wait concurrent cleanup to finish.
     806             :          */
     807          76 :         LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
     808          76 :         workMemory =
     809          80 :             (IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
     810          80 :             autovacuum_work_mem : maintenance_work_mem;
     811             :     }
     812             :     else
     813             :     {
     814             :         /*
     815             :          * We are called from regular insert and if we see concurrent cleanup
     816             :          * just exit in hope that concurrent process will clean up pending
     817             :          * list.
     818             :          */
     819           0 :         if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
     820          46 :             return;
     821           0 :         workMemory = work_mem;
     822             :     }
     823             : 
     824          76 :     metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
     825          76 :     LockBuffer(metabuffer, GIN_SHARE);
     826          76 :     metapage = BufferGetPage(metabuffer);
     827          76 :     metadata = GinPageGetMeta(metapage);
     828             : 
     829          76 :     if (metadata->head == InvalidBlockNumber)
     830             :     {
     831             :         /* Nothing to do */
     832          46 :         UnlockReleaseBuffer(metabuffer);
     833          46 :         UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
     834          46 :         return;
     835             :     }
     836             : 
     837             :     /*
     838             :      * Remember a tail page to prevent infinite cleanup if other backends add
     839             :      * new tuples faster than we can cleanup.
     840             :      */
     841          30 :     blknoFinish = metadata->tail;
     842             : 
     843             :     /*
     844             :      * Read and lock head of pending list
     845             :      */
     846          30 :     blkno = metadata->head;
     847          30 :     buffer = ReadBuffer(index, blkno);
     848          30 :     LockBuffer(buffer, GIN_SHARE);
     849          30 :     page = BufferGetPage(buffer);
     850             : 
     851          30 :     LockBuffer(metabuffer, GIN_UNLOCK);
     852             : 
     853             :     /*
     854             :      * Initialize.  All temporary space will be in opCtx
     855             :      */
     856          30 :     opCtx = AllocSetContextCreate(CurrentMemoryContext,
     857             :                                   "GIN insert cleanup temporary context",
     858             :                                   ALLOCSET_DEFAULT_SIZES);
     859             : 
     860          30 :     oldCtx = MemoryContextSwitchTo(opCtx);
     861             : 
     862          30 :     initKeyArray(&datums, 128);
     863          30 :     ginInitBA(&accum);
     864          30 :     accum.ginstate = ginstate;
     865             : 
     866             :     /*
     867             :      * At the top of this loop, we have pin and lock on the current page of
     868             :      * the pending list.  However, we'll release that before exiting the loop.
     869             :      * Note we also have pin but not lock on the metapage.
     870             :      */
     871             :     for (;;)
     872             :     {
     873        2814 :         Assert(!GinPageIsDeleted(page));
     874             : 
     875             :         /*
     876             :          * Are we walk through the page which as we remember was a tail when
     877             :          * we start our cleanup?  But if caller asks us to clean up whole
     878             :          * pending list then ignore old tail, we will work until list becomes
     879             :          * empty.
     880             :          */
     881        2844 :         if (blkno == blknoFinish && full_clean == false)
     882           0 :             cleanupFinish = true;
     883             : 
     884             :         /*
     885             :          * read page's datums into accum
     886             :          */
     887        2844 :         processPendingPage(&accum, &datums, page, FirstOffsetNumber);
     888             : 
     889        2844 :         vacuum_delay_point();
     890             : 
     891             :         /*
     892             :          * Is it time to flush memory to disk?  Flush if we are at the end of
     893             :          * the pending list, or if we have a full row and memory is getting
     894             :          * full.
     895             :          */
     896        2844 :         if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
     897        2814 :             (GinPageHasFullRow(page) &&
     898        2814 :              (accum.allocatedMemory >= workMemory * 1024L)))
     899           0 :         {
     900             :             ItemPointerData *list;
     901             :             uint32      nlist;
     902             :             Datum       key;
     903             :             GinNullCategory category;
     904             :             OffsetNumber maxoff,
     905             :                         attnum;
     906             : 
     907             :             /*
     908             :              * Unlock current page to increase performance. Changes of page
     909             :              * will be checked later by comparing maxoff after completion of
     910             :              * memory flush.
     911             :              */
     912          30 :             maxoff = PageGetMaxOffsetNumber(page);
     913          30 :             LockBuffer(buffer, GIN_UNLOCK);
     914             : 
     915             :             /*
     916             :              * Moving collected data into regular structure can take
     917             :              * significant amount of time - so, run it without locking pending
     918             :              * list.
     919             :              */
     920          30 :             ginBeginBAScan(&accum);
     921      366102 :             while ((list = ginGetBAEntry(&accum,
     922             :                                          &attnum, &key, &category, &nlist)) != NULL)
     923             :             {
     924      366072 :                 ginEntryInsert(ginstate, attnum, key, category,
     925             :                                list, nlist, NULL);
     926      366072 :                 vacuum_delay_point();
     927             :             }
     928             : 
     929             :             /*
     930             :              * Lock the whole list to remove pages
     931             :              */
     932          30 :             LockBuffer(metabuffer, GIN_EXCLUSIVE);
     933          30 :             LockBuffer(buffer, GIN_SHARE);
     934             : 
     935             :             Assert(!GinPageIsDeleted(page));
     936             : 
     937             :             /*
     938             :              * While we left the page unlocked, more stuff might have gotten
     939             :              * added to it.  If so, process those entries immediately.  There
     940             :              * shouldn't be very many, so we don't worry about the fact that
     941             :              * we're doing this with exclusive lock. Insertion algorithm
     942             :              * guarantees that inserted row(s) will not continue on next page.
     943             :              * NOTE: intentionally no vacuum_delay_point in this loop.
     944             :              */
     945          30 :             if (PageGetMaxOffsetNumber(page) != maxoff)
     946             :             {
     947           0 :                 ginInitBA(&accum);
     948           0 :                 processPendingPage(&accum, &datums, page, maxoff + 1);
     949             : 
     950           0 :                 ginBeginBAScan(&accum);
     951           0 :                 while ((list = ginGetBAEntry(&accum,
     952             :                                              &attnum, &key, &category, &nlist)) != NULL)
     953           0 :                     ginEntryInsert(ginstate, attnum, key, category,
     954             :                                    list, nlist, NULL);
     955             :             }
     956             : 
     957             :             /*
     958             :              * Remember next page - it will become the new list head
     959             :              */
     960          30 :             blkno = GinPageGetOpaque(page)->rightlink;
     961          30 :             UnlockReleaseBuffer(buffer);    /* shiftList will do exclusive
     962             :                                              * locking */
     963             : 
     964             :             /*
     965             :              * remove read pages from pending list, at this point all content
     966             :              * of read pages is in regular structure
     967             :              */
     968          30 :             shiftList(index, metabuffer, blkno, fill_fsm, stats);
     969             : 
     970             :             /* At this point, some pending pages have been freed up */
     971          30 :             fsm_vac = true;
     972             : 
     973             :             Assert(blkno == metadata->head);
     974          30 :             LockBuffer(metabuffer, GIN_UNLOCK);
     975             : 
     976             :             /*
     977             :              * if we removed the whole pending list or we cleanup tail (which
     978             :              * we remembered on start our cleanup process) then just exit
     979             :              */
     980          30 :             if (blkno == InvalidBlockNumber || cleanupFinish)
     981             :                 break;
     982             : 
     983             :             /*
     984             :              * release memory used so far and reinit state
     985             :              */
     986           0 :             MemoryContextReset(opCtx);
     987           0 :             initKeyArray(&datums, datums.maxvalues);
     988           0 :             ginInitBA(&accum);
     989             :         }
     990             :         else
     991             :         {
     992        2814 :             blkno = GinPageGetOpaque(page)->rightlink;
     993        2814 :             UnlockReleaseBuffer(buffer);
     994             :         }
     995             : 
     996             :         /*
     997             :          * Read next page in pending list
     998             :          */
     999        2814 :         vacuum_delay_point();
    1000        2814 :         buffer = ReadBuffer(index, blkno);
    1001        2814 :         LockBuffer(buffer, GIN_SHARE);
    1002        2814 :         page = BufferGetPage(buffer);
    1003             :     }
    1004             : 
    1005          30 :     UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
    1006          30 :     ReleaseBuffer(metabuffer);
    1007             : 
    1008             :     /*
    1009             :      * As pending list pages can have a high churn rate, it is desirable to
    1010             :      * recycle them immediately to the FreeSpaceMap when ordinary backends
    1011             :      * clean the list.
    1012             :      */
    1013          30 :     if (fsm_vac && fill_fsm)
    1014          12 :         IndexFreeSpaceMapVacuum(index);
    1015             : 
    1016             :     /* Clean up temporary space */
    1017          30 :     MemoryContextSwitchTo(oldCtx);
    1018          30 :     MemoryContextDelete(opCtx);
    1019             : }
    1020             : 
    1021             : /*
    1022             :  * SQL-callable function to clean the insert pending list
    1023             :  */
    1024             : Datum
    1025          18 : gin_clean_pending_list(PG_FUNCTION_ARGS)
    1026             : {
    1027          18 :     Oid         indexoid = PG_GETARG_OID(0);
    1028          18 :     Relation    indexRel = index_open(indexoid, RowExclusiveLock);
    1029             :     IndexBulkDeleteResult stats;
    1030             :     GinState    ginstate;
    1031             : 
    1032          18 :     if (RecoveryInProgress())
    1033           0 :         ereport(ERROR,
    1034             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1035             :                  errmsg("recovery is in progress"),
    1036             :                  errhint("GIN pending list cannot be cleaned up during recovery.")));
    1037             : 
    1038             :     /* Must be a GIN index */
    1039          18 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
    1040          18 :         indexRel->rd_rel->relam != GIN_AM_OID)
    1041           0 :         ereport(ERROR,
    1042             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1043             :                  errmsg("\"%s\" is not a GIN index",
    1044             :                         RelationGetRelationName(indexRel))));
    1045             : 
    1046             :     /*
    1047             :      * Reject attempts to read non-local temporary relations; we would be
    1048             :      * likely to get wrong data since we have no visibility into the owning
    1049             :      * session's local buffers.
    1050             :      */
    1051          18 :     if (RELATION_IS_OTHER_TEMP(indexRel))
    1052           0 :         ereport(ERROR,
    1053             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1054             :                  errmsg("cannot access temporary indexes of other sessions")));
    1055             : 
    1056             :     /* User must own the index (comparable to privileges needed for VACUUM) */
    1057          18 :     if (!object_ownercheck(RelationRelationId, indexoid, GetUserId()))
    1058           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
    1059           0 :                        RelationGetRelationName(indexRel));
    1060             : 
    1061          18 :     memset(&stats, 0, sizeof(stats));
    1062          18 :     initGinState(&ginstate, indexRel);
    1063          18 :     ginInsertCleanup(&ginstate, true, true, true, &stats);
    1064             : 
    1065          18 :     index_close(indexRel, RowExclusiveLock);
    1066             : 
    1067          18 :     PG_RETURN_INT64((int64) stats.pages_deleted);
    1068             : }

Generated by: LCOV version 1.14