Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * sinvaladt.c
4 : * POSTGRES shared cache invalidation data manager.
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/storage/ipc/sinvaladt.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include <signal.h>
18 : #include <unistd.h>
19 :
20 : #include "miscadmin.h"
21 : #include "storage/ipc.h"
22 : #include "storage/proc.h"
23 : #include "storage/procnumber.h"
24 : #include "storage/procsignal.h"
25 : #include "storage/shmem.h"
26 : #include "storage/sinvaladt.h"
27 : #include "storage/spin.h"
28 :
29 : /*
30 : * Conceptually, the shared cache invalidation messages are stored in an
31 : * infinite array, where maxMsgNum is the next array subscript to store a
32 : * submitted message in, minMsgNum is the smallest array subscript containing
33 : * a message not yet read by all backends, and we always have maxMsgNum >=
34 : * minMsgNum. (They are equal when there are no messages pending.) For each
35 : * active backend, there is a nextMsgNum pointer indicating the next message it
36 : * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
37 : * backend.
38 : *
39 : * (In the current implementation, minMsgNum is a lower bound for the
40 : * per-process nextMsgNum values, but it isn't rigorously kept equal to the
41 : * smallest nextMsgNum --- it may lag behind. We only update it when
42 : * SICleanupQueue is called, and we try not to do that often.)
43 : *
44 : * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
45 : * entries. We translate MsgNum values into circular-buffer indexes by
46 : * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
47 : * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
48 : * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
49 : * in the buffer. If the buffer does overflow, we recover by setting the
50 : * "reset" flag for each backend that has fallen too far behind. A backend
51 : * that is in "reset" state is ignored while determining minMsgNum. When
52 : * it does finally attempt to receive inval messages, it must discard all
53 : * its invalidatable state, since it won't know what it missed.
54 : *
55 : * To reduce the probability of needing resets, we send a "catchup" interrupt
56 : * to any backend that seems to be falling unreasonably far behind. The
57 : * normal behavior is that at most one such interrupt is in flight at a time;
58 : * when a backend completes processing a catchup interrupt, it executes
59 : * SICleanupQueue, which will signal the next-furthest-behind backend if
60 : * needed. This avoids undue contention from multiple backends all trying
61 : * to catch up at once. However, the furthest-back backend might be stuck
62 : * in a state where it can't catch up. Eventually it will get reset, so it
63 : * won't cause any more problems for anyone but itself. But we don't want
64 : * to find that a bunch of other backends are now too close to the reset
65 : * threshold to be saved. So SICleanupQueue is designed to occasionally
66 : * send extra catchup interrupts as the queue gets fuller, to backends that
67 : * are far behind and haven't gotten one yet. As long as there aren't a lot
68 : * of "stuck" backends, we won't need a lot of extra interrupts, since ones
69 : * that aren't stuck will propagate their interrupts to the next guy.
70 : *
71 : * We would have problems if the MsgNum values overflow an integer, so
72 : * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
73 : * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
74 : * large so that we don't need to do this often. It must be a multiple of
75 : * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
76 : * to be moved when we do it.
77 : *
78 : * Access to the shared sinval array is protected by two locks, SInvalReadLock
79 : * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
80 : * authorizes them to modify their own ProcState but not to modify or even
81 : * look at anyone else's. When we need to perform array-wide updates,
82 : * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
83 : * lock out all readers. Writers take SInvalWriteLock (always in exclusive
84 : * mode) to serialize adding messages to the queue. Note that a writer
85 : * can operate in parallel with one or more readers, because the writer
86 : * has no need to touch anyone's ProcState, except in the infrequent cases
87 : * when SICleanupQueue is needed. The only point of overlap is that
88 : * the writer wants to change maxMsgNum while readers need to read it.
89 : * We deal with that by having a spinlock that readers must take for just
90 : * long enough to read maxMsgNum, while writers take it for just long enough
91 : * to write maxMsgNum. (The exact rule is that you need the spinlock to
92 : * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
93 : * spinlock to write maxMsgNum unless you are holding both locks.)
94 : *
95 : * Note: since maxMsgNum is an int and hence presumably atomically readable/
96 : * writable, the spinlock might seem unnecessary. The reason it is needed
97 : * is to provide a memory barrier: we need to be sure that messages written
98 : * to the array are actually there before maxMsgNum is increased, and that
99 : * readers will see that data after fetching maxMsgNum. Multiprocessors
100 : * that have weak memory-ordering guarantees can fail without the memory
101 : * barrier instructions that are included in the spinlock sequences.
102 : */
103 :
104 :
105 : /*
106 : * Configurable parameters.
107 : *
108 : * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
109 : * Must be a power of 2 for speed.
110 : *
111 : * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
112 : * Must be a multiple of MAXNUMMESSAGES. Should be large.
113 : *
114 : * CLEANUP_MIN: the minimum number of messages that must be in the buffer
115 : * before we bother to call SICleanupQueue.
116 : *
117 : * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
118 : * we exceed CLEANUP_MIN. Should be a power of 2 for speed.
119 : *
120 : * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
121 : * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
122 : *
123 : * WRITE_QUANTUM: the max number of messages to push into the buffer per
124 : * iteration of SIInsertDataEntries. Noncritical but should be less than
125 : * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
126 : * per iteration.
127 : */
128 :
129 : #define MAXNUMMESSAGES 4096
130 : #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
131 : #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
132 : #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
133 : #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
134 : #define WRITE_QUANTUM 64
135 :
136 : /* Per-backend state in shared invalidation structure */
137 : typedef struct ProcState
138 : {
139 : /* procPid is zero in an inactive ProcState array entry. */
140 : pid_t procPid; /* PID of backend, for signaling */
141 : /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
142 : int nextMsgNum; /* next message number to read */
143 : bool resetState; /* backend needs to reset its state */
144 : bool signaled; /* backend has been sent catchup signal */
145 : bool hasMessages; /* backend has unread messages */
146 :
147 : /*
148 : * Backend only sends invalidations, never receives them. This only makes
149 : * sense for Startup process during recovery because it doesn't maintain a
150 : * relcache, yet it fires inval messages to allow query backends to see
151 : * schema changes.
152 : */
153 : bool sendOnly; /* backend only sends, never receives */
154 :
155 : /*
156 : * Next LocalTransactionId to use for each idle backend slot. We keep
157 : * this here because it is indexed by ProcNumber and it is convenient to
158 : * copy the value to and from local memory when MyProcNumber is set. It's
159 : * meaningless in an active ProcState entry.
160 : */
161 : LocalTransactionId nextLXID;
162 : } ProcState;
163 :
164 : /* Shared cache invalidation memory segment */
165 : typedef struct SISeg
166 : {
167 : /*
168 : * General state information
169 : */
170 : int minMsgNum; /* oldest message still needed */
171 : int maxMsgNum; /* next message number to be assigned */
172 : int nextThreshold; /* # of messages to call SICleanupQueue */
173 :
174 : slock_t msgnumLock; /* spinlock protecting maxMsgNum */
175 :
176 : /*
177 : * Circular buffer holding shared-inval messages
178 : */
179 : SharedInvalidationMessage buffer[MAXNUMMESSAGES];
180 :
181 : /*
182 : * Per-backend invalidation state info.
183 : *
184 : * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno.
185 : * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is
186 : * a dense array of their indexes, to speed up scanning all in-use slots.
187 : *
188 : * 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but
189 : * having our separate copy avoids contention on ProcArrayLock, and allows
190 : * us to track only the processes that participate in shared cache
191 : * invalidations.
192 : */
193 : int numProcs;
194 : int *pgprocnos;
195 : ProcState procState[FLEXIBLE_ARRAY_MEMBER];
196 : } SISeg;
197 :
198 : /*
199 : * We reserve a slot for each possible ProcNumber, plus one for each
200 : * possible auxiliary process type. (This scheme assumes there is not
201 : * more than one of any auxiliary process type at a time, except for
202 : * IO workers.)
203 : */
204 : #define NumProcStateSlots (MaxBackends + NUM_AUXILIARY_PROCS)
205 :
206 : static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
207 :
208 :
209 : static LocalTransactionId nextLocalTransactionId;
210 :
211 : static void CleanupInvalidationState(int status, Datum arg);
212 :
213 :
214 : /*
215 : * SharedInvalShmemSize --- return shared-memory space needed
216 : */
217 : Size
218 6006 : SharedInvalShmemSize(void)
219 : {
220 : Size size;
221 :
222 6006 : size = offsetof(SISeg, procState);
223 6006 : size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */
224 6006 : size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */
225 :
226 6006 : return size;
227 : }
228 :
229 : /*
230 : * SharedInvalShmemInit
231 : * Create and initialize the SI message buffer
232 : */
233 : void
234 2100 : SharedInvalShmemInit(void)
235 : {
236 : int i;
237 : bool found;
238 :
239 : /* Allocate space in shared memory */
240 2100 : shmInvalBuffer = (SISeg *)
241 2100 : ShmemInitStruct("shmInvalBuffer", SharedInvalShmemSize(), &found);
242 2100 : if (found)
243 0 : return;
244 :
245 : /* Clear message counters, save size of procState array, init spinlock */
246 2100 : shmInvalBuffer->minMsgNum = 0;
247 2100 : shmInvalBuffer->maxMsgNum = 0;
248 2100 : shmInvalBuffer->nextThreshold = CLEANUP_MIN;
249 2100 : SpinLockInit(&shmInvalBuffer->msgnumLock);
250 :
251 : /* The buffer[] array is initially all unused, so we need not fill it */
252 :
253 : /* Mark all backends inactive, and initialize nextLXID */
254 279486 : for (i = 0; i < NumProcStateSlots; i++)
255 : {
256 277386 : shmInvalBuffer->procState[i].procPid = 0; /* inactive */
257 277386 : shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
258 277386 : shmInvalBuffer->procState[i].resetState = false;
259 277386 : shmInvalBuffer->procState[i].signaled = false;
260 277386 : shmInvalBuffer->procState[i].hasMessages = false;
261 277386 : shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
262 : }
263 2100 : shmInvalBuffer->numProcs = 0;
264 2100 : shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i];
265 : }
266 :
267 : /*
268 : * SharedInvalBackendInit
269 : * Initialize a new backend to operate on the sinval buffer
270 : */
271 : void
272 35484 : SharedInvalBackendInit(bool sendOnly)
273 : {
274 : ProcState *stateP;
275 : pid_t oldPid;
276 35484 : SISeg *segP = shmInvalBuffer;
277 :
278 35484 : if (MyProcNumber < 0)
279 0 : elog(ERROR, "MyProcNumber not set");
280 35484 : if (MyProcNumber >= NumProcStateSlots)
281 0 : elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)",
282 : MyProcNumber, NumProcStateSlots);
283 35484 : stateP = &segP->procState[MyProcNumber];
284 :
285 : /*
286 : * This can run in parallel with read operations, but not with write
287 : * operations, since SIInsertDataEntries relies on the pgprocnos array to
288 : * set hasMessages appropriately.
289 : */
290 35484 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
291 :
292 35484 : oldPid = stateP->procPid;
293 35484 : if (oldPid != 0)
294 : {
295 0 : LWLockRelease(SInvalWriteLock);
296 0 : elog(ERROR, "sinval slot for backend %d is already in use by process %d",
297 : MyProcNumber, (int) oldPid);
298 : }
299 :
300 35484 : shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = MyProcNumber;
301 :
302 : /* Fetch next local transaction ID into local memory */
303 35484 : nextLocalTransactionId = stateP->nextLXID;
304 :
305 : /* mark myself active, with all extant messages already read */
306 35484 : stateP->procPid = MyProcPid;
307 35484 : stateP->nextMsgNum = segP->maxMsgNum;
308 35484 : stateP->resetState = false;
309 35484 : stateP->signaled = false;
310 35484 : stateP->hasMessages = false;
311 35484 : stateP->sendOnly = sendOnly;
312 :
313 35484 : LWLockRelease(SInvalWriteLock);
314 :
315 : /* register exit routine to mark my entry inactive at exit */
316 35484 : on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
317 35484 : }
318 :
319 : /*
320 : * CleanupInvalidationState
321 : * Mark the current backend as no longer active.
322 : *
323 : * This function is called via on_shmem_exit() during backend shutdown.
324 : *
325 : * arg is really of type "SISeg*".
326 : */
327 : static void
328 35484 : CleanupInvalidationState(int status, Datum arg)
329 : {
330 35484 : SISeg *segP = (SISeg *) DatumGetPointer(arg);
331 : ProcState *stateP;
332 : int i;
333 :
334 : Assert(PointerIsValid(segP));
335 :
336 35484 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
337 :
338 35484 : stateP = &segP->procState[MyProcNumber];
339 :
340 : /* Update next local transaction ID for next holder of this proc number */
341 35484 : stateP->nextLXID = nextLocalTransactionId;
342 :
343 : /* Mark myself inactive */
344 35484 : stateP->procPid = 0;
345 35484 : stateP->nextMsgNum = 0;
346 35484 : stateP->resetState = false;
347 35484 : stateP->signaled = false;
348 :
349 43464 : for (i = segP->numProcs - 1; i >= 0; i--)
350 : {
351 43464 : if (segP->pgprocnos[i] == MyProcNumber)
352 : {
353 35484 : if (i != segP->numProcs - 1)
354 4558 : segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1];
355 35484 : break;
356 : }
357 : }
358 35484 : if (i < 0)
359 0 : elog(PANIC, "could not find entry in sinval array");
360 35484 : segP->numProcs--;
361 :
362 35484 : LWLockRelease(SInvalWriteLock);
363 35484 : }
364 :
365 : /*
366 : * SIInsertDataEntries
367 : * Add new invalidation message(s) to the buffer.
368 : */
369 : void
370 871394 : SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
371 : {
372 871394 : SISeg *segP = shmInvalBuffer;
373 :
374 : /*
375 : * N can be arbitrarily large. We divide the work into groups of no more
376 : * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
377 : * an unreasonably long time. (This is not so much because we care about
378 : * letting in other writers, as that some just-caught-up backend might be
379 : * trying to do SICleanupQueue to pass on its signal, and we don't want it
380 : * to have to wait a long time.) Also, we need to consider calling
381 : * SICleanupQueue every so often.
382 : */
383 1790628 : while (n > 0)
384 : {
385 919234 : int nthistime = Min(n, WRITE_QUANTUM);
386 : int numMsgs;
387 : int max;
388 : int i;
389 :
390 919234 : n -= nthistime;
391 :
392 919234 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
393 :
394 : /*
395 : * If the buffer is full, we *must* acquire some space. Clean the
396 : * queue and reset anyone who is preventing space from being freed.
397 : * Otherwise, clean the queue only when it's exceeded the next
398 : * fullness threshold. We have to loop and recheck the buffer state
399 : * after any call of SICleanupQueue.
400 : */
401 : for (;;)
402 : {
403 929084 : numMsgs = segP->maxMsgNum - segP->minMsgNum;
404 929084 : if (numMsgs + nthistime > MAXNUMMESSAGES ||
405 928656 : numMsgs >= segP->nextThreshold)
406 9850 : SICleanupQueue(true, nthistime);
407 : else
408 : break;
409 : }
410 :
411 : /*
412 : * Insert new message(s) into proper slot of circular buffer
413 : */
414 919234 : max = segP->maxMsgNum;
415 8642280 : while (nthistime-- > 0)
416 : {
417 7723046 : segP->buffer[max % MAXNUMMESSAGES] = *data++;
418 7723046 : max++;
419 : }
420 :
421 : /* Update current value of maxMsgNum using spinlock */
422 919234 : SpinLockAcquire(&segP->msgnumLock);
423 919234 : segP->maxMsgNum = max;
424 919234 : SpinLockRelease(&segP->msgnumLock);
425 :
426 : /*
427 : * Now that the maxMsgNum change is globally visible, we give everyone
428 : * a swift kick to make sure they read the newly added messages.
429 : * Releasing SInvalWriteLock will enforce a full memory barrier, so
430 : * these (unlocked) changes will be committed to memory before we exit
431 : * the function.
432 : */
433 4823514 : for (i = 0; i < segP->numProcs; i++)
434 : {
435 3904280 : ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
436 :
437 3904280 : stateP->hasMessages = true;
438 : }
439 :
440 919234 : LWLockRelease(SInvalWriteLock);
441 : }
442 871394 : }
443 :
444 : /*
445 : * SIGetDataEntries
446 : * get next SI message(s) for current backend, if there are any
447 : *
448 : * Possible return values:
449 : * 0: no SI message available
450 : * n>0: next n SI messages have been extracted into data[]
451 : * -1: SI reset message extracted
452 : *
453 : * If the return value is less than the array size "datasize", the caller
454 : * can assume that there are no more SI messages after the one(s) returned.
455 : * Otherwise, another call is needed to collect more messages.
456 : *
457 : * NB: this can run in parallel with other instances of SIGetDataEntries
458 : * executing on behalf of other backends, since each instance will modify only
459 : * fields of its own backend's ProcState, and no instance will look at fields
460 : * of other backends' ProcStates. We express this by grabbing SInvalReadLock
461 : * in shared mode. Note that this is not exactly the normal (read-only)
462 : * interpretation of a shared lock! Look closely at the interactions before
463 : * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
464 : *
465 : * NB: this can also run in parallel with SIInsertDataEntries. It is not
466 : * guaranteed that we will return any messages added after the routine is
467 : * entered.
468 : *
469 : * Note: we assume that "datasize" is not so large that it might be important
470 : * to break our hold on SInvalReadLock into segments.
471 : */
472 : int
473 37474000 : SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
474 : {
475 : SISeg *segP;
476 : ProcState *stateP;
477 : int max;
478 : int n;
479 :
480 37474000 : segP = shmInvalBuffer;
481 37474000 : stateP = &segP->procState[MyProcNumber];
482 :
483 : /*
484 : * Before starting to take locks, do a quick, unlocked test to see whether
485 : * there can possibly be anything to read. On a multiprocessor system,
486 : * it's possible that this load could migrate backwards and occur before
487 : * we actually enter this function, so we might miss a sinval message that
488 : * was just added by some other processor. But they can't migrate
489 : * backwards over a preceding lock acquisition, so it should be OK. If we
490 : * haven't acquired a lock preventing against further relevant
491 : * invalidations, any such occurrence is not much different than if the
492 : * invalidation had arrived slightly later in the first place.
493 : */
494 37474000 : if (!stateP->hasMessages)
495 36123904 : return 0;
496 :
497 1350096 : LWLockAcquire(SInvalReadLock, LW_SHARED);
498 :
499 : /*
500 : * We must reset hasMessages before determining how many messages we're
501 : * going to read. That way, if new messages arrive after we have
502 : * determined how many we're reading, the flag will get reset and we'll
503 : * notice those messages part-way through.
504 : *
505 : * Note that, if we don't end up reading all of the messages, we had
506 : * better be certain to reset this flag before exiting!
507 : */
508 1350096 : stateP->hasMessages = false;
509 :
510 : /* Fetch current value of maxMsgNum using spinlock */
511 1350096 : SpinLockAcquire(&segP->msgnumLock);
512 1350096 : max = segP->maxMsgNum;
513 1350096 : SpinLockRelease(&segP->msgnumLock);
514 :
515 1350096 : if (stateP->resetState)
516 : {
517 : /*
518 : * Force reset. We can say we have dealt with any messages added
519 : * since the reset, as well; and that means we should clear the
520 : * signaled flag, too.
521 : */
522 530 : stateP->nextMsgNum = max;
523 530 : stateP->resetState = false;
524 530 : stateP->signaled = false;
525 530 : LWLockRelease(SInvalReadLock);
526 530 : return -1;
527 : }
528 :
529 : /*
530 : * Retrieve messages and advance backend's counter, until data array is
531 : * full or there are no more messages.
532 : *
533 : * There may be other backends that haven't read the message(s), so we
534 : * cannot delete them here. SICleanupQueue() will eventually remove them
535 : * from the queue.
536 : */
537 1349566 : n = 0;
538 30596922 : while (n < datasize && stateP->nextMsgNum < max)
539 : {
540 29247356 : data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
541 29247356 : stateP->nextMsgNum++;
542 : }
543 :
544 : /*
545 : * If we have caught up completely, reset our "signaled" flag so that
546 : * we'll get another signal if we fall behind again.
547 : *
548 : * If we haven't caught up completely, reset the hasMessages flag so that
549 : * we see the remaining messages next time.
550 : */
551 1349566 : if (stateP->nextMsgNum >= max)
552 576868 : stateP->signaled = false;
553 : else
554 772698 : stateP->hasMessages = true;
555 :
556 1349566 : LWLockRelease(SInvalReadLock);
557 1349566 : return n;
558 : }
559 :
560 : /*
561 : * SICleanupQueue
562 : * Remove messages that have been consumed by all active backends
563 : *
564 : * callerHasWriteLock is true if caller is holding SInvalWriteLock.
565 : * minFree is the minimum number of message slots to make free.
566 : *
567 : * Possible side effects of this routine include marking one or more
568 : * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
569 : * to some backend that seems to be getting too far behind. We signal at
570 : * most one backend at a time, for reasons explained at the top of the file.
571 : *
572 : * Caution: because we transiently release write lock when we have to signal
573 : * some other backend, it is NOT guaranteed that there are still minFree
574 : * free message slots at exit. Caller must recheck and perhaps retry.
575 : */
576 : void
577 15484 : SICleanupQueue(bool callerHasWriteLock, int minFree)
578 : {
579 15484 : SISeg *segP = shmInvalBuffer;
580 : int min,
581 : minsig,
582 : lowbound,
583 : numMsgs,
584 : i;
585 15484 : ProcState *needSig = NULL;
586 :
587 : /* Lock out all writers and readers */
588 15484 : if (!callerHasWriteLock)
589 5634 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
590 15484 : LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
591 :
592 : /*
593 : * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
594 : * furthest-back backend that needs signaling (if any), and reset any
595 : * backends that are too far back. Note that because we ignore sendOnly
596 : * backends here it is possible for them to keep sending messages without
597 : * a problem even when they are the only active backend.
598 : */
599 15484 : min = segP->maxMsgNum;
600 15484 : minsig = min - SIG_THRESHOLD;
601 15484 : lowbound = min - MAXNUMMESSAGES + minFree;
602 :
603 117832 : for (i = 0; i < segP->numProcs; i++)
604 : {
605 102348 : ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
606 102348 : int n = stateP->nextMsgNum;
607 :
608 : /* Ignore if already in reset state */
609 : Assert(stateP->procPid != 0);
610 102348 : if (stateP->resetState || stateP->sendOnly)
611 10600 : continue;
612 :
613 : /*
614 : * If we must free some space and this backend is preventing it, force
615 : * him into reset state and then ignore until he catches up.
616 : */
617 91748 : if (n < lowbound)
618 : {
619 532 : stateP->resetState = true;
620 : /* no point in signaling him ... */
621 532 : continue;
622 : }
623 :
624 : /* Track the global minimum nextMsgNum */
625 91216 : if (n < min)
626 23638 : min = n;
627 :
628 : /* Also see who's furthest back of the unsignaled backends */
629 91216 : if (n < minsig && !stateP->signaled)
630 : {
631 5838 : minsig = n;
632 5838 : needSig = stateP;
633 : }
634 : }
635 15484 : segP->minMsgNum = min;
636 :
637 : /*
638 : * When minMsgNum gets really large, decrement all message counters so as
639 : * to forestall overflow of the counters. This happens seldom enough that
640 : * folding it into the previous loop would be a loser.
641 : */
642 15484 : if (min >= MSGNUMWRAPAROUND)
643 : {
644 0 : segP->minMsgNum -= MSGNUMWRAPAROUND;
645 0 : segP->maxMsgNum -= MSGNUMWRAPAROUND;
646 0 : for (i = 0; i < segP->numProcs; i++)
647 0 : segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
648 : }
649 :
650 : /*
651 : * Determine how many messages are still in the queue, and set the
652 : * threshold at which we should repeat SICleanupQueue().
653 : */
654 15484 : numMsgs = segP->maxMsgNum - segP->minMsgNum;
655 15484 : if (numMsgs < CLEANUP_MIN)
656 5212 : segP->nextThreshold = CLEANUP_MIN;
657 : else
658 10272 : segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
659 :
660 : /*
661 : * Lastly, signal anyone who needs a catchup interrupt. Since
662 : * SendProcSignal() might not be fast, we don't want to hold locks while
663 : * executing it.
664 : */
665 15484 : if (needSig)
666 : {
667 5716 : pid_t his_pid = needSig->procPid;
668 5716 : ProcNumber his_procNumber = (needSig - &segP->procState[0]);
669 :
670 5716 : needSig->signaled = true;
671 5716 : LWLockRelease(SInvalReadLock);
672 5716 : LWLockRelease(SInvalWriteLock);
673 5716 : elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
674 5716 : SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_procNumber);
675 5716 : if (callerHasWriteLock)
676 4242 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
677 : }
678 : else
679 : {
680 9768 : LWLockRelease(SInvalReadLock);
681 9768 : if (!callerHasWriteLock)
682 4160 : LWLockRelease(SInvalWriteLock);
683 : }
684 15484 : }
685 :
686 :
687 : /*
688 : * GetNextLocalTransactionId --- allocate a new LocalTransactionId
689 : *
690 : * We split VirtualTransactionIds into two parts so that it is possible
691 : * to allocate a new one without any contention for shared memory, except
692 : * for a bit of additional overhead during backend startup/shutdown.
693 : * The high-order part of a VirtualTransactionId is a ProcNumber, and the
694 : * low-order part is a LocalTransactionId, which we assign from a local
695 : * counter. To avoid the risk of a VirtualTransactionId being reused
696 : * within a short interval, successive procs occupying the same PGPROC slot
697 : * should use a consecutive sequence of local IDs, which is implemented
698 : * by copying nextLocalTransactionId as seen above.
699 : */
700 : LocalTransactionId
701 887710 : GetNextLocalTransactionId(void)
702 : {
703 : LocalTransactionId result;
704 :
705 : /* loop to avoid returning InvalidLocalTransactionId at wraparound */
706 : do
707 : {
708 887710 : result = nextLocalTransactionId++;
709 887710 : } while (!LocalTransactionIdIsValid(result));
710 :
711 868584 : return result;
712 : }
|