LCOV - code coverage report
Current view: top level - src/backend/storage/ipc - sinvaladt.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 180 188 95.7 %
Date: 2019-09-19 02:07:14 Functions: 10 10 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * sinvaladt.c
       4             :  *    POSTGRES shared cache invalidation data manager.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/storage/ipc/sinvaladt.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : #include "postgres.h"
      16             : 
      17             : #include <signal.h>
      18             : #include <unistd.h>
      19             : 
      20             : #include "miscadmin.h"
      21             : #include "storage/backendid.h"
      22             : #include "storage/ipc.h"
      23             : #include "storage/proc.h"
      24             : #include "storage/procsignal.h"
      25             : #include "storage/shmem.h"
      26             : #include "storage/sinvaladt.h"
      27             : #include "storage/spin.h"
      28             : #include "access/transam.h"
      29             : 
      30             : 
      31             : /*
      32             :  * Conceptually, the shared cache invalidation messages are stored in an
      33             :  * infinite array, where maxMsgNum is the next array subscript to store a
      34             :  * submitted message in, minMsgNum is the smallest array subscript containing
      35             :  * a message not yet read by all backends, and we always have maxMsgNum >=
      36             :  * minMsgNum.  (They are equal when there are no messages pending.)  For each
      37             :  * active backend, there is a nextMsgNum pointer indicating the next message it
      38             :  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
      39             :  * backend.
      40             :  *
      41             :  * (In the current implementation, minMsgNum is a lower bound for the
      42             :  * per-process nextMsgNum values, but it isn't rigorously kept equal to the
      43             :  * smallest nextMsgNum --- it may lag behind.  We only update it when
      44             :  * SICleanupQueue is called, and we try not to do that often.)
      45             :  *
      46             :  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
      47             :  * entries.  We translate MsgNum values into circular-buffer indexes by
      48             :  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
      49             :  * MAXNUMMESSAGES is a constant and a power of 2).  As long as maxMsgNum
      50             :  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
      51             :  * in the buffer.  If the buffer does overflow, we recover by setting the
      52             :  * "reset" flag for each backend that has fallen too far behind.  A backend
      53             :  * that is in "reset" state is ignored while determining minMsgNum.  When
      54             :  * it does finally attempt to receive inval messages, it must discard all
      55             :  * its invalidatable state, since it won't know what it missed.
      56             :  *
      57             :  * To reduce the probability of needing resets, we send a "catchup" interrupt
      58             :  * to any backend that seems to be falling unreasonably far behind.  The
      59             :  * normal behavior is that at most one such interrupt is in flight at a time;
      60             :  * when a backend completes processing a catchup interrupt, it executes
      61             :  * SICleanupQueue, which will signal the next-furthest-behind backend if
      62             :  * needed.  This avoids undue contention from multiple backends all trying
      63             :  * to catch up at once.  However, the furthest-back backend might be stuck
      64             :  * in a state where it can't catch up.  Eventually it will get reset, so it
      65             :  * won't cause any more problems for anyone but itself.  But we don't want
      66             :  * to find that a bunch of other backends are now too close to the reset
      67             :  * threshold to be saved.  So SICleanupQueue is designed to occasionally
      68             :  * send extra catchup interrupts as the queue gets fuller, to backends that
      69             :  * are far behind and haven't gotten one yet.  As long as there aren't a lot
      70             :  * of "stuck" backends, we won't need a lot of extra interrupts, since ones
      71             :  * that aren't stuck will propagate their interrupts to the next guy.
      72             :  *
      73             :  * We would have problems if the MsgNum values overflow an integer, so
      74             :  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
      75             :  * from all the MsgNum variables simultaneously.  MSGNUMWRAPAROUND can be
      76             :  * large so that we don't need to do this often.  It must be a multiple of
      77             :  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
      78             :  * to be moved when we do it.
      79             :  *
      80             :  * Access to the shared sinval array is protected by two locks, SInvalReadLock
      81             :  * and SInvalWriteLock.  Readers take SInvalReadLock in shared mode; this
      82             :  * authorizes them to modify their own ProcState but not to modify or even
      83             :  * look at anyone else's.  When we need to perform array-wide updates,
      84             :  * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
      85             :  * lock out all readers.  Writers take SInvalWriteLock (always in exclusive
      86             :  * mode) to serialize adding messages to the queue.  Note that a writer
      87             :  * can operate in parallel with one or more readers, because the writer
      88             :  * has no need to touch anyone's ProcState, except in the infrequent cases
      89             :  * when SICleanupQueue is needed.  The only point of overlap is that
      90             :  * the writer wants to change maxMsgNum while readers need to read it.
      91             :  * We deal with that by having a spinlock that readers must take for just
      92             :  * long enough to read maxMsgNum, while writers take it for just long enough
      93             :  * to write maxMsgNum.  (The exact rule is that you need the spinlock to
      94             :  * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
      95             :  * spinlock to write maxMsgNum unless you are holding both locks.)
      96             :  *
      97             :  * Note: since maxMsgNum is an int and hence presumably atomically readable/
      98             :  * writable, the spinlock might seem unnecessary.  The reason it is needed
      99             :  * is to provide a memory barrier: we need to be sure that messages written
     100             :  * to the array are actually there before maxMsgNum is increased, and that
     101             :  * readers will see that data after fetching maxMsgNum.  Multiprocessors
     102             :  * that have weak memory-ordering guarantees can fail without the memory
     103             :  * barrier instructions that are included in the spinlock sequences.
     104             :  */
     105             : 
     106             : 
     107             : /*
     108             :  * Configurable parameters.
     109             :  *
     110             :  * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
     111             :  * Must be a power of 2 for speed.
     112             :  *
     113             :  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
     114             :  * Must be a multiple of MAXNUMMESSAGES.  Should be large.
     115             :  *
     116             :  * CLEANUP_MIN: the minimum number of messages that must be in the buffer
     117             :  * before we bother to call SICleanupQueue.
     118             :  *
     119             :  * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
     120             :  * we exceed CLEANUP_MIN.  Should be a power of 2 for speed.
     121             :  *
     122             :  * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
     123             :  * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
     124             :  *
     125             :  * WRITE_QUANTUM: the max number of messages to push into the buffer per
     126             :  * iteration of SIInsertDataEntries.  Noncritical but should be less than
     127             :  * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
     128             :  * per iteration.
     129             :  */
     130             : 
     131             : #define MAXNUMMESSAGES 4096
     132             : #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
     133             : #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
     134             : #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
     135             : #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
     136             : #define WRITE_QUANTUM 64
     137             : 
     138             : /* Per-backend state in shared invalidation structure */
     139             : typedef struct ProcState
     140             : {
     141             :     /* procPid is zero in an inactive ProcState array entry. */
     142             :     pid_t       procPid;        /* PID of backend, for signaling */
     143             :     PGPROC     *proc;           /* PGPROC of backend */
     144             :     /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
     145             :     int         nextMsgNum;     /* next message number to read */
     146             :     bool        resetState;     /* backend needs to reset its state */
     147             :     bool        signaled;       /* backend has been sent catchup signal */
     148             :     bool        hasMessages;    /* backend has unread messages */
     149             : 
     150             :     /*
     151             :      * Backend only sends invalidations, never receives them. This only makes
     152             :      * sense for Startup process during recovery because it doesn't maintain a
     153             :      * relcache, yet it fires inval messages to allow query backends to see
     154             :      * schema changes.
     155             :      */
     156             :     bool        sendOnly;       /* backend only sends, never receives */
     157             : 
     158             :     /*
     159             :      * Next LocalTransactionId to use for each idle backend slot.  We keep
     160             :      * this here because it is indexed by BackendId and it is convenient to
     161             :      * copy the value to and from local memory when MyBackendId is set. It's
     162             :      * meaningless in an active ProcState entry.
     163             :      */
     164             :     LocalTransactionId nextLXID;
     165             : } ProcState;
     166             : 
     167             : /* Shared cache invalidation memory segment */
     168             : typedef struct SISeg
     169             : {
     170             :     /*
     171             :      * General state information
     172             :      */
     173             :     int         minMsgNum;      /* oldest message still needed */
     174             :     int         maxMsgNum;      /* next message number to be assigned */
     175             :     int         nextThreshold;  /* # of messages to call SICleanupQueue */
     176             :     int         lastBackend;    /* index of last active procState entry, +1 */
     177             :     int         maxBackends;    /* size of procState array */
     178             : 
     179             :     slock_t     msgnumLock;     /* spinlock protecting maxMsgNum */
     180             : 
     181             :     /*
     182             :      * Circular buffer holding shared-inval messages
     183             :      */
     184             :     SharedInvalidationMessage buffer[MAXNUMMESSAGES];
     185             : 
     186             :     /*
     187             :      * Per-backend invalidation state info (has MaxBackends entries).
     188             :      */
     189             :     ProcState   procState[FLEXIBLE_ARRAY_MEMBER];
     190             : } SISeg;
     191             : 
     192             : static SISeg *shmInvalBuffer;   /* pointer to the shared inval buffer */
     193             : 
     194             : 
     195             : static LocalTransactionId nextLocalTransactionId;
     196             : 
     197             : static void CleanupInvalidationState(int status, Datum arg);
     198             : 
     199             : 
     200             : /*
     201             :  * SInvalShmemSize --- return shared-memory space needed
     202             :  */
     203             : Size
     204        3716 : SInvalShmemSize(void)
     205             : {
     206             :     Size        size;
     207             : 
     208        3716 :     size = offsetof(SISeg, procState);
     209        3716 :     size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
     210             : 
     211        3716 :     return size;
     212             : }
     213             : 
     214             : /*
     215             :  * CreateSharedInvalidationState
     216             :  *      Create and initialize the SI message buffer
     217             :  */
     218             : void
     219        1856 : CreateSharedInvalidationState(void)
     220             : {
     221             :     int         i;
     222             :     bool        found;
     223             : 
     224             :     /* Allocate space in shared memory */
     225        1856 :     shmInvalBuffer = (SISeg *)
     226        1856 :         ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
     227        1856 :     if (found)
     228           0 :         return;
     229             : 
     230             :     /* Clear message counters, save size of procState array, init spinlock */
     231        1856 :     shmInvalBuffer->minMsgNum = 0;
     232        1856 :     shmInvalBuffer->maxMsgNum = 0;
     233        1856 :     shmInvalBuffer->nextThreshold = CLEANUP_MIN;
     234        1856 :     shmInvalBuffer->lastBackend = 0;
     235        1856 :     shmInvalBuffer->maxBackends = MaxBackends;
     236        1856 :     SpinLockInit(&shmInvalBuffer->msgnumLock);
     237             : 
     238             :     /* The buffer[] array is initially all unused, so we need not fill it */
     239             : 
     240             :     /* Mark all backends inactive, and initialize nextLXID */
     241      200124 :     for (i = 0; i < shmInvalBuffer->maxBackends; i++)
     242             :     {
     243      198268 :         shmInvalBuffer->procState[i].procPid = 0;    /* inactive */
     244      198268 :         shmInvalBuffer->procState[i].proc = NULL;
     245      198268 :         shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
     246      198268 :         shmInvalBuffer->procState[i].resetState = false;
     247      198268 :         shmInvalBuffer->procState[i].signaled = false;
     248      198268 :         shmInvalBuffer->procState[i].hasMessages = false;
     249      198268 :         shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
     250             :     }
     251             : }
     252             : 
     253             : /*
     254             :  * SharedInvalBackendInit
     255             :  *      Initialize a new backend to operate on the sinval buffer
     256             :  */
     257             : void
     258        9932 : SharedInvalBackendInit(bool sendOnly)
     259             : {
     260             :     int         index;
     261        9932 :     ProcState  *stateP = NULL;
     262        9932 :     SISeg      *segP = shmInvalBuffer;
     263             : 
     264             :     /*
     265             :      * This can run in parallel with read operations, but not with write
     266             :      * operations, since SIInsertDataEntries relies on lastBackend to set
     267             :      * hasMessages appropriately.
     268             :      */
     269        9932 :     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     270             : 
     271             :     /* Look for a free entry in the procState array */
     272       35364 :     for (index = 0; index < segP->lastBackend; index++)
     273             :     {
     274       26888 :         if (segP->procState[index].procPid == 0) /* inactive slot? */
     275             :         {
     276        1456 :             stateP = &segP->procState[index];
     277        1456 :             break;
     278             :         }
     279             :     }
     280             : 
     281        9932 :     if (stateP == NULL)
     282             :     {
     283        8476 :         if (segP->lastBackend < segP->maxBackends)
     284             :         {
     285        8476 :             stateP = &segP->procState[segP->lastBackend];
     286             :             Assert(stateP->procPid == 0);
     287        8476 :             segP->lastBackend++;
     288             :         }
     289             :         else
     290             :         {
     291             :             /*
     292             :              * out of procState slots: MaxBackends exceeded -- report normally
     293             :              */
     294           0 :             MyBackendId = InvalidBackendId;
     295           0 :             LWLockRelease(SInvalWriteLock);
     296           0 :             ereport(FATAL,
     297             :                     (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
     298             :                      errmsg("sorry, too many clients already")));
     299             :         }
     300             :     }
     301             : 
     302        9932 :     MyBackendId = (stateP - &segP->procState[0]) + 1;
     303             : 
     304             :     /* Advertise assigned backend ID in MyProc */
     305        9932 :     MyProc->backendId = MyBackendId;
     306             : 
     307             :     /* Fetch next local transaction ID into local memory */
     308        9932 :     nextLocalTransactionId = stateP->nextLXID;
     309             : 
     310             :     /* mark myself active, with all extant messages already read */
     311        9932 :     stateP->procPid = MyProcPid;
     312        9932 :     stateP->proc = MyProc;
     313        9932 :     stateP->nextMsgNum = segP->maxMsgNum;
     314        9932 :     stateP->resetState = false;
     315        9932 :     stateP->signaled = false;
     316        9932 :     stateP->hasMessages = false;
     317        9932 :     stateP->sendOnly = sendOnly;
     318             : 
     319        9932 :     LWLockRelease(SInvalWriteLock);
     320             : 
     321             :     /* register exit routine to mark my entry inactive at exit */
     322        9932 :     on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
     323             : 
     324        9932 :     elog(DEBUG4, "my backend ID is %d", MyBackendId);
     325        9932 : }
     326             : 
     327             : /*
     328             :  * CleanupInvalidationState
     329             :  *      Mark the current backend as no longer active.
     330             :  *
     331             :  * This function is called via on_shmem_exit() during backend shutdown.
     332             :  *
     333             :  * arg is really of type "SISeg*".
     334             :  */
     335             : static void
     336        9932 : CleanupInvalidationState(int status, Datum arg)
     337             : {
     338        9932 :     SISeg      *segP = (SISeg *) DatumGetPointer(arg);
     339             :     ProcState  *stateP;
     340             :     int         i;
     341             : 
     342             :     Assert(PointerIsValid(segP));
     343             : 
     344        9932 :     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     345             : 
     346        9932 :     stateP = &segP->procState[MyBackendId - 1];
     347             : 
     348             :     /* Update next local transaction ID for next holder of this backendID */
     349        9932 :     stateP->nextLXID = nextLocalTransactionId;
     350             : 
     351             :     /* Mark myself inactive */
     352        9932 :     stateP->procPid = 0;
     353        9932 :     stateP->proc = NULL;
     354        9932 :     stateP->nextMsgNum = 0;
     355        9932 :     stateP->resetState = false;
     356        9932 :     stateP->signaled = false;
     357             : 
     358             :     /* Recompute index of last active backend */
     359       18388 :     for (i = segP->lastBackend; i > 0; i--)
     360             :     {
     361       17316 :         if (segP->procState[i - 1].procPid != 0)
     362        8860 :             break;
     363             :     }
     364        9932 :     segP->lastBackend = i;
     365             : 
     366        9932 :     LWLockRelease(SInvalWriteLock);
     367        9932 : }
     368             : 
     369             : /*
     370             :  * BackendIdGetProc
     371             :  *      Get the PGPROC structure for a backend, given the backend ID.
     372             :  *      The result may be out of date arbitrarily quickly, so the caller
     373             :  *      must be careful about how this information is used.  NULL is
     374             :  *      returned if the backend is not active.
     375             :  */
     376             : PGPROC *
     377          96 : BackendIdGetProc(int backendID)
     378             : {
     379          96 :     PGPROC     *result = NULL;
     380          96 :     SISeg      *segP = shmInvalBuffer;
     381             : 
     382             :     /* Need to lock out additions/removals of backends */
     383          96 :     LWLockAcquire(SInvalWriteLock, LW_SHARED);
     384             : 
     385          96 :     if (backendID > 0 && backendID <= segP->lastBackend)
     386             :     {
     387          96 :         ProcState  *stateP = &segP->procState[backendID - 1];
     388             : 
     389          96 :         result = stateP->proc;
     390             :     }
     391             : 
     392          96 :     LWLockRelease(SInvalWriteLock);
     393             : 
     394          96 :     return result;
     395             : }
     396             : 
     397             : /*
     398             :  * BackendIdGetTransactionIds
     399             :  *      Get the xid and xmin of the backend. The result may be out of date
     400             :  *      arbitrarily quickly, so the caller must be careful about how this
     401             :  *      information is used.
     402             :  */
     403             : void
     404        1574 : BackendIdGetTransactionIds(int backendID, TransactionId *xid, TransactionId *xmin)
     405             : {
     406        1574 :     SISeg      *segP = shmInvalBuffer;
     407             : 
     408        1574 :     *xid = InvalidTransactionId;
     409        1574 :     *xmin = InvalidTransactionId;
     410             : 
     411             :     /* Need to lock out additions/removals of backends */
     412        1574 :     LWLockAcquire(SInvalWriteLock, LW_SHARED);
     413             : 
     414        1574 :     if (backendID > 0 && backendID <= segP->lastBackend)
     415             :     {
     416         924 :         ProcState  *stateP = &segP->procState[backendID - 1];
     417         924 :         PGPROC     *proc = stateP->proc;
     418             : 
     419         924 :         if (proc != NULL)
     420             :         {
     421         924 :             PGXACT     *xact = &ProcGlobal->allPgXact[proc->pgprocno];
     422             : 
     423         924 :             *xid = xact->xid;
     424         924 :             *xmin = xact->xmin;
     425             :         }
     426             :     }
     427             : 
     428        1574 :     LWLockRelease(SInvalWriteLock);
     429        1574 : }
     430             : 
     431             : /*
     432             :  * SIInsertDataEntries
     433             :  *      Add new invalidation message(s) to the buffer.
     434             :  */
     435             : void
     436     1049016 : SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
     437             : {
     438     1049016 :     SISeg      *segP = shmInvalBuffer;
     439             : 
     440             :     /*
     441             :      * N can be arbitrarily large.  We divide the work into groups of no more
     442             :      * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
     443             :      * an unreasonably long time.  (This is not so much because we care about
     444             :      * letting in other writers, as that some just-caught-up backend might be
     445             :      * trying to do SICleanupQueue to pass on its signal, and we don't want it
     446             :      * to have to wait a long time.)  Also, we need to consider calling
     447             :      * SICleanupQueue every so often.
     448             :      */
     449     3160910 :     while (n > 0)
     450             :     {
     451     1062878 :         int         nthistime = Min(n, WRITE_QUANTUM);
     452             :         int         numMsgs;
     453             :         int         max;
     454             :         int         i;
     455             : 
     456     1062878 :         n -= nthistime;
     457             : 
     458     1062878 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     459             : 
     460             :         /*
     461             :          * If the buffer is full, we *must* acquire some space.  Clean the
     462             :          * queue and reset anyone who is preventing space from being freed.
     463             :          * Otherwise, clean the queue only when it's exceeded the next
     464             :          * fullness threshold.  We have to loop and recheck the buffer state
     465             :          * after any call of SICleanupQueue.
     466             :          */
     467             :         for (;;)
     468             :         {
     469     1075630 :             numMsgs = segP->maxMsgNum - segP->minMsgNum;
     470     2138310 :             if (numMsgs + nthistime > MAXNUMMESSAGES ||
     471     1069056 :                 numMsgs >= segP->nextThreshold)
     472        6376 :                 SICleanupQueue(true, nthistime);
     473             :             else
     474             :                 break;
     475             :         }
     476             : 
     477             :         /*
     478             :          * Insert new message(s) into proper slot of circular buffer
     479             :          */
     480     1062878 :         max = segP->maxMsgNum;
     481     7457564 :         while (nthistime-- > 0)
     482             :         {
     483     5331808 :             segP->buffer[max % MAXNUMMESSAGES] = *data++;
     484     5331808 :             max++;
     485             :         }
     486             : 
     487             :         /* Update current value of maxMsgNum using spinlock */
     488     1062878 :         SpinLockAcquire(&segP->msgnumLock);
     489     1062878 :         segP->maxMsgNum = max;
     490     1062878 :         SpinLockRelease(&segP->msgnumLock);
     491             : 
     492             :         /*
     493             :          * Now that the maxMsgNum change is globally visible, we give everyone
     494             :          * a swift kick to make sure they read the newly added messages.
     495             :          * Releasing SInvalWriteLock will enforce a full memory barrier, so
     496             :          * these (unlocked) changes will be committed to memory before we exit
     497             :          * the function.
     498             :          */
     499     4745188 :         for (i = 0; i < segP->lastBackend; i++)
     500             :         {
     501     3682310 :             ProcState  *stateP = &segP->procState[i];
     502             : 
     503     3682310 :             stateP->hasMessages = true;
     504             :         }
     505             : 
     506     1062878 :         LWLockRelease(SInvalWriteLock);
     507             :     }
     508     1049016 : }
     509             : 
     510             : /*
     511             :  * SIGetDataEntries
     512             :  *      get next SI message(s) for current backend, if there are any
     513             :  *
     514             :  * Possible return values:
     515             :  *  0:   no SI message available
     516             :  *  n>0: next n SI messages have been extracted into data[]
     517             :  * -1:   SI reset message extracted
     518             :  *
     519             :  * If the return value is less than the array size "datasize", the caller
     520             :  * can assume that there are no more SI messages after the one(s) returned.
     521             :  * Otherwise, another call is needed to collect more messages.
     522             :  *
     523             :  * NB: this can run in parallel with other instances of SIGetDataEntries
     524             :  * executing on behalf of other backends, since each instance will modify only
     525             :  * fields of its own backend's ProcState, and no instance will look at fields
     526             :  * of other backends' ProcStates.  We express this by grabbing SInvalReadLock
     527             :  * in shared mode.  Note that this is not exactly the normal (read-only)
     528             :  * interpretation of a shared lock! Look closely at the interactions before
     529             :  * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
     530             :  *
     531             :  * NB: this can also run in parallel with SIInsertDataEntries.  It is not
     532             :  * guaranteed that we will return any messages added after the routine is
     533             :  * entered.
     534             :  *
     535             :  * Note: we assume that "datasize" is not so large that it might be important
     536             :  * to break our hold on SInvalReadLock into segments.
     537             :  */
     538             : int
     539    25741872 : SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
     540             : {
     541             :     SISeg      *segP;
     542             :     ProcState  *stateP;
     543             :     int         max;
     544             :     int         n;
     545             : 
     546    25741872 :     segP = shmInvalBuffer;
     547    25741872 :     stateP = &segP->procState[MyBackendId - 1];
     548             : 
     549             :     /*
     550             :      * Before starting to take locks, do a quick, unlocked test to see whether
     551             :      * there can possibly be anything to read.  On a multiprocessor system,
     552             :      * it's possible that this load could migrate backwards and occur before
     553             :      * we actually enter this function, so we might miss a sinval message that
     554             :      * was just added by some other processor.  But they can't migrate
     555             :      * backwards over a preceding lock acquisition, so it should be OK.  If we
     556             :      * haven't acquired a lock preventing against further relevant
     557             :      * invalidations, any such occurrence is not much different than if the
     558             :      * invalidation had arrived slightly later in the first place.
     559             :      */
     560    25741872 :     if (!stateP->hasMessages)
     561    24969080 :         return 0;
     562             : 
     563      772792 :     LWLockAcquire(SInvalReadLock, LW_SHARED);
     564             : 
     565             :     /*
     566             :      * We must reset hasMessages before determining how many messages we're
     567             :      * going to read.  That way, if new messages arrive after we have
     568             :      * determined how many we're reading, the flag will get reset and we'll
     569             :      * notice those messages part-way through.
     570             :      *
     571             :      * Note that, if we don't end up reading all of the messages, we had
     572             :      * better be certain to reset this flag before exiting!
     573             :      */
     574      772792 :     stateP->hasMessages = false;
     575             : 
     576             :     /* Fetch current value of maxMsgNum using spinlock */
     577      772792 :     SpinLockAcquire(&segP->msgnumLock);
     578      772792 :     max = segP->maxMsgNum;
     579      772792 :     SpinLockRelease(&segP->msgnumLock);
     580             : 
     581      772792 :     if (stateP->resetState)
     582             :     {
     583             :         /*
     584             :          * Force reset.  We can say we have dealt with any messages added
     585             :          * since the reset, as well; and that means we should clear the
     586             :          * signaled flag, too.
     587             :          */
     588         254 :         stateP->nextMsgNum = max;
     589         254 :         stateP->resetState = false;
     590         254 :         stateP->signaled = false;
     591         254 :         LWLockRelease(SInvalReadLock);
     592         254 :         return -1;
     593             :     }
     594             : 
     595             :     /*
     596             :      * Retrieve messages and advance backend's counter, until data array is
     597             :      * full or there are no more messages.
     598             :      *
     599             :      * There may be other backends that haven't read the message(s), so we
     600             :      * cannot delete them here.  SICleanupQueue() will eventually remove them
     601             :      * from the queue.
     602             :      */
     603      772538 :     n = 0;
     604    18933768 :     while (n < datasize && stateP->nextMsgNum < max)
     605             :     {
     606    17388692 :         data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
     607    17388692 :         stateP->nextMsgNum++;
     608             :     }
     609             : 
     610             :     /*
     611             :      * If we have caught up completely, reset our "signaled" flag so that
     612             :      * we'll get another signal if we fall behind again.
     613             :      *
     614             :      * If we haven't caught up completely, reset the hasMessages flag so that
     615             :      * we see the remaining messages next time.
     616             :      */
     617      772538 :     if (stateP->nextMsgNum >= max)
     618      316544 :         stateP->signaled = false;
     619             :     else
     620      455994 :         stateP->hasMessages = true;
     621             : 
     622      772538 :     LWLockRelease(SInvalReadLock);
     623      772538 :     return n;
     624             : }
     625             : 
     626             : /*
     627             :  * SICleanupQueue
     628             :  *      Remove messages that have been consumed by all active backends
     629             :  *
     630             :  * callerHasWriteLock is true if caller is holding SInvalWriteLock.
     631             :  * minFree is the minimum number of message slots to make free.
     632             :  *
     633             :  * Possible side effects of this routine include marking one or more
     634             :  * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
     635             :  * to some backend that seems to be getting too far behind.  We signal at
     636             :  * most one backend at a time, for reasons explained at the top of the file.
     637             :  *
     638             :  * Caution: because we transiently release write lock when we have to signal
     639             :  * some other backend, it is NOT guaranteed that there are still minFree
     640             :  * free message slots at exit.  Caller must recheck and perhaps retry.
     641             :  */
     642             : void
     643        9354 : SICleanupQueue(bool callerHasWriteLock, int minFree)
     644             : {
     645        9354 :     SISeg      *segP = shmInvalBuffer;
     646             :     int         min,
     647             :                 minsig,
     648             :                 lowbound,
     649             :                 numMsgs,
     650             :                 i;
     651        9354 :     ProcState  *needSig = NULL;
     652             : 
     653             :     /* Lock out all writers and readers */
     654        9354 :     if (!callerHasWriteLock)
     655        2978 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     656        9354 :     LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
     657             : 
     658             :     /*
     659             :      * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
     660             :      * furthest-back backend that needs signaling (if any), and reset any
     661             :      * backends that are too far back.  Note that because we ignore sendOnly
     662             :      * backends here it is possible for them to keep sending messages without
     663             :      * a problem even when they are the only active backend.
     664             :      */
     665        9354 :     min = segP->maxMsgNum;
     666        9354 :     minsig = min - SIG_THRESHOLD;
     667        9354 :     lowbound = min - MAXNUMMESSAGES + minFree;
     668             : 
     669       69560 :     for (i = 0; i < segP->lastBackend; i++)
     670             :     {
     671       60206 :         ProcState  *stateP = &segP->procState[i];
     672       60206 :         int         n = stateP->nextMsgNum;
     673             : 
     674             :         /* Ignore if inactive or already in reset state */
     675       60206 :         if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
     676        7610 :             continue;
     677             : 
     678             :         /*
     679             :          * If we must free some space and this backend is preventing it, force
     680             :          * him into reset state and then ignore until he catches up.
     681             :          */
     682       52596 :         if (n < lowbound)
     683             :         {
     684         254 :             stateP->resetState = true;
     685             :             /* no point in signaling him ... */
     686         254 :             continue;
     687             :         }
     688             : 
     689             :         /* Track the global minimum nextMsgNum */
     690       52342 :         if (n < min)
     691       13588 :             min = n;
     692             : 
     693             :         /* Also see who's furthest back of the unsignaled backends */
     694       52342 :         if (n < minsig && !stateP->signaled)
     695             :         {
     696        3064 :             minsig = n;
     697        3064 :             needSig = stateP;
     698             :         }
     699             :     }
     700        9354 :     segP->minMsgNum = min;
     701             : 
     702             :     /*
     703             :      * When minMsgNum gets really large, decrement all message counters so as
     704             :      * to forestall overflow of the counters.  This happens seldom enough that
     705             :      * folding it into the previous loop would be a loser.
     706             :      */
     707        9354 :     if (min >= MSGNUMWRAPAROUND)
     708             :     {
     709           0 :         segP->minMsgNum -= MSGNUMWRAPAROUND;
     710           0 :         segP->maxMsgNum -= MSGNUMWRAPAROUND;
     711           0 :         for (i = 0; i < segP->lastBackend; i++)
     712             :         {
     713             :             /* we don't bother skipping inactive entries here */
     714           0 :             segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
     715             :         }
     716             :     }
     717             : 
     718             :     /*
     719             :      * Determine how many messages are still in the queue, and set the
     720             :      * threshold at which we should repeat SICleanupQueue().
     721             :      */
     722        9354 :     numMsgs = segP->maxMsgNum - segP->minMsgNum;
     723        9354 :     if (numMsgs < CLEANUP_MIN)
     724        3144 :         segP->nextThreshold = CLEANUP_MIN;
     725             :     else
     726        6210 :         segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
     727             : 
     728             :     /*
     729             :      * Lastly, signal anyone who needs a catchup interrupt.  Since
     730             :      * SendProcSignal() might not be fast, we don't want to hold locks while
     731             :      * executing it.
     732             :      */
     733        9354 :     if (needSig)
     734             :     {
     735        3022 :         pid_t       his_pid = needSig->procPid;
     736        3022 :         BackendId   his_backendId = (needSig - &segP->procState[0]) + 1;
     737             : 
     738        3022 :         needSig->signaled = true;
     739        3022 :         LWLockRelease(SInvalReadLock);
     740        3022 :         LWLockRelease(SInvalWriteLock);
     741        3022 :         elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
     742        3022 :         SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
     743        3022 :         if (callerHasWriteLock)
     744        2270 :             LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     745             :     }
     746             :     else
     747             :     {
     748        6332 :         LWLockRelease(SInvalReadLock);
     749        6332 :         if (!callerHasWriteLock)
     750        2226 :             LWLockRelease(SInvalWriteLock);
     751             :     }
     752        9354 : }
     753             : 
     754             : 
     755             : /*
     756             :  * GetNextLocalTransactionId --- allocate a new LocalTransactionId
     757             :  *
     758             :  * We split VirtualTransactionIds into two parts so that it is possible
     759             :  * to allocate a new one without any contention for shared memory, except
     760             :  * for a bit of additional overhead during backend startup/shutdown.
     761             :  * The high-order part of a VirtualTransactionId is a BackendId, and the
     762             :  * low-order part is a LocalTransactionId, which we assign from a local
     763             :  * counter.  To avoid the risk of a VirtualTransactionId being reused
     764             :  * within a short interval, successive procs occupying the same backend ID
     765             :  * slot should use a consecutive sequence of local IDs, which is implemented
     766             :  * by copying nextLocalTransactionId as seen above.
     767             :  */
     768             : LocalTransactionId
     769      462228 : GetNextLocalTransactionId(void)
     770             : {
     771             :     LocalTransactionId result;
     772             : 
     773             :     /* loop to avoid returning InvalidLocalTransactionId at wraparound */
     774             :     do
     775             :     {
     776      462228 :         result = nextLocalTransactionId++;
     777      462228 :     } while (!LocalTransactionIdIsValid(result));
     778             : 
     779      459946 :     return result;
     780             : }

Generated by: LCOV version 1.13