LCOV - code coverage report
Current view: top level - src/backend/access/gin - ginfast.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 91.7 % 363 333
Test Date: 2026-02-17 17:20:33 Functions: 100.0 % 10 10
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * ginfast.c
       4              :  *    Fast insert routines for the Postgres inverted index access method.
       5              :  *    Pending entries are stored in linear list of pages.  Later on
       6              :  *    (typically during VACUUM), ginInsertCleanup() will be invoked to
       7              :  *    transfer pending entries into the regular index structure.  This
       8              :  *    wins because bulk insertion is much more efficient than retail.
       9              :  *
      10              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      11              :  * Portions Copyright (c) 1994, Regents of the University of California
      12              :  *
      13              :  * IDENTIFICATION
      14              :  *          src/backend/access/gin/ginfast.c
      15              :  *
      16              :  *-------------------------------------------------------------------------
      17              :  */
      18              : 
      19              : #include "postgres.h"
      20              : 
      21              : #include "access/gin_private.h"
      22              : #include "access/ginxlog.h"
      23              : #include "access/xlog.h"
      24              : #include "access/xloginsert.h"
      25              : #include "catalog/pg_am.h"
      26              : #include "commands/vacuum.h"
      27              : #include "miscadmin.h"
      28              : #include "port/pg_bitutils.h"
      29              : #include "postmaster/autovacuum.h"
      30              : #include "storage/indexfsm.h"
      31              : #include "storage/lmgr.h"
      32              : #include "storage/predicate.h"
      33              : #include "utils/acl.h"
      34              : #include "utils/fmgrprotos.h"
      35              : #include "utils/memutils.h"
      36              : #include "utils/rel.h"
      37              : 
      38              : /* GUC parameter */
      39              : int         gin_pending_list_limit = 0;
      40              : 
      41              : #define GIN_PAGE_FREESIZE \
      42              :     ( (Size) BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
      43              : 
      44              : typedef struct KeyArray
      45              : {
      46              :     Datum      *keys;           /* expansible array */
      47              :     GinNullCategory *categories;    /* another expansible array */
      48              :     int32       nvalues;        /* current number of valid entries */
      49              :     int32       maxvalues;      /* allocated size of arrays */
      50              : } KeyArray;
      51              : 
      52              : 
      53              : /*
      54              :  * Build a pending-list page from the given array of tuples, and write it out.
      55              :  *
      56              :  * Returns amount of free space left on the page.
      57              :  */
      58              : static int32
      59         1475 : writeListPage(Relation index, Buffer buffer,
      60              :               const IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
      61              : {
      62         1475 :     Page        page = BufferGetPage(buffer);
      63              :     int32       i,
      64              :                 freesize,
      65         1475 :                 size = 0;
      66              :     OffsetNumber l,
      67              :                 off;
      68              :     PGAlignedBlock workspace;
      69              :     char       *ptr;
      70              : 
      71         1475 :     START_CRIT_SECTION();
      72              : 
      73         1475 :     GinInitBuffer(buffer, GIN_LIST);
      74              : 
      75         1475 :     off = FirstOffsetNumber;
      76         1475 :     ptr = workspace.data;
      77              : 
      78         8602 :     for (i = 0; i < ntuples; i++)
      79              :     {
      80         7127 :         int         this_size = IndexTupleSize(tuples[i]);
      81              : 
      82         7127 :         memcpy(ptr, tuples[i], this_size);
      83         7127 :         ptr += this_size;
      84         7127 :         size += this_size;
      85              : 
      86         7127 :         l = PageAddItem(page, tuples[i], this_size, off, false, false);
      87              : 
      88         7127 :         if (l == InvalidOffsetNumber)
      89            0 :             elog(ERROR, "failed to add item to index page in \"%s\"",
      90              :                  RelationGetRelationName(index));
      91              : 
      92         7127 :         off++;
      93              :     }
      94              : 
      95              :     Assert(size <= BLCKSZ);      /* else we overran workspace */
      96              : 
      97         1475 :     GinPageGetOpaque(page)->rightlink = rightlink;
      98              : 
      99              :     /*
     100              :      * tail page may contain only whole row(s) or final part of row placed on
     101              :      * previous pages (a "row" here meaning all the index tuples generated for
     102              :      * one heap tuple)
     103              :      */
     104         1475 :     if (rightlink == InvalidBlockNumber)
     105              :     {
     106         1475 :         GinPageSetFullRow(page);
     107         1475 :         GinPageGetOpaque(page)->maxoff = 1;
     108              :     }
     109              :     else
     110              :     {
     111            0 :         GinPageGetOpaque(page)->maxoff = 0;
     112              :     }
     113              : 
     114         1475 :     MarkBufferDirty(buffer);
     115              : 
     116         1475 :     if (RelationNeedsWAL(index))
     117              :     {
     118              :         ginxlogInsertListPage data;
     119              :         XLogRecPtr  recptr;
     120              : 
     121          583 :         data.rightlink = rightlink;
     122          583 :         data.ntuples = ntuples;
     123              : 
     124          583 :         XLogBeginInsert();
     125          583 :         XLogRegisterData(&data, sizeof(ginxlogInsertListPage));
     126              : 
     127          583 :         XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
     128          583 :         XLogRegisterBufData(0, workspace.data, size);
     129              : 
     130          583 :         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
     131          583 :         PageSetLSN(page, recptr);
     132              :     }
     133              : 
     134              :     /* get free space before releasing buffer */
     135         1475 :     freesize = PageGetExactFreeSpace(page);
     136              : 
     137         1475 :     UnlockReleaseBuffer(buffer);
     138              : 
     139         1475 :     END_CRIT_SECTION();
     140              : 
     141         1475 :     return freesize;
     142              : }
     143              : 
     144              : static void
     145         1475 : makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
     146              :             GinMetaPageData *res)
     147              : {
     148         1475 :     Buffer      curBuffer = InvalidBuffer;
     149         1475 :     Buffer      prevBuffer = InvalidBuffer;
     150              :     int         i,
     151         1475 :                 size = 0,
     152              :                 tupsize;
     153         1475 :     int         startTuple = 0;
     154              : 
     155              :     Assert(ntuples > 0);
     156              : 
     157              :     /*
     158              :      * Split tuples into pages
     159              :      */
     160         8602 :     for (i = 0; i < ntuples; i++)
     161              :     {
     162         7127 :         if (curBuffer == InvalidBuffer)
     163              :         {
     164         1475 :             curBuffer = GinNewBuffer(index);
     165              : 
     166         1475 :             if (prevBuffer != InvalidBuffer)
     167              :             {
     168            0 :                 res->nPendingPages++;
     169            0 :                 writeListPage(index, prevBuffer,
     170            0 :                               tuples + startTuple,
     171              :                               i - startTuple,
     172              :                               BufferGetBlockNumber(curBuffer));
     173              :             }
     174              :             else
     175              :             {
     176         1475 :                 res->head = BufferGetBlockNumber(curBuffer);
     177              :             }
     178              : 
     179         1475 :             prevBuffer = curBuffer;
     180         1475 :             startTuple = i;
     181         1475 :             size = 0;
     182              :         }
     183              : 
     184         7127 :         tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
     185              : 
     186         7127 :         if (size + tupsize > GinListPageSize)
     187              :         {
     188              :             /* won't fit, force a new page and reprocess */
     189            0 :             i--;
     190            0 :             curBuffer = InvalidBuffer;
     191              :         }
     192              :         else
     193              :         {
     194         7127 :             size += tupsize;
     195              :         }
     196              :     }
     197              : 
     198              :     /*
     199              :      * Write last page
     200              :      */
     201         1475 :     res->tail = BufferGetBlockNumber(curBuffer);
     202         2950 :     res->tailFreeSize = writeListPage(index, curBuffer,
     203         1475 :                                       tuples + startTuple,
     204              :                                       ntuples - startTuple,
     205              :                                       InvalidBlockNumber);
     206         1475 :     res->nPendingPages++;
     207              :     /* that was only one heap tuple */
     208         1475 :     res->nPendingHeapTuples = 1;
     209         1475 : }
     210              : 
     211              : /*
     212              :  * Write the index tuples contained in *collector into the index's
     213              :  * pending list.
     214              :  *
     215              :  * Function guarantees that all these tuples will be inserted consecutively,
     216              :  * preserving order
     217              :  */
     218              : void
     219       132889 : ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
     220              : {
     221       132889 :     Relation    index = ginstate->index;
     222              :     Buffer      metabuffer;
     223              :     Page        metapage;
     224       132889 :     GinMetaPageData *metadata = NULL;
     225       132889 :     Buffer      buffer = InvalidBuffer;
     226       132889 :     Page        page = NULL;
     227              :     ginxlogUpdateMeta data;
     228       132889 :     bool        separateList = false;
     229       132889 :     bool        needCleanup = false;
     230              :     int         cleanupSize;
     231              :     bool        needWal;
     232              : 
     233       132889 :     if (collector->ntuples == 0)
     234            0 :         return;
     235              : 
     236       132889 :     needWal = RelationNeedsWAL(index);
     237              : 
     238       132889 :     data.locator = index->rd_locator;
     239       132889 :     data.ntuples = 0;
     240       132889 :     data.newRightlink = data.prevTail = InvalidBlockNumber;
     241              : 
     242       132889 :     metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
     243       132889 :     metapage = BufferGetPage(metabuffer);
     244              : 
     245              :     /*
     246              :      * An insertion to the pending list could logically belong anywhere in the
     247              :      * tree, so it conflicts with all serializable scans.  All scans acquire a
     248              :      * predicate lock on the metabuffer to represent that.  Therefore we'll
     249              :      * check for conflicts in, but not until we have the page locked and are
     250              :      * ready to modify the page.
     251              :      */
     252              : 
     253       132889 :     if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
     254              :     {
     255              :         /*
     256              :          * Total size is greater than one page => make sublist
     257              :          */
     258            0 :         separateList = true;
     259              :     }
     260              :     else
     261              :     {
     262       132889 :         LockBuffer(metabuffer, GIN_EXCLUSIVE);
     263       132889 :         metadata = GinPageGetMeta(metapage);
     264              : 
     265       132889 :         if (metadata->head == InvalidBlockNumber ||
     266       132828 :             collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
     267              :         {
     268              :             /*
     269              :              * Pending list is empty or total size is greater than freespace
     270              :              * on tail page => make sublist
     271              :              *
     272              :              * We unlock metabuffer to keep high concurrency
     273              :              */
     274         1475 :             separateList = true;
     275         1475 :             LockBuffer(metabuffer, GIN_UNLOCK);
     276              :         }
     277              :     }
     278              : 
     279       132889 :     if (separateList)
     280              :     {
     281              :         /*
     282              :          * We should make sublist separately and append it to the tail
     283              :          */
     284              :         GinMetaPageData sublist;
     285              : 
     286         1475 :         memset(&sublist, 0, sizeof(GinMetaPageData));
     287         1475 :         makeSublist(index, collector->tuples, collector->ntuples, &sublist);
     288              : 
     289              :         /*
     290              :          * metapage was unlocked, see above
     291              :          */
     292         1475 :         LockBuffer(metabuffer, GIN_EXCLUSIVE);
     293         1475 :         metadata = GinPageGetMeta(metapage);
     294              : 
     295         1475 :         CheckForSerializableConflictIn(index, NULL, GIN_METAPAGE_BLKNO);
     296              : 
     297         1472 :         if (metadata->head == InvalidBlockNumber)
     298              :         {
     299              :             /*
     300              :              * Main list is empty, so just insert sublist as main list
     301              :              */
     302           58 :             START_CRIT_SECTION();
     303              : 
     304           58 :             metadata->head = sublist.head;
     305           58 :             metadata->tail = sublist.tail;
     306           58 :             metadata->tailFreeSize = sublist.tailFreeSize;
     307              : 
     308           58 :             metadata->nPendingPages = sublist.nPendingPages;
     309           58 :             metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
     310              : 
     311           58 :             if (needWal)
     312           48 :                 XLogBeginInsert();
     313              :         }
     314              :         else
     315              :         {
     316              :             /*
     317              :              * Merge lists
     318              :              */
     319         1414 :             data.prevTail = metadata->tail;
     320         1414 :             data.newRightlink = sublist.head;
     321              : 
     322         1414 :             buffer = ReadBuffer(index, metadata->tail);
     323         1414 :             LockBuffer(buffer, GIN_EXCLUSIVE);
     324         1414 :             page = BufferGetPage(buffer);
     325              : 
     326              :             Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
     327              : 
     328         1414 :             START_CRIT_SECTION();
     329              : 
     330         1414 :             GinPageGetOpaque(page)->rightlink = sublist.head;
     331              : 
     332         1414 :             MarkBufferDirty(buffer);
     333              : 
     334         1414 :             metadata->tail = sublist.tail;
     335         1414 :             metadata->tailFreeSize = sublist.tailFreeSize;
     336              : 
     337         1414 :             metadata->nPendingPages += sublist.nPendingPages;
     338         1414 :             metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
     339              : 
     340         1414 :             if (needWal)
     341              :             {
     342          532 :                 XLogBeginInsert();
     343          532 :                 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
     344              :             }
     345              :         }
     346              :     }
     347              :     else
     348              :     {
     349              :         /*
     350              :          * Insert into tail page.  Metapage is already locked
     351              :          */
     352              :         OffsetNumber l,
     353              :                     off;
     354              :         int         i,
     355              :                     tupsize;
     356              :         char       *ptr;
     357              :         char       *collectordata;
     358              : 
     359       131414 :         CheckForSerializableConflictIn(index, NULL, GIN_METAPAGE_BLKNO);
     360              : 
     361       131414 :         buffer = ReadBuffer(index, metadata->tail);
     362       131414 :         LockBuffer(buffer, GIN_EXCLUSIVE);
     363       131414 :         page = BufferGetPage(buffer);
     364              : 
     365       131414 :         off = (PageIsEmpty(page)) ? FirstOffsetNumber :
     366       131414 :             OffsetNumberNext(PageGetMaxOffsetNumber(page));
     367              : 
     368       131414 :         collectordata = ptr = (char *) palloc(collector->sumsize);
     369              : 
     370       131414 :         data.ntuples = collector->ntuples;
     371              : 
     372       131414 :         START_CRIT_SECTION();
     373              : 
     374       131414 :         if (needWal)
     375        72259 :             XLogBeginInsert();
     376              : 
     377              :         /*
     378              :          * Increase counter of heap tuples
     379              :          */
     380              :         Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
     381       131414 :         GinPageGetOpaque(page)->maxoff++;
     382       131414 :         metadata->nPendingHeapTuples++;
     383              : 
     384       704990 :         for (i = 0; i < collector->ntuples; i++)
     385              :         {
     386       573576 :             tupsize = IndexTupleSize(collector->tuples[i]);
     387       573576 :             l = PageAddItem(page, collector->tuples[i], tupsize, off, false, false);
     388              : 
     389       573576 :             if (l == InvalidOffsetNumber)
     390            0 :                 elog(ERROR, "failed to add item to index page in \"%s\"",
     391              :                      RelationGetRelationName(index));
     392              : 
     393       573576 :             memcpy(ptr, collector->tuples[i], tupsize);
     394       573576 :             ptr += tupsize;
     395              : 
     396       573576 :             off++;
     397              :         }
     398              : 
     399              :         Assert((ptr - collectordata) <= collector->sumsize);
     400              : 
     401       131414 :         MarkBufferDirty(buffer);
     402              : 
     403       131414 :         if (needWal)
     404              :         {
     405        72259 :             XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
     406        72259 :             XLogRegisterBufData(1, collectordata, collector->sumsize);
     407              :         }
     408              : 
     409       131414 :         metadata->tailFreeSize = PageGetExactFreeSpace(page);
     410              :     }
     411              : 
     412              :     /*
     413              :      * Set pd_lower just past the end of the metadata.  This is essential,
     414              :      * because without doing so, metadata will be lost if xlog.c compresses
     415              :      * the page.  (We must do this here because pre-v11 versions of PG did not
     416              :      * set the metapage's pd_lower correctly, so a pg_upgraded index might
     417              :      * contain the wrong value.)
     418              :      */
     419       132886 :     ((PageHeader) metapage)->pd_lower =
     420       132886 :         ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
     421              : 
     422              :     /*
     423              :      * Write metabuffer, make xlog entry
     424              :      */
     425       132886 :     MarkBufferDirty(metabuffer);
     426              : 
     427       132886 :     if (needWal)
     428              :     {
     429              :         XLogRecPtr  recptr;
     430              : 
     431        72839 :         memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
     432              : 
     433        72839 :         XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
     434        72839 :         XLogRegisterData(&data, sizeof(ginxlogUpdateMeta));
     435              : 
     436        72839 :         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
     437        72839 :         PageSetLSN(metapage, recptr);
     438              : 
     439        72839 :         if (buffer != InvalidBuffer)
     440              :         {
     441        72791 :             PageSetLSN(page, recptr);
     442              :         }
     443              :     }
     444              : 
     445       132886 :     if (buffer != InvalidBuffer)
     446       132828 :         UnlockReleaseBuffer(buffer);
     447              : 
     448              :     /*
     449              :      * Force pending list cleanup when it becomes too long. And,
     450              :      * ginInsertCleanup could take significant amount of time, so we prefer to
     451              :      * call it when it can do all the work in a single collection cycle. In
     452              :      * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
     453              :      * while pending list is still small enough to fit into
     454              :      * gin_pending_list_limit.
     455              :      *
     456              :      * ginInsertCleanup() should not be called inside our CRIT_SECTION.
     457              :      */
     458       132886 :     cleanupSize = GinGetPendingListCleanupSize(index);
     459       132886 :     if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * (Size) 1024)
     460            0 :         needCleanup = true;
     461              : 
     462       132886 :     UnlockReleaseBuffer(metabuffer);
     463              : 
     464       132886 :     END_CRIT_SECTION();
     465              : 
     466              :     /*
     467              :      * Since it could contend with concurrent cleanup process we cleanup
     468              :      * pending list not forcibly.
     469              :      */
     470       132886 :     if (needCleanup)
     471            0 :         ginInsertCleanup(ginstate, false, true, false, NULL);
     472              : }
     473              : 
     474              : /*
     475              :  * Create temporary index tuples for a single indexable item (one index column
     476              :  * for the heap tuple specified by ht_ctid), and append them to the array
     477              :  * in *collector.  They will subsequently be written out using
     478              :  * ginHeapTupleFastInsert.  Note that to guarantee consistent state, all
     479              :  * temp tuples for a given heap tuple must be written in one call to
     480              :  * ginHeapTupleFastInsert.
     481              :  */
     482              : void
     483       192928 : ginHeapTupleFastCollect(GinState *ginstate,
     484              :                         GinTupleCollector *collector,
     485              :                         OffsetNumber attnum, Datum value, bool isNull,
     486              :                         ItemPointer ht_ctid)
     487              : {
     488              :     Datum      *entries;
     489              :     GinNullCategory *categories;
     490              :     int32       i,
     491              :                 nentries;
     492              : 
     493              :     /*
     494              :      * Extract the key values that need to be inserted in the index
     495              :      */
     496       192928 :     entries = ginExtractEntries(ginstate, attnum, value, isNull,
     497              :                                 &nentries, &categories);
     498              : 
     499              :     /*
     500              :      * Protect against integer overflow in allocation calculations
     501              :      */
     502       192928 :     if (nentries < 0 ||
     503       192928 :         collector->ntuples + nentries > MaxAllocSize / sizeof(IndexTuple))
     504            0 :         elog(ERROR, "too many entries for GIN index");
     505              : 
     506              :     /*
     507              :      * Allocate/reallocate memory for storing collected tuples
     508              :      */
     509       192928 :     if (collector->tuples == NULL)
     510              :     {
     511              :         /*
     512              :          * Determine the number of elements to allocate in the tuples array
     513              :          * initially.  Make it a power of 2 to avoid wasting memory when
     514              :          * resizing (since palloc likes powers of 2).
     515              :          */
     516       132889 :         collector->lentuples = pg_nextpower2_32(Max(16, nentries));
     517       132889 :         collector->tuples = palloc_array(IndexTuple, collector->lentuples);
     518              :     }
     519        60039 :     else if (collector->lentuples < collector->ntuples + nentries)
     520              :     {
     521              :         /*
     522              :          * Advance lentuples to the next suitable power of 2.  This won't
     523              :          * overflow, though we could get to a value that exceeds
     524              :          * MaxAllocSize/sizeof(IndexTuple), causing an error in repalloc.
     525              :          */
     526            0 :         collector->lentuples = pg_nextpower2_32(collector->ntuples + nentries);
     527            0 :         collector->tuples = repalloc_array(collector->tuples,
     528              :                                            IndexTuple, collector->lentuples);
     529              :     }
     530              : 
     531              :     /*
     532              :      * Build an index tuple for each key value, and add to array.  In pending
     533              :      * tuples we just stick the heap TID into t_tid.
     534              :      */
     535       773631 :     for (i = 0; i < nentries; i++)
     536              :     {
     537              :         IndexTuple  itup;
     538              : 
     539       580703 :         itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
     540              :                             NULL, 0, 0, true);
     541       580703 :         itup->t_tid = *ht_ctid;
     542       580703 :         collector->tuples[collector->ntuples++] = itup;
     543       580703 :         collector->sumsize += IndexTupleSize(itup);
     544              :     }
     545       192928 : }
     546              : 
     547              : /*
     548              :  * Deletes pending list pages up to (not including) newHead page.
     549              :  * If newHead == InvalidBlockNumber then function drops the whole list.
     550              :  *
     551              :  * metapage is pinned and exclusive-locked throughout this function.
     552              :  */
     553              : static void
     554           27 : shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
     555              :           bool fill_fsm, IndexBulkDeleteResult *stats)
     556              : {
     557              :     Page        metapage;
     558              :     GinMetaPageData *metadata;
     559              :     BlockNumber blknoToDelete;
     560              : 
     561           27 :     metapage = BufferGetPage(metabuffer);
     562           27 :     metadata = GinPageGetMeta(metapage);
     563           27 :     blknoToDelete = metadata->head;
     564              : 
     565              :     do
     566              :     {
     567              :         Page        page;
     568              :         int         i;
     569          108 :         int64       nDeletedHeapTuples = 0;
     570              :         ginxlogDeleteListPages data;
     571              :         Buffer      buffers[GIN_NDELETE_AT_ONCE];
     572              :         BlockNumber freespace[GIN_NDELETE_AT_ONCE];
     573              : 
     574          108 :         data.ndeleted = 0;
     575         1548 :         while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
     576              :         {
     577         1440 :             freespace[data.ndeleted] = blknoToDelete;
     578         1440 :             buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
     579         1440 :             LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
     580         1440 :             page = BufferGetPage(buffers[data.ndeleted]);
     581              : 
     582         1440 :             data.ndeleted++;
     583              : 
     584              :             Assert(!GinPageIsDeleted(page));
     585              : 
     586         1440 :             nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
     587         1440 :             blknoToDelete = GinPageGetOpaque(page)->rightlink;
     588              :         }
     589              : 
     590          108 :         if (stats)
     591          107 :             stats->pages_deleted += data.ndeleted;
     592              : 
     593              :         /*
     594              :          * This operation touches an unusually large number of pages, so
     595              :          * prepare the XLogInsert machinery for that before entering the
     596              :          * critical section.
     597              :          */
     598          108 :         if (RelationNeedsWAL(index))
     599           51 :             XLogEnsureRecordSpace(data.ndeleted, 0);
     600              : 
     601          108 :         START_CRIT_SECTION();
     602              : 
     603          108 :         metadata->head = blknoToDelete;
     604              : 
     605              :         Assert(metadata->nPendingPages >= data.ndeleted);
     606          108 :         metadata->nPendingPages -= data.ndeleted;
     607              :         Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
     608          108 :         metadata->nPendingHeapTuples -= nDeletedHeapTuples;
     609              : 
     610          108 :         if (blknoToDelete == InvalidBlockNumber)
     611              :         {
     612           27 :             metadata->tail = InvalidBlockNumber;
     613           27 :             metadata->tailFreeSize = 0;
     614           27 :             metadata->nPendingPages = 0;
     615           27 :             metadata->nPendingHeapTuples = 0;
     616              :         }
     617              : 
     618              :         /*
     619              :          * Set pd_lower just past the end of the metadata.  This is essential,
     620              :          * because without doing so, metadata will be lost if xlog.c
     621              :          * compresses the page.  (We must do this here because pre-v11
     622              :          * versions of PG did not set the metapage's pd_lower correctly, so a
     623              :          * pg_upgraded index might contain the wrong value.)
     624              :          */
     625          108 :         ((PageHeader) metapage)->pd_lower =
     626          108 :             ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
     627              : 
     628          108 :         MarkBufferDirty(metabuffer);
     629              : 
     630         1548 :         for (i = 0; i < data.ndeleted; i++)
     631              :         {
     632         1440 :             page = BufferGetPage(buffers[i]);
     633         1440 :             GinPageGetOpaque(page)->flags = GIN_DELETED;
     634         1440 :             MarkBufferDirty(buffers[i]);
     635              :         }
     636              : 
     637          108 :         if (RelationNeedsWAL(index))
     638              :         {
     639              :             XLogRecPtr  recptr;
     640              : 
     641           51 :             XLogBeginInsert();
     642           51 :             XLogRegisterBuffer(0, metabuffer,
     643              :                                REGBUF_WILL_INIT | REGBUF_STANDARD);
     644          606 :             for (i = 0; i < data.ndeleted; i++)
     645          555 :                 XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
     646              : 
     647           51 :             memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
     648              : 
     649           51 :             XLogRegisterData(&data,
     650              :                              sizeof(ginxlogDeleteListPages));
     651              : 
     652           51 :             recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
     653           51 :             PageSetLSN(metapage, recptr);
     654              : 
     655          606 :             for (i = 0; i < data.ndeleted; i++)
     656              :             {
     657          555 :                 page = BufferGetPage(buffers[i]);
     658          555 :                 PageSetLSN(page, recptr);
     659              :             }
     660              :         }
     661              : 
     662         1548 :         for (i = 0; i < data.ndeleted; i++)
     663         1440 :             UnlockReleaseBuffer(buffers[i]);
     664              : 
     665          108 :         END_CRIT_SECTION();
     666              : 
     667         1463 :         for (i = 0; fill_fsm && i < data.ndeleted; i++)
     668         1355 :             RecordFreeIndexPage(index, freespace[i]);
     669              : 
     670          108 :     } while (blknoToDelete != newHead);
     671           27 : }
     672              : 
     673              : /* Initialize empty KeyArray */
     674              : static void
     675           27 : initKeyArray(KeyArray *keys, int32 maxvalues)
     676              : {
     677           27 :     keys->keys = palloc_array(Datum, maxvalues);
     678           27 :     keys->categories = palloc_array(GinNullCategory, maxvalues);
     679           27 :     keys->nvalues = 0;
     680           27 :     keys->maxvalues = maxvalues;
     681           27 : }
     682              : 
     683              : /* Add datum to KeyArray, resizing if needed */
     684              : static void
     685       578743 : addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
     686              : {
     687       578743 :     if (keys->nvalues >= keys->maxvalues)
     688              :     {
     689            0 :         keys->maxvalues *= 2;
     690            0 :         keys->keys = repalloc_array(keys->keys, Datum, keys->maxvalues);
     691            0 :         keys->categories = repalloc_array(keys->categories, GinNullCategory, keys->maxvalues);
     692              :     }
     693              : 
     694       578743 :     keys->keys[keys->nvalues] = datum;
     695       578743 :     keys->categories[keys->nvalues] = category;
     696       578743 :     keys->nvalues++;
     697       578743 : }
     698              : 
     699              : /*
     700              :  * Collect data from a pending-list page in preparation for insertion into
     701              :  * the main index.
     702              :  *
     703              :  * Go through all tuples >= startoff on page and collect values in accum
     704              :  *
     705              :  * Note that ka is just workspace --- it does not carry any state across
     706              :  * calls.
     707              :  */
     708              : static void
     709         1441 : processPendingPage(BuildAccumulator *accum, KeyArray *ka,
     710              :                    Page page, OffsetNumber startoff)
     711              : {
     712              :     ItemPointerData heapptr;
     713              :     OffsetNumber i,
     714              :                 maxoff;
     715              :     OffsetNumber attrnum;
     716              : 
     717              :     /* reset *ka to empty */
     718         1441 :     ka->nvalues = 0;
     719              : 
     720         1441 :     maxoff = PageGetMaxOffsetNumber(page);
     721              :     Assert(maxoff >= FirstOffsetNumber);
     722         1441 :     ItemPointerSetInvalid(&heapptr);
     723         1441 :     attrnum = 0;
     724              : 
     725       580184 :     for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
     726              :     {
     727       578743 :         IndexTuple  itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
     728              :         OffsetNumber curattnum;
     729              :         Datum       curkey;
     730              :         GinNullCategory curcategory;
     731              : 
     732              :         /* Check for change of heap TID or attnum */
     733       578743 :         curattnum = gintuple_get_attrnum(accum->ginstate, itup);
     734              : 
     735       578743 :         if (!ItemPointerIsValid(&heapptr))
     736              :         {
     737         1441 :             heapptr = itup->t_tid;
     738         1441 :             attrnum = curattnum;
     739              :         }
     740       577302 :         else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
     741              :                    curattnum == attrnum))
     742              :         {
     743              :             /*
     744              :              * ginInsertBAEntries can insert several datums per call, but only
     745              :              * for one heap tuple and one column.  So call it at a boundary,
     746              :              * and reset ka.
     747              :              */
     748       191028 :             ginInsertBAEntries(accum, &heapptr, attrnum,
     749              :                                ka->keys, ka->categories, ka->nvalues);
     750       191028 :             ka->nvalues = 0;
     751       191028 :             heapptr = itup->t_tid;
     752       191028 :             attrnum = curattnum;
     753              :         }
     754              : 
     755              :         /* Add key to KeyArray */
     756       578743 :         curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
     757       578743 :         addDatum(ka, curkey, curcategory);
     758              :     }
     759              : 
     760              :     /* Dump out all remaining keys */
     761         1441 :     ginInsertBAEntries(accum, &heapptr, attrnum,
     762              :                        ka->keys, ka->categories, ka->nvalues);
     763         1441 : }
     764              : 
     765              : /*
     766              :  * Move tuples from pending pages into regular GIN structure.
     767              :  *
     768              :  * On first glance it looks completely not crash-safe. But if we crash
     769              :  * after posting entries to the main index and before removing them from the
     770              :  * pending list, it's okay because when we redo the posting later on, nothing
     771              :  * bad will happen.
     772              :  *
     773              :  * fill_fsm indicates that ginInsertCleanup should add deleted pages
     774              :  * to FSM otherwise caller is responsible to put deleted pages into
     775              :  * FSM.
     776              :  *
     777              :  * If stats isn't null, we count deleted pending pages into the counts.
     778              :  */
     779              : void
     780           76 : ginInsertCleanup(GinState *ginstate, bool full_clean,
     781              :                  bool fill_fsm, bool forceCleanup,
     782              :                  IndexBulkDeleteResult *stats)
     783              : {
     784           76 :     Relation    index = ginstate->index;
     785              :     Buffer      metabuffer,
     786              :                 buffer;
     787              :     Page        metapage,
     788              :                 page;
     789              :     GinMetaPageData *metadata;
     790              :     MemoryContext opCtx,
     791              :                 oldCtx;
     792              :     BuildAccumulator accum;
     793              :     KeyArray    datums;
     794              :     BlockNumber blkno,
     795              :                 blknoFinish;
     796           76 :     bool        cleanupFinish = false;
     797           76 :     bool        fsm_vac = false;
     798              :     int         workMemory;
     799              : 
     800              :     /*
     801              :      * We would like to prevent concurrent cleanup process. For that we will
     802              :      * lock metapage in exclusive mode using LockPage() call. Nobody other
     803              :      * will use that lock for metapage, so we keep possibility of concurrent
     804              :      * insertion into pending list
     805              :      */
     806              : 
     807           76 :     if (forceCleanup)
     808              :     {
     809              :         /*
     810              :          * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
     811              :          * and we would like to wait concurrent cleanup to finish.
     812              :          */
     813           76 :         LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
     814           76 :         workMemory =
     815            5 :             (AmAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
     816           81 :             autovacuum_work_mem : maintenance_work_mem;
     817              :     }
     818              :     else
     819              :     {
     820              :         /*
     821              :          * We are called from regular insert and if we see concurrent cleanup
     822              :          * just exit in hope that concurrent process will clean up pending
     823              :          * list.
     824              :          */
     825            0 :         if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
     826           49 :             return;
     827            0 :         workMemory = work_mem;
     828              :     }
     829              : 
     830           76 :     metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
     831           76 :     LockBuffer(metabuffer, GIN_SHARE);
     832           76 :     metapage = BufferGetPage(metabuffer);
     833           76 :     metadata = GinPageGetMeta(metapage);
     834              : 
     835           76 :     if (metadata->head == InvalidBlockNumber)
     836              :     {
     837              :         /* Nothing to do */
     838           49 :         UnlockReleaseBuffer(metabuffer);
     839           49 :         UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
     840           49 :         return;
     841              :     }
     842              : 
     843              :     /*
     844              :      * Remember a tail page to prevent infinite cleanup if other backends add
     845              :      * new tuples faster than we can cleanup.
     846              :      */
     847           27 :     blknoFinish = metadata->tail;
     848              : 
     849              :     /*
     850              :      * Read and lock head of pending list
     851              :      */
     852           27 :     blkno = metadata->head;
     853           27 :     buffer = ReadBuffer(index, blkno);
     854           27 :     LockBuffer(buffer, GIN_SHARE);
     855           27 :     page = BufferGetPage(buffer);
     856              : 
     857           27 :     LockBuffer(metabuffer, GIN_UNLOCK);
     858              : 
     859              :     /*
     860              :      * Initialize.  All temporary space will be in opCtx
     861              :      */
     862           27 :     opCtx = AllocSetContextCreate(CurrentMemoryContext,
     863              :                                   "GIN insert cleanup temporary context",
     864              :                                   ALLOCSET_DEFAULT_SIZES);
     865              : 
     866           27 :     oldCtx = MemoryContextSwitchTo(opCtx);
     867              : 
     868           27 :     initKeyArray(&datums, 128);
     869           27 :     ginInitBA(&accum);
     870           27 :     accum.ginstate = ginstate;
     871              : 
     872              :     /*
     873              :      * At the top of this loop, we have pin and lock on the current page of
     874              :      * the pending list.  However, we'll release that before exiting the loop.
     875              :      * Note we also have pin but not lock on the metapage.
     876              :      */
     877              :     for (;;)
     878              :     {
     879         1413 :         Assert(!GinPageIsDeleted(page));
     880              : 
     881              :         /*
     882              :          * Are we walk through the page which as we remember was a tail when
     883              :          * we start our cleanup?  But if caller asks us to clean up whole
     884              :          * pending list then ignore old tail, we will work until list becomes
     885              :          * empty.
     886              :          */
     887         1440 :         if (blkno == blknoFinish && full_clean == false)
     888            1 :             cleanupFinish = true;
     889              : 
     890              :         /*
     891              :          * read page's datums into accum
     892              :          */
     893         1440 :         processPendingPage(&accum, &datums, page, FirstOffsetNumber);
     894              : 
     895         1440 :         vacuum_delay_point(false);
     896              : 
     897              :         /*
     898              :          * Is it time to flush memory to disk?  Flush if we are at the end of
     899              :          * the pending list, or if we have a full row and memory is getting
     900              :          * full.
     901              :          */
     902         1440 :         if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
     903         1413 :             (GinPageHasFullRow(page) &&
     904         1413 :              accum.allocatedMemory >= workMemory * (Size) 1024))
     905            0 :         {
     906              :             ItemPointerData *list;
     907              :             uint32      nlist;
     908              :             Datum       key;
     909              :             GinNullCategory category;
     910              :             OffsetNumber maxoff,
     911              :                         attnum;
     912              : 
     913              :             /*
     914              :              * Unlock current page to increase performance. Changes of page
     915              :              * will be checked later by comparing maxoff after completion of
     916              :              * memory flush.
     917              :              */
     918           27 :             maxoff = PageGetMaxOffsetNumber(page);
     919           27 :             LockBuffer(buffer, GIN_UNLOCK);
     920              : 
     921              :             /*
     922              :              * Moving collected data into regular structure can take
     923              :              * significant amount of time - so, run it without locking pending
     924              :              * list.
     925              :              */
     926           27 :             ginBeginBAScan(&accum);
     927       183130 :             while ((list = ginGetBAEntry(&accum,
     928       183130 :                                          &attnum, &key, &category, &nlist)) != NULL)
     929              :             {
     930       183103 :                 ginEntryInsert(ginstate, attnum, key, category,
     931              :                                list, nlist, NULL);
     932       183103 :                 vacuum_delay_point(false);
     933              :             }
     934              : 
     935              :             /*
     936              :              * Lock the whole list to remove pages
     937              :              */
     938           27 :             LockBuffer(metabuffer, GIN_EXCLUSIVE);
     939           27 :             LockBuffer(buffer, GIN_SHARE);
     940              : 
     941              :             Assert(!GinPageIsDeleted(page));
     942              : 
     943              :             /*
     944              :              * While we left the page unlocked, more stuff might have gotten
     945              :              * added to it.  If so, process those entries immediately.  There
     946              :              * shouldn't be very many, so we don't worry about the fact that
     947              :              * we're doing this with exclusive lock. Insertion algorithm
     948              :              * guarantees that inserted row(s) will not continue on next page.
     949              :              * NOTE: intentionally no vacuum_delay_point in this loop.
     950              :              */
     951           27 :             if (PageGetMaxOffsetNumber(page) != maxoff)
     952              :             {
     953            1 :                 ginInitBA(&accum);
     954            1 :                 processPendingPage(&accum, &datums, page, maxoff + 1);
     955              : 
     956            1 :                 ginBeginBAScan(&accum);
     957            8 :                 while ((list = ginGetBAEntry(&accum,
     958            8 :                                              &attnum, &key, &category, &nlist)) != NULL)
     959            7 :                     ginEntryInsert(ginstate, attnum, key, category,
     960              :                                    list, nlist, NULL);
     961              :             }
     962              : 
     963              :             /*
     964              :              * Remember next page - it will become the new list head
     965              :              */
     966           27 :             blkno = GinPageGetOpaque(page)->rightlink;
     967           27 :             UnlockReleaseBuffer(buffer);    /* shiftList will do exclusive
     968              :                                              * locking */
     969              : 
     970              :             /*
     971              :              * remove read pages from pending list, at this point all content
     972              :              * of read pages is in regular structure
     973              :              */
     974           27 :             shiftList(index, metabuffer, blkno, fill_fsm, stats);
     975              : 
     976              :             /* At this point, some pending pages have been freed up */
     977           27 :             fsm_vac = true;
     978              : 
     979              :             Assert(blkno == metadata->head);
     980           27 :             LockBuffer(metabuffer, GIN_UNLOCK);
     981              : 
     982              :             /*
     983              :              * if we removed the whole pending list or we cleanup tail (which
     984              :              * we remembered on start our cleanup process) then just exit
     985              :              */
     986           27 :             if (blkno == InvalidBlockNumber || cleanupFinish)
     987              :                 break;
     988              : 
     989              :             /*
     990              :              * release memory used so far and reinit state
     991              :              */
     992            0 :             MemoryContextReset(opCtx);
     993            0 :             initKeyArray(&datums, datums.maxvalues);
     994            0 :             ginInitBA(&accum);
     995              :         }
     996              :         else
     997              :         {
     998         1413 :             blkno = GinPageGetOpaque(page)->rightlink;
     999         1413 :             UnlockReleaseBuffer(buffer);
    1000              :         }
    1001              : 
    1002              :         /*
    1003              :          * Read next page in pending list
    1004              :          */
    1005         1413 :         vacuum_delay_point(false);
    1006         1413 :         buffer = ReadBuffer(index, blkno);
    1007         1413 :         LockBuffer(buffer, GIN_SHARE);
    1008         1413 :         page = BufferGetPage(buffer);
    1009              :     }
    1010              : 
    1011           27 :     UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
    1012           27 :     ReleaseBuffer(metabuffer);
    1013              : 
    1014              :     /*
    1015              :      * As pending list pages can have a high churn rate, it is desirable to
    1016              :      * recycle them immediately to the FreeSpaceMap when ordinary backends
    1017              :      * clean the list.
    1018              :      */
    1019           27 :     if (fsm_vac && fill_fsm)
    1020           11 :         IndexFreeSpaceMapVacuum(index);
    1021              : 
    1022              :     /* Clean up temporary space */
    1023           27 :     MemoryContextSwitchTo(oldCtx);
    1024           27 :     MemoryContextDelete(opCtx);
    1025              : }
    1026              : 
    1027              : /*
    1028              :  * SQL-callable function to clean the insert pending list
    1029              :  */
    1030              : Datum
    1031           13 : gin_clean_pending_list(PG_FUNCTION_ARGS)
    1032              : {
    1033           13 :     Oid         indexoid = PG_GETARG_OID(0);
    1034           13 :     Relation    indexRel = index_open(indexoid, RowExclusiveLock);
    1035              :     IndexBulkDeleteResult stats;
    1036              : 
    1037           13 :     if (RecoveryInProgress())
    1038            0 :         ereport(ERROR,
    1039              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1040              :                  errmsg("recovery is in progress"),
    1041              :                  errhint("GIN pending list cannot be cleaned up during recovery.")));
    1042              : 
    1043              :     /* Must be a GIN index */
    1044           13 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
    1045           13 :         indexRel->rd_rel->relam != GIN_AM_OID)
    1046            0 :         ereport(ERROR,
    1047              :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1048              :                  errmsg("\"%s\" is not a GIN index",
    1049              :                         RelationGetRelationName(indexRel))));
    1050              : 
    1051              :     /*
    1052              :      * Reject attempts to read non-local temporary relations; we would be
    1053              :      * likely to get wrong data since we have no visibility into the owning
    1054              :      * session's local buffers.
    1055              :      */
    1056           13 :     if (RELATION_IS_OTHER_TEMP(indexRel))
    1057            0 :         ereport(ERROR,
    1058              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1059              :                  errmsg("cannot access temporary indexes of other sessions")));
    1060              : 
    1061              :     /* User must own the index (comparable to privileges needed for VACUUM) */
    1062           13 :     if (!object_ownercheck(RelationRelationId, indexoid, GetUserId()))
    1063            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
    1064            0 :                        RelationGetRelationName(indexRel));
    1065              : 
    1066           13 :     memset(&stats, 0, sizeof(stats));
    1067              : 
    1068              :     /*
    1069              :      * Can't assume anything about the content of an !indisready index.  Make
    1070              :      * those a no-op, not an error, so users can just run this function on all
    1071              :      * indexes of the access method.  Since an indisready&&!indisvalid index
    1072              :      * is merely awaiting missed aminsert calls, we're capable of processing
    1073              :      * it.  Decline to do so, out of an abundance of caution.
    1074              :      */
    1075           13 :     if (indexRel->rd_index->indisvalid)
    1076              :     {
    1077              :         GinState    ginstate;
    1078              : 
    1079           13 :         initGinState(&ginstate, indexRel);
    1080           13 :         ginInsertCleanup(&ginstate, true, true, true, &stats);
    1081              :     }
    1082              :     else
    1083            0 :         ereport(DEBUG1,
    1084              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1085              :                  errmsg("index \"%s\" is not valid",
    1086              :                         RelationGetRelationName(indexRel))));
    1087              : 
    1088           13 :     index_close(indexRel, RowExclusiveLock);
    1089              : 
    1090           13 :     PG_RETURN_INT64((int64) stats.pages_deleted);
    1091              : }
        

Generated by: LCOV version 2.0-1