LCOV - code coverage report
Current view: top level - src/backend/access/gin - ginfast.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 323 364 88.7 %
Date: 2019-11-21 12:06:29 Functions: 10 10 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * ginfast.c
       4             :  *    Fast insert routines for the Postgres inverted index access method.
       5             :  *    Pending entries are stored in linear list of pages.  Later on
       6             :  *    (typically during VACUUM), ginInsertCleanup() will be invoked to
       7             :  *    transfer pending entries into the regular index structure.  This
       8             :  *    wins because bulk insertion is much more efficient than retail.
       9             :  *
      10             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
      11             :  * Portions Copyright (c) 1994, Regents of the University of California
      12             :  *
      13             :  * IDENTIFICATION
      14             :  *          src/backend/access/gin/ginfast.c
      15             :  *
      16             :  *-------------------------------------------------------------------------
      17             :  */
      18             : 
      19             : #include "postgres.h"
      20             : 
      21             : #include "access/gin_private.h"
      22             : #include "access/ginxlog.h"
      23             : #include "access/xlog.h"
      24             : #include "access/xloginsert.h"
      25             : #include "catalog/pg_am.h"
      26             : #include "commands/vacuum.h"
      27             : #include "miscadmin.h"
      28             : #include "postmaster/autovacuum.h"
      29             : #include "storage/indexfsm.h"
      30             : #include "storage/lmgr.h"
      31             : #include "storage/predicate.h"
      32             : #include "utils/acl.h"
      33             : #include "utils/builtins.h"
      34             : #include "utils/memutils.h"
      35             : #include "utils/rel.h"
      36             : 
      37             : /* GUC parameter */
      38             : int         gin_pending_list_limit = 0;
      39             : 
      40             : #define GIN_PAGE_FREESIZE \
      41             :     ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
      42             : 
      43             : typedef struct KeyArray
      44             : {
      45             :     Datum      *keys;           /* expansible array */
      46             :     GinNullCategory *categories;    /* another expansible array */
      47             :     int32       nvalues;        /* current number of valid entries */
      48             :     int32       maxvalues;      /* allocated size of arrays */
      49             : } KeyArray;
      50             : 
      51             : 
      52             : /*
      53             :  * Build a pending-list page from the given array of tuples, and write it out.
      54             :  *
      55             :  * Returns amount of free space left on the page.
      56             :  */
      57             : static int32
      58         660 : writeListPage(Relation index, Buffer buffer,
      59             :               IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
      60             : {
      61         660 :     Page        page = BufferGetPage(buffer);
      62             :     int32       i,
      63             :                 freesize,
      64         660 :                 size = 0;
      65             :     OffsetNumber l,
      66             :                 off;
      67             :     PGAlignedBlock workspace;
      68             :     char       *ptr;
      69             : 
      70         660 :     START_CRIT_SECTION();
      71             : 
      72         660 :     GinInitBuffer(buffer, GIN_LIST);
      73             : 
      74         660 :     off = FirstOffsetNumber;
      75         660 :     ptr = workspace.data;
      76             : 
      77        2624 :     for (i = 0; i < ntuples; i++)
      78             :     {
      79        1964 :         int         this_size = IndexTupleSize(tuples[i]);
      80             : 
      81        1964 :         memcpy(ptr, tuples[i], this_size);
      82        1964 :         ptr += this_size;
      83        1964 :         size += this_size;
      84             : 
      85        1964 :         l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
      86             : 
      87        1964 :         if (l == InvalidOffsetNumber)
      88           0 :             elog(ERROR, "failed to add item to index page in \"%s\"",
      89             :                  RelationGetRelationName(index));
      90             : 
      91        1964 :         off++;
      92             :     }
      93             : 
      94             :     Assert(size <= BLCKSZ);      /* else we overran workspace */
      95             : 
      96         660 :     GinPageGetOpaque(page)->rightlink = rightlink;
      97             : 
      98             :     /*
      99             :      * tail page may contain only whole row(s) or final part of row placed on
     100             :      * previous pages (a "row" here meaning all the index tuples generated for
     101             :      * one heap tuple)
     102             :      */
     103         660 :     if (rightlink == InvalidBlockNumber)
     104             :     {
     105         660 :         GinPageSetFullRow(page);
     106         660 :         GinPageGetOpaque(page)->maxoff = 1;
     107             :     }
     108             :     else
     109             :     {
     110           0 :         GinPageGetOpaque(page)->maxoff = 0;
     111             :     }
     112             : 
     113         660 :     MarkBufferDirty(buffer);
     114             : 
     115         660 :     if (RelationNeedsWAL(index))
     116             :     {
     117             :         ginxlogInsertListPage data;
     118             :         XLogRecPtr  recptr;
     119             : 
     120         656 :         data.rightlink = rightlink;
     121         656 :         data.ntuples = ntuples;
     122             : 
     123         656 :         XLogBeginInsert();
     124         656 :         XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
     125             : 
     126         656 :         XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
     127         656 :         XLogRegisterBufData(0, workspace.data, size);
     128             : 
     129         656 :         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
     130         656 :         PageSetLSN(page, recptr);
     131             :     }
     132             : 
     133             :     /* get free space before releasing buffer */
     134         660 :     freesize = PageGetExactFreeSpace(page);
     135             : 
     136         660 :     UnlockReleaseBuffer(buffer);
     137             : 
     138         660 :     END_CRIT_SECTION();
     139             : 
     140         660 :     return freesize;
     141             : }
     142             : 
     143             : static void
     144         660 : makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
     145             :             GinMetaPageData *res)
     146             : {
     147         660 :     Buffer      curBuffer = InvalidBuffer;
     148         660 :     Buffer      prevBuffer = InvalidBuffer;
     149             :     int         i,
     150         660 :                 size = 0,
     151             :                 tupsize;
     152         660 :     int         startTuple = 0;
     153             : 
     154             :     Assert(ntuples > 0);
     155             : 
     156             :     /*
     157             :      * Split tuples into pages
     158             :      */
     159        2624 :     for (i = 0; i < ntuples; i++)
     160             :     {
     161        1964 :         if (curBuffer == InvalidBuffer)
     162             :         {
     163         660 :             curBuffer = GinNewBuffer(index);
     164             : 
     165         660 :             if (prevBuffer != InvalidBuffer)
     166             :             {
     167           0 :                 res->nPendingPages++;
     168           0 :                 writeListPage(index, prevBuffer,
     169           0 :                               tuples + startTuple,
     170             :                               i - startTuple,
     171             :                               BufferGetBlockNumber(curBuffer));
     172             :             }
     173             :             else
     174             :             {
     175         660 :                 res->head = BufferGetBlockNumber(curBuffer);
     176             :             }
     177             : 
     178         660 :             prevBuffer = curBuffer;
     179         660 :             startTuple = i;
     180         660 :             size = 0;
     181             :         }
     182             : 
     183        1964 :         tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
     184             : 
     185        1964 :         if (size + tupsize > GinListPageSize)
     186             :         {
     187             :             /* won't fit, force a new page and reprocess */
     188           0 :             i--;
     189           0 :             curBuffer = InvalidBuffer;
     190             :         }
     191             :         else
     192             :         {
     193        1964 :             size += tupsize;
     194             :         }
     195             :     }
     196             : 
     197             :     /*
     198             :      * Write last page
     199             :      */
     200         660 :     res->tail = BufferGetBlockNumber(curBuffer);
     201        1320 :     res->tailFreeSize = writeListPage(index, curBuffer,
     202         660 :                                       tuples + startTuple,
     203             :                                       ntuples - startTuple,
     204             :                                       InvalidBlockNumber);
     205         660 :     res->nPendingPages++;
     206             :     /* that was only one heap tuple */
     207         660 :     res->nPendingHeapTuples = 1;
     208         660 : }
     209             : 
     210             : /*
     211             :  * Write the index tuples contained in *collector into the index's
     212             :  * pending list.
     213             :  *
     214             :  * Function guarantees that all these tuples will be inserted consecutively,
     215             :  * preserving order
     216             :  */
     217             : void
     218       88030 : ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
     219             : {
     220       88030 :     Relation    index = ginstate->index;
     221             :     Buffer      metabuffer;
     222             :     Page        metapage;
     223       88030 :     GinMetaPageData *metadata = NULL;
     224       88030 :     Buffer      buffer = InvalidBuffer;
     225       88030 :     Page        page = NULL;
     226             :     ginxlogUpdateMeta data;
     227       88030 :     bool        separateList = false;
     228       88030 :     bool        needCleanup = false;
     229             :     int         cleanupSize;
     230             :     bool        needWal;
     231             : 
     232       88030 :     if (collector->ntuples == 0)
     233           0 :         return;
     234             : 
     235       88030 :     needWal = RelationNeedsWAL(index);
     236             : 
     237       88030 :     data.node = index->rd_node;
     238       88030 :     data.ntuples = 0;
     239       88030 :     data.newRightlink = data.prevTail = InvalidBlockNumber;
     240             : 
     241       88030 :     metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
     242       88030 :     metapage = BufferGetPage(metabuffer);
     243             : 
     244             :     /*
     245             :      * An insertion to the pending list could logically belong anywhere in the
     246             :      * tree, so it conflicts with all serializable scans.  All scans acquire a
     247             :      * predicate lock on the metabuffer to represent that.
     248             :      */
     249       88030 :     CheckForSerializableConflictIn(index, NULL, metabuffer);
     250             : 
     251       88024 :     if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
     252             :     {
     253             :         /*
     254             :          * Total size is greater than one page => make sublist
     255             :          */
     256           0 :         separateList = true;
     257             :     }
     258             :     else
     259             :     {
     260       88024 :         LockBuffer(metabuffer, GIN_EXCLUSIVE);
     261       88024 :         metadata = GinPageGetMeta(metapage);
     262             : 
     263      176032 :         if (metadata->head == InvalidBlockNumber ||
     264       88008 :             collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
     265             :         {
     266             :             /*
     267             :              * Pending list is empty or total size is greater than freespace
     268             :              * on tail page => make sublist
     269             :              *
     270             :              * We unlock metabuffer to keep high concurrency
     271             :              */
     272         660 :             separateList = true;
     273         660 :             LockBuffer(metabuffer, GIN_UNLOCK);
     274             :         }
     275             :     }
     276             : 
     277       88024 :     if (separateList)
     278             :     {
     279             :         /*
     280             :          * We should make sublist separately and append it to the tail
     281             :          */
     282             :         GinMetaPageData sublist;
     283             : 
     284         660 :         memset(&sublist, 0, sizeof(GinMetaPageData));
     285         660 :         makeSublist(index, collector->tuples, collector->ntuples, &sublist);
     286             : 
     287         660 :         if (needWal)
     288         656 :             XLogBeginInsert();
     289             : 
     290             :         /*
     291             :          * metapage was unlocked, see above
     292             :          */
     293         660 :         LockBuffer(metabuffer, GIN_EXCLUSIVE);
     294         660 :         metadata = GinPageGetMeta(metapage);
     295             : 
     296         660 :         if (metadata->head == InvalidBlockNumber)
     297             :         {
     298             :             /*
     299             :              * Main list is empty, so just insert sublist as main list
     300             :              */
     301          16 :             START_CRIT_SECTION();
     302             : 
     303          16 :             metadata->head = sublist.head;
     304          16 :             metadata->tail = sublist.tail;
     305          16 :             metadata->tailFreeSize = sublist.tailFreeSize;
     306             : 
     307          16 :             metadata->nPendingPages = sublist.nPendingPages;
     308          16 :             metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
     309             :         }
     310             :         else
     311             :         {
     312             :             /*
     313             :              * Merge lists
     314             :              */
     315         644 :             data.prevTail = metadata->tail;
     316         644 :             data.newRightlink = sublist.head;
     317             : 
     318         644 :             buffer = ReadBuffer(index, metadata->tail);
     319         644 :             LockBuffer(buffer, GIN_EXCLUSIVE);
     320         644 :             page = BufferGetPage(buffer);
     321             : 
     322             :             Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
     323             : 
     324         644 :             START_CRIT_SECTION();
     325             : 
     326         644 :             GinPageGetOpaque(page)->rightlink = sublist.head;
     327             : 
     328         644 :             MarkBufferDirty(buffer);
     329             : 
     330         644 :             metadata->tail = sublist.tail;
     331         644 :             metadata->tailFreeSize = sublist.tailFreeSize;
     332             : 
     333         644 :             metadata->nPendingPages += sublist.nPendingPages;
     334         644 :             metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
     335             : 
     336         644 :             if (needWal)
     337         644 :                 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
     338             :         }
     339             :     }
     340             :     else
     341             :     {
     342             :         /*
     343             :          * Insert into tail page.  Metapage is already locked
     344             :          */
     345             :         OffsetNumber l,
     346             :                     off;
     347             :         int         i,
     348             :                     tupsize;
     349             :         char       *ptr;
     350             :         char       *collectordata;
     351             : 
     352       87364 :         buffer = ReadBuffer(index, metadata->tail);
     353       87364 :         LockBuffer(buffer, GIN_EXCLUSIVE);
     354       87364 :         page = BufferGetPage(buffer);
     355             : 
     356      174728 :         off = (PageIsEmpty(page)) ? FirstOffsetNumber :
     357       87364 :             OffsetNumberNext(PageGetMaxOffsetNumber(page));
     358             : 
     359       87364 :         collectordata = ptr = (char *) palloc(collector->sumsize);
     360             : 
     361       87364 :         data.ntuples = collector->ntuples;
     362             : 
     363       87364 :         if (needWal)
     364       87360 :             XLogBeginInsert();
     365             : 
     366       87364 :         START_CRIT_SECTION();
     367             : 
     368             :         /*
     369             :          * Increase counter of heap tuples
     370             :          */
     371             :         Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
     372       87364 :         GinPageGetOpaque(page)->maxoff++;
     373       87364 :         metadata->nPendingHeapTuples++;
     374             : 
     375      349420 :         for (i = 0; i < collector->ntuples; i++)
     376             :         {
     377      262056 :             tupsize = IndexTupleSize(collector->tuples[i]);
     378      262056 :             l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
     379             : 
     380      262056 :             if (l == InvalidOffsetNumber)
     381           0 :                 elog(ERROR, "failed to add item to index page in \"%s\"",
     382             :                      RelationGetRelationName(index));
     383             : 
     384      262056 :             memcpy(ptr, collector->tuples[i], tupsize);
     385      262056 :             ptr += tupsize;
     386             : 
     387      262056 :             off++;
     388             :         }
     389             : 
     390             :         Assert((ptr - collectordata) <= collector->sumsize);
     391       87364 :         if (needWal)
     392             :         {
     393       87360 :             XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
     394       87360 :             XLogRegisterBufData(1, collectordata, collector->sumsize);
     395             :         }
     396             : 
     397       87364 :         metadata->tailFreeSize = PageGetExactFreeSpace(page);
     398             : 
     399       87364 :         MarkBufferDirty(buffer);
     400             :     }
     401             : 
     402             :     /*
     403             :      * Set pd_lower just past the end of the metadata.  This is essential,
     404             :      * because without doing so, metadata will be lost if xlog.c compresses
     405             :      * the page.  (We must do this here because pre-v11 versions of PG did not
     406             :      * set the metapage's pd_lower correctly, so a pg_upgraded index might
     407             :      * contain the wrong value.)
     408             :      */
     409       88024 :     ((PageHeader) metapage)->pd_lower =
     410       88024 :         ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
     411             : 
     412             :     /*
     413             :      * Write metabuffer, make xlog entry
     414             :      */
     415       88024 :     MarkBufferDirty(metabuffer);
     416             : 
     417       88024 :     if (needWal)
     418             :     {
     419             :         XLogRecPtr  recptr;
     420             : 
     421       88016 :         memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
     422             : 
     423       88016 :         XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
     424       88016 :         XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
     425             : 
     426       88016 :         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
     427       88016 :         PageSetLSN(metapage, recptr);
     428             : 
     429       88016 :         if (buffer != InvalidBuffer)
     430             :         {
     431       88004 :             PageSetLSN(page, recptr);
     432             :         }
     433             :     }
     434             : 
     435       88024 :     if (buffer != InvalidBuffer)
     436       88008 :         UnlockReleaseBuffer(buffer);
     437             : 
     438             :     /*
     439             :      * Force pending list cleanup when it becomes too long. And,
     440             :      * ginInsertCleanup could take significant amount of time, so we prefer to
     441             :      * call it when it can do all the work in a single collection cycle. In
     442             :      * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
     443             :      * while pending list is still small enough to fit into
     444             :      * gin_pending_list_limit.
     445             :      *
     446             :      * ginInsertCleanup() should not be called inside our CRIT_SECTION.
     447             :      */
     448       88024 :     cleanupSize = GinGetPendingListCleanupSize(index);
     449       88024 :     if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
     450           0 :         needCleanup = true;
     451             : 
     452       88024 :     UnlockReleaseBuffer(metabuffer);
     453             : 
     454       88024 :     END_CRIT_SECTION();
     455             : 
     456             :     /*
     457             :      * Since it could contend with concurrent cleanup process we cleanup
     458             :      * pending list not forcibly.
     459             :      */
     460       88024 :     if (needCleanup)
     461           0 :         ginInsertCleanup(ginstate, false, true, false, NULL);
     462             : }
     463             : 
     464             : /*
     465             :  * Create temporary index tuples for a single indexable item (one index column
     466             :  * for the heap tuple specified by ht_ctid), and append them to the array
     467             :  * in *collector.  They will subsequently be written out using
     468             :  * ginHeapTupleFastInsert.  Note that to guarantee consistent state, all
     469             :  * temp tuples for a given heap tuple must be written in one call to
     470             :  * ginHeapTupleFastInsert.
     471             :  */
     472             : void
     473       88030 : ginHeapTupleFastCollect(GinState *ginstate,
     474             :                         GinTupleCollector *collector,
     475             :                         OffsetNumber attnum, Datum value, bool isNull,
     476             :                         ItemPointer ht_ctid)
     477             : {
     478             :     Datum      *entries;
     479             :     GinNullCategory *categories;
     480             :     int32       i,
     481             :                 nentries;
     482             : 
     483             :     /*
     484             :      * Extract the key values that need to be inserted in the index
     485             :      */
     486       88030 :     entries = ginExtractEntries(ginstate, attnum, value, isNull,
     487             :                                 &nentries, &categories);
     488             : 
     489             :     /*
     490             :      * Protect against integer overflow in allocation calculations
     491             :      */
     492      176060 :     if (nentries < 0 ||
     493       88030 :         collector->ntuples + nentries > MaxAllocSize / sizeof(IndexTuple))
     494           0 :         elog(ERROR, "too many entries for GIN index");
     495             : 
     496             :     /*
     497             :      * Allocate/reallocate memory for storing collected tuples
     498             :      */
     499       88030 :     if (collector->tuples == NULL)
     500             :     {
     501             :         /*
     502             :          * Determine the number of elements to allocate in the tuples array
     503             :          * initially.  Make it a power of 2 to avoid wasting memory when
     504             :          * resizing (since palloc likes powers of 2).
     505             :          */
     506       88030 :         collector->lentuples = 16;
     507      176060 :         while (collector->lentuples < nentries)
     508           0 :             collector->lentuples *= 2;
     509             : 
     510       88030 :         collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
     511             :     }
     512           0 :     else if (collector->lentuples < collector->ntuples + nentries)
     513             :     {
     514             :         /*
     515             :          * Advance lentuples to the next suitable power of 2.  This won't
     516             :          * overflow, though we could get to a value that exceeds
     517             :          * MaxAllocSize/sizeof(IndexTuple), causing an error in repalloc.
     518             :          */
     519             :         do
     520             :         {
     521           0 :             collector->lentuples *= 2;
     522           0 :         } while (collector->lentuples < collector->ntuples + nentries);
     523             : 
     524           0 :         collector->tuples = (IndexTuple *) repalloc(collector->tuples,
     525           0 :                                                     sizeof(IndexTuple) * collector->lentuples);
     526             :     }
     527             : 
     528             :     /*
     529             :      * Build an index tuple for each key value, and add to array.  In pending
     530             :      * tuples we just stick the heap TID into t_tid.
     531             :      */
     532      352056 :     for (i = 0; i < nentries; i++)
     533             :     {
     534             :         IndexTuple  itup;
     535             : 
     536      264026 :         itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
     537             :                             NULL, 0, 0, true);
     538      264026 :         itup->t_tid = *ht_ctid;
     539      264026 :         collector->tuples[collector->ntuples++] = itup;
     540      264026 :         collector->sumsize += IndexTupleSize(itup);
     541             :     }
     542       88030 : }
     543             : 
     544             : /*
     545             :  * Deletes pending list pages up to (not including) newHead page.
     546             :  * If newHead == InvalidBlockNumber then function drops the whole list.
     547             :  *
     548             :  * metapage is pinned and exclusive-locked throughout this function.
     549             :  */
     550             : static void
     551           8 : shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
     552             :           bool fill_fsm, IndexBulkDeleteResult *stats)
     553             : {
     554             :     Page        metapage;
     555             :     GinMetaPageData *metadata;
     556             :     BlockNumber blknoToDelete;
     557             : 
     558           8 :     metapage = BufferGetPage(metabuffer);
     559           8 :     metadata = GinPageGetMeta(metapage);
     560           8 :     blknoToDelete = metadata->head;
     561             : 
     562             :     do
     563             :     {
     564             :         Page        page;
     565             :         int         i;
     566          44 :         int64       nDeletedHeapTuples = 0;
     567             :         ginxlogDeleteListPages data;
     568             :         Buffer      buffers[GIN_NDELETE_AT_ONCE];
     569             :         BlockNumber freespace[GIN_NDELETE_AT_ONCE];
     570             : 
     571          44 :         data.ndeleted = 0;
     572         740 :         while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
     573             :         {
     574         652 :             freespace[data.ndeleted] = blknoToDelete;
     575         652 :             buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
     576         652 :             LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
     577         652 :             page = BufferGetPage(buffers[data.ndeleted]);
     578             : 
     579         652 :             data.ndeleted++;
     580             : 
     581             :             Assert(!GinPageIsDeleted(page));
     582             : 
     583         652 :             nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
     584         652 :             blknoToDelete = GinPageGetOpaque(page)->rightlink;
     585             :         }
     586             : 
     587          44 :         if (stats)
     588          44 :             stats->pages_deleted += data.ndeleted;
     589             : 
     590             :         /*
     591             :          * This operation touches an unusually large number of pages, so
     592             :          * prepare the XLogInsert machinery for that before entering the
     593             :          * critical section.
     594             :          */
     595          44 :         if (RelationNeedsWAL(index))
     596          44 :             XLogEnsureRecordSpace(data.ndeleted, 0);
     597             : 
     598          44 :         START_CRIT_SECTION();
     599             : 
     600          44 :         metadata->head = blknoToDelete;
     601             : 
     602             :         Assert(metadata->nPendingPages >= data.ndeleted);
     603          44 :         metadata->nPendingPages -= data.ndeleted;
     604             :         Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
     605          44 :         metadata->nPendingHeapTuples -= nDeletedHeapTuples;
     606             : 
     607          44 :         if (blknoToDelete == InvalidBlockNumber)
     608             :         {
     609           8 :             metadata->tail = InvalidBlockNumber;
     610           8 :             metadata->tailFreeSize = 0;
     611           8 :             metadata->nPendingPages = 0;
     612           8 :             metadata->nPendingHeapTuples = 0;
     613             :         }
     614             : 
     615             :         /*
     616             :          * Set pd_lower just past the end of the metadata.  This is essential,
     617             :          * because without doing so, metadata will be lost if xlog.c
     618             :          * compresses the page.  (We must do this here because pre-v11
     619             :          * versions of PG did not set the metapage's pd_lower correctly, so a
     620             :          * pg_upgraded index might contain the wrong value.)
     621             :          */
     622          44 :         ((PageHeader) metapage)->pd_lower =
     623          44 :             ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
     624             : 
     625          44 :         MarkBufferDirty(metabuffer);
     626             : 
     627         696 :         for (i = 0; i < data.ndeleted; i++)
     628             :         {
     629         652 :             page = BufferGetPage(buffers[i]);
     630         652 :             GinPageGetOpaque(page)->flags = GIN_DELETED;
     631         652 :             MarkBufferDirty(buffers[i]);
     632             :         }
     633             : 
     634          44 :         if (RelationNeedsWAL(index))
     635             :         {
     636             :             XLogRecPtr  recptr;
     637             : 
     638          44 :             XLogBeginInsert();
     639          44 :             XLogRegisterBuffer(0, metabuffer,
     640             :                                REGBUF_WILL_INIT | REGBUF_STANDARD);
     641         696 :             for (i = 0; i < data.ndeleted; i++)
     642         652 :                 XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
     643             : 
     644          44 :             memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
     645             : 
     646          44 :             XLogRegisterData((char *) &data,
     647             :                              sizeof(ginxlogDeleteListPages));
     648             : 
     649          44 :             recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
     650          44 :             PageSetLSN(metapage, recptr);
     651             : 
     652         696 :             for (i = 0; i < data.ndeleted; i++)
     653             :             {
     654         652 :                 page = BufferGetPage(buffers[i]);
     655         652 :                 PageSetLSN(page, recptr);
     656             :             }
     657             :         }
     658             : 
     659         696 :         for (i = 0; i < data.ndeleted; i++)
     660         652 :             UnlockReleaseBuffer(buffers[i]);
     661             : 
     662          44 :         END_CRIT_SECTION();
     663             : 
     664         664 :         for (i = 0; fill_fsm && i < data.ndeleted; i++)
     665         620 :             RecordFreeIndexPage(index, freespace[i]);
     666             : 
     667          44 :     } while (blknoToDelete != newHead);
     668           8 : }
     669             : 
     670             : /* Initialize empty KeyArray */
     671             : static void
     672           8 : initKeyArray(KeyArray *keys, int32 maxvalues)
     673             : {
     674           8 :     keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
     675           8 :     keys->categories = (GinNullCategory *)
     676           8 :         palloc(sizeof(GinNullCategory) * maxvalues);
     677           8 :     keys->nvalues = 0;
     678           8 :     keys->maxvalues = maxvalues;
     679           8 : }
     680             : 
     681             : /* Add datum to KeyArray, resizing if needed */
     682             : static void
     683      263976 : addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
     684             : {
     685      263976 :     if (keys->nvalues >= keys->maxvalues)
     686             :     {
     687           0 :         keys->maxvalues *= 2;
     688           0 :         keys->keys = (Datum *)
     689           0 :             repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
     690           0 :         keys->categories = (GinNullCategory *)
     691           0 :             repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
     692             :     }
     693             : 
     694      263976 :     keys->keys[keys->nvalues] = datum;
     695      263976 :     keys->categories[keys->nvalues] = category;
     696      263976 :     keys->nvalues++;
     697      263976 : }
     698             : 
     699             : /*
     700             :  * Collect data from a pending-list page in preparation for insertion into
     701             :  * the main index.
     702             :  *
     703             :  * Go through all tuples >= startoff on page and collect values in accum
     704             :  *
     705             :  * Note that ka is just workspace --- it does not carry any state across
     706             :  * calls.
     707             :  */
     708             : static void
     709         652 : processPendingPage(BuildAccumulator *accum, KeyArray *ka,
     710             :                    Page page, OffsetNumber startoff)
     711             : {
     712             :     ItemPointerData heapptr;
     713             :     OffsetNumber i,
     714             :                 maxoff;
     715             :     OffsetNumber attrnum;
     716             : 
     717             :     /* reset *ka to empty */
     718         652 :     ka->nvalues = 0;
     719             : 
     720         652 :     maxoff = PageGetMaxOffsetNumber(page);
     721             :     Assert(maxoff >= FirstOffsetNumber);
     722         652 :     ItemPointerSetInvalid(&heapptr);
     723         652 :     attrnum = 0;
     724             : 
     725      264628 :     for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
     726             :     {
     727      263976 :         IndexTuple  itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
     728             :         OffsetNumber curattnum;
     729             :         Datum       curkey;
     730             :         GinNullCategory curcategory;
     731             : 
     732             :         /* Check for change of heap TID or attnum */
     733      263976 :         curattnum = gintuple_get_attrnum(accum->ginstate, itup);
     734             : 
     735      263976 :         if (!ItemPointerIsValid(&heapptr))
     736             :         {
     737         652 :             heapptr = itup->t_tid;
     738         652 :             attrnum = curattnum;
     739             :         }
     740      263324 :         else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
     741             :                    curattnum == attrnum))
     742             :         {
     743             :             /*
     744             :              * ginInsertBAEntries can insert several datums per call, but only
     745             :              * for one heap tuple and one column.  So call it at a boundary,
     746             :              * and reset ka.
     747             :              */
     748       87348 :             ginInsertBAEntries(accum, &heapptr, attrnum,
     749             :                                ka->keys, ka->categories, ka->nvalues);
     750       87348 :             ka->nvalues = 0;
     751       87348 :             heapptr = itup->t_tid;
     752       87348 :             attrnum = curattnum;
     753             :         }
     754             : 
     755             :         /* Add key to KeyArray */
     756      263976 :         curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
     757      263976 :         addDatum(ka, curkey, curcategory);
     758             :     }
     759             : 
     760             :     /* Dump out all remaining keys */
     761         652 :     ginInsertBAEntries(accum, &heapptr, attrnum,
     762             :                        ka->keys, ka->categories, ka->nvalues);
     763         652 : }
     764             : 
     765             : /*
     766             :  * Move tuples from pending pages into regular GIN structure.
     767             :  *
     768             :  * On first glance it looks completely not crash-safe. But if we crash
     769             :  * after posting entries to the main index and before removing them from the
     770             :  * pending list, it's okay because when we redo the posting later on, nothing
     771             :  * bad will happen.
     772             :  *
     773             :  * fill_fsm indicates that ginInsertCleanup should add deleted pages
     774             :  * to FSM otherwise caller is responsible to put deleted pages into
     775             :  * FSM.
     776             :  *
     777             :  * If stats isn't null, we count deleted pending pages into the counts.
     778             :  */
     779             : void
     780          32 : ginInsertCleanup(GinState *ginstate, bool full_clean,
     781             :                  bool fill_fsm, bool forceCleanup,
     782             :                  IndexBulkDeleteResult *stats)
     783             : {
     784          32 :     Relation    index = ginstate->index;
     785             :     Buffer      metabuffer,
     786             :                 buffer;
     787             :     Page        metapage,
     788             :                 page;
     789             :     GinMetaPageData *metadata;
     790             :     MemoryContext opCtx,
     791             :                 oldCtx;
     792             :     BuildAccumulator accum;
     793             :     KeyArray    datums;
     794             :     BlockNumber blkno,
     795             :                 blknoFinish;
     796          32 :     bool        cleanupFinish = false;
     797          32 :     bool        fsm_vac = false;
     798             :     Size        workMemory;
     799             : 
     800             :     /*
     801             :      * We would like to prevent concurrent cleanup process. For that we will
     802             :      * lock metapage in exclusive mode using LockPage() call. Nobody other
     803             :      * will use that lock for metapage, so we keep possibility of concurrent
     804             :      * insertion into pending list
     805             :      */
     806             : 
     807          32 :     if (forceCleanup)
     808             :     {
     809             :         /*
     810             :          * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
     811             :          * and we would like to wait concurrent cleanup to finish.
     812             :          */
     813          32 :         LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
     814          32 :         workMemory =
     815          36 :             (IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
     816          32 :             autovacuum_work_mem : maintenance_work_mem;
     817             :     }
     818             :     else
     819             :     {
     820             :         /*
     821             :          * We are called from regular insert and if we see concurrent cleanup
     822             :          * just exit in hope that concurrent process will clean up pending
     823             :          * list.
     824             :          */
     825           0 :         if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
     826          24 :             return;
     827           0 :         workMemory = work_mem;
     828             :     }
     829             : 
     830          32 :     metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
     831          32 :     LockBuffer(metabuffer, GIN_SHARE);
     832          32 :     metapage = BufferGetPage(metabuffer);
     833          32 :     metadata = GinPageGetMeta(metapage);
     834             : 
     835          32 :     if (metadata->head == InvalidBlockNumber)
     836             :     {
     837             :         /* Nothing to do */
     838          24 :         UnlockReleaseBuffer(metabuffer);
     839          24 :         UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
     840          24 :         return;
     841             :     }
     842             : 
     843             :     /*
     844             :      * Remember a tail page to prevent infinite cleanup if other backends add
     845             :      * new tuples faster than we can cleanup.
     846             :      */
     847           8 :     blknoFinish = metadata->tail;
     848             : 
     849             :     /*
     850             :      * Read and lock head of pending list
     851             :      */
     852           8 :     blkno = metadata->head;
     853           8 :     buffer = ReadBuffer(index, blkno);
     854           8 :     LockBuffer(buffer, GIN_SHARE);
     855           8 :     page = BufferGetPage(buffer);
     856             : 
     857           8 :     LockBuffer(metabuffer, GIN_UNLOCK);
     858             : 
     859             :     /*
     860             :      * Initialize.  All temporary space will be in opCtx
     861             :      */
     862           8 :     opCtx = AllocSetContextCreate(CurrentMemoryContext,
     863             :                                   "GIN insert cleanup temporary context",
     864             :                                   ALLOCSET_DEFAULT_SIZES);
     865             : 
     866           8 :     oldCtx = MemoryContextSwitchTo(opCtx);
     867             : 
     868           8 :     initKeyArray(&datums, 128);
     869           8 :     ginInitBA(&accum);
     870           8 :     accum.ginstate = ginstate;
     871             : 
     872             :     /*
     873             :      * At the top of this loop, we have pin and lock on the current page of
     874             :      * the pending list.  However, we'll release that before exiting the loop.
     875             :      * Note we also have pin but not lock on the metapage.
     876             :      */
     877             :     for (;;)
     878             :     {
     879         644 :         Assert(!GinPageIsDeleted(page));
     880             : 
     881             :         /*
     882             :          * Are we walk through the page which as we remember was a tail when
     883             :          * we start our cleanup?  But if caller asks us to clean up whole
     884             :          * pending list then ignore old tail, we will work until list becomes
     885             :          * empty.
     886             :          */
     887         652 :         if (blkno == blknoFinish && full_clean == false)
     888           0 :             cleanupFinish = true;
     889             : 
     890             :         /*
     891             :          * read page's datums into accum
     892             :          */
     893         652 :         processPendingPage(&accum, &datums, page, FirstOffsetNumber);
     894             : 
     895         652 :         vacuum_delay_point();
     896             : 
     897             :         /*
     898             :          * Is it time to flush memory to disk?  Flush if we are at the end of
     899             :          * the pending list, or if we have a full row and memory is getting
     900             :          * full.
     901             :          */
     902        1296 :         if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
     903        1288 :             (GinPageHasFullRow(page) &&
     904         644 :              (accum.allocatedMemory >= workMemory * 1024L)))
     905           0 :         {
     906             :             ItemPointerData *list;
     907             :             uint32      nlist;
     908             :             Datum       key;
     909             :             GinNullCategory category;
     910             :             OffsetNumber maxoff,
     911             :                         attnum;
     912             : 
     913             :             /*
     914             :              * Unlock current page to increase performance. Changes of page
     915             :              * will be checked later by comparing maxoff after completion of
     916             :              * memory flush.
     917             :              */
     918           8 :             maxoff = PageGetMaxOffsetNumber(page);
     919           8 :             LockBuffer(buffer, GIN_UNLOCK);
     920             : 
     921             :             /*
     922             :              * Moving collected data into regular structure can take
     923             :              * significant amount of time - so, run it without locking pending
     924             :              * list.
     925             :              */
     926           8 :             ginBeginBAScan(&accum);
     927       84016 :             while ((list = ginGetBAEntry(&accum,
     928             :                                          &attnum, &key, &category, &nlist)) != NULL)
     929             :             {
     930       84000 :                 ginEntryInsert(ginstate, attnum, key, category,
     931             :                                list, nlist, NULL);
     932       84000 :                 vacuum_delay_point();
     933             :             }
     934             : 
     935             :             /*
     936             :              * Lock the whole list to remove pages
     937             :              */
     938           8 :             LockBuffer(metabuffer, GIN_EXCLUSIVE);
     939           8 :             LockBuffer(buffer, GIN_SHARE);
     940             : 
     941             :             Assert(!GinPageIsDeleted(page));
     942             : 
     943             :             /*
     944             :              * While we left the page unlocked, more stuff might have gotten
     945             :              * added to it.  If so, process those entries immediately.  There
     946             :              * shouldn't be very many, so we don't worry about the fact that
     947             :              * we're doing this with exclusive lock. Insertion algorithm
     948             :              * guarantees that inserted row(s) will not continue on next page.
     949             :              * NOTE: intentionally no vacuum_delay_point in this loop.
     950             :              */
     951           8 :             if (PageGetMaxOffsetNumber(page) != maxoff)
     952             :             {
     953           0 :                 ginInitBA(&accum);
     954           0 :                 processPendingPage(&accum, &datums, page, maxoff + 1);
     955             : 
     956           0 :                 ginBeginBAScan(&accum);
     957           0 :                 while ((list = ginGetBAEntry(&accum,
     958             :                                              &attnum, &key, &category, &nlist)) != NULL)
     959           0 :                     ginEntryInsert(ginstate, attnum, key, category,
     960             :                                    list, nlist, NULL);
     961             :             }
     962             : 
     963             :             /*
     964             :              * Remember next page - it will become the new list head
     965             :              */
     966           8 :             blkno = GinPageGetOpaque(page)->rightlink;
     967           8 :             UnlockReleaseBuffer(buffer);    /* shiftList will do exclusive
     968             :                                              * locking */
     969             : 
     970             :             /*
     971             :              * remove read pages from pending list, at this point all content
     972             :              * of read pages is in regular structure
     973             :              */
     974           8 :             shiftList(index, metabuffer, blkno, fill_fsm, stats);
     975             : 
     976             :             /* At this point, some pending pages have been freed up */
     977           8 :             fsm_vac = true;
     978             : 
     979             :             Assert(blkno == metadata->head);
     980           8 :             LockBuffer(metabuffer, GIN_UNLOCK);
     981             : 
     982             :             /*
     983             :              * if we removed the whole pending list or we cleanup tail (which
     984             :              * we remembered on start our cleanup process) then just exit
     985             :              */
     986           8 :             if (blkno == InvalidBlockNumber || cleanupFinish)
     987             :                 break;
     988             : 
     989             :             /*
     990             :              * release memory used so far and reinit state
     991             :              */
     992           0 :             MemoryContextReset(opCtx);
     993           0 :             initKeyArray(&datums, datums.maxvalues);
     994           0 :             ginInitBA(&accum);
     995             :         }
     996             :         else
     997             :         {
     998         644 :             blkno = GinPageGetOpaque(page)->rightlink;
     999         644 :             UnlockReleaseBuffer(buffer);
    1000             :         }
    1001             : 
    1002             :         /*
    1003             :          * Read next page in pending list
    1004             :          */
    1005         644 :         vacuum_delay_point();
    1006         644 :         buffer = ReadBuffer(index, blkno);
    1007         644 :         LockBuffer(buffer, GIN_SHARE);
    1008         644 :         page = BufferGetPage(buffer);
    1009             :     }
    1010             : 
    1011           8 :     UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
    1012           8 :     ReleaseBuffer(metabuffer);
    1013             : 
    1014             :     /*
    1015             :      * As pending list pages can have a high churn rate, it is desirable to
    1016             :      * recycle them immediately to the FreeSpaceMap when ordinary backends
    1017             :      * clean the list.
    1018             :      */
    1019          16 :     if (fsm_vac && fill_fsm)
    1020           4 :         IndexFreeSpaceMapVacuum(index);
    1021             : 
    1022             :     /* Clean up temporary space */
    1023           8 :     MemoryContextSwitchTo(oldCtx);
    1024           8 :     MemoryContextDelete(opCtx);
    1025             : }
    1026             : 
    1027             : /*
    1028             :  * SQL-callable function to clean the insert pending list
    1029             :  */
    1030             : Datum
    1031           8 : gin_clean_pending_list(PG_FUNCTION_ARGS)
    1032             : {
    1033           8 :     Oid         indexoid = PG_GETARG_OID(0);
    1034           8 :     Relation    indexRel = index_open(indexoid, RowExclusiveLock);
    1035             :     IndexBulkDeleteResult stats;
    1036             :     GinState    ginstate;
    1037             : 
    1038           8 :     if (RecoveryInProgress())
    1039           0 :         ereport(ERROR,
    1040             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1041             :                  errmsg("recovery is in progress"),
    1042             :                  errhint("GIN pending list cannot be cleaned up during recovery.")));
    1043             : 
    1044             :     /* Must be a GIN index */
    1045          16 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
    1046           8 :         indexRel->rd_rel->relam != GIN_AM_OID)
    1047           0 :         ereport(ERROR,
    1048             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1049             :                  errmsg("\"%s\" is not a GIN index",
    1050             :                         RelationGetRelationName(indexRel))));
    1051             : 
    1052             :     /*
    1053             :      * Reject attempts to read non-local temporary relations; we would be
    1054             :      * likely to get wrong data since we have no visibility into the owning
    1055             :      * session's local buffers.
    1056             :      */
    1057           8 :     if (RELATION_IS_OTHER_TEMP(indexRel))
    1058           0 :         ereport(ERROR,
    1059             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1060             :                  errmsg("cannot access temporary indexes of other sessions")));
    1061             : 
    1062             :     /* User must own the index (comparable to privileges needed for VACUUM) */
    1063           8 :     if (!pg_class_ownercheck(indexoid, GetUserId()))
    1064           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
    1065           0 :                        RelationGetRelationName(indexRel));
    1066             : 
    1067           8 :     memset(&stats, 0, sizeof(stats));
    1068           8 :     initGinState(&ginstate, indexRel);
    1069           8 :     ginInsertCleanup(&ginstate, true, true, true, &stats);
    1070             : 
    1071           8 :     index_close(indexRel, RowExclusiveLock);
    1072             : 
    1073           8 :     PG_RETURN_INT64((int64) stats.pages_deleted);
    1074             : }

Generated by: LCOV version 1.13