Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * sinvaladt.c
4 : * POSTGRES shared cache invalidation data manager.
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/storage/ipc/sinvaladt.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include <signal.h>
18 : #include <unistd.h>
19 :
20 : #include "miscadmin.h"
21 : #include "storage/ipc.h"
22 : #include "storage/proc.h"
23 : #include "storage/procnumber.h"
24 : #include "storage/procsignal.h"
25 : #include "storage/shmem.h"
26 : #include "storage/sinvaladt.h"
27 : #include "storage/spin.h"
28 :
29 : /*
30 : * Conceptually, the shared cache invalidation messages are stored in an
31 : * infinite array, where maxMsgNum is the next array subscript to store a
32 : * submitted message in, minMsgNum is the smallest array subscript containing
33 : * a message not yet read by all backends, and we always have maxMsgNum >=
34 : * minMsgNum. (They are equal when there are no messages pending.) For each
35 : * active backend, there is a nextMsgNum pointer indicating the next message it
36 : * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
37 : * backend.
38 : *
39 : * (In the current implementation, minMsgNum is a lower bound for the
40 : * per-process nextMsgNum values, but it isn't rigorously kept equal to the
41 : * smallest nextMsgNum --- it may lag behind. We only update it when
42 : * SICleanupQueue is called, and we try not to do that often.)
43 : *
44 : * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
45 : * entries. We translate MsgNum values into circular-buffer indexes by
46 : * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
47 : * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
48 : * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
49 : * in the buffer. If the buffer does overflow, we recover by setting the
50 : * "reset" flag for each backend that has fallen too far behind. A backend
51 : * that is in "reset" state is ignored while determining minMsgNum. When
52 : * it does finally attempt to receive inval messages, it must discard all
53 : * its invalidatable state, since it won't know what it missed.
54 : *
55 : * To reduce the probability of needing resets, we send a "catchup" interrupt
56 : * to any backend that seems to be falling unreasonably far behind. The
57 : * normal behavior is that at most one such interrupt is in flight at a time;
58 : * when a backend completes processing a catchup interrupt, it executes
59 : * SICleanupQueue, which will signal the next-furthest-behind backend if
60 : * needed. This avoids undue contention from multiple backends all trying
61 : * to catch up at once. However, the furthest-back backend might be stuck
62 : * in a state where it can't catch up. Eventually it will get reset, so it
63 : * won't cause any more problems for anyone but itself. But we don't want
64 : * to find that a bunch of other backends are now too close to the reset
65 : * threshold to be saved. So SICleanupQueue is designed to occasionally
66 : * send extra catchup interrupts as the queue gets fuller, to backends that
67 : * are far behind and haven't gotten one yet. As long as there aren't a lot
68 : * of "stuck" backends, we won't need a lot of extra interrupts, since ones
69 : * that aren't stuck will propagate their interrupts to the next guy.
70 : *
71 : * We would have problems if the MsgNum values overflow an integer, so
72 : * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
73 : * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
74 : * large so that we don't need to do this often. It must be a multiple of
75 : * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
76 : * to be moved when we do it.
77 : *
78 : * Access to the shared sinval array is protected by two locks, SInvalReadLock
79 : * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
80 : * authorizes them to modify their own ProcState but not to modify or even
81 : * look at anyone else's. When we need to perform array-wide updates,
82 : * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
83 : * lock out all readers. Writers take SInvalWriteLock (always in exclusive
84 : * mode) to serialize adding messages to the queue. Note that a writer
85 : * can operate in parallel with one or more readers, because the writer
86 : * has no need to touch anyone's ProcState, except in the infrequent cases
87 : * when SICleanupQueue is needed. The only point of overlap is that
88 : * the writer wants to change maxMsgNum while readers need to read it.
89 : * We deal with that by having a spinlock that readers must take for just
90 : * long enough to read maxMsgNum, while writers take it for just long enough
91 : * to write maxMsgNum. (The exact rule is that you need the spinlock to
92 : * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
93 : * spinlock to write maxMsgNum unless you are holding both locks.)
94 : *
95 : * Note: since maxMsgNum is an int and hence presumably atomically readable/
96 : * writable, the spinlock might seem unnecessary. The reason it is needed
97 : * is to provide a memory barrier: we need to be sure that messages written
98 : * to the array are actually there before maxMsgNum is increased, and that
99 : * readers will see that data after fetching maxMsgNum. Multiprocessors
100 : * that have weak memory-ordering guarantees can fail without the memory
101 : * barrier instructions that are included in the spinlock sequences.
102 : */
103 :
104 :
105 : /*
106 : * Configurable parameters.
107 : *
108 : * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
109 : * Must be a power of 2 for speed.
110 : *
111 : * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
112 : * Must be a multiple of MAXNUMMESSAGES. Should be large.
113 : *
114 : * CLEANUP_MIN: the minimum number of messages that must be in the buffer
115 : * before we bother to call SICleanupQueue.
116 : *
117 : * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
118 : * we exceed CLEANUP_MIN. Should be a power of 2 for speed.
119 : *
120 : * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
121 : * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
122 : *
123 : * WRITE_QUANTUM: the max number of messages to push into the buffer per
124 : * iteration of SIInsertDataEntries. Noncritical but should be less than
125 : * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
126 : * per iteration.
127 : */
128 :
129 : #define MAXNUMMESSAGES 4096
130 : #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
131 : #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
132 : #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
133 : #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
134 : #define WRITE_QUANTUM 64
135 :
136 : /* Per-backend state in shared invalidation structure */
137 : typedef struct ProcState
138 : {
139 : /* procPid is zero in an inactive ProcState array entry. */
140 : pid_t procPid; /* PID of backend, for signaling */
141 : /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
142 : int nextMsgNum; /* next message number to read */
143 : bool resetState; /* backend needs to reset its state */
144 : bool signaled; /* backend has been sent catchup signal */
145 : bool hasMessages; /* backend has unread messages */
146 :
147 : /*
148 : * Backend only sends invalidations, never receives them. This only makes
149 : * sense for Startup process during recovery because it doesn't maintain a
150 : * relcache, yet it fires inval messages to allow query backends to see
151 : * schema changes.
152 : */
153 : bool sendOnly; /* backend only sends, never receives */
154 :
155 : /*
156 : * Next LocalTransactionId to use for each idle backend slot. We keep
157 : * this here because it is indexed by ProcNumber and it is convenient to
158 : * copy the value to and from local memory when MyProcNumber is set. It's
159 : * meaningless in an active ProcState entry.
160 : */
161 : LocalTransactionId nextLXID;
162 : } ProcState;
163 :
164 : /* Shared cache invalidation memory segment */
165 : typedef struct SISeg
166 : {
167 : /*
168 : * General state information
169 : */
170 : int minMsgNum; /* oldest message still needed */
171 : int maxMsgNum; /* next message number to be assigned */
172 : int nextThreshold; /* # of messages to call SICleanupQueue */
173 :
174 : slock_t msgnumLock; /* spinlock protecting maxMsgNum */
175 :
176 : /*
177 : * Circular buffer holding shared-inval messages
178 : */
179 : SharedInvalidationMessage buffer[MAXNUMMESSAGES];
180 :
181 : /*
182 : * Per-backend invalidation state info.
183 : *
184 : * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno.
185 : * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is
186 : * a dense array of their indexes, to speed up scanning all in-use slots.
187 : *
188 : * 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but
189 : * having our separate copy avoids contention on ProcArrayLock, and allows
190 : * us to track only the processes that participate in shared cache
191 : * invalidations.
192 : */
193 : int numProcs;
194 : int *pgprocnos;
195 : ProcState procState[FLEXIBLE_ARRAY_MEMBER];
196 : } SISeg;
197 :
198 : /*
199 : * We reserve a slot for each possible ProcNumber, plus one for each
200 : * possible auxiliary process type. (This scheme assumes there is not
201 : * more than one of any auxiliary process type at a time.)
202 : */
203 : #define NumProcStateSlots (MaxBackends + NUM_AUXILIARY_PROCS)
204 :
205 : static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
206 :
207 :
208 : static LocalTransactionId nextLocalTransactionId;
209 :
210 : static void CleanupInvalidationState(int status, Datum arg);
211 :
212 :
213 : /*
214 : * SharedInvalShmemSize --- return shared-memory space needed
215 : */
216 : Size
217 5436 : SharedInvalShmemSize(void)
218 : {
219 : Size size;
220 :
221 5436 : size = offsetof(SISeg, procState);
222 5436 : size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */
223 5436 : size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */
224 :
225 5436 : return size;
226 : }
227 :
228 : /*
229 : * SharedInvalShmemInit
230 : * Create and initialize the SI message buffer
231 : */
232 : void
233 1902 : SharedInvalShmemInit(void)
234 : {
235 : int i;
236 : bool found;
237 :
238 : /* Allocate space in shared memory */
239 1902 : shmInvalBuffer = (SISeg *)
240 1902 : ShmemInitStruct("shmInvalBuffer", SharedInvalShmemSize(), &found);
241 1902 : if (found)
242 0 : return;
243 :
244 : /* Clear message counters, save size of procState array, init spinlock */
245 1902 : shmInvalBuffer->minMsgNum = 0;
246 1902 : shmInvalBuffer->maxMsgNum = 0;
247 1902 : shmInvalBuffer->nextThreshold = CLEANUP_MIN;
248 1902 : SpinLockInit(&shmInvalBuffer->msgnumLock);
249 :
250 : /* The buffer[] array is initially all unused, so we need not fill it */
251 :
252 : /* Mark all backends inactive, and initialize nextLXID */
253 171440 : for (i = 0; i < NumProcStateSlots; i++)
254 : {
255 169538 : shmInvalBuffer->procState[i].procPid = 0; /* inactive */
256 169538 : shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
257 169538 : shmInvalBuffer->procState[i].resetState = false;
258 169538 : shmInvalBuffer->procState[i].signaled = false;
259 169538 : shmInvalBuffer->procState[i].hasMessages = false;
260 169538 : shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
261 : }
262 1902 : shmInvalBuffer->numProcs = 0;
263 1902 : shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i];
264 : }
265 :
266 : /*
267 : * SharedInvalBackendInit
268 : * Initialize a new backend to operate on the sinval buffer
269 : */
270 : void
271 31284 : SharedInvalBackendInit(bool sendOnly)
272 : {
273 : ProcState *stateP;
274 : pid_t oldPid;
275 31284 : SISeg *segP = shmInvalBuffer;
276 :
277 31284 : if (MyProcNumber < 0)
278 0 : elog(ERROR, "MyProcNumber not set");
279 31284 : if (MyProcNumber >= NumProcStateSlots)
280 0 : elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)",
281 : MyProcNumber, NumProcStateSlots);
282 31284 : stateP = &segP->procState[MyProcNumber];
283 :
284 : /*
285 : * This can run in parallel with read operations, but not with write
286 : * operations, since SIInsertDataEntries relies on the pgprocnos array to
287 : * set hasMessages appropriately.
288 : */
289 31284 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
290 :
291 31284 : oldPid = stateP->procPid;
292 31284 : if (oldPid != 0)
293 : {
294 0 : LWLockRelease(SInvalWriteLock);
295 0 : elog(ERROR, "sinval slot for backend %d is already in use by process %d",
296 : MyProcNumber, (int) oldPid);
297 : }
298 :
299 31284 : shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = MyProcNumber;
300 :
301 : /* Fetch next local transaction ID into local memory */
302 31284 : nextLocalTransactionId = stateP->nextLXID;
303 :
304 : /* mark myself active, with all extant messages already read */
305 31284 : stateP->procPid = MyProcPid;
306 31284 : stateP->nextMsgNum = segP->maxMsgNum;
307 31284 : stateP->resetState = false;
308 31284 : stateP->signaled = false;
309 31284 : stateP->hasMessages = false;
310 31284 : stateP->sendOnly = sendOnly;
311 :
312 31284 : LWLockRelease(SInvalWriteLock);
313 :
314 : /* register exit routine to mark my entry inactive at exit */
315 31284 : on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
316 31284 : }
317 :
318 : /*
319 : * CleanupInvalidationState
320 : * Mark the current backend as no longer active.
321 : *
322 : * This function is called via on_shmem_exit() during backend shutdown.
323 : *
324 : * arg is really of type "SISeg*".
325 : */
326 : static void
327 31284 : CleanupInvalidationState(int status, Datum arg)
328 : {
329 31284 : SISeg *segP = (SISeg *) DatumGetPointer(arg);
330 : ProcState *stateP;
331 : int i;
332 :
333 : Assert(PointerIsValid(segP));
334 :
335 31284 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
336 :
337 31284 : stateP = &segP->procState[MyProcNumber];
338 :
339 : /* Update next local transaction ID for next holder of this proc number */
340 31284 : stateP->nextLXID = nextLocalTransactionId;
341 :
342 : /* Mark myself inactive */
343 31284 : stateP->procPid = 0;
344 31284 : stateP->nextMsgNum = 0;
345 31284 : stateP->resetState = false;
346 31284 : stateP->signaled = false;
347 :
348 38618 : for (i = segP->numProcs - 1; i >= 0; i--)
349 : {
350 38618 : if (segP->pgprocnos[i] == MyProcNumber)
351 : {
352 31284 : if (i != segP->numProcs - 1)
353 4212 : segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1];
354 31284 : break;
355 : }
356 : }
357 31284 : if (i < 0)
358 0 : elog(PANIC, "could not find entry in sinval array");
359 31284 : segP->numProcs--;
360 :
361 31284 : LWLockRelease(SInvalWriteLock);
362 31284 : }
363 :
364 : /*
365 : * SIInsertDataEntries
366 : * Add new invalidation message(s) to the buffer.
367 : */
368 : void
369 783598 : SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
370 : {
371 783598 : SISeg *segP = shmInvalBuffer;
372 :
373 : /*
374 : * N can be arbitrarily large. We divide the work into groups of no more
375 : * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
376 : * an unreasonably long time. (This is not so much because we care about
377 : * letting in other writers, as that some just-caught-up backend might be
378 : * trying to do SICleanupQueue to pass on its signal, and we don't want it
379 : * to have to wait a long time.) Also, we need to consider calling
380 : * SICleanupQueue every so often.
381 : */
382 1612562 : while (n > 0)
383 : {
384 828964 : int nthistime = Min(n, WRITE_QUANTUM);
385 : int numMsgs;
386 : int max;
387 : int i;
388 :
389 828964 : n -= nthistime;
390 :
391 828964 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
392 :
393 : /*
394 : * If the buffer is full, we *must* acquire some space. Clean the
395 : * queue and reset anyone who is preventing space from being freed.
396 : * Otherwise, clean the queue only when it's exceeded the next
397 : * fullness threshold. We have to loop and recheck the buffer state
398 : * after any call of SICleanupQueue.
399 : */
400 : for (;;)
401 : {
402 838552 : numMsgs = segP->maxMsgNum - segP->minMsgNum;
403 838552 : if (numMsgs + nthistime > MAXNUMMESSAGES ||
404 838188 : numMsgs >= segP->nextThreshold)
405 9588 : SICleanupQueue(true, nthistime);
406 : else
407 : break;
408 : }
409 :
410 : /*
411 : * Insert new message(s) into proper slot of circular buffer
412 : */
413 828964 : max = segP->maxMsgNum;
414 7991368 : while (nthistime-- > 0)
415 : {
416 7162404 : segP->buffer[max % MAXNUMMESSAGES] = *data++;
417 7162404 : max++;
418 : }
419 :
420 : /* Update current value of maxMsgNum using spinlock */
421 828964 : SpinLockAcquire(&segP->msgnumLock);
422 828964 : segP->maxMsgNum = max;
423 828964 : SpinLockRelease(&segP->msgnumLock);
424 :
425 : /*
426 : * Now that the maxMsgNum change is globally visible, we give everyone
427 : * a swift kick to make sure they read the newly added messages.
428 : * Releasing SInvalWriteLock will enforce a full memory barrier, so
429 : * these (unlocked) changes will be committed to memory before we exit
430 : * the function.
431 : */
432 4415802 : for (i = 0; i < segP->numProcs; i++)
433 : {
434 3586838 : ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
435 :
436 3586838 : stateP->hasMessages = true;
437 : }
438 :
439 828964 : LWLockRelease(SInvalWriteLock);
440 : }
441 783598 : }
442 :
443 : /*
444 : * SIGetDataEntries
445 : * get next SI message(s) for current backend, if there are any
446 : *
447 : * Possible return values:
448 : * 0: no SI message available
449 : * n>0: next n SI messages have been extracted into data[]
450 : * -1: SI reset message extracted
451 : *
452 : * If the return value is less than the array size "datasize", the caller
453 : * can assume that there are no more SI messages after the one(s) returned.
454 : * Otherwise, another call is needed to collect more messages.
455 : *
456 : * NB: this can run in parallel with other instances of SIGetDataEntries
457 : * executing on behalf of other backends, since each instance will modify only
458 : * fields of its own backend's ProcState, and no instance will look at fields
459 : * of other backends' ProcStates. We express this by grabbing SInvalReadLock
460 : * in shared mode. Note that this is not exactly the normal (read-only)
461 : * interpretation of a shared lock! Look closely at the interactions before
462 : * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
463 : *
464 : * NB: this can also run in parallel with SIInsertDataEntries. It is not
465 : * guaranteed that we will return any messages added after the routine is
466 : * entered.
467 : *
468 : * Note: we assume that "datasize" is not so large that it might be important
469 : * to break our hold on SInvalReadLock into segments.
470 : */
471 : int
472 32748288 : SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
473 : {
474 : SISeg *segP;
475 : ProcState *stateP;
476 : int max;
477 : int n;
478 :
479 32748288 : segP = shmInvalBuffer;
480 32748288 : stateP = &segP->procState[MyProcNumber];
481 :
482 : /*
483 : * Before starting to take locks, do a quick, unlocked test to see whether
484 : * there can possibly be anything to read. On a multiprocessor system,
485 : * it's possible that this load could migrate backwards and occur before
486 : * we actually enter this function, so we might miss a sinval message that
487 : * was just added by some other processor. But they can't migrate
488 : * backwards over a preceding lock acquisition, so it should be OK. If we
489 : * haven't acquired a lock preventing against further relevant
490 : * invalidations, any such occurrence is not much different than if the
491 : * invalidation had arrived slightly later in the first place.
492 : */
493 32748288 : if (!stateP->hasMessages)
494 31495574 : return 0;
495 :
496 1252714 : LWLockAcquire(SInvalReadLock, LW_SHARED);
497 :
498 : /*
499 : * We must reset hasMessages before determining how many messages we're
500 : * going to read. That way, if new messages arrive after we have
501 : * determined how many we're reading, the flag will get reset and we'll
502 : * notice those messages part-way through.
503 : *
504 : * Note that, if we don't end up reading all of the messages, we had
505 : * better be certain to reset this flag before exiting!
506 : */
507 1252714 : stateP->hasMessages = false;
508 :
509 : /* Fetch current value of maxMsgNum using spinlock */
510 1252714 : SpinLockAcquire(&segP->msgnumLock);
511 1252714 : max = segP->maxMsgNum;
512 1252714 : SpinLockRelease(&segP->msgnumLock);
513 :
514 1252714 : if (stateP->resetState)
515 : {
516 : /*
517 : * Force reset. We can say we have dealt with any messages added
518 : * since the reset, as well; and that means we should clear the
519 : * signaled flag, too.
520 : */
521 446 : stateP->nextMsgNum = max;
522 446 : stateP->resetState = false;
523 446 : stateP->signaled = false;
524 446 : LWLockRelease(SInvalReadLock);
525 446 : return -1;
526 : }
527 :
528 : /*
529 : * Retrieve messages and advance backend's counter, until data array is
530 : * full or there are no more messages.
531 : *
532 : * There may be other backends that haven't read the message(s), so we
533 : * cannot delete them here. SICleanupQueue() will eventually remove them
534 : * from the queue.
535 : */
536 1252268 : n = 0;
537 28805444 : while (n < datasize && stateP->nextMsgNum < max)
538 : {
539 27553176 : data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
540 27553176 : stateP->nextMsgNum++;
541 : }
542 :
543 : /*
544 : * If we have caught up completely, reset our "signaled" flag so that
545 : * we'll get another signal if we fall behind again.
546 : *
547 : * If we haven't caught up completely, reset the hasMessages flag so that
548 : * we see the remaining messages next time.
549 : */
550 1252268 : if (stateP->nextMsgNum >= max)
551 518972 : stateP->signaled = false;
552 : else
553 733296 : stateP->hasMessages = true;
554 :
555 1252268 : LWLockRelease(SInvalReadLock);
556 1252268 : return n;
557 : }
558 :
559 : /*
560 : * SICleanupQueue
561 : * Remove messages that have been consumed by all active backends
562 : *
563 : * callerHasWriteLock is true if caller is holding SInvalWriteLock.
564 : * minFree is the minimum number of message slots to make free.
565 : *
566 : * Possible side effects of this routine include marking one or more
567 : * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
568 : * to some backend that seems to be getting too far behind. We signal at
569 : * most one backend at a time, for reasons explained at the top of the file.
570 : *
571 : * Caution: because we transiently release write lock when we have to signal
572 : * some other backend, it is NOT guaranteed that there are still minFree
573 : * free message slots at exit. Caller must recheck and perhaps retry.
574 : */
575 : void
576 14872 : SICleanupQueue(bool callerHasWriteLock, int minFree)
577 : {
578 14872 : SISeg *segP = shmInvalBuffer;
579 : int min,
580 : minsig,
581 : lowbound,
582 : numMsgs,
583 : i;
584 14872 : ProcState *needSig = NULL;
585 :
586 : /* Lock out all writers and readers */
587 14872 : if (!callerHasWriteLock)
588 5284 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
589 14872 : LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
590 :
591 : /*
592 : * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
593 : * furthest-back backend that needs signaling (if any), and reset any
594 : * backends that are too far back. Note that because we ignore sendOnly
595 : * backends here it is possible for them to keep sending messages without
596 : * a problem even when they are the only active backend.
597 : */
598 14872 : min = segP->maxMsgNum;
599 14872 : minsig = min - SIG_THRESHOLD;
600 14872 : lowbound = min - MAXNUMMESSAGES + minFree;
601 :
602 112440 : for (i = 0; i < segP->numProcs; i++)
603 : {
604 97568 : ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
605 97568 : int n = stateP->nextMsgNum;
606 :
607 : /* Ignore if already in reset state */
608 : Assert(stateP->procPid != 0);
609 97568 : if (stateP->resetState || stateP->sendOnly)
610 10026 : continue;
611 :
612 : /*
613 : * If we must free some space and this backend is preventing it, force
614 : * him into reset state and then ignore until he catches up.
615 : */
616 87542 : if (n < lowbound)
617 : {
618 448 : stateP->resetState = true;
619 : /* no point in signaling him ... */
620 448 : continue;
621 : }
622 :
623 : /* Track the global minimum nextMsgNum */
624 87094 : if (n < min)
625 22512 : min = n;
626 :
627 : /* Also see who's furthest back of the unsignaled backends */
628 87094 : if (n < minsig && !stateP->signaled)
629 : {
630 5416 : minsig = n;
631 5416 : needSig = stateP;
632 : }
633 : }
634 14872 : segP->minMsgNum = min;
635 :
636 : /*
637 : * When minMsgNum gets really large, decrement all message counters so as
638 : * to forestall overflow of the counters. This happens seldom enough that
639 : * folding it into the previous loop would be a loser.
640 : */
641 14872 : if (min >= MSGNUMWRAPAROUND)
642 : {
643 0 : segP->minMsgNum -= MSGNUMWRAPAROUND;
644 0 : segP->maxMsgNum -= MSGNUMWRAPAROUND;
645 0 : for (i = 0; i < segP->numProcs; i++)
646 0 : segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
647 : }
648 :
649 : /*
650 : * Determine how many messages are still in the queue, and set the
651 : * threshold at which we should repeat SICleanupQueue().
652 : */
653 14872 : numMsgs = segP->maxMsgNum - segP->minMsgNum;
654 14872 : if (numMsgs < CLEANUP_MIN)
655 4930 : segP->nextThreshold = CLEANUP_MIN;
656 : else
657 9942 : segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
658 :
659 : /*
660 : * Lastly, signal anyone who needs a catchup interrupt. Since
661 : * SendProcSignal() might not be fast, we don't want to hold locks while
662 : * executing it.
663 : */
664 14872 : if (needSig)
665 : {
666 5318 : pid_t his_pid = needSig->procPid;
667 5318 : ProcNumber his_procNumber = (needSig - &segP->procState[0]);
668 :
669 5318 : needSig->signaled = true;
670 5318 : LWLockRelease(SInvalReadLock);
671 5318 : LWLockRelease(SInvalWriteLock);
672 5318 : elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
673 5318 : SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_procNumber);
674 5318 : if (callerHasWriteLock)
675 4058 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
676 : }
677 : else
678 : {
679 9554 : LWLockRelease(SInvalReadLock);
680 9554 : if (!callerHasWriteLock)
681 4024 : LWLockRelease(SInvalWriteLock);
682 : }
683 14872 : }
684 :
685 :
686 : /*
687 : * GetNextLocalTransactionId --- allocate a new LocalTransactionId
688 : *
689 : * We split VirtualTransactionIds into two parts so that it is possible
690 : * to allocate a new one without any contention for shared memory, except
691 : * for a bit of additional overhead during backend startup/shutdown.
692 : * The high-order part of a VirtualTransactionId is a ProcNumber, and the
693 : * low-order part is a LocalTransactionId, which we assign from a local
694 : * counter. To avoid the risk of a VirtualTransactionId being reused
695 : * within a short interval, successive procs occupying the same PGPROC slot
696 : * should use a consecutive sequence of local IDs, which is implemented
697 : * by copying nextLocalTransactionId as seen above.
698 : */
699 : LocalTransactionId
700 765538 : GetNextLocalTransactionId(void)
701 : {
702 : LocalTransactionId result;
703 :
704 : /* loop to avoid returning InvalidLocalTransactionId at wraparound */
705 : do
706 : {
707 765538 : result = nextLocalTransactionId++;
708 765538 : } while (!LocalTransactionIdIsValid(result));
709 :
710 748708 : return result;
711 : }
|