LCOV - code coverage report
Current view: top level - src/backend/storage/ipc - sinvaladt.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 94.4 % 160 151
Test Date: 2026-04-06 14:16:21 Functions: 100.0 % 8 8
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * sinvaladt.c
       4              :  *    POSTGRES shared cache invalidation data manager.
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *    src/backend/storage/ipc/sinvaladt.c
      12              :  *
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : #include "postgres.h"
      16              : 
      17              : #include <signal.h>
      18              : #include <unistd.h>
      19              : 
      20              : #include "miscadmin.h"
      21              : #include "storage/ipc.h"
      22              : #include "storage/proc.h"
      23              : #include "storage/procnumber.h"
      24              : #include "storage/procsignal.h"
      25              : #include "storage/shmem.h"
      26              : #include "storage/sinvaladt.h"
      27              : #include "storage/spin.h"
      28              : #include "storage/subsystems.h"
      29              : 
      30              : /*
      31              :  * Conceptually, the shared cache invalidation messages are stored in an
      32              :  * infinite array, where maxMsgNum is the next array subscript to store a
      33              :  * submitted message in, minMsgNum is the smallest array subscript containing
      34              :  * a message not yet read by all backends, and we always have maxMsgNum >=
      35              :  * minMsgNum.  (They are equal when there are no messages pending.)  For each
      36              :  * active backend, there is a nextMsgNum pointer indicating the next message it
      37              :  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
      38              :  * backend.
      39              :  *
      40              :  * (In the current implementation, minMsgNum is a lower bound for the
      41              :  * per-process nextMsgNum values, but it isn't rigorously kept equal to the
      42              :  * smallest nextMsgNum --- it may lag behind.  We only update it when
      43              :  * SICleanupQueue is called, and we try not to do that often.)
      44              :  *
      45              :  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
      46              :  * entries.  We translate MsgNum values into circular-buffer indexes by
      47              :  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
      48              :  * MAXNUMMESSAGES is a constant and a power of 2).  As long as maxMsgNum
      49              :  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
      50              :  * in the buffer.  If the buffer does overflow, we recover by setting the
      51              :  * "reset" flag for each backend that has fallen too far behind.  A backend
      52              :  * that is in "reset" state is ignored while determining minMsgNum.  When
      53              :  * it does finally attempt to receive inval messages, it must discard all
      54              :  * its invalidatable state, since it won't know what it missed.
      55              :  *
      56              :  * To reduce the probability of needing resets, we send a "catchup" interrupt
      57              :  * to any backend that seems to be falling unreasonably far behind.  The
      58              :  * normal behavior is that at most one such interrupt is in flight at a time;
      59              :  * when a backend completes processing a catchup interrupt, it executes
      60              :  * SICleanupQueue, which will signal the next-furthest-behind backend if
      61              :  * needed.  This avoids undue contention from multiple backends all trying
      62              :  * to catch up at once.  However, the furthest-back backend might be stuck
      63              :  * in a state where it can't catch up.  Eventually it will get reset, so it
      64              :  * won't cause any more problems for anyone but itself.  But we don't want
      65              :  * to find that a bunch of other backends are now too close to the reset
      66              :  * threshold to be saved.  So SICleanupQueue is designed to occasionally
      67              :  * send extra catchup interrupts as the queue gets fuller, to backends that
      68              :  * are far behind and haven't gotten one yet.  As long as there aren't a lot
      69              :  * of "stuck" backends, we won't need a lot of extra interrupts, since ones
      70              :  * that aren't stuck will propagate their interrupts to the next guy.
      71              :  *
      72              :  * We would have problems if the MsgNum values overflow an integer, so
      73              :  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
      74              :  * from all the MsgNum variables simultaneously.  MSGNUMWRAPAROUND can be
      75              :  * large so that we don't need to do this often.  It must be a multiple of
      76              :  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
      77              :  * to be moved when we do it.
      78              :  *
      79              :  * Access to the shared sinval array is protected by two locks, SInvalReadLock
      80              :  * and SInvalWriteLock.  Readers take SInvalReadLock in shared mode; this
      81              :  * authorizes them to modify their own ProcState but not to modify or even
      82              :  * look at anyone else's.  When we need to perform array-wide updates,
      83              :  * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
      84              :  * lock out all readers.  Writers take SInvalWriteLock (always in exclusive
      85              :  * mode) to serialize adding messages to the queue.  Note that a writer
      86              :  * can operate in parallel with one or more readers, because the writer
      87              :  * has no need to touch anyone's ProcState, except in the infrequent cases
      88              :  * when SICleanupQueue is needed.  The only point of overlap is that
      89              :  * the writer wants to change maxMsgNum while readers need to read it.
      90              :  * We deal with that by having a spinlock that readers must take for just
      91              :  * long enough to read maxMsgNum, while writers take it for just long enough
      92              :  * to write maxMsgNum.  (The exact rule is that you need the spinlock to
      93              :  * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
      94              :  * spinlock to write maxMsgNum unless you are holding both locks.)
      95              :  *
      96              :  * Note: since maxMsgNum is an int and hence presumably atomically readable/
      97              :  * writable, the spinlock might seem unnecessary.  The reason it is needed
      98              :  * is to provide a memory barrier: we need to be sure that messages written
      99              :  * to the array are actually there before maxMsgNum is increased, and that
     100              :  * readers will see that data after fetching maxMsgNum.  Multiprocessors
     101              :  * that have weak memory-ordering guarantees can fail without the memory
     102              :  * barrier instructions that are included in the spinlock sequences.
     103              :  */
     104              : 
     105              : 
     106              : /*
     107              :  * Configurable parameters.
     108              :  *
     109              :  * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
     110              :  * Must be a power of 2 for speed.
     111              :  *
     112              :  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
     113              :  * Must be a multiple of MAXNUMMESSAGES.  Should be large.
     114              :  *
     115              :  * CLEANUP_MIN: the minimum number of messages that must be in the buffer
     116              :  * before we bother to call SICleanupQueue.
     117              :  *
     118              :  * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
     119              :  * we exceed CLEANUP_MIN.  Should be a power of 2 for speed.
     120              :  *
     121              :  * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
     122              :  * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
     123              :  *
     124              :  * WRITE_QUANTUM: the max number of messages to push into the buffer per
     125              :  * iteration of SIInsertDataEntries.  Noncritical but should be less than
     126              :  * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
     127              :  * per iteration.
     128              :  */
     129              : 
     130              : #define MAXNUMMESSAGES 4096
     131              : #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
     132              : #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
     133              : #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
     134              : #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
     135              : #define WRITE_QUANTUM 64
     136              : 
     137              : /* Per-backend state in shared invalidation structure */
     138              : typedef struct ProcState
     139              : {
     140              :     /* procPid is zero in an inactive ProcState array entry. */
     141              :     pid_t       procPid;        /* PID of backend, for signaling */
     142              :     /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
     143              :     int         nextMsgNum;     /* next message number to read */
     144              :     bool        resetState;     /* backend needs to reset its state */
     145              :     bool        signaled;       /* backend has been sent catchup signal */
     146              :     bool        hasMessages;    /* backend has unread messages */
     147              : 
     148              :     /*
     149              :      * Backend only sends invalidations, never receives them. This only makes
     150              :      * sense for Startup process during recovery because it doesn't maintain a
     151              :      * relcache, yet it fires inval messages to allow query backends to see
     152              :      * schema changes.
     153              :      */
     154              :     bool        sendOnly;       /* backend only sends, never receives */
     155              : 
     156              :     /*
     157              :      * Next LocalTransactionId to use for each idle backend slot.  We keep
     158              :      * this here because it is indexed by ProcNumber and it is convenient to
     159              :      * copy the value to and from local memory when MyProcNumber is set. It's
     160              :      * meaningless in an active ProcState entry.
     161              :      */
     162              :     LocalTransactionId nextLXID;
     163              : } ProcState;
     164              : 
     165              : /* Shared cache invalidation memory segment */
     166              : typedef struct SISeg
     167              : {
     168              :     /*
     169              :      * General state information
     170              :      */
     171              :     int         minMsgNum;      /* oldest message still needed */
     172              :     int         maxMsgNum;      /* next message number to be assigned */
     173              :     int         nextThreshold;  /* # of messages to call SICleanupQueue */
     174              : 
     175              :     slock_t     msgnumLock;     /* spinlock protecting maxMsgNum */
     176              : 
     177              :     /*
     178              :      * Circular buffer holding shared-inval messages
     179              :      */
     180              :     SharedInvalidationMessage buffer[MAXNUMMESSAGES];
     181              : 
     182              :     /*
     183              :      * Per-backend invalidation state info.
     184              :      *
     185              :      * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno.
     186              :      * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is
     187              :      * a dense array of their indexes, to speed up scanning all in-use slots.
     188              :      *
     189              :      * 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but
     190              :      * having our separate copy avoids contention on ProcArrayLock, and allows
     191              :      * us to track only the processes that participate in shared cache
     192              :      * invalidations.
     193              :      */
     194              :     int         numProcs;
     195              :     int        *pgprocnos;
     196              :     ProcState   procState[FLEXIBLE_ARRAY_MEMBER];
     197              : } SISeg;
     198              : 
     199              : /*
     200              :  * We reserve a slot for each possible ProcNumber, plus one for each
     201              :  * possible auxiliary process type.  (This scheme assumes there is not
     202              :  * more than one of any auxiliary process type at a time, except for
     203              :  * IO workers.)
     204              :  */
     205              : #define NumProcStateSlots   (MaxBackends + NUM_AUXILIARY_PROCS)
     206              : 
     207              : static SISeg *shmInvalBuffer;   /* pointer to the shared inval buffer */
     208              : 
     209              : static void SharedInvalShmemRequest(void *arg);
     210              : static void SharedInvalShmemInit(void *arg);
     211              : 
     212              : const ShmemCallbacks SharedInvalShmemCallbacks = {
     213              :     .request_fn = SharedInvalShmemRequest,
     214              :     .init_fn = SharedInvalShmemInit,
     215              : };
     216              : 
     217              : 
     218              : static LocalTransactionId nextLocalTransactionId;
     219              : 
     220              : static void CleanupInvalidationState(int status, Datum arg);
     221              : 
     222              : 
     223              : /*
     224              :  * SharedInvalShmemRequest
     225              :  *      Register shared memory needs for the SI message buffer
     226              :  */
     227              : static void
     228         1233 : SharedInvalShmemRequest(void *arg)
     229              : {
     230              :     Size        size;
     231              : 
     232         1233 :     size = offsetof(SISeg, procState);
     233         1233 :     size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots));  /* procState */
     234         1233 :     size = add_size(size, mul_size(sizeof(int), NumProcStateSlots));    /* pgprocnos */
     235              : 
     236         1233 :     ShmemRequestStruct(.name = "shmInvalBuffer",
     237              :                        .size = size,
     238              :                        .ptr = (void **) &shmInvalBuffer,
     239              :         );
     240         1233 : }
     241              : 
     242              : static void
     243         1230 : SharedInvalShmemInit(void *arg)
     244              : {
     245              :     int         i;
     246              : 
     247              :     /* Clear message counters, init spinlock */
     248         1230 :     shmInvalBuffer->minMsgNum = 0;
     249         1230 :     shmInvalBuffer->maxMsgNum = 0;
     250         1230 :     shmInvalBuffer->nextThreshold = CLEANUP_MIN;
     251         1230 :     SpinLockInit(&shmInvalBuffer->msgnumLock);
     252              : 
     253              :     /* The buffer[] array is initially all unused, so we need not fill it */
     254              : 
     255              :     /* Mark all backends inactive, and initialize nextLXID */
     256       163161 :     for (i = 0; i < NumProcStateSlots; i++)
     257              :     {
     258       161931 :         shmInvalBuffer->procState[i].procPid = 0;    /* inactive */
     259       161931 :         shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
     260       161931 :         shmInvalBuffer->procState[i].resetState = false;
     261       161931 :         shmInvalBuffer->procState[i].signaled = false;
     262       161931 :         shmInvalBuffer->procState[i].hasMessages = false;
     263       161931 :         shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
     264              :     }
     265         1230 :     shmInvalBuffer->numProcs = 0;
     266         1230 :     shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i];
     267         1230 : }
     268              : 
     269              : /*
     270              :  * SharedInvalBackendInit
     271              :  *      Initialize a new backend to operate on the sinval buffer
     272              :  */
     273              : void
     274        19953 : SharedInvalBackendInit(bool sendOnly)
     275              : {
     276              :     ProcState  *stateP;
     277              :     pid_t       oldPid;
     278        19953 :     SISeg      *segP = shmInvalBuffer;
     279              : 
     280        19953 :     if (MyProcNumber < 0)
     281            0 :         elog(ERROR, "MyProcNumber not set");
     282        19953 :     if (MyProcNumber >= NumProcStateSlots)
     283            0 :         elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)",
     284              :              MyProcNumber, NumProcStateSlots);
     285        19953 :     stateP = &segP->procState[MyProcNumber];
     286              : 
     287              :     /*
     288              :      * This can run in parallel with read operations, but not with write
     289              :      * operations, since SIInsertDataEntries relies on the pgprocnos array to
     290              :      * set hasMessages appropriately.
     291              :      */
     292        19953 :     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     293              : 
     294        19953 :     oldPid = stateP->procPid;
     295        19953 :     if (oldPid != 0)
     296              :     {
     297            0 :         LWLockRelease(SInvalWriteLock);
     298            0 :         elog(ERROR, "sinval slot for backend %d is already in use by process %d",
     299              :              MyProcNumber, (int) oldPid);
     300              :     }
     301              : 
     302        19953 :     shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = MyProcNumber;
     303              : 
     304              :     /* Fetch next local transaction ID into local memory */
     305        19953 :     nextLocalTransactionId = stateP->nextLXID;
     306              : 
     307              :     /* mark myself active, with all extant messages already read */
     308        19953 :     stateP->procPid = MyProcPid;
     309        19953 :     stateP->nextMsgNum = segP->maxMsgNum;
     310        19953 :     stateP->resetState = false;
     311        19953 :     stateP->signaled = false;
     312        19953 :     stateP->hasMessages = false;
     313        19953 :     stateP->sendOnly = sendOnly;
     314              : 
     315        19953 :     LWLockRelease(SInvalWriteLock);
     316              : 
     317              :     /* register exit routine to mark my entry inactive at exit */
     318        19953 :     on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
     319        19953 : }
     320              : 
     321              : /*
     322              :  * CleanupInvalidationState
     323              :  *      Mark the current backend as no longer active.
     324              :  *
     325              :  * This function is called via on_shmem_exit() during backend shutdown.
     326              :  *
     327              :  * arg is really of type "SISeg*".
     328              :  */
     329              : static void
     330        19953 : CleanupInvalidationState(int status, Datum arg)
     331              : {
     332        19953 :     SISeg      *segP = (SISeg *) DatumGetPointer(arg);
     333              :     ProcState  *stateP;
     334              :     int         i;
     335              : 
     336              :     Assert(segP);
     337              : 
     338        19953 :     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     339              : 
     340        19953 :     stateP = &segP->procState[MyProcNumber];
     341              : 
     342              :     /* Update next local transaction ID for next holder of this proc number */
     343        19953 :     stateP->nextLXID = nextLocalTransactionId;
     344              : 
     345              :     /* Mark myself inactive */
     346        19953 :     stateP->procPid = 0;
     347        19953 :     stateP->nextMsgNum = 0;
     348        19953 :     stateP->resetState = false;
     349        19953 :     stateP->signaled = false;
     350              : 
     351        28017 :     for (i = segP->numProcs - 1; i >= 0; i--)
     352              :     {
     353        28017 :         if (segP->pgprocnos[i] == MyProcNumber)
     354              :         {
     355        19953 :             if (i != segP->numProcs - 1)
     356         3685 :                 segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1];
     357        19953 :             break;
     358              :         }
     359              :     }
     360        19953 :     if (i < 0)
     361            0 :         elog(PANIC, "could not find entry in sinval array");
     362        19953 :     segP->numProcs--;
     363              : 
     364        19953 :     LWLockRelease(SInvalWriteLock);
     365        19953 : }
     366              : 
     367              : /*
     368              :  * SIInsertDataEntries
     369              :  *      Add new invalidation message(s) to the buffer.
     370              :  */
     371              : void
     372       534323 : SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
     373              : {
     374       534323 :     SISeg      *segP = shmInvalBuffer;
     375              : 
     376              :     /*
     377              :      * N can be arbitrarily large.  We divide the work into groups of no more
     378              :      * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
     379              :      * an unreasonably long time.  (This is not so much because we care about
     380              :      * letting in other writers, as that some just-caught-up backend might be
     381              :      * trying to do SICleanupQueue to pass on its signal, and we don't want it
     382              :      * to have to wait a long time.)  Also, we need to consider calling
     383              :      * SICleanupQueue every so often.
     384              :      */
     385      1097767 :     while (n > 0)
     386              :     {
     387       563444 :         int         nthistime = Min(n, WRITE_QUANTUM);
     388              :         int         numMsgs;
     389              :         int         max;
     390              :         int         i;
     391              : 
     392       563444 :         n -= nthistime;
     393              : 
     394       563444 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     395              : 
     396              :         /*
     397              :          * If the buffer is full, we *must* acquire some space.  Clean the
     398              :          * queue and reset anyone who is preventing space from being freed.
     399              :          * Otherwise, clean the queue only when it's exceeded the next
     400              :          * fullness threshold.  We have to loop and recheck the buffer state
     401              :          * after any call of SICleanupQueue.
     402              :          */
     403              :         for (;;)
     404              :         {
     405       569921 :             numMsgs = segP->maxMsgNum - segP->minMsgNum;
     406       569921 :             if (numMsgs + nthistime > MAXNUMMESSAGES ||
     407       569680 :                 numMsgs >= segP->nextThreshold)
     408         6477 :                 SICleanupQueue(true, nthistime);
     409              :             else
     410              :                 break;
     411              :         }
     412              : 
     413              :         /*
     414              :          * Insert new message(s) into proper slot of circular buffer
     415              :          */
     416       563444 :         max = segP->maxMsgNum;
     417      5400569 :         while (nthistime-- > 0)
     418              :         {
     419      4837125 :             segP->buffer[max % MAXNUMMESSAGES] = *data++;
     420      4837125 :             max++;
     421              :         }
     422              : 
     423              :         /* Update current value of maxMsgNum using spinlock */
     424       563444 :         SpinLockAcquire(&segP->msgnumLock);
     425       563444 :         segP->maxMsgNum = max;
     426       563444 :         SpinLockRelease(&segP->msgnumLock);
     427              : 
     428              :         /*
     429              :          * Now that the maxMsgNum change is globally visible, we give everyone
     430              :          * a swift kick to make sure they read the newly added messages.
     431              :          * Releasing SInvalWriteLock will enforce a full memory barrier, so
     432              :          * these (unlocked) changes will be committed to memory before we exit
     433              :          * the function.
     434              :          */
     435      3384998 :         for (i = 0; i < segP->numProcs; i++)
     436              :         {
     437      2821554 :             ProcState  *stateP = &segP->procState[segP->pgprocnos[i]];
     438              : 
     439      2821554 :             stateP->hasMessages = true;
     440              :         }
     441              : 
     442       563444 :         LWLockRelease(SInvalWriteLock);
     443              :     }
     444       534323 : }
     445              : 
     446              : /*
     447              :  * SIGetDataEntries
     448              :  *      get next SI message(s) for current backend, if there are any
     449              :  *
     450              :  * Possible return values:
     451              :  *  0:   no SI message available
     452              :  *  n>0: next n SI messages have been extracted into data[]
     453              :  * -1:   SI reset message extracted
     454              :  *
     455              :  * If the return value is less than the array size "datasize", the caller
     456              :  * can assume that there are no more SI messages after the one(s) returned.
     457              :  * Otherwise, another call is needed to collect more messages.
     458              :  *
     459              :  * NB: this can run in parallel with other instances of SIGetDataEntries
     460              :  * executing on behalf of other backends, since each instance will modify only
     461              :  * fields of its own backend's ProcState, and no instance will look at fields
     462              :  * of other backends' ProcStates.  We express this by grabbing SInvalReadLock
     463              :  * in shared mode.  Note that this is not exactly the normal (read-only)
     464              :  * interpretation of a shared lock! Look closely at the interactions before
     465              :  * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
     466              :  *
     467              :  * NB: this can also run in parallel with SIInsertDataEntries.  It is not
     468              :  * guaranteed that we will return any messages added after the routine is
     469              :  * entered.
     470              :  *
     471              :  * Note: we assume that "datasize" is not so large that it might be important
     472              :  * to break our hold on SInvalReadLock into segments.
     473              :  */
     474              : int
     475     24803429 : SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
     476              : {
     477              :     SISeg      *segP;
     478              :     ProcState  *stateP;
     479              :     int         max;
     480              :     int         n;
     481              : 
     482     24803429 :     segP = shmInvalBuffer;
     483     24803429 :     stateP = &segP->procState[MyProcNumber];
     484              : 
     485              :     /*
     486              :      * Before starting to take locks, do a quick, unlocked test to see whether
     487              :      * there can possibly be anything to read.  On a multiprocessor system,
     488              :      * it's possible that this load could migrate backwards and occur before
     489              :      * we actually enter this function, so we might miss a sinval message that
     490              :      * was just added by some other processor.  But they can't migrate
     491              :      * backwards over a preceding lock acquisition, so it should be OK.  If we
     492              :      * haven't acquired a lock preventing against further relevant
     493              :      * invalidations, any such occurrence is not much different than if the
     494              :      * invalidation had arrived slightly later in the first place.
     495              :      */
     496     24803429 :     if (!stateP->hasMessages)
     497     23856922 :         return 0;
     498              : 
     499       946507 :     LWLockAcquire(SInvalReadLock, LW_SHARED);
     500              : 
     501              :     /*
     502              :      * We must reset hasMessages before determining how many messages we're
     503              :      * going to read.  That way, if new messages arrive after we have
     504              :      * determined how many we're reading, the flag will get reset and we'll
     505              :      * notice those messages part-way through.
     506              :      *
     507              :      * Note that, if we don't end up reading all of the messages, we had
     508              :      * better be certain to reset this flag before exiting!
     509              :      */
     510       946507 :     stateP->hasMessages = false;
     511              : 
     512              :     /* Fetch current value of maxMsgNum using spinlock */
     513       946507 :     SpinLockAcquire(&segP->msgnumLock);
     514       946507 :     max = segP->maxMsgNum;
     515       946507 :     SpinLockRelease(&segP->msgnumLock);
     516              : 
     517       946507 :     if (stateP->resetState)
     518              :     {
     519              :         /*
     520              :          * Force reset.  We can say we have dealt with any messages added
     521              :          * since the reset, as well; and that means we should clear the
     522              :          * signaled flag, too.
     523              :          */
     524          296 :         stateP->nextMsgNum = max;
     525          296 :         stateP->resetState = false;
     526          296 :         stateP->signaled = false;
     527          296 :         LWLockRelease(SInvalReadLock);
     528          296 :         return -1;
     529              :     }
     530              : 
     531              :     /*
     532              :      * Retrieve messages and advance backend's counter, until data array is
     533              :      * full or there are no more messages.
     534              :      *
     535              :      * There may be other backends that haven't read the message(s), so we
     536              :      * cannot delete them here.  SICleanupQueue() will eventually remove them
     537              :      * from the queue.
     538              :      */
     539       946211 :     n = 0;
     540     22357844 :     while (n < datasize && stateP->nextMsgNum < max)
     541              :     {
     542     21411633 :         data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
     543     21411633 :         stateP->nextMsgNum++;
     544              :     }
     545              : 
     546              :     /*
     547              :      * If we have caught up completely, reset our "signaled" flag so that
     548              :      * we'll get another signal if we fall behind again.
     549              :      *
     550              :      * If we haven't caught up completely, reset the hasMessages flag so that
     551              :      * we see the remaining messages next time.
     552              :      */
     553       946211 :     if (stateP->nextMsgNum >= max)
     554       377712 :         stateP->signaled = false;
     555              :     else
     556       568499 :         stateP->hasMessages = true;
     557              : 
     558       946211 :     LWLockRelease(SInvalReadLock);
     559       946211 :     return n;
     560              : }
     561              : 
     562              : /*
     563              :  * SICleanupQueue
     564              :  *      Remove messages that have been consumed by all active backends
     565              :  *
     566              :  * callerHasWriteLock is true if caller is holding SInvalWriteLock.
     567              :  * minFree is the minimum number of message slots to make free.
     568              :  *
     569              :  * Possible side effects of this routine include marking one or more
     570              :  * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
     571              :  * to some backend that seems to be getting too far behind.  We signal at
     572              :  * most one backend at a time, for reasons explained at the top of the file.
     573              :  *
     574              :  * Caution: because we transiently release write lock when we have to signal
     575              :  * some other backend, it is NOT guaranteed that there are still minFree
     576              :  * free message slots at exit.  Caller must recheck and perhaps retry.
     577              :  */
     578              : void
     579         9614 : SICleanupQueue(bool callerHasWriteLock, int minFree)
     580              : {
     581         9614 :     SISeg      *segP = shmInvalBuffer;
     582              :     int         min,
     583              :                 minsig,
     584              :                 lowbound,
     585              :                 numMsgs,
     586              :                 i;
     587         9614 :     ProcState  *needSig = NULL;
     588              : 
     589              :     /* Lock out all writers and readers */
     590         9614 :     if (!callerHasWriteLock)
     591         3137 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     592         9614 :     LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
     593              : 
     594              :     /*
     595              :      * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
     596              :      * furthest-back backend that needs signaling (if any), and reset any
     597              :      * backends that are too far back.  Note that because we ignore sendOnly
     598              :      * backends here it is possible for them to keep sending messages without
     599              :      * a problem even when they are the only active backend.
     600              :      */
     601         9614 :     min = segP->maxMsgNum;
     602         9614 :     minsig = min - SIG_THRESHOLD;
     603         9614 :     lowbound = min - MAXNUMMESSAGES + minFree;
     604              : 
     605        81710 :     for (i = 0; i < segP->numProcs; i++)
     606              :     {
     607        72096 :         ProcState  *stateP = &segP->procState[segP->pgprocnos[i]];
     608        72096 :         int         n = stateP->nextMsgNum;
     609              : 
     610              :         /* Ignore if already in reset state */
     611              :         Assert(stateP->procPid != 0);
     612        72096 :         if (stateP->resetState || stateP->sendOnly)
     613         5601 :             continue;
     614              : 
     615              :         /*
     616              :          * If we must free some space and this backend is preventing it, force
     617              :          * him into reset state and then ignore until he catches up.
     618              :          */
     619        66495 :         if (n < lowbound)
     620              :         {
     621          299 :             stateP->resetState = true;
     622              :             /* no point in signaling him ... */
     623          299 :             continue;
     624              :         }
     625              : 
     626              :         /* Track the global minimum nextMsgNum */
     627        66196 :         if (n < min)
     628        14722 :             min = n;
     629              : 
     630              :         /* Also see who's furthest back of the unsignaled backends */
     631        66196 :         if (n < minsig && !stateP->signaled)
     632              :         {
     633         3638 :             minsig = n;
     634         3638 :             needSig = stateP;
     635              :         }
     636              :     }
     637         9614 :     segP->minMsgNum = min;
     638              : 
     639              :     /*
     640              :      * When minMsgNum gets really large, decrement all message counters so as
     641              :      * to forestall overflow of the counters.  This happens seldom enough that
     642              :      * folding it into the previous loop would be a loser.
     643              :      */
     644         9614 :     if (min >= MSGNUMWRAPAROUND)
     645              :     {
     646            0 :         segP->minMsgNum -= MSGNUMWRAPAROUND;
     647            0 :         segP->maxMsgNum -= MSGNUMWRAPAROUND;
     648            0 :         for (i = 0; i < segP->numProcs; i++)
     649            0 :             segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
     650              :     }
     651              : 
     652              :     /*
     653              :      * Determine how many messages are still in the queue, and set the
     654              :      * threshold at which we should repeat SICleanupQueue().
     655              :      */
     656         9614 :     numMsgs = segP->maxMsgNum - segP->minMsgNum;
     657         9614 :     if (numMsgs < CLEANUP_MIN)
     658         3142 :         segP->nextThreshold = CLEANUP_MIN;
     659              :     else
     660         6472 :         segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
     661              : 
     662              :     /*
     663              :      * Lastly, signal anyone who needs a catchup interrupt.  Since
     664              :      * SendProcSignal() might not be fast, we don't want to hold locks while
     665              :      * executing it.
     666              :      */
     667         9614 :     if (needSig)
     668              :     {
     669         3510 :         pid_t       his_pid = needSig->procPid;
     670         3510 :         ProcNumber  his_procNumber = (needSig - &segP->procState[0]);
     671              : 
     672         3510 :         needSig->signaled = true;
     673         3510 :         LWLockRelease(SInvalReadLock);
     674         3510 :         LWLockRelease(SInvalWriteLock);
     675         3510 :         elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
     676         3510 :         SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_procNumber);
     677         3510 :         if (callerHasWriteLock)
     678         2704 :             LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     679              :     }
     680              :     else
     681              :     {
     682         6104 :         LWLockRelease(SInvalReadLock);
     683         6104 :         if (!callerHasWriteLock)
     684         2331 :             LWLockRelease(SInvalWriteLock);
     685              :     }
     686         9614 : }
     687              : 
     688              : 
     689              : /*
     690              :  * GetNextLocalTransactionId --- allocate a new LocalTransactionId
     691              :  *
     692              :  * We split VirtualTransactionIds into two parts so that it is possible
     693              :  * to allocate a new one without any contention for shared memory, except
     694              :  * for a bit of additional overhead during backend startup/shutdown.
     695              :  * The high-order part of a VirtualTransactionId is a ProcNumber, and the
     696              :  * low-order part is a LocalTransactionId, which we assign from a local
     697              :  * counter.  To avoid the risk of a VirtualTransactionId being reused
     698              :  * within a short interval, successive procs occupying the same PGPROC slot
     699              :  * should use a consecutive sequence of local IDs, which is implemented
     700              :  * by copying nextLocalTransactionId as seen above.
     701              :  */
     702              : LocalTransactionId
     703       627105 : GetNextLocalTransactionId(void)
     704              : {
     705              :     LocalTransactionId result;
     706              : 
     707              :     /* loop to avoid returning InvalidLocalTransactionId at wraparound */
     708              :     do
     709              :     {
     710       638293 :         result = nextLocalTransactionId++;
     711       638293 :     } while (!LocalTransactionIdIsValid(result));
     712              : 
     713       627105 :     return result;
     714              : }
        

Generated by: LCOV version 2.0-1