LCOV - code coverage report
Current view: top level - src/backend/storage/ipc - sinvaladt.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 152 162 93.8 %
Date: 2024-04-20 08:11:12 Functions: 8 8 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * sinvaladt.c
       4             :  *    POSTGRES shared cache invalidation data manager.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/storage/ipc/sinvaladt.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : #include "postgres.h"
      16             : 
      17             : #include <signal.h>
      18             : #include <unistd.h>
      19             : 
      20             : #include "access/transam.h"
      21             : #include "miscadmin.h"
      22             : #include "storage/ipc.h"
      23             : #include "storage/proc.h"
      24             : #include "storage/procnumber.h"
      25             : #include "storage/procsignal.h"
      26             : #include "storage/shmem.h"
      27             : #include "storage/sinvaladt.h"
      28             : #include "storage/spin.h"
      29             : 
      30             : /*
      31             :  * Conceptually, the shared cache invalidation messages are stored in an
      32             :  * infinite array, where maxMsgNum is the next array subscript to store a
      33             :  * submitted message in, minMsgNum is the smallest array subscript containing
      34             :  * a message not yet read by all backends, and we always have maxMsgNum >=
      35             :  * minMsgNum.  (They are equal when there are no messages pending.)  For each
      36             :  * active backend, there is a nextMsgNum pointer indicating the next message it
      37             :  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
      38             :  * backend.
      39             :  *
      40             :  * (In the current implementation, minMsgNum is a lower bound for the
      41             :  * per-process nextMsgNum values, but it isn't rigorously kept equal to the
      42             :  * smallest nextMsgNum --- it may lag behind.  We only update it when
      43             :  * SICleanupQueue is called, and we try not to do that often.)
      44             :  *
      45             :  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
      46             :  * entries.  We translate MsgNum values into circular-buffer indexes by
      47             :  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
      48             :  * MAXNUMMESSAGES is a constant and a power of 2).  As long as maxMsgNum
      49             :  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
      50             :  * in the buffer.  If the buffer does overflow, we recover by setting the
      51             :  * "reset" flag for each backend that has fallen too far behind.  A backend
      52             :  * that is in "reset" state is ignored while determining minMsgNum.  When
      53             :  * it does finally attempt to receive inval messages, it must discard all
      54             :  * its invalidatable state, since it won't know what it missed.
      55             :  *
      56             :  * To reduce the probability of needing resets, we send a "catchup" interrupt
      57             :  * to any backend that seems to be falling unreasonably far behind.  The
      58             :  * normal behavior is that at most one such interrupt is in flight at a time;
      59             :  * when a backend completes processing a catchup interrupt, it executes
      60             :  * SICleanupQueue, which will signal the next-furthest-behind backend if
      61             :  * needed.  This avoids undue contention from multiple backends all trying
      62             :  * to catch up at once.  However, the furthest-back backend might be stuck
      63             :  * in a state where it can't catch up.  Eventually it will get reset, so it
      64             :  * won't cause any more problems for anyone but itself.  But we don't want
      65             :  * to find that a bunch of other backends are now too close to the reset
      66             :  * threshold to be saved.  So SICleanupQueue is designed to occasionally
      67             :  * send extra catchup interrupts as the queue gets fuller, to backends that
      68             :  * are far behind and haven't gotten one yet.  As long as there aren't a lot
      69             :  * of "stuck" backends, we won't need a lot of extra interrupts, since ones
      70             :  * that aren't stuck will propagate their interrupts to the next guy.
      71             :  *
      72             :  * We would have problems if the MsgNum values overflow an integer, so
      73             :  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
      74             :  * from all the MsgNum variables simultaneously.  MSGNUMWRAPAROUND can be
      75             :  * large so that we don't need to do this often.  It must be a multiple of
      76             :  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
      77             :  * to be moved when we do it.
      78             :  *
      79             :  * Access to the shared sinval array is protected by two locks, SInvalReadLock
      80             :  * and SInvalWriteLock.  Readers take SInvalReadLock in shared mode; this
      81             :  * authorizes them to modify their own ProcState but not to modify or even
      82             :  * look at anyone else's.  When we need to perform array-wide updates,
      83             :  * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
      84             :  * lock out all readers.  Writers take SInvalWriteLock (always in exclusive
      85             :  * mode) to serialize adding messages to the queue.  Note that a writer
      86             :  * can operate in parallel with one or more readers, because the writer
      87             :  * has no need to touch anyone's ProcState, except in the infrequent cases
      88             :  * when SICleanupQueue is needed.  The only point of overlap is that
      89             :  * the writer wants to change maxMsgNum while readers need to read it.
      90             :  * We deal with that by having a spinlock that readers must take for just
      91             :  * long enough to read maxMsgNum, while writers take it for just long enough
      92             :  * to write maxMsgNum.  (The exact rule is that you need the spinlock to
      93             :  * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
      94             :  * spinlock to write maxMsgNum unless you are holding both locks.)
      95             :  *
      96             :  * Note: since maxMsgNum is an int and hence presumably atomically readable/
      97             :  * writable, the spinlock might seem unnecessary.  The reason it is needed
      98             :  * is to provide a memory barrier: we need to be sure that messages written
      99             :  * to the array are actually there before maxMsgNum is increased, and that
     100             :  * readers will see that data after fetching maxMsgNum.  Multiprocessors
     101             :  * that have weak memory-ordering guarantees can fail without the memory
     102             :  * barrier instructions that are included in the spinlock sequences.
     103             :  */
     104             : 
     105             : 
     106             : /*
     107             :  * Configurable parameters.
     108             :  *
     109             :  * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
     110             :  * Must be a power of 2 for speed.
     111             :  *
     112             :  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
     113             :  * Must be a multiple of MAXNUMMESSAGES.  Should be large.
     114             :  *
     115             :  * CLEANUP_MIN: the minimum number of messages that must be in the buffer
     116             :  * before we bother to call SICleanupQueue.
     117             :  *
     118             :  * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
     119             :  * we exceed CLEANUP_MIN.  Should be a power of 2 for speed.
     120             :  *
     121             :  * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
     122             :  * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
     123             :  *
     124             :  * WRITE_QUANTUM: the max number of messages to push into the buffer per
     125             :  * iteration of SIInsertDataEntries.  Noncritical but should be less than
     126             :  * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
     127             :  * per iteration.
     128             :  */
     129             : 
     130             : #define MAXNUMMESSAGES 4096
     131             : #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
     132             : #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
     133             : #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
     134             : #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
     135             : #define WRITE_QUANTUM 64
     136             : 
     137             : /* Per-backend state in shared invalidation structure */
     138             : typedef struct ProcState
     139             : {
     140             :     /* procPid is zero in an inactive ProcState array entry. */
     141             :     pid_t       procPid;        /* PID of backend, for signaling */
     142             :     /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
     143             :     int         nextMsgNum;     /* next message number to read */
     144             :     bool        resetState;     /* backend needs to reset its state */
     145             :     bool        signaled;       /* backend has been sent catchup signal */
     146             :     bool        hasMessages;    /* backend has unread messages */
     147             : 
     148             :     /*
     149             :      * Backend only sends invalidations, never receives them. This only makes
     150             :      * sense for Startup process during recovery because it doesn't maintain a
     151             :      * relcache, yet it fires inval messages to allow query backends to see
     152             :      * schema changes.
     153             :      */
     154             :     bool        sendOnly;       /* backend only sends, never receives */
     155             : 
     156             :     /*
     157             :      * Next LocalTransactionId to use for each idle backend slot.  We keep
     158             :      * this here because it is indexed by ProcNumber and it is convenient to
     159             :      * copy the value to and from local memory when MyProcNumber is set. It's
     160             :      * meaningless in an active ProcState entry.
     161             :      */
     162             :     LocalTransactionId nextLXID;
     163             : } ProcState;
     164             : 
     165             : /* Shared cache invalidation memory segment */
     166             : typedef struct SISeg
     167             : {
     168             :     /*
     169             :      * General state information
     170             :      */
     171             :     int         minMsgNum;      /* oldest message still needed */
     172             :     int         maxMsgNum;      /* next message number to be assigned */
     173             :     int         nextThreshold;  /* # of messages to call SICleanupQueue */
     174             : 
     175             :     slock_t     msgnumLock;     /* spinlock protecting maxMsgNum */
     176             : 
     177             :     /*
     178             :      * Circular buffer holding shared-inval messages
     179             :      */
     180             :     SharedInvalidationMessage buffer[MAXNUMMESSAGES];
     181             : 
     182             :     /*
     183             :      * Per-backend invalidation state info.
     184             :      *
     185             :      * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno.
     186             :      * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is
     187             :      * a dense array of their indexes, to speed up scanning all in-use slots.
     188             :      *
     189             :      * 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but
     190             :      * having our separate copy avoids contention on ProcArrayLock, and allows
     191             :      * us to track only the processes that participate in shared cache
     192             :      * invalidations.
     193             :      */
     194             :     int         numProcs;
     195             :     int        *pgprocnos;
     196             :     ProcState   procState[FLEXIBLE_ARRAY_MEMBER];
     197             : } SISeg;
     198             : 
     199             : /*
     200             :  * We reserve a slot for each possible ProcNumber, plus one for each
     201             :  * possible auxiliary process type.  (This scheme assumes there is not
     202             :  * more than one of any auxiliary process type at a time.)
     203             :  */
     204             : #define NumProcStateSlots   (MaxBackends + NUM_AUXILIARY_PROCS)
     205             : 
     206             : static SISeg *shmInvalBuffer;   /* pointer to the shared inval buffer */
     207             : 
     208             : 
     209             : static LocalTransactionId nextLocalTransactionId;
     210             : 
     211             : static void CleanupInvalidationState(int status, Datum arg);
     212             : 
     213             : 
     214             : /*
     215             :  * SInvalShmemSize --- return shared-memory space needed
     216             :  */
     217             : Size
     218        5066 : SInvalShmemSize(void)
     219             : {
     220             :     Size        size;
     221             : 
     222        5066 :     size = offsetof(SISeg, procState);
     223        5066 :     size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots));  /* procState */
     224        5066 :     size = add_size(size, mul_size(sizeof(int), NumProcStateSlots));    /* pgprocnos */
     225             : 
     226        5066 :     return size;
     227             : }
     228             : 
     229             : /*
     230             :  * CreateSharedInvalidationState
     231             :  *      Create and initialize the SI message buffer
     232             :  */
     233             : void
     234        1768 : CreateSharedInvalidationState(void)
     235             : {
     236             :     int         i;
     237             :     bool        found;
     238             : 
     239             :     /* Allocate space in shared memory */
     240        1768 :     shmInvalBuffer = (SISeg *)
     241        1768 :         ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
     242        1768 :     if (found)
     243           0 :         return;
     244             : 
     245             :     /* Clear message counters, save size of procState array, init spinlock */
     246        1768 :     shmInvalBuffer->minMsgNum = 0;
     247        1768 :     shmInvalBuffer->maxMsgNum = 0;
     248        1768 :     shmInvalBuffer->nextThreshold = CLEANUP_MIN;
     249        1768 :     SpinLockInit(&shmInvalBuffer->msgnumLock);
     250             : 
     251             :     /* The buffer[] array is initially all unused, so we need not fill it */
     252             : 
     253             :     /* Mark all backends inactive, and initialize nextLXID */
     254      158636 :     for (i = 0; i < NumProcStateSlots; i++)
     255             :     {
     256      156868 :         shmInvalBuffer->procState[i].procPid = 0;    /* inactive */
     257      156868 :         shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
     258      156868 :         shmInvalBuffer->procState[i].resetState = false;
     259      156868 :         shmInvalBuffer->procState[i].signaled = false;
     260      156868 :         shmInvalBuffer->procState[i].hasMessages = false;
     261      156868 :         shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
     262             :     }
     263        1768 :     shmInvalBuffer->numProcs = 0;
     264        1768 :     shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i];
     265             : }
     266             : 
     267             : /*
     268             :  * SharedInvalBackendInit
     269             :  *      Initialize a new backend to operate on the sinval buffer
     270             :  */
     271             : void
     272       25894 : SharedInvalBackendInit(bool sendOnly)
     273             : {
     274             :     ProcState  *stateP;
     275             :     pid_t       oldPid;
     276       25894 :     SISeg      *segP = shmInvalBuffer;
     277             : 
     278       25894 :     if (MyProcNumber < 0)
     279           0 :         elog(ERROR, "MyProcNumber not set");
     280       25894 :     if (MyProcNumber >= NumProcStateSlots)
     281           0 :         elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)",
     282             :              MyProcNumber, NumProcStateSlots);
     283       25894 :     stateP = &segP->procState[MyProcNumber];
     284             : 
     285             :     /*
     286             :      * This can run in parallel with read operations, but not with write
     287             :      * operations, since SIInsertDataEntries relies on the pgprocnos array to
     288             :      * set hasMessages appropriately.
     289             :      */
     290       25894 :     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     291             : 
     292       25894 :     oldPid = stateP->procPid;
     293       25894 :     if (oldPid != 0)
     294             :     {
     295           0 :         LWLockRelease(SInvalWriteLock);
     296           0 :         elog(ERROR, "sinval slot for backend %d is already in use by process %d",
     297             :              MyProcNumber, (int) oldPid);
     298             :     }
     299             : 
     300       25894 :     shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = MyProcNumber;
     301             : 
     302             :     /* Fetch next local transaction ID into local memory */
     303       25894 :     nextLocalTransactionId = stateP->nextLXID;
     304             : 
     305             :     /* mark myself active, with all extant messages already read */
     306       25894 :     stateP->procPid = MyProcPid;
     307       25894 :     stateP->nextMsgNum = segP->maxMsgNum;
     308       25894 :     stateP->resetState = false;
     309       25894 :     stateP->signaled = false;
     310       25894 :     stateP->hasMessages = false;
     311       25894 :     stateP->sendOnly = sendOnly;
     312             : 
     313       25894 :     LWLockRelease(SInvalWriteLock);
     314             : 
     315             :     /* register exit routine to mark my entry inactive at exit */
     316       25894 :     on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
     317       25894 : }
     318             : 
     319             : /*
     320             :  * CleanupInvalidationState
     321             :  *      Mark the current backend as no longer active.
     322             :  *
     323             :  * This function is called via on_shmem_exit() during backend shutdown.
     324             :  *
     325             :  * arg is really of type "SISeg*".
     326             :  */
     327             : static void
     328       25894 : CleanupInvalidationState(int status, Datum arg)
     329             : {
     330       25894 :     SISeg      *segP = (SISeg *) DatumGetPointer(arg);
     331             :     ProcState  *stateP;
     332             :     int         i;
     333             : 
     334             :     Assert(PointerIsValid(segP));
     335             : 
     336       25894 :     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     337             : 
     338       25894 :     stateP = &segP->procState[MyProcNumber];
     339             : 
     340             :     /* Update next local transaction ID for next holder of this proc number */
     341       25894 :     stateP->nextLXID = nextLocalTransactionId;
     342             : 
     343             :     /* Mark myself inactive */
     344       25894 :     stateP->procPid = 0;
     345       25894 :     stateP->nextMsgNum = 0;
     346       25894 :     stateP->resetState = false;
     347       25894 :     stateP->signaled = false;
     348             : 
     349       33046 :     for (i = segP->numProcs - 1; i >= 0; i--)
     350             :     {
     351       33046 :         if (segP->pgprocnos[i] == MyProcNumber)
     352             :         {
     353       25894 :             if (i != segP->numProcs - 1)
     354        3830 :                 segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1];
     355       25894 :             break;
     356             :         }
     357             :     }
     358       25894 :     if (i < 0)
     359           0 :         elog(PANIC, "could not find entry in sinval array");
     360       25894 :     segP->numProcs--;
     361             : 
     362       25894 :     LWLockRelease(SInvalWriteLock);
     363       25894 : }
     364             : 
     365             : /*
     366             :  * SIInsertDataEntries
     367             :  *      Add new invalidation message(s) to the buffer.
     368             :  */
     369             : void
     370      442052 : SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
     371             : {
     372      442052 :     SISeg      *segP = shmInvalBuffer;
     373             : 
     374             :     /*
     375             :      * N can be arbitrarily large.  We divide the work into groups of no more
     376             :      * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
     377             :      * an unreasonably long time.  (This is not so much because we care about
     378             :      * letting in other writers, as that some just-caught-up backend might be
     379             :      * trying to do SICleanupQueue to pass on its signal, and we don't want it
     380             :      * to have to wait a long time.)  Also, we need to consider calling
     381             :      * SICleanupQueue every so often.
     382             :      */
     383      928732 :     while (n > 0)
     384             :     {
     385      486680 :         int         nthistime = Min(n, WRITE_QUANTUM);
     386             :         int         numMsgs;
     387             :         int         max;
     388             :         int         i;
     389             : 
     390      486680 :         n -= nthistime;
     391             : 
     392      486680 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     393             : 
     394             :         /*
     395             :          * If the buffer is full, we *must* acquire some space.  Clean the
     396             :          * queue and reset anyone who is preventing space from being freed.
     397             :          * Otherwise, clean the queue only when it's exceeded the next
     398             :          * fullness threshold.  We have to loop and recheck the buffer state
     399             :          * after any call of SICleanupQueue.
     400             :          */
     401             :         for (;;)
     402             :         {
     403      495846 :             numMsgs = segP->maxMsgNum - segP->minMsgNum;
     404      495846 :             if (numMsgs + nthistime > MAXNUMMESSAGES ||
     405      495378 :                 numMsgs >= segP->nextThreshold)
     406        9166 :                 SICleanupQueue(true, nthistime);
     407             :             else
     408             :                 break;
     409             :         }
     410             : 
     411             :         /*
     412             :          * Insert new message(s) into proper slot of circular buffer
     413             :          */
     414      486680 :         max = segP->maxMsgNum;
     415     6963052 :         while (nthistime-- > 0)
     416             :         {
     417     6476372 :             segP->buffer[max % MAXNUMMESSAGES] = *data++;
     418     6476372 :             max++;
     419             :         }
     420             : 
     421             :         /* Update current value of maxMsgNum using spinlock */
     422      486680 :         SpinLockAcquire(&segP->msgnumLock);
     423      486680 :         segP->maxMsgNum = max;
     424      486680 :         SpinLockRelease(&segP->msgnumLock);
     425             : 
     426             :         /*
     427             :          * Now that the maxMsgNum change is globally visible, we give everyone
     428             :          * a swift kick to make sure they read the newly added messages.
     429             :          * Releasing SInvalWriteLock will enforce a full memory barrier, so
     430             :          * these (unlocked) changes will be committed to memory before we exit
     431             :          * the function.
     432             :          */
     433     2716526 :         for (i = 0; i < segP->numProcs; i++)
     434             :         {
     435     2229846 :             ProcState  *stateP = &segP->procState[segP->pgprocnos[i]];
     436             : 
     437     2229846 :             stateP->hasMessages = true;
     438             :         }
     439             : 
     440      486680 :         LWLockRelease(SInvalWriteLock);
     441             :     }
     442      442052 : }
     443             : 
     444             : /*
     445             :  * SIGetDataEntries
     446             :  *      get next SI message(s) for current backend, if there are any
     447             :  *
     448             :  * Possible return values:
     449             :  *  0:   no SI message available
     450             :  *  n>0: next n SI messages have been extracted into data[]
     451             :  * -1:   SI reset message extracted
     452             :  *
     453             :  * If the return value is less than the array size "datasize", the caller
     454             :  * can assume that there are no more SI messages after the one(s) returned.
     455             :  * Otherwise, another call is needed to collect more messages.
     456             :  *
     457             :  * NB: this can run in parallel with other instances of SIGetDataEntries
     458             :  * executing on behalf of other backends, since each instance will modify only
     459             :  * fields of its own backend's ProcState, and no instance will look at fields
     460             :  * of other backends' ProcStates.  We express this by grabbing SInvalReadLock
     461             :  * in shared mode.  Note that this is not exactly the normal (read-only)
     462             :  * interpretation of a shared lock! Look closely at the interactions before
     463             :  * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
     464             :  *
     465             :  * NB: this can also run in parallel with SIInsertDataEntries.  It is not
     466             :  * guaranteed that we will return any messages added after the routine is
     467             :  * entered.
     468             :  *
     469             :  * Note: we assume that "datasize" is not so large that it might be important
     470             :  * to break our hold on SInvalReadLock into segments.
     471             :  */
     472             : int
     473    28280906 : SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
     474             : {
     475             :     SISeg      *segP;
     476             :     ProcState  *stateP;
     477             :     int         max;
     478             :     int         n;
     479             : 
     480    28280906 :     segP = shmInvalBuffer;
     481    28280906 :     stateP = &segP->procState[MyProcNumber];
     482             : 
     483             :     /*
     484             :      * Before starting to take locks, do a quick, unlocked test to see whether
     485             :      * there can possibly be anything to read.  On a multiprocessor system,
     486             :      * it's possible that this load could migrate backwards and occur before
     487             :      * we actually enter this function, so we might miss a sinval message that
     488             :      * was just added by some other processor.  But they can't migrate
     489             :      * backwards over a preceding lock acquisition, so it should be OK.  If we
     490             :      * haven't acquired a lock preventing against further relevant
     491             :      * invalidations, any such occurrence is not much different than if the
     492             :      * invalidation had arrived slightly later in the first place.
     493             :      */
     494    28280906 :     if (!stateP->hasMessages)
     495    27266600 :         return 0;
     496             : 
     497     1014306 :     LWLockAcquire(SInvalReadLock, LW_SHARED);
     498             : 
     499             :     /*
     500             :      * We must reset hasMessages before determining how many messages we're
     501             :      * going to read.  That way, if new messages arrive after we have
     502             :      * determined how many we're reading, the flag will get reset and we'll
     503             :      * notice those messages part-way through.
     504             :      *
     505             :      * Note that, if we don't end up reading all of the messages, we had
     506             :      * better be certain to reset this flag before exiting!
     507             :      */
     508     1014306 :     stateP->hasMessages = false;
     509             : 
     510             :     /* Fetch current value of maxMsgNum using spinlock */
     511     1014306 :     SpinLockAcquire(&segP->msgnumLock);
     512     1014306 :     max = segP->maxMsgNum;
     513     1014306 :     SpinLockRelease(&segP->msgnumLock);
     514             : 
     515     1014306 :     if (stateP->resetState)
     516             :     {
     517             :         /*
     518             :          * Force reset.  We can say we have dealt with any messages added
     519             :          * since the reset, as well; and that means we should clear the
     520             :          * signaled flag, too.
     521             :          */
     522         562 :         stateP->nextMsgNum = max;
     523         562 :         stateP->resetState = false;
     524         562 :         stateP->signaled = false;
     525         562 :         LWLockRelease(SInvalReadLock);
     526         562 :         return -1;
     527             :     }
     528             : 
     529             :     /*
     530             :      * Retrieve messages and advance backend's counter, until data array is
     531             :      * full or there are no more messages.
     532             :      *
     533             :      * There may be other backends that haven't read the message(s), so we
     534             :      * cannot delete them here.  SICleanupQueue() will eventually remove them
     535             :      * from the queue.
     536             :      */
     537     1013744 :     n = 0;
     538    26685634 :     while (n < datasize && stateP->nextMsgNum < max)
     539             :     {
     540    25671890 :         data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
     541    25671890 :         stateP->nextMsgNum++;
     542             :     }
     543             : 
     544             :     /*
     545             :      * If we have caught up completely, reset our "signaled" flag so that
     546             :      * we'll get another signal if we fall behind again.
     547             :      *
     548             :      * If we haven't caught up completely, reset the hasMessages flag so that
     549             :      * we see the remaining messages next time.
     550             :      */
     551     1013744 :     if (stateP->nextMsgNum >= max)
     552      312692 :         stateP->signaled = false;
     553             :     else
     554      701052 :         stateP->hasMessages = true;
     555             : 
     556     1013744 :     LWLockRelease(SInvalReadLock);
     557     1013744 :     return n;
     558             : }
     559             : 
     560             : /*
     561             :  * SICleanupQueue
     562             :  *      Remove messages that have been consumed by all active backends
     563             :  *
     564             :  * callerHasWriteLock is true if caller is holding SInvalWriteLock.
     565             :  * minFree is the minimum number of message slots to make free.
     566             :  *
     567             :  * Possible side effects of this routine include marking one or more
     568             :  * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
     569             :  * to some backend that seems to be getting too far behind.  We signal at
     570             :  * most one backend at a time, for reasons explained at the top of the file.
     571             :  *
     572             :  * Caution: because we transiently release write lock when we have to signal
     573             :  * some other backend, it is NOT guaranteed that there are still minFree
     574             :  * free message slots at exit.  Caller must recheck and perhaps retry.
     575             :  */
     576             : void
     577       14392 : SICleanupQueue(bool callerHasWriteLock, int minFree)
     578             : {
     579       14392 :     SISeg      *segP = shmInvalBuffer;
     580             :     int         min,
     581             :                 minsig,
     582             :                 lowbound,
     583             :                 numMsgs,
     584             :                 i;
     585       14392 :     ProcState  *needSig = NULL;
     586             : 
     587             :     /* Lock out all writers and readers */
     588       14392 :     if (!callerHasWriteLock)
     589        5226 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     590       14392 :     LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
     591             : 
     592             :     /*
     593             :      * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
     594             :      * furthest-back backend that needs signaling (if any), and reset any
     595             :      * backends that are too far back.  Note that because we ignore sendOnly
     596             :      * backends here it is possible for them to keep sending messages without
     597             :      * a problem even when they are the only active backend.
     598             :      */
     599       14392 :     min = segP->maxMsgNum;
     600       14392 :     minsig = min - SIG_THRESHOLD;
     601       14392 :     lowbound = min - MAXNUMMESSAGES + minFree;
     602             : 
     603      115698 :     for (i = 0; i < segP->numProcs; i++)
     604             :     {
     605      101306 :         ProcState  *stateP = &segP->procState[segP->pgprocnos[i]];
     606      101306 :         int         n = stateP->nextMsgNum;
     607             : 
     608             :         /* Ignore if already in reset state */
     609             :         Assert(stateP->procPid != 0);
     610      101306 :         if (stateP->resetState || stateP->sendOnly)
     611       12088 :             continue;
     612             : 
     613             :         /*
     614             :          * If we must free some space and this backend is preventing it, force
     615             :          * him into reset state and then ignore until he catches up.
     616             :          */
     617       89218 :         if (n < lowbound)
     618             :         {
     619         564 :             stateP->resetState = true;
     620             :             /* no point in signaling him ... */
     621         564 :             continue;
     622             :         }
     623             : 
     624             :         /* Track the global minimum nextMsgNum */
     625       88654 :         if (n < min)
     626       22762 :             min = n;
     627             : 
     628             :         /* Also see who's furthest back of the unsignaled backends */
     629       88654 :         if (n < minsig && !stateP->signaled)
     630             :         {
     631        5352 :             minsig = n;
     632        5352 :             needSig = stateP;
     633             :         }
     634             :     }
     635       14392 :     segP->minMsgNum = min;
     636             : 
     637             :     /*
     638             :      * When minMsgNum gets really large, decrement all message counters so as
     639             :      * to forestall overflow of the counters.  This happens seldom enough that
     640             :      * folding it into the previous loop would be a loser.
     641             :      */
     642       14392 :     if (min >= MSGNUMWRAPAROUND)
     643             :     {
     644           0 :         segP->minMsgNum -= MSGNUMWRAPAROUND;
     645           0 :         segP->maxMsgNum -= MSGNUMWRAPAROUND;
     646           0 :         for (i = 0; i < segP->numProcs; i++)
     647           0 :             segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
     648             :     }
     649             : 
     650             :     /*
     651             :      * Determine how many messages are still in the queue, and set the
     652             :      * threshold at which we should repeat SICleanupQueue().
     653             :      */
     654       14392 :     numMsgs = segP->maxMsgNum - segP->minMsgNum;
     655       14392 :     if (numMsgs < CLEANUP_MIN)
     656        4320 :         segP->nextThreshold = CLEANUP_MIN;
     657             :     else
     658       10072 :         segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
     659             : 
     660             :     /*
     661             :      * Lastly, signal anyone who needs a catchup interrupt.  Since
     662             :      * SendProcSignal() might not be fast, we don't want to hold locks while
     663             :      * executing it.
     664             :      */
     665       14392 :     if (needSig)
     666             :     {
     667        5250 :         pid_t       his_pid = needSig->procPid;
     668        5250 :         ProcNumber  his_procNumber = (needSig - &segP->procState[0]);
     669             : 
     670        5250 :         needSig->signaled = true;
     671        5250 :         LWLockRelease(SInvalReadLock);
     672        5250 :         LWLockRelease(SInvalWriteLock);
     673        5250 :         elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
     674        5250 :         SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_procNumber);
     675        5250 :         if (callerHasWriteLock)
     676        3494 :             LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     677             :     }
     678             :     else
     679             :     {
     680        9142 :         LWLockRelease(SInvalReadLock);
     681        9142 :         if (!callerHasWriteLock)
     682        3470 :             LWLockRelease(SInvalWriteLock);
     683             :     }
     684       14392 : }
     685             : 
     686             : 
     687             : /*
     688             :  * GetNextLocalTransactionId --- allocate a new LocalTransactionId
     689             :  *
     690             :  * We split VirtualTransactionIds into two parts so that it is possible
     691             :  * to allocate a new one without any contention for shared memory, except
     692             :  * for a bit of additional overhead during backend startup/shutdown.
     693             :  * The high-order part of a VirtualTransactionId is a ProcNumber, and the
     694             :  * low-order part is a LocalTransactionId, which we assign from a local
     695             :  * counter.  To avoid the risk of a VirtualTransactionId being reused
     696             :  * within a short interval, successive procs occupying the same PGPROC slot
     697             :  * should use a consecutive sequence of local IDs, which is implemented
     698             :  * by copying nextLocalTransactionId as seen above.
     699             :  */
     700             : LocalTransactionId
     701      581574 : GetNextLocalTransactionId(void)
     702             : {
     703             :     LocalTransactionId result;
     704             : 
     705             :     /* loop to avoid returning InvalidLocalTransactionId at wraparound */
     706             :     do
     707             :     {
     708      581574 :         result = nextLocalTransactionId++;
     709      581574 :     } while (!LocalTransactionIdIsValid(result));
     710             : 
     711      566048 :     return result;
     712             : }

Generated by: LCOV version 1.14