LCOV - code coverage report
Current view: top level - src/backend/storage/ipc - sinvaladt.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 93.8 % 162 152
Test Date: 2026-02-17 17:20:33 Functions: 100.0 % 8 8
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * sinvaladt.c
       4              :  *    POSTGRES shared cache invalidation data manager.
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *    src/backend/storage/ipc/sinvaladt.c
      12              :  *
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : #include "postgres.h"
      16              : 
      17              : #include <signal.h>
      18              : #include <unistd.h>
      19              : 
      20              : #include "miscadmin.h"
      21              : #include "storage/ipc.h"
      22              : #include "storage/proc.h"
      23              : #include "storage/procnumber.h"
      24              : #include "storage/procsignal.h"
      25              : #include "storage/shmem.h"
      26              : #include "storage/sinvaladt.h"
      27              : #include "storage/spin.h"
      28              : 
      29              : /*
      30              :  * Conceptually, the shared cache invalidation messages are stored in an
      31              :  * infinite array, where maxMsgNum is the next array subscript to store a
      32              :  * submitted message in, minMsgNum is the smallest array subscript containing
      33              :  * a message not yet read by all backends, and we always have maxMsgNum >=
      34              :  * minMsgNum.  (They are equal when there are no messages pending.)  For each
      35              :  * active backend, there is a nextMsgNum pointer indicating the next message it
      36              :  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
      37              :  * backend.
      38              :  *
      39              :  * (In the current implementation, minMsgNum is a lower bound for the
      40              :  * per-process nextMsgNum values, but it isn't rigorously kept equal to the
      41              :  * smallest nextMsgNum --- it may lag behind.  We only update it when
      42              :  * SICleanupQueue is called, and we try not to do that often.)
      43              :  *
      44              :  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
      45              :  * entries.  We translate MsgNum values into circular-buffer indexes by
      46              :  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
      47              :  * MAXNUMMESSAGES is a constant and a power of 2).  As long as maxMsgNum
      48              :  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
      49              :  * in the buffer.  If the buffer does overflow, we recover by setting the
      50              :  * "reset" flag for each backend that has fallen too far behind.  A backend
      51              :  * that is in "reset" state is ignored while determining minMsgNum.  When
      52              :  * it does finally attempt to receive inval messages, it must discard all
      53              :  * its invalidatable state, since it won't know what it missed.
      54              :  *
      55              :  * To reduce the probability of needing resets, we send a "catchup" interrupt
      56              :  * to any backend that seems to be falling unreasonably far behind.  The
      57              :  * normal behavior is that at most one such interrupt is in flight at a time;
      58              :  * when a backend completes processing a catchup interrupt, it executes
      59              :  * SICleanupQueue, which will signal the next-furthest-behind backend if
      60              :  * needed.  This avoids undue contention from multiple backends all trying
      61              :  * to catch up at once.  However, the furthest-back backend might be stuck
      62              :  * in a state where it can't catch up.  Eventually it will get reset, so it
      63              :  * won't cause any more problems for anyone but itself.  But we don't want
      64              :  * to find that a bunch of other backends are now too close to the reset
      65              :  * threshold to be saved.  So SICleanupQueue is designed to occasionally
      66              :  * send extra catchup interrupts as the queue gets fuller, to backends that
      67              :  * are far behind and haven't gotten one yet.  As long as there aren't a lot
      68              :  * of "stuck" backends, we won't need a lot of extra interrupts, since ones
      69              :  * that aren't stuck will propagate their interrupts to the next guy.
      70              :  *
      71              :  * We would have problems if the MsgNum values overflow an integer, so
      72              :  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
      73              :  * from all the MsgNum variables simultaneously.  MSGNUMWRAPAROUND can be
      74              :  * large so that we don't need to do this often.  It must be a multiple of
      75              :  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
      76              :  * to be moved when we do it.
      77              :  *
      78              :  * Access to the shared sinval array is protected by two locks, SInvalReadLock
      79              :  * and SInvalWriteLock.  Readers take SInvalReadLock in shared mode; this
      80              :  * authorizes them to modify their own ProcState but not to modify or even
      81              :  * look at anyone else's.  When we need to perform array-wide updates,
      82              :  * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
      83              :  * lock out all readers.  Writers take SInvalWriteLock (always in exclusive
      84              :  * mode) to serialize adding messages to the queue.  Note that a writer
      85              :  * can operate in parallel with one or more readers, because the writer
      86              :  * has no need to touch anyone's ProcState, except in the infrequent cases
      87              :  * when SICleanupQueue is needed.  The only point of overlap is that
      88              :  * the writer wants to change maxMsgNum while readers need to read it.
      89              :  * We deal with that by having a spinlock that readers must take for just
      90              :  * long enough to read maxMsgNum, while writers take it for just long enough
      91              :  * to write maxMsgNum.  (The exact rule is that you need the spinlock to
      92              :  * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
      93              :  * spinlock to write maxMsgNum unless you are holding both locks.)
      94              :  *
      95              :  * Note: since maxMsgNum is an int and hence presumably atomically readable/
      96              :  * writable, the spinlock might seem unnecessary.  The reason it is needed
      97              :  * is to provide a memory barrier: we need to be sure that messages written
      98              :  * to the array are actually there before maxMsgNum is increased, and that
      99              :  * readers will see that data after fetching maxMsgNum.  Multiprocessors
     100              :  * that have weak memory-ordering guarantees can fail without the memory
     101              :  * barrier instructions that are included in the spinlock sequences.
     102              :  */
     103              : 
     104              : 
     105              : /*
     106              :  * Configurable parameters.
     107              :  *
     108              :  * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
     109              :  * Must be a power of 2 for speed.
     110              :  *
     111              :  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
     112              :  * Must be a multiple of MAXNUMMESSAGES.  Should be large.
     113              :  *
     114              :  * CLEANUP_MIN: the minimum number of messages that must be in the buffer
     115              :  * before we bother to call SICleanupQueue.
     116              :  *
     117              :  * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
     118              :  * we exceed CLEANUP_MIN.  Should be a power of 2 for speed.
     119              :  *
     120              :  * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
     121              :  * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
     122              :  *
     123              :  * WRITE_QUANTUM: the max number of messages to push into the buffer per
     124              :  * iteration of SIInsertDataEntries.  Noncritical but should be less than
     125              :  * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
     126              :  * per iteration.
     127              :  */
     128              : 
     129              : #define MAXNUMMESSAGES 4096
     130              : #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
     131              : #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
     132              : #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
     133              : #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
     134              : #define WRITE_QUANTUM 64
     135              : 
     136              : /* Per-backend state in shared invalidation structure */
     137              : typedef struct ProcState
     138              : {
     139              :     /* procPid is zero in an inactive ProcState array entry. */
     140              :     pid_t       procPid;        /* PID of backend, for signaling */
     141              :     /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
     142              :     int         nextMsgNum;     /* next message number to read */
     143              :     bool        resetState;     /* backend needs to reset its state */
     144              :     bool        signaled;       /* backend has been sent catchup signal */
     145              :     bool        hasMessages;    /* backend has unread messages */
     146              : 
     147              :     /*
     148              :      * Backend only sends invalidations, never receives them. This only makes
     149              :      * sense for Startup process during recovery because it doesn't maintain a
     150              :      * relcache, yet it fires inval messages to allow query backends to see
     151              :      * schema changes.
     152              :      */
     153              :     bool        sendOnly;       /* backend only sends, never receives */
     154              : 
     155              :     /*
     156              :      * Next LocalTransactionId to use for each idle backend slot.  We keep
     157              :      * this here because it is indexed by ProcNumber and it is convenient to
     158              :      * copy the value to and from local memory when MyProcNumber is set. It's
     159              :      * meaningless in an active ProcState entry.
     160              :      */
     161              :     LocalTransactionId nextLXID;
     162              : } ProcState;
     163              : 
     164              : /* Shared cache invalidation memory segment */
     165              : typedef struct SISeg
     166              : {
     167              :     /*
     168              :      * General state information
     169              :      */
     170              :     int         minMsgNum;      /* oldest message still needed */
     171              :     int         maxMsgNum;      /* next message number to be assigned */
     172              :     int         nextThreshold;  /* # of messages to call SICleanupQueue */
     173              : 
     174              :     slock_t     msgnumLock;     /* spinlock protecting maxMsgNum */
     175              : 
     176              :     /*
     177              :      * Circular buffer holding shared-inval messages
     178              :      */
     179              :     SharedInvalidationMessage buffer[MAXNUMMESSAGES];
     180              : 
     181              :     /*
     182              :      * Per-backend invalidation state info.
     183              :      *
     184              :      * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno.
     185              :      * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is
     186              :      * a dense array of their indexes, to speed up scanning all in-use slots.
     187              :      *
     188              :      * 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but
     189              :      * having our separate copy avoids contention on ProcArrayLock, and allows
     190              :      * us to track only the processes that participate in shared cache
     191              :      * invalidations.
     192              :      */
     193              :     int         numProcs;
     194              :     int        *pgprocnos;
     195              :     ProcState   procState[FLEXIBLE_ARRAY_MEMBER];
     196              : } SISeg;
     197              : 
     198              : /*
     199              :  * We reserve a slot for each possible ProcNumber, plus one for each
     200              :  * possible auxiliary process type.  (This scheme assumes there is not
     201              :  * more than one of any auxiliary process type at a time, except for
     202              :  * IO workers.)
     203              :  */
     204              : #define NumProcStateSlots   (MaxBackends + NUM_AUXILIARY_PROCS)
     205              : 
     206              : static SISeg *shmInvalBuffer;   /* pointer to the shared inval buffer */
     207              : 
     208              : 
     209              : static LocalTransactionId nextLocalTransactionId;
     210              : 
     211              : static void CleanupInvalidationState(int status, Datum arg);
     212              : 
     213              : 
     214              : /*
     215              :  * SharedInvalShmemSize --- return shared-memory space needed
     216              :  */
     217              : Size
     218         3267 : SharedInvalShmemSize(void)
     219              : {
     220              :     Size        size;
     221              : 
     222         3267 :     size = offsetof(SISeg, procState);
     223         3267 :     size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots));  /* procState */
     224         3267 :     size = add_size(size, mul_size(sizeof(int), NumProcStateSlots));    /* pgprocnos */
     225              : 
     226         3267 :     return size;
     227              : }
     228              : 
     229              : /*
     230              :  * SharedInvalShmemInit
     231              :  *      Create and initialize the SI message buffer
     232              :  */
     233              : void
     234         1140 : SharedInvalShmemInit(void)
     235              : {
     236              :     int         i;
     237              :     bool        found;
     238              : 
     239              :     /* Allocate space in shared memory */
     240         1140 :     shmInvalBuffer = (SISeg *)
     241         1140 :         ShmemInitStruct("shmInvalBuffer", SharedInvalShmemSize(), &found);
     242         1140 :     if (found)
     243            0 :         return;
     244              : 
     245              :     /* Clear message counters, save size of procState array, init spinlock */
     246         1140 :     shmInvalBuffer->minMsgNum = 0;
     247         1140 :     shmInvalBuffer->maxMsgNum = 0;
     248         1140 :     shmInvalBuffer->nextThreshold = CLEANUP_MIN;
     249         1140 :     SpinLockInit(&shmInvalBuffer->msgnumLock);
     250              : 
     251              :     /* The buffer[] array is initially all unused, so we need not fill it */
     252              : 
     253              :     /* Mark all backends inactive, and initialize nextLXID */
     254       149060 :     for (i = 0; i < NumProcStateSlots; i++)
     255              :     {
     256       147920 :         shmInvalBuffer->procState[i].procPid = 0;    /* inactive */
     257       147920 :         shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
     258       147920 :         shmInvalBuffer->procState[i].resetState = false;
     259       147920 :         shmInvalBuffer->procState[i].signaled = false;
     260       147920 :         shmInvalBuffer->procState[i].hasMessages = false;
     261       147920 :         shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
     262              :     }
     263         1140 :     shmInvalBuffer->numProcs = 0;
     264         1140 :     shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i];
     265              : }
     266              : 
     267              : /*
     268              :  * SharedInvalBackendInit
     269              :  *      Initialize a new backend to operate on the sinval buffer
     270              :  */
     271              : void
     272        18555 : SharedInvalBackendInit(bool sendOnly)
     273              : {
     274              :     ProcState  *stateP;
     275              :     pid_t       oldPid;
     276        18555 :     SISeg      *segP = shmInvalBuffer;
     277              : 
     278        18555 :     if (MyProcNumber < 0)
     279            0 :         elog(ERROR, "MyProcNumber not set");
     280        18555 :     if (MyProcNumber >= NumProcStateSlots)
     281            0 :         elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)",
     282              :              MyProcNumber, NumProcStateSlots);
     283        18555 :     stateP = &segP->procState[MyProcNumber];
     284              : 
     285              :     /*
     286              :      * This can run in parallel with read operations, but not with write
     287              :      * operations, since SIInsertDataEntries relies on the pgprocnos array to
     288              :      * set hasMessages appropriately.
     289              :      */
     290        18555 :     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     291              : 
     292        18555 :     oldPid = stateP->procPid;
     293        18555 :     if (oldPid != 0)
     294              :     {
     295            0 :         LWLockRelease(SInvalWriteLock);
     296            0 :         elog(ERROR, "sinval slot for backend %d is already in use by process %d",
     297              :              MyProcNumber, (int) oldPid);
     298              :     }
     299              : 
     300        18555 :     shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = MyProcNumber;
     301              : 
     302              :     /* Fetch next local transaction ID into local memory */
     303        18555 :     nextLocalTransactionId = stateP->nextLXID;
     304              : 
     305              :     /* mark myself active, with all extant messages already read */
     306        18555 :     stateP->procPid = MyProcPid;
     307        18555 :     stateP->nextMsgNum = segP->maxMsgNum;
     308        18555 :     stateP->resetState = false;
     309        18555 :     stateP->signaled = false;
     310        18555 :     stateP->hasMessages = false;
     311        18555 :     stateP->sendOnly = sendOnly;
     312              : 
     313        18555 :     LWLockRelease(SInvalWriteLock);
     314              : 
     315              :     /* register exit routine to mark my entry inactive at exit */
     316        18555 :     on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
     317        18555 : }
     318              : 
     319              : /*
     320              :  * CleanupInvalidationState
     321              :  *      Mark the current backend as no longer active.
     322              :  *
     323              :  * This function is called via on_shmem_exit() during backend shutdown.
     324              :  *
     325              :  * arg is really of type "SISeg*".
     326              :  */
     327              : static void
     328        18555 : CleanupInvalidationState(int status, Datum arg)
     329              : {
     330        18555 :     SISeg      *segP = (SISeg *) DatumGetPointer(arg);
     331              :     ProcState  *stateP;
     332              :     int         i;
     333              : 
     334              :     Assert(segP);
     335              : 
     336        18555 :     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     337              : 
     338        18555 :     stateP = &segP->procState[MyProcNumber];
     339              : 
     340              :     /* Update next local transaction ID for next holder of this proc number */
     341        18555 :     stateP->nextLXID = nextLocalTransactionId;
     342              : 
     343              :     /* Mark myself inactive */
     344        18555 :     stateP->procPid = 0;
     345        18555 :     stateP->nextMsgNum = 0;
     346        18555 :     stateP->resetState = false;
     347        18555 :     stateP->signaled = false;
     348              : 
     349        26027 :     for (i = segP->numProcs - 1; i >= 0; i--)
     350              :     {
     351        26027 :         if (segP->pgprocnos[i] == MyProcNumber)
     352              :         {
     353        18555 :             if (i != segP->numProcs - 1)
     354         3378 :                 segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1];
     355        18555 :             break;
     356              :         }
     357              :     }
     358        18555 :     if (i < 0)
     359            0 :         elog(PANIC, "could not find entry in sinval array");
     360        18555 :     segP->numProcs--;
     361              : 
     362        18555 :     LWLockRelease(SInvalWriteLock);
     363        18555 : }
     364              : 
     365              : /*
     366              :  * SIInsertDataEntries
     367              :  *      Add new invalidation message(s) to the buffer.
     368              :  */
     369              : void
     370       447917 : SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
     371              : {
     372       447917 :     SISeg      *segP = shmInvalBuffer;
     373              : 
     374              :     /*
     375              :      * N can be arbitrarily large.  We divide the work into groups of no more
     376              :      * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
     377              :      * an unreasonably long time.  (This is not so much because we care about
     378              :      * letting in other writers, as that some just-caught-up backend might be
     379              :      * trying to do SICleanupQueue to pass on its signal, and we don't want it
     380              :      * to have to wait a long time.)  Also, we need to consider calling
     381              :      * SICleanupQueue every so often.
     382              :      */
     383       921598 :     while (n > 0)
     384              :     {
     385       473681 :         int         nthistime = Min(n, WRITE_QUANTUM);
     386              :         int         numMsgs;
     387              :         int         max;
     388              :         int         i;
     389              : 
     390       473681 :         n -= nthistime;
     391              : 
     392       473681 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     393              : 
     394              :         /*
     395              :          * If the buffer is full, we *must* acquire some space.  Clean the
     396              :          * queue and reset anyone who is preventing space from being freed.
     397              :          * Otherwise, clean the queue only when it's exceeded the next
     398              :          * fullness threshold.  We have to loop and recheck the buffer state
     399              :          * after any call of SICleanupQueue.
     400              :          */
     401              :         for (;;)
     402              :         {
     403       479333 :             numMsgs = segP->maxMsgNum - segP->minMsgNum;
     404       479333 :             if (numMsgs + nthistime > MAXNUMMESSAGES ||
     405       479128 :                 numMsgs >= segP->nextThreshold)
     406         5652 :                 SICleanupQueue(true, nthistime);
     407              :             else
     408              :                 break;
     409              :         }
     410              : 
     411              :         /*
     412              :          * Insert new message(s) into proper slot of circular buffer
     413              :          */
     414       473681 :         max = segP->maxMsgNum;
     415      4546771 :         while (nthistime-- > 0)
     416              :         {
     417      4073090 :             segP->buffer[max % MAXNUMMESSAGES] = *data++;
     418      4073090 :             max++;
     419              :         }
     420              : 
     421              :         /* Update current value of maxMsgNum using spinlock */
     422       473681 :         SpinLockAcquire(&segP->msgnumLock);
     423       473681 :         segP->maxMsgNum = max;
     424       473681 :         SpinLockRelease(&segP->msgnumLock);
     425              : 
     426              :         /*
     427              :          * Now that the maxMsgNum change is globally visible, we give everyone
     428              :          * a swift kick to make sure they read the newly added messages.
     429              :          * Releasing SInvalWriteLock will enforce a full memory barrier, so
     430              :          * these (unlocked) changes will be committed to memory before we exit
     431              :          * the function.
     432              :          */
     433      2737589 :         for (i = 0; i < segP->numProcs; i++)
     434              :         {
     435      2263908 :             ProcState  *stateP = &segP->procState[segP->pgprocnos[i]];
     436              : 
     437      2263908 :             stateP->hasMessages = true;
     438              :         }
     439              : 
     440       473681 :         LWLockRelease(SInvalWriteLock);
     441              :     }
     442       447917 : }
     443              : 
     444              : /*
     445              :  * SIGetDataEntries
     446              :  *      get next SI message(s) for current backend, if there are any
     447              :  *
     448              :  * Possible return values:
     449              :  *  0:   no SI message available
     450              :  *  n>0: next n SI messages have been extracted into data[]
     451              :  * -1:   SI reset message extracted
     452              :  *
     453              :  * If the return value is less than the array size "datasize", the caller
     454              :  * can assume that there are no more SI messages after the one(s) returned.
     455              :  * Otherwise, another call is needed to collect more messages.
     456              :  *
     457              :  * NB: this can run in parallel with other instances of SIGetDataEntries
     458              :  * executing on behalf of other backends, since each instance will modify only
     459              :  * fields of its own backend's ProcState, and no instance will look at fields
     460              :  * of other backends' ProcStates.  We express this by grabbing SInvalReadLock
     461              :  * in shared mode.  Note that this is not exactly the normal (read-only)
     462              :  * interpretation of a shared lock! Look closely at the interactions before
     463              :  * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
     464              :  *
     465              :  * NB: this can also run in parallel with SIInsertDataEntries.  It is not
     466              :  * guaranteed that we will return any messages added after the routine is
     467              :  * entered.
     468              :  *
     469              :  * Note: we assume that "datasize" is not so large that it might be important
     470              :  * to break our hold on SInvalReadLock into segments.
     471              :  */
     472              : int
     473     20837954 : SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
     474              : {
     475              :     SISeg      *segP;
     476              :     ProcState  *stateP;
     477              :     int         max;
     478              :     int         n;
     479              : 
     480     20837954 :     segP = shmInvalBuffer;
     481     20837954 :     stateP = &segP->procState[MyProcNumber];
     482              : 
     483              :     /*
     484              :      * Before starting to take locks, do a quick, unlocked test to see whether
     485              :      * there can possibly be anything to read.  On a multiprocessor system,
     486              :      * it's possible that this load could migrate backwards and occur before
     487              :      * we actually enter this function, so we might miss a sinval message that
     488              :      * was just added by some other processor.  But they can't migrate
     489              :      * backwards over a preceding lock acquisition, so it should be OK.  If we
     490              :      * haven't acquired a lock preventing against further relevant
     491              :      * invalidations, any such occurrence is not much different than if the
     492              :      * invalidation had arrived slightly later in the first place.
     493              :      */
     494     20837954 :     if (!stateP->hasMessages)
     495     20063230 :         return 0;
     496              : 
     497       774724 :     LWLockAcquire(SInvalReadLock, LW_SHARED);
     498              : 
     499              :     /*
     500              :      * We must reset hasMessages before determining how many messages we're
     501              :      * going to read.  That way, if new messages arrive after we have
     502              :      * determined how many we're reading, the flag will get reset and we'll
     503              :      * notice those messages part-way through.
     504              :      *
     505              :      * Note that, if we don't end up reading all of the messages, we had
     506              :      * better be certain to reset this flag before exiting!
     507              :      */
     508       774724 :     stateP->hasMessages = false;
     509              : 
     510              :     /* Fetch current value of maxMsgNum using spinlock */
     511       774724 :     SpinLockAcquire(&segP->msgnumLock);
     512       774724 :     max = segP->maxMsgNum;
     513       774724 :     SpinLockRelease(&segP->msgnumLock);
     514              : 
     515       774724 :     if (stateP->resetState)
     516              :     {
     517              :         /*
     518              :          * Force reset.  We can say we have dealt with any messages added
     519              :          * since the reset, as well; and that means we should clear the
     520              :          * signaled flag, too.
     521              :          */
     522          249 :         stateP->nextMsgNum = max;
     523          249 :         stateP->resetState = false;
     524          249 :         stateP->signaled = false;
     525          249 :         LWLockRelease(SInvalReadLock);
     526          249 :         return -1;
     527              :     }
     528              : 
     529              :     /*
     530              :      * Retrieve messages and advance backend's counter, until data array is
     531              :      * full or there are no more messages.
     532              :      *
     533              :      * There may be other backends that haven't read the message(s), so we
     534              :      * cannot delete them here.  SICleanupQueue() will eventually remove them
     535              :      * from the queue.
     536              :      */
     537       774475 :     n = 0;
     538     18351719 :     while (n < datasize && stateP->nextMsgNum < max)
     539              :     {
     540     17577244 :         data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
     541     17577244 :         stateP->nextMsgNum++;
     542              :     }
     543              : 
     544              :     /*
     545              :      * If we have caught up completely, reset our "signaled" flag so that
     546              :      * we'll get another signal if we fall behind again.
     547              :      *
     548              :      * If we haven't caught up completely, reset the hasMessages flag so that
     549              :      * we see the remaining messages next time.
     550              :      */
     551       774475 :     if (stateP->nextMsgNum >= max)
     552       303554 :         stateP->signaled = false;
     553              :     else
     554       470921 :         stateP->hasMessages = true;
     555              : 
     556       774475 :     LWLockRelease(SInvalReadLock);
     557       774475 :     return n;
     558              : }
     559              : 
     560              : /*
     561              :  * SICleanupQueue
     562              :  *      Remove messages that have been consumed by all active backends
     563              :  *
     564              :  * callerHasWriteLock is true if caller is holding SInvalWriteLock.
     565              :  * minFree is the minimum number of message slots to make free.
     566              :  *
     567              :  * Possible side effects of this routine include marking one or more
     568              :  * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
     569              :  * to some backend that seems to be getting too far behind.  We signal at
     570              :  * most one backend at a time, for reasons explained at the top of the file.
     571              :  *
     572              :  * Caution: because we transiently release write lock when we have to signal
     573              :  * some other backend, it is NOT guaranteed that there are still minFree
     574              :  * free message slots at exit.  Caller must recheck and perhaps retry.
     575              :  */
     576              : void
     577         8503 : SICleanupQueue(bool callerHasWriteLock, int minFree)
     578              : {
     579         8503 :     SISeg      *segP = shmInvalBuffer;
     580              :     int         min,
     581              :                 minsig,
     582              :                 lowbound,
     583              :                 numMsgs,
     584              :                 i;
     585         8503 :     ProcState  *needSig = NULL;
     586              : 
     587              :     /* Lock out all writers and readers */
     588         8503 :     if (!callerHasWriteLock)
     589         2851 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     590         8503 :     LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
     591              : 
     592              :     /*
     593              :      * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
     594              :      * furthest-back backend that needs signaling (if any), and reset any
     595              :      * backends that are too far back.  Note that because we ignore sendOnly
     596              :      * backends here it is possible for them to keep sending messages without
     597              :      * a problem even when they are the only active backend.
     598              :      */
     599         8503 :     min = segP->maxMsgNum;
     600         8503 :     minsig = min - SIG_THRESHOLD;
     601         8503 :     lowbound = min - MAXNUMMESSAGES + minFree;
     602              : 
     603        71208 :     for (i = 0; i < segP->numProcs; i++)
     604              :     {
     605        62705 :         ProcState  *stateP = &segP->procState[segP->pgprocnos[i]];
     606        62705 :         int         n = stateP->nextMsgNum;
     607              : 
     608              :         /* Ignore if already in reset state */
     609              :         Assert(stateP->procPid != 0);
     610        62705 :         if (stateP->resetState || stateP->sendOnly)
     611         5867 :             continue;
     612              : 
     613              :         /*
     614              :          * If we must free some space and this backend is preventing it, force
     615              :          * him into reset state and then ignore until he catches up.
     616              :          */
     617        56838 :         if (n < lowbound)
     618              :         {
     619          250 :             stateP->resetState = true;
     620              :             /* no point in signaling him ... */
     621          250 :             continue;
     622              :         }
     623              : 
     624              :         /* Track the global minimum nextMsgNum */
     625        56588 :         if (n < min)
     626        12903 :             min = n;
     627              : 
     628              :         /* Also see who's furthest back of the unsignaled backends */
     629        56588 :         if (n < minsig && !stateP->signaled)
     630              :         {
     631         2919 :             minsig = n;
     632         2919 :             needSig = stateP;
     633              :         }
     634              :     }
     635         8503 :     segP->minMsgNum = min;
     636              : 
     637              :     /*
     638              :      * When minMsgNum gets really large, decrement all message counters so as
     639              :      * to forestall overflow of the counters.  This happens seldom enough that
     640              :      * folding it into the previous loop would be a loser.
     641              :      */
     642         8503 :     if (min >= MSGNUMWRAPAROUND)
     643              :     {
     644            0 :         segP->minMsgNum -= MSGNUMWRAPAROUND;
     645            0 :         segP->maxMsgNum -= MSGNUMWRAPAROUND;
     646            0 :         for (i = 0; i < segP->numProcs; i++)
     647            0 :             segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
     648              :     }
     649              : 
     650              :     /*
     651              :      * Determine how many messages are still in the queue, and set the
     652              :      * threshold at which we should repeat SICleanupQueue().
     653              :      */
     654         8503 :     numMsgs = segP->maxMsgNum - segP->minMsgNum;
     655         8503 :     if (numMsgs < CLEANUP_MIN)
     656         2819 :         segP->nextThreshold = CLEANUP_MIN;
     657              :     else
     658         5684 :         segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
     659              : 
     660              :     /*
     661              :      * Lastly, signal anyone who needs a catchup interrupt.  Since
     662              :      * SendProcSignal() might not be fast, we don't want to hold locks while
     663              :      * executing it.
     664              :      */
     665         8503 :     if (needSig)
     666              :     {
     667         2870 :         pid_t       his_pid = needSig->procPid;
     668         2870 :         ProcNumber  his_procNumber = (needSig - &segP->procState[0]);
     669              : 
     670         2870 :         needSig->signaled = true;
     671         2870 :         LWLockRelease(SInvalReadLock);
     672         2870 :         LWLockRelease(SInvalWriteLock);
     673         2870 :         elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
     674         2870 :         SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_procNumber);
     675         2870 :         if (callerHasWriteLock)
     676         2338 :             LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     677              :     }
     678              :     else
     679              :     {
     680         5633 :         LWLockRelease(SInvalReadLock);
     681         5633 :         if (!callerHasWriteLock)
     682         2319 :             LWLockRelease(SInvalWriteLock);
     683              :     }
     684         8503 : }
     685              : 
     686              : 
     687              : /*
     688              :  * GetNextLocalTransactionId --- allocate a new LocalTransactionId
     689              :  *
     690              :  * We split VirtualTransactionIds into two parts so that it is possible
     691              :  * to allocate a new one without any contention for shared memory, except
     692              :  * for a bit of additional overhead during backend startup/shutdown.
     693              :  * The high-order part of a VirtualTransactionId is a ProcNumber, and the
     694              :  * low-order part is a LocalTransactionId, which we assign from a local
     695              :  * counter.  To avoid the risk of a VirtualTransactionId being reused
     696              :  * within a short interval, successive procs occupying the same PGPROC slot
     697              :  * should use a consecutive sequence of local IDs, which is implemented
     698              :  * by copying nextLocalTransactionId as seen above.
     699              :  */
     700              : LocalTransactionId
     701       552656 : GetNextLocalTransactionId(void)
     702              : {
     703              :     LocalTransactionId result;
     704              : 
     705              :     /* loop to avoid returning InvalidLocalTransactionId at wraparound */
     706              :     do
     707              :     {
     708       563023 :         result = nextLocalTransactionId++;
     709       563023 :     } while (!LocalTransactionIdIsValid(result));
     710              : 
     711       552656 :     return result;
     712              : }
        

Generated by: LCOV version 2.0-1