LCOV - code coverage report
Current view: top level - src/backend/storage/ipc - sinvaladt.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 180 188 95.7 %
Date: 2020-06-03 09:06:53 Functions: 10 10 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * sinvaladt.c
       4             :  *    POSTGRES shared cache invalidation data manager.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/storage/ipc/sinvaladt.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : #include "postgres.h"
      16             : 
      17             : #include <signal.h>
      18             : #include <unistd.h>
      19             : 
      20             : #include "access/transam.h"
      21             : #include "miscadmin.h"
      22             : #include "storage/backendid.h"
      23             : #include "storage/ipc.h"
      24             : #include "storage/proc.h"
      25             : #include "storage/procsignal.h"
      26             : #include "storage/shmem.h"
      27             : #include "storage/sinvaladt.h"
      28             : #include "storage/spin.h"
      29             : 
      30             : /*
      31             :  * Conceptually, the shared cache invalidation messages are stored in an
      32             :  * infinite array, where maxMsgNum is the next array subscript to store a
      33             :  * submitted message in, minMsgNum is the smallest array subscript containing
      34             :  * a message not yet read by all backends, and we always have maxMsgNum >=
      35             :  * minMsgNum.  (They are equal when there are no messages pending.)  For each
      36             :  * active backend, there is a nextMsgNum pointer indicating the next message it
      37             :  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
      38             :  * backend.
      39             :  *
      40             :  * (In the current implementation, minMsgNum is a lower bound for the
      41             :  * per-process nextMsgNum values, but it isn't rigorously kept equal to the
      42             :  * smallest nextMsgNum --- it may lag behind.  We only update it when
      43             :  * SICleanupQueue is called, and we try not to do that often.)
      44             :  *
      45             :  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
      46             :  * entries.  We translate MsgNum values into circular-buffer indexes by
      47             :  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
      48             :  * MAXNUMMESSAGES is a constant and a power of 2).  As long as maxMsgNum
      49             :  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
      50             :  * in the buffer.  If the buffer does overflow, we recover by setting the
      51             :  * "reset" flag for each backend that has fallen too far behind.  A backend
      52             :  * that is in "reset" state is ignored while determining minMsgNum.  When
      53             :  * it does finally attempt to receive inval messages, it must discard all
      54             :  * its invalidatable state, since it won't know what it missed.
      55             :  *
      56             :  * To reduce the probability of needing resets, we send a "catchup" interrupt
      57             :  * to any backend that seems to be falling unreasonably far behind.  The
      58             :  * normal behavior is that at most one such interrupt is in flight at a time;
      59             :  * when a backend completes processing a catchup interrupt, it executes
      60             :  * SICleanupQueue, which will signal the next-furthest-behind backend if
      61             :  * needed.  This avoids undue contention from multiple backends all trying
      62             :  * to catch up at once.  However, the furthest-back backend might be stuck
      63             :  * in a state where it can't catch up.  Eventually it will get reset, so it
      64             :  * won't cause any more problems for anyone but itself.  But we don't want
      65             :  * to find that a bunch of other backends are now too close to the reset
      66             :  * threshold to be saved.  So SICleanupQueue is designed to occasionally
      67             :  * send extra catchup interrupts as the queue gets fuller, to backends that
      68             :  * are far behind and haven't gotten one yet.  As long as there aren't a lot
      69             :  * of "stuck" backends, we won't need a lot of extra interrupts, since ones
      70             :  * that aren't stuck will propagate their interrupts to the next guy.
      71             :  *
      72             :  * We would have problems if the MsgNum values overflow an integer, so
      73             :  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
      74             :  * from all the MsgNum variables simultaneously.  MSGNUMWRAPAROUND can be
      75             :  * large so that we don't need to do this often.  It must be a multiple of
      76             :  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
      77             :  * to be moved when we do it.
      78             :  *
      79             :  * Access to the shared sinval array is protected by two locks, SInvalReadLock
      80             :  * and SInvalWriteLock.  Readers take SInvalReadLock in shared mode; this
      81             :  * authorizes them to modify their own ProcState but not to modify or even
      82             :  * look at anyone else's.  When we need to perform array-wide updates,
      83             :  * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
      84             :  * lock out all readers.  Writers take SInvalWriteLock (always in exclusive
      85             :  * mode) to serialize adding messages to the queue.  Note that a writer
      86             :  * can operate in parallel with one or more readers, because the writer
      87             :  * has no need to touch anyone's ProcState, except in the infrequent cases
      88             :  * when SICleanupQueue is needed.  The only point of overlap is that
      89             :  * the writer wants to change maxMsgNum while readers need to read it.
      90             :  * We deal with that by having a spinlock that readers must take for just
      91             :  * long enough to read maxMsgNum, while writers take it for just long enough
      92             :  * to write maxMsgNum.  (The exact rule is that you need the spinlock to
      93             :  * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
      94             :  * spinlock to write maxMsgNum unless you are holding both locks.)
      95             :  *
      96             :  * Note: since maxMsgNum is an int and hence presumably atomically readable/
      97             :  * writable, the spinlock might seem unnecessary.  The reason it is needed
      98             :  * is to provide a memory barrier: we need to be sure that messages written
      99             :  * to the array are actually there before maxMsgNum is increased, and that
     100             :  * readers will see that data after fetching maxMsgNum.  Multiprocessors
     101             :  * that have weak memory-ordering guarantees can fail without the memory
     102             :  * barrier instructions that are included in the spinlock sequences.
     103             :  */
     104             : 
     105             : 
     106             : /*
     107             :  * Configurable parameters.
     108             :  *
     109             :  * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
     110             :  * Must be a power of 2 for speed.
     111             :  *
     112             :  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
     113             :  * Must be a multiple of MAXNUMMESSAGES.  Should be large.
     114             :  *
     115             :  * CLEANUP_MIN: the minimum number of messages that must be in the buffer
     116             :  * before we bother to call SICleanupQueue.
     117             :  *
     118             :  * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
     119             :  * we exceed CLEANUP_MIN.  Should be a power of 2 for speed.
     120             :  *
     121             :  * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
     122             :  * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
     123             :  *
     124             :  * WRITE_QUANTUM: the max number of messages to push into the buffer per
     125             :  * iteration of SIInsertDataEntries.  Noncritical but should be less than
     126             :  * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
     127             :  * per iteration.
     128             :  */
     129             : 
     130             : #define MAXNUMMESSAGES 4096
     131             : #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
     132             : #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
     133             : #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
     134             : #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
     135             : #define WRITE_QUANTUM 64
     136             : 
     137             : /* Per-backend state in shared invalidation structure */
     138             : typedef struct ProcState
     139             : {
     140             :     /* procPid is zero in an inactive ProcState array entry. */
     141             :     pid_t       procPid;        /* PID of backend, for signaling */
     142             :     PGPROC     *proc;           /* PGPROC of backend */
     143             :     /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
     144             :     int         nextMsgNum;     /* next message number to read */
     145             :     bool        resetState;     /* backend needs to reset its state */
     146             :     bool        signaled;       /* backend has been sent catchup signal */
     147             :     bool        hasMessages;    /* backend has unread messages */
     148             : 
     149             :     /*
     150             :      * Backend only sends invalidations, never receives them. This only makes
     151             :      * sense for Startup process during recovery because it doesn't maintain a
     152             :      * relcache, yet it fires inval messages to allow query backends to see
     153             :      * schema changes.
     154             :      */
     155             :     bool        sendOnly;       /* backend only sends, never receives */
     156             : 
     157             :     /*
     158             :      * Next LocalTransactionId to use for each idle backend slot.  We keep
     159             :      * this here because it is indexed by BackendId and it is convenient to
     160             :      * copy the value to and from local memory when MyBackendId is set. It's
     161             :      * meaningless in an active ProcState entry.
     162             :      */
     163             :     LocalTransactionId nextLXID;
     164             : } ProcState;
     165             : 
     166             : /* Shared cache invalidation memory segment */
     167             : typedef struct SISeg
     168             : {
     169             :     /*
     170             :      * General state information
     171             :      */
     172             :     int         minMsgNum;      /* oldest message still needed */
     173             :     int         maxMsgNum;      /* next message number to be assigned */
     174             :     int         nextThreshold;  /* # of messages to call SICleanupQueue */
     175             :     int         lastBackend;    /* index of last active procState entry, +1 */
     176             :     int         maxBackends;    /* size of procState array */
     177             : 
     178             :     slock_t     msgnumLock;     /* spinlock protecting maxMsgNum */
     179             : 
     180             :     /*
     181             :      * Circular buffer holding shared-inval messages
     182             :      */
     183             :     SharedInvalidationMessage buffer[MAXNUMMESSAGES];
     184             : 
     185             :     /*
     186             :      * Per-backend invalidation state info (has MaxBackends entries).
     187             :      */
     188             :     ProcState   procState[FLEXIBLE_ARRAY_MEMBER];
     189             : } SISeg;
     190             : 
     191             : static SISeg *shmInvalBuffer;   /* pointer to the shared inval buffer */
     192             : 
     193             : 
     194             : static LocalTransactionId nextLocalTransactionId;
     195             : 
     196             : static void CleanupInvalidationState(int status, Datum arg);
     197             : 
     198             : 
     199             : /*
     200             :  * SInvalShmemSize --- return shared-memory space needed
     201             :  */
     202             : Size
     203        4344 : SInvalShmemSize(void)
     204             : {
     205             :     Size        size;
     206             : 
     207        4344 :     size = offsetof(SISeg, procState);
     208        4344 :     size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
     209             : 
     210        4344 :     return size;
     211             : }
     212             : 
     213             : /*
     214             :  * CreateSharedInvalidationState
     215             :  *      Create and initialize the SI message buffer
     216             :  */
     217             : void
     218        2170 : CreateSharedInvalidationState(void)
     219             : {
     220             :     int         i;
     221             :     bool        found;
     222             : 
     223             :     /* Allocate space in shared memory */
     224        2170 :     shmInvalBuffer = (SISeg *)
     225        2170 :         ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
     226        2170 :     if (found)
     227           0 :         return;
     228             : 
     229             :     /* Clear message counters, save size of procState array, init spinlock */
     230        2170 :     shmInvalBuffer->minMsgNum = 0;
     231        2170 :     shmInvalBuffer->maxMsgNum = 0;
     232        2170 :     shmInvalBuffer->nextThreshold = CLEANUP_MIN;
     233        2170 :     shmInvalBuffer->lastBackend = 0;
     234        2170 :     shmInvalBuffer->maxBackends = MaxBackends;
     235        2170 :     SpinLockInit(&shmInvalBuffer->msgnumLock);
     236             : 
     237             :     /* The buffer[] array is initially all unused, so we need not fill it */
     238             : 
     239             :     /* Mark all backends inactive, and initialize nextLXID */
     240      230366 :     for (i = 0; i < shmInvalBuffer->maxBackends; i++)
     241             :     {
     242      228196 :         shmInvalBuffer->procState[i].procPid = 0;    /* inactive */
     243      228196 :         shmInvalBuffer->procState[i].proc = NULL;
     244      228196 :         shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
     245      228196 :         shmInvalBuffer->procState[i].resetState = false;
     246      228196 :         shmInvalBuffer->procState[i].signaled = false;
     247      228196 :         shmInvalBuffer->procState[i].hasMessages = false;
     248      228196 :         shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
     249             :     }
     250             : }
     251             : 
     252             : /*
     253             :  * SharedInvalBackendInit
     254             :  *      Initialize a new backend to operate on the sinval buffer
     255             :  */
     256             : void
     257       11134 : SharedInvalBackendInit(bool sendOnly)
     258             : {
     259             :     int         index;
     260       11134 :     ProcState  *stateP = NULL;
     261       11134 :     SISeg      *segP = shmInvalBuffer;
     262             : 
     263             :     /*
     264             :      * This can run in parallel with read operations, but not with write
     265             :      * operations, since SIInsertDataEntries relies on lastBackend to set
     266             :      * hasMessages appropriately.
     267             :      */
     268       11134 :     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     269             : 
     270             :     /* Look for a free entry in the procState array */
     271       40070 :     for (index = 0; index < segP->lastBackend; index++)
     272             :     {
     273       30338 :         if (segP->procState[index].procPid == 0) /* inactive slot? */
     274             :         {
     275        1402 :             stateP = &segP->procState[index];
     276        1402 :             break;
     277             :         }
     278             :     }
     279             : 
     280       11134 :     if (stateP == NULL)
     281             :     {
     282        9732 :         if (segP->lastBackend < segP->maxBackends)
     283             :         {
     284        9732 :             stateP = &segP->procState[segP->lastBackend];
     285             :             Assert(stateP->procPid == 0);
     286        9732 :             segP->lastBackend++;
     287             :         }
     288             :         else
     289             :         {
     290             :             /*
     291             :              * out of procState slots: MaxBackends exceeded -- report normally
     292             :              */
     293           0 :             MyBackendId = InvalidBackendId;
     294           0 :             LWLockRelease(SInvalWriteLock);
     295           0 :             ereport(FATAL,
     296             :                     (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
     297             :                      errmsg("sorry, too many clients already")));
     298             :         }
     299             :     }
     300             : 
     301       11134 :     MyBackendId = (stateP - &segP->procState[0]) + 1;
     302             : 
     303             :     /* Advertise assigned backend ID in MyProc */
     304       11134 :     MyProc->backendId = MyBackendId;
     305             : 
     306             :     /* Fetch next local transaction ID into local memory */
     307       11134 :     nextLocalTransactionId = stateP->nextLXID;
     308             : 
     309             :     /* mark myself active, with all extant messages already read */
     310       11134 :     stateP->procPid = MyProcPid;
     311       11134 :     stateP->proc = MyProc;
     312       11134 :     stateP->nextMsgNum = segP->maxMsgNum;
     313       11134 :     stateP->resetState = false;
     314       11134 :     stateP->signaled = false;
     315       11134 :     stateP->hasMessages = false;
     316       11134 :     stateP->sendOnly = sendOnly;
     317             : 
     318       11134 :     LWLockRelease(SInvalWriteLock);
     319             : 
     320             :     /* register exit routine to mark my entry inactive at exit */
     321       11134 :     on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
     322             : 
     323       11134 :     elog(DEBUG4, "my backend ID is %d", MyBackendId);
     324       11134 : }
     325             : 
     326             : /*
     327             :  * CleanupInvalidationState
     328             :  *      Mark the current backend as no longer active.
     329             :  *
     330             :  * This function is called via on_shmem_exit() during backend shutdown.
     331             :  *
     332             :  * arg is really of type "SISeg*".
     333             :  */
     334             : static void
     335       11134 : CleanupInvalidationState(int status, Datum arg)
     336             : {
     337       11134 :     SISeg      *segP = (SISeg *) DatumGetPointer(arg);
     338             :     ProcState  *stateP;
     339             :     int         i;
     340             : 
     341             :     Assert(PointerIsValid(segP));
     342             : 
     343       11134 :     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     344             : 
     345       11134 :     stateP = &segP->procState[MyBackendId - 1];
     346             : 
     347             :     /* Update next local transaction ID for next holder of this backendID */
     348       11134 :     stateP->nextLXID = nextLocalTransactionId;
     349             : 
     350             :     /* Mark myself inactive */
     351       11134 :     stateP->procPid = 0;
     352       11134 :     stateP->proc = NULL;
     353       11134 :     stateP->nextMsgNum = 0;
     354       11134 :     stateP->resetState = false;
     355       11134 :     stateP->signaled = false;
     356             : 
     357             :     /* Recompute index of last active backend */
     358       20848 :     for (i = segP->lastBackend; i > 0; i--)
     359             :     {
     360       19658 :         if (segP->procState[i - 1].procPid != 0)
     361        9944 :             break;
     362             :     }
     363       11134 :     segP->lastBackend = i;
     364             : 
     365       11134 :     LWLockRelease(SInvalWriteLock);
     366       11134 : }
     367             : 
     368             : /*
     369             :  * BackendIdGetProc
     370             :  *      Get the PGPROC structure for a backend, given the backend ID.
     371             :  *      The result may be out of date arbitrarily quickly, so the caller
     372             :  *      must be careful about how this information is used.  NULL is
     373             :  *      returned if the backend is not active.
     374             :  */
     375             : PGPROC *
     376         132 : BackendIdGetProc(int backendID)
     377             : {
     378         132 :     PGPROC     *result = NULL;
     379         132 :     SISeg      *segP = shmInvalBuffer;
     380             : 
     381             :     /* Need to lock out additions/removals of backends */
     382         132 :     LWLockAcquire(SInvalWriteLock, LW_SHARED);
     383             : 
     384         132 :     if (backendID > 0 && backendID <= segP->lastBackend)
     385             :     {
     386         132 :         ProcState  *stateP = &segP->procState[backendID - 1];
     387             : 
     388         132 :         result = stateP->proc;
     389             :     }
     390             : 
     391         132 :     LWLockRelease(SInvalWriteLock);
     392             : 
     393         132 :     return result;
     394             : }
     395             : 
     396             : /*
     397             :  * BackendIdGetTransactionIds
     398             :  *      Get the xid and xmin of the backend. The result may be out of date
     399             :  *      arbitrarily quickly, so the caller must be careful about how this
     400             :  *      information is used.
     401             :  */
     402             : void
     403        2246 : BackendIdGetTransactionIds(int backendID, TransactionId *xid, TransactionId *xmin)
     404             : {
     405        2246 :     SISeg      *segP = shmInvalBuffer;
     406             : 
     407        2246 :     *xid = InvalidTransactionId;
     408        2246 :     *xmin = InvalidTransactionId;
     409             : 
     410             :     /* Need to lock out additions/removals of backends */
     411        2246 :     LWLockAcquire(SInvalWriteLock, LW_SHARED);
     412             : 
     413        2246 :     if (backendID > 0 && backendID <= segP->lastBackend)
     414             :     {
     415        1332 :         ProcState  *stateP = &segP->procState[backendID - 1];
     416        1332 :         PGPROC     *proc = stateP->proc;
     417             : 
     418        1332 :         if (proc != NULL)
     419             :         {
     420        1332 :             PGXACT     *xact = &ProcGlobal->allPgXact[proc->pgprocno];
     421             : 
     422        1332 :             *xid = xact->xid;
     423        1332 :             *xmin = xact->xmin;
     424             :         }
     425             :     }
     426             : 
     427        2246 :     LWLockRelease(SInvalWriteLock);
     428        2246 : }
     429             : 
     430             : /*
     431             :  * SIInsertDataEntries
     432             :  *      Add new invalidation message(s) to the buffer.
     433             :  */
     434             : void
     435     1122450 : SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
     436             : {
     437     1122450 :     SISeg      *segP = shmInvalBuffer;
     438             : 
     439             :     /*
     440             :      * N can be arbitrarily large.  We divide the work into groups of no more
     441             :      * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
     442             :      * an unreasonably long time.  (This is not so much because we care about
     443             :      * letting in other writers, as that some just-caught-up backend might be
     444             :      * trying to do SICleanupQueue to pass on its signal, and we don't want it
     445             :      * to have to wait a long time.)  Also, we need to consider calling
     446             :      * SICleanupQueue every so often.
     447             :      */
     448     2258838 :     while (n > 0)
     449             :     {
     450     1136388 :         int         nthistime = Min(n, WRITE_QUANTUM);
     451             :         int         numMsgs;
     452             :         int         max;
     453             :         int         i;
     454             : 
     455     1136388 :         n -= nthistime;
     456             : 
     457     1136388 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     458             : 
     459             :         /*
     460             :          * If the buffer is full, we *must* acquire some space.  Clean the
     461             :          * queue and reset anyone who is preventing space from being freed.
     462             :          * Otherwise, clean the queue only when it's exceeded the next
     463             :          * fullness threshold.  We have to loop and recheck the buffer state
     464             :          * after any call of SICleanupQueue.
     465             :          */
     466             :         for (;;)
     467             :         {
     468        6818 :             numMsgs = segP->maxMsgNum - segP->minMsgNum;
     469     1143206 :             if (numMsgs + nthistime > MAXNUMMESSAGES ||
     470     1143028 :                 numMsgs >= segP->nextThreshold)
     471        6818 :                 SICleanupQueue(true, nthistime);
     472             :             else
     473             :                 break;
     474             :         }
     475             : 
     476             :         /*
     477             :          * Insert new message(s) into proper slot of circular buffer
     478             :          */
     479     1136388 :         max = segP->maxMsgNum;
     480     6820254 :         while (nthistime-- > 0)
     481             :         {
     482     5683866 :             segP->buffer[max % MAXNUMMESSAGES] = *data++;
     483     5683866 :             max++;
     484             :         }
     485             : 
     486             :         /* Update current value of maxMsgNum using spinlock */
     487     1136388 :         SpinLockAcquire(&segP->msgnumLock);
     488     1136388 :         segP->maxMsgNum = max;
     489     1136388 :         SpinLockRelease(&segP->msgnumLock);
     490             : 
     491             :         /*
     492             :          * Now that the maxMsgNum change is globally visible, we give everyone
     493             :          * a swift kick to make sure they read the newly added messages.
     494             :          * Releasing SInvalWriteLock will enforce a full memory barrier, so
     495             :          * these (unlocked) changes will be committed to memory before we exit
     496             :          * the function.
     497             :          */
     498     5156436 :         for (i = 0; i < segP->lastBackend; i++)
     499             :         {
     500     4020048 :             ProcState  *stateP = &segP->procState[i];
     501             : 
     502     4020048 :             stateP->hasMessages = true;
     503             :         }
     504             : 
     505     1136388 :         LWLockRelease(SInvalWriteLock);
     506             :     }
     507     1122450 : }
     508             : 
     509             : /*
     510             :  * SIGetDataEntries
     511             :  *      get next SI message(s) for current backend, if there are any
     512             :  *
     513             :  * Possible return values:
     514             :  *  0:   no SI message available
     515             :  *  n>0: next n SI messages have been extracted into data[]
     516             :  * -1:   SI reset message extracted
     517             :  *
     518             :  * If the return value is less than the array size "datasize", the caller
     519             :  * can assume that there are no more SI messages after the one(s) returned.
     520             :  * Otherwise, another call is needed to collect more messages.
     521             :  *
     522             :  * NB: this can run in parallel with other instances of SIGetDataEntries
     523             :  * executing on behalf of other backends, since each instance will modify only
     524             :  * fields of its own backend's ProcState, and no instance will look at fields
     525             :  * of other backends' ProcStates.  We express this by grabbing SInvalReadLock
     526             :  * in shared mode.  Note that this is not exactly the normal (read-only)
     527             :  * interpretation of a shared lock! Look closely at the interactions before
     528             :  * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
     529             :  *
     530             :  * NB: this can also run in parallel with SIInsertDataEntries.  It is not
     531             :  * guaranteed that we will return any messages added after the routine is
     532             :  * entered.
     533             :  *
     534             :  * Note: we assume that "datasize" is not so large that it might be important
     535             :  * to break our hold on SInvalReadLock into segments.
     536             :  */
     537             : int
     538    28383508 : SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
     539             : {
     540             :     SISeg      *segP;
     541             :     ProcState  *stateP;
     542             :     int         max;
     543             :     int         n;
     544             : 
     545    28383508 :     segP = shmInvalBuffer;
     546    28383508 :     stateP = &segP->procState[MyBackendId - 1];
     547             : 
     548             :     /*
     549             :      * Before starting to take locks, do a quick, unlocked test to see whether
     550             :      * there can possibly be anything to read.  On a multiprocessor system,
     551             :      * it's possible that this load could migrate backwards and occur before
     552             :      * we actually enter this function, so we might miss a sinval message that
     553             :      * was just added by some other processor.  But they can't migrate
     554             :      * backwards over a preceding lock acquisition, so it should be OK.  If we
     555             :      * haven't acquired a lock preventing against further relevant
     556             :      * invalidations, any such occurrence is not much different than if the
     557             :      * invalidation had arrived slightly later in the first place.
     558             :      */
     559    28383508 :     if (!stateP->hasMessages)
     560    27540242 :         return 0;
     561             : 
     562      843266 :     LWLockAcquire(SInvalReadLock, LW_SHARED);
     563             : 
     564             :     /*
     565             :      * We must reset hasMessages before determining how many messages we're
     566             :      * going to read.  That way, if new messages arrive after we have
     567             :      * determined how many we're reading, the flag will get reset and we'll
     568             :      * notice those messages part-way through.
     569             :      *
     570             :      * Note that, if we don't end up reading all of the messages, we had
     571             :      * better be certain to reset this flag before exiting!
     572             :      */
     573      843266 :     stateP->hasMessages = false;
     574             : 
     575             :     /* Fetch current value of maxMsgNum using spinlock */
     576      843266 :     SpinLockAcquire(&segP->msgnumLock);
     577      843266 :     max = segP->maxMsgNum;
     578      843266 :     SpinLockRelease(&segP->msgnumLock);
     579             : 
     580      843266 :     if (stateP->resetState)
     581             :     {
     582             :         /*
     583             :          * Force reset.  We can say we have dealt with any messages added
     584             :          * since the reset, as well; and that means we should clear the
     585             :          * signaled flag, too.
     586             :          */
     587         218 :         stateP->nextMsgNum = max;
     588         218 :         stateP->resetState = false;
     589         218 :         stateP->signaled = false;
     590         218 :         LWLockRelease(SInvalReadLock);
     591         218 :         return -1;
     592             :     }
     593             : 
     594             :     /*
     595             :      * Retrieve messages and advance backend's counter, until data array is
     596             :      * full or there are no more messages.
     597             :      *
     598             :      * There may be other backends that haven't read the message(s), so we
     599             :      * cannot delete them here.  SICleanupQueue() will eventually remove them
     600             :      * from the queue.
     601             :      */
     602      843048 :     n = 0;
     603    19855500 :     while (n < datasize && stateP->nextMsgNum < max)
     604             :     {
     605    19012452 :         data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
     606    19012452 :         stateP->nextMsgNum++;
     607             :     }
     608             : 
     609             :     /*
     610             :      * If we have caught up completely, reset our "signaled" flag so that
     611             :      * we'll get another signal if we fall behind again.
     612             :      *
     613             :      * If we haven't caught up completely, reset the hasMessages flag so that
     614             :      * we see the remaining messages next time.
     615             :      */
     616      843048 :     if (stateP->nextMsgNum >= max)
     617      347278 :         stateP->signaled = false;
     618             :     else
     619      495770 :         stateP->hasMessages = true;
     620             : 
     621      843048 :     LWLockRelease(SInvalReadLock);
     622      843048 :     return n;
     623             : }
     624             : 
     625             : /*
     626             :  * SICleanupQueue
     627             :  *      Remove messages that have been consumed by all active backends
     628             :  *
     629             :  * callerHasWriteLock is true if caller is holding SInvalWriteLock.
     630             :  * minFree is the minimum number of message slots to make free.
     631             :  *
     632             :  * Possible side effects of this routine include marking one or more
     633             :  * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
     634             :  * to some backend that seems to be getting too far behind.  We signal at
     635             :  * most one backend at a time, for reasons explained at the top of the file.
     636             :  *
     637             :  * Caution: because we transiently release write lock when we have to signal
     638             :  * some other backend, it is NOT guaranteed that there are still minFree
     639             :  * free message slots at exit.  Caller must recheck and perhaps retry.
     640             :  */
     641             : void
     642        9866 : SICleanupQueue(bool callerHasWriteLock, int minFree)
     643             : {
     644        9866 :     SISeg      *segP = shmInvalBuffer;
     645             :     int         min,
     646             :                 minsig,
     647             :                 lowbound,
     648             :                 numMsgs,
     649             :                 i;
     650        9866 :     ProcState  *needSig = NULL;
     651             : 
     652             :     /* Lock out all writers and readers */
     653        9866 :     if (!callerHasWriteLock)
     654        3048 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     655        9866 :     LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
     656             : 
     657             :     /*
     658             :      * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
     659             :      * furthest-back backend that needs signaling (if any), and reset any
     660             :      * backends that are too far back.  Note that because we ignore sendOnly
     661             :      * backends here it is possible for them to keep sending messages without
     662             :      * a problem even when they are the only active backend.
     663             :      */
     664        9866 :     min = segP->maxMsgNum;
     665        9866 :     minsig = min - SIG_THRESHOLD;
     666        9866 :     lowbound = min - MAXNUMMESSAGES + minFree;
     667             : 
     668       74448 :     for (i = 0; i < segP->lastBackend; i++)
     669             :     {
     670       64582 :         ProcState  *stateP = &segP->procState[i];
     671       64582 :         int         n = stateP->nextMsgNum;
     672             : 
     673             :         /* Ignore if inactive or already in reset state */
     674       64582 :         if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
     675        7624 :             continue;
     676             : 
     677             :         /*
     678             :          * If we must free some space and this backend is preventing it, force
     679             :          * him into reset state and then ignore until he catches up.
     680             :          */
     681       56958 :         if (n < lowbound)
     682             :         {
     683         218 :             stateP->resetState = true;
     684             :             /* no point in signaling him ... */
     685         218 :             continue;
     686             :         }
     687             : 
     688             :         /* Track the global minimum nextMsgNum */
     689       56740 :         if (n < min)
     690       14294 :             min = n;
     691             : 
     692             :         /* Also see who's furthest back of the unsignaled backends */
     693       56740 :         if (n < minsig && !stateP->signaled)
     694             :         {
     695        3122 :             minsig = n;
     696        3122 :             needSig = stateP;
     697             :         }
     698             :     }
     699        9866 :     segP->minMsgNum = min;
     700             : 
     701             :     /*
     702             :      * When minMsgNum gets really large, decrement all message counters so as
     703             :      * to forestall overflow of the counters.  This happens seldom enough that
     704             :      * folding it into the previous loop would be a loser.
     705             :      */
     706        9866 :     if (min >= MSGNUMWRAPAROUND)
     707             :     {
     708           0 :         segP->minMsgNum -= MSGNUMWRAPAROUND;
     709           0 :         segP->maxMsgNum -= MSGNUMWRAPAROUND;
     710           0 :         for (i = 0; i < segP->lastBackend; i++)
     711             :         {
     712             :             /* we don't bother skipping inactive entries here */
     713           0 :             segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
     714             :         }
     715             :     }
     716             : 
     717             :     /*
     718             :      * Determine how many messages are still in the queue, and set the
     719             :      * threshold at which we should repeat SICleanupQueue().
     720             :      */
     721        9866 :     numMsgs = segP->maxMsgNum - segP->minMsgNum;
     722        9866 :     if (numMsgs < CLEANUP_MIN)
     723        3386 :         segP->nextThreshold = CLEANUP_MIN;
     724             :     else
     725        6480 :         segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
     726             : 
     727             :     /*
     728             :      * Lastly, signal anyone who needs a catchup interrupt.  Since
     729             :      * SendProcSignal() might not be fast, we don't want to hold locks while
     730             :      * executing it.
     731             :      */
     732        9866 :     if (needSig)
     733             :     {
     734        3076 :         pid_t       his_pid = needSig->procPid;
     735        3076 :         BackendId   his_backendId = (needSig - &segP->procState[0]) + 1;
     736             : 
     737        3076 :         needSig->signaled = true;
     738        3076 :         LWLockRelease(SInvalReadLock);
     739        3076 :         LWLockRelease(SInvalWriteLock);
     740        3076 :         elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
     741        3076 :         SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
     742        3076 :         if (callerHasWriteLock)
     743        2352 :             LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     744             :     }
     745             :     else
     746             :     {
     747        6790 :         LWLockRelease(SInvalReadLock);
     748        6790 :         if (!callerHasWriteLock)
     749        2324 :             LWLockRelease(SInvalWriteLock);
     750             :     }
     751        9866 : }
     752             : 
     753             : 
     754             : /*
     755             :  * GetNextLocalTransactionId --- allocate a new LocalTransactionId
     756             :  *
     757             :  * We split VirtualTransactionIds into two parts so that it is possible
     758             :  * to allocate a new one without any contention for shared memory, except
     759             :  * for a bit of additional overhead during backend startup/shutdown.
     760             :  * The high-order part of a VirtualTransactionId is a BackendId, and the
     761             :  * low-order part is a LocalTransactionId, which we assign from a local
     762             :  * counter.  To avoid the risk of a VirtualTransactionId being reused
     763             :  * within a short interval, successive procs occupying the same backend ID
     764             :  * slot should use a consecutive sequence of local IDs, which is implemented
     765             :  * by copying nextLocalTransactionId as seen above.
     766             :  */
     767             : LocalTransactionId
     768      497854 : GetNextLocalTransactionId(void)
     769             : {
     770             :     LocalTransactionId result;
     771             : 
     772             :     /* loop to avoid returning InvalidLocalTransactionId at wraparound */
     773             :     do
     774             :     {
     775      497854 :         result = nextLocalTransactionId++;
     776      497854 :     } while (!LocalTransactionIdIsValid(result));
     777             : 
     778      495302 :     return result;
     779             : }

Generated by: LCOV version 1.13