Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * sinvaladt.c
4 : * POSTGRES shared cache invalidation data manager.
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/storage/ipc/sinvaladt.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include <signal.h>
18 : #include <unistd.h>
19 :
20 : #include "miscadmin.h"
21 : #include "storage/ipc.h"
22 : #include "storage/proc.h"
23 : #include "storage/procnumber.h"
24 : #include "storage/procsignal.h"
25 : #include "storage/shmem.h"
26 : #include "storage/sinvaladt.h"
27 : #include "storage/spin.h"
28 : #include "storage/subsystems.h"
29 :
30 : /*
31 : * Conceptually, the shared cache invalidation messages are stored in an
32 : * infinite array, where maxMsgNum is the next array subscript to store a
33 : * submitted message in, minMsgNum is the smallest array subscript containing
34 : * a message not yet read by all backends, and we always have maxMsgNum >=
35 : * minMsgNum. (They are equal when there are no messages pending.) For each
36 : * active backend, there is a nextMsgNum pointer indicating the next message it
37 : * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
38 : * backend.
39 : *
40 : * (In the current implementation, minMsgNum is a lower bound for the
41 : * per-process nextMsgNum values, but it isn't rigorously kept equal to the
42 : * smallest nextMsgNum --- it may lag behind. We only update it when
43 : * SICleanupQueue is called, and we try not to do that often.)
44 : *
45 : * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
46 : * entries. We translate MsgNum values into circular-buffer indexes by
47 : * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
48 : * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
49 : * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
50 : * in the buffer. If the buffer does overflow, we recover by setting the
51 : * "reset" flag for each backend that has fallen too far behind. A backend
52 : * that is in "reset" state is ignored while determining minMsgNum. When
53 : * it does finally attempt to receive inval messages, it must discard all
54 : * its invalidatable state, since it won't know what it missed.
55 : *
56 : * To reduce the probability of needing resets, we send a "catchup" interrupt
57 : * to any backend that seems to be falling unreasonably far behind. The
58 : * normal behavior is that at most one such interrupt is in flight at a time;
59 : * when a backend completes processing a catchup interrupt, it executes
60 : * SICleanupQueue, which will signal the next-furthest-behind backend if
61 : * needed. This avoids undue contention from multiple backends all trying
62 : * to catch up at once. However, the furthest-back backend might be stuck
63 : * in a state where it can't catch up. Eventually it will get reset, so it
64 : * won't cause any more problems for anyone but itself. But we don't want
65 : * to find that a bunch of other backends are now too close to the reset
66 : * threshold to be saved. So SICleanupQueue is designed to occasionally
67 : * send extra catchup interrupts as the queue gets fuller, to backends that
68 : * are far behind and haven't gotten one yet. As long as there aren't a lot
69 : * of "stuck" backends, we won't need a lot of extra interrupts, since ones
70 : * that aren't stuck will propagate their interrupts to the next guy.
71 : *
72 : * We would have problems if the MsgNum values overflow an integer, so
73 : * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
74 : * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
75 : * large so that we don't need to do this often. It must be a multiple of
76 : * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
77 : * to be moved when we do it.
78 : *
79 : * Access to the shared sinval array is protected by two locks, SInvalReadLock
80 : * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
81 : * authorizes them to modify their own ProcState but not to modify or even
82 : * look at anyone else's. When we need to perform array-wide updates,
83 : * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
84 : * lock out all readers. Writers take SInvalWriteLock (always in exclusive
85 : * mode) to serialize adding messages to the queue. Note that a writer
86 : * can operate in parallel with one or more readers, because the writer
87 : * has no need to touch anyone's ProcState, except in the infrequent cases
88 : * when SICleanupQueue is needed. The only point of overlap is that
89 : * the writer wants to change maxMsgNum while readers need to read it.
90 : * We deal with that by having a spinlock that readers must take for just
91 : * long enough to read maxMsgNum, while writers take it for just long enough
92 : * to write maxMsgNum. (The exact rule is that you need the spinlock to
93 : * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
94 : * spinlock to write maxMsgNum unless you are holding both locks.)
95 : *
96 : * Note: since maxMsgNum is an int and hence presumably atomically readable/
97 : * writable, the spinlock might seem unnecessary. The reason it is needed
98 : * is to provide a memory barrier: we need to be sure that messages written
99 : * to the array are actually there before maxMsgNum is increased, and that
100 : * readers will see that data after fetching maxMsgNum. Multiprocessors
101 : * that have weak memory-ordering guarantees can fail without the memory
102 : * barrier instructions that are included in the spinlock sequences.
103 : */
104 :
105 :
106 : /*
107 : * Configurable parameters.
108 : *
109 : * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
110 : * Must be a power of 2 for speed.
111 : *
112 : * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
113 : * Must be a multiple of MAXNUMMESSAGES. Should be large.
114 : *
115 : * CLEANUP_MIN: the minimum number of messages that must be in the buffer
116 : * before we bother to call SICleanupQueue.
117 : *
118 : * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
119 : * we exceed CLEANUP_MIN. Should be a power of 2 for speed.
120 : *
121 : * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
122 : * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
123 : *
124 : * WRITE_QUANTUM: the max number of messages to push into the buffer per
125 : * iteration of SIInsertDataEntries. Noncritical but should be less than
126 : * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
127 : * per iteration.
128 : */
129 :
130 : #define MAXNUMMESSAGES 4096
131 : #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
132 : #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
133 : #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
134 : #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
135 : #define WRITE_QUANTUM 64
136 :
137 : /* Per-backend state in shared invalidation structure */
138 : typedef struct ProcState
139 : {
140 : /* procPid is zero in an inactive ProcState array entry. */
141 : pid_t procPid; /* PID of backend, for signaling */
142 : /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
143 : int nextMsgNum; /* next message number to read */
144 : bool resetState; /* backend needs to reset its state */
145 : bool signaled; /* backend has been sent catchup signal */
146 : bool hasMessages; /* backend has unread messages */
147 :
148 : /*
149 : * Backend only sends invalidations, never receives them. This only makes
150 : * sense for Startup process during recovery because it doesn't maintain a
151 : * relcache, yet it fires inval messages to allow query backends to see
152 : * schema changes.
153 : */
154 : bool sendOnly; /* backend only sends, never receives */
155 :
156 : /*
157 : * Next LocalTransactionId to use for each idle backend slot. We keep
158 : * this here because it is indexed by ProcNumber and it is convenient to
159 : * copy the value to and from local memory when MyProcNumber is set. It's
160 : * meaningless in an active ProcState entry.
161 : */
162 : LocalTransactionId nextLXID;
163 : } ProcState;
164 :
165 : /* Shared cache invalidation memory segment */
166 : typedef struct SISeg
167 : {
168 : /*
169 : * General state information
170 : */
171 : int minMsgNum; /* oldest message still needed */
172 : int maxMsgNum; /* next message number to be assigned */
173 : int nextThreshold; /* # of messages to call SICleanupQueue */
174 :
175 : slock_t msgnumLock; /* spinlock protecting maxMsgNum */
176 :
177 : /*
178 : * Circular buffer holding shared-inval messages
179 : */
180 : SharedInvalidationMessage buffer[MAXNUMMESSAGES];
181 :
182 : /*
183 : * Per-backend invalidation state info.
184 : *
185 : * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno.
186 : * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is
187 : * a dense array of their indexes, to speed up scanning all in-use slots.
188 : *
189 : * 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but
190 : * having our separate copy avoids contention on ProcArrayLock, and allows
191 : * us to track only the processes that participate in shared cache
192 : * invalidations.
193 : */
194 : int numProcs;
195 : int *pgprocnos;
196 : ProcState procState[FLEXIBLE_ARRAY_MEMBER];
197 : } SISeg;
198 :
199 : /*
200 : * We reserve a slot for each possible ProcNumber, plus one for each
201 : * possible auxiliary process type. (This scheme assumes there is not
202 : * more than one of any auxiliary process type at a time, except for
203 : * IO workers.)
204 : */
205 : #define NumProcStateSlots (MaxBackends + NUM_AUXILIARY_PROCS)
206 :
207 : static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
208 :
209 : static void SharedInvalShmemRequest(void *arg);
210 : static void SharedInvalShmemInit(void *arg);
211 :
212 : const ShmemCallbacks SharedInvalShmemCallbacks = {
213 : .request_fn = SharedInvalShmemRequest,
214 : .init_fn = SharedInvalShmemInit,
215 : };
216 :
217 :
218 : static LocalTransactionId nextLocalTransactionId;
219 :
220 : static void CleanupInvalidationState(int status, Datum arg);
221 :
222 :
223 : /*
224 : * SharedInvalShmemRequest
225 : * Register shared memory needs for the SI message buffer
226 : */
227 : static void
228 1233 : SharedInvalShmemRequest(void *arg)
229 : {
230 : Size size;
231 :
232 1233 : size = offsetof(SISeg, procState);
233 1233 : size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */
234 1233 : size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */
235 :
236 1233 : ShmemRequestStruct(.name = "shmInvalBuffer",
237 : .size = size,
238 : .ptr = (void **) &shmInvalBuffer,
239 : );
240 1233 : }
241 :
242 : static void
243 1230 : SharedInvalShmemInit(void *arg)
244 : {
245 : int i;
246 :
247 : /* Clear message counters, init spinlock */
248 1230 : shmInvalBuffer->minMsgNum = 0;
249 1230 : shmInvalBuffer->maxMsgNum = 0;
250 1230 : shmInvalBuffer->nextThreshold = CLEANUP_MIN;
251 1230 : SpinLockInit(&shmInvalBuffer->msgnumLock);
252 :
253 : /* The buffer[] array is initially all unused, so we need not fill it */
254 :
255 : /* Mark all backends inactive, and initialize nextLXID */
256 163161 : for (i = 0; i < NumProcStateSlots; i++)
257 : {
258 161931 : shmInvalBuffer->procState[i].procPid = 0; /* inactive */
259 161931 : shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
260 161931 : shmInvalBuffer->procState[i].resetState = false;
261 161931 : shmInvalBuffer->procState[i].signaled = false;
262 161931 : shmInvalBuffer->procState[i].hasMessages = false;
263 161931 : shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
264 : }
265 1230 : shmInvalBuffer->numProcs = 0;
266 1230 : shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i];
267 1230 : }
268 :
269 : /*
270 : * SharedInvalBackendInit
271 : * Initialize a new backend to operate on the sinval buffer
272 : */
273 : void
274 19953 : SharedInvalBackendInit(bool sendOnly)
275 : {
276 : ProcState *stateP;
277 : pid_t oldPid;
278 19953 : SISeg *segP = shmInvalBuffer;
279 :
280 19953 : if (MyProcNumber < 0)
281 0 : elog(ERROR, "MyProcNumber not set");
282 19953 : if (MyProcNumber >= NumProcStateSlots)
283 0 : elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)",
284 : MyProcNumber, NumProcStateSlots);
285 19953 : stateP = &segP->procState[MyProcNumber];
286 :
287 : /*
288 : * This can run in parallel with read operations, but not with write
289 : * operations, since SIInsertDataEntries relies on the pgprocnos array to
290 : * set hasMessages appropriately.
291 : */
292 19953 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
293 :
294 19953 : oldPid = stateP->procPid;
295 19953 : if (oldPid != 0)
296 : {
297 0 : LWLockRelease(SInvalWriteLock);
298 0 : elog(ERROR, "sinval slot for backend %d is already in use by process %d",
299 : MyProcNumber, (int) oldPid);
300 : }
301 :
302 19953 : shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = MyProcNumber;
303 :
304 : /* Fetch next local transaction ID into local memory */
305 19953 : nextLocalTransactionId = stateP->nextLXID;
306 :
307 : /* mark myself active, with all extant messages already read */
308 19953 : stateP->procPid = MyProcPid;
309 19953 : stateP->nextMsgNum = segP->maxMsgNum;
310 19953 : stateP->resetState = false;
311 19953 : stateP->signaled = false;
312 19953 : stateP->hasMessages = false;
313 19953 : stateP->sendOnly = sendOnly;
314 :
315 19953 : LWLockRelease(SInvalWriteLock);
316 :
317 : /* register exit routine to mark my entry inactive at exit */
318 19953 : on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
319 19953 : }
320 :
321 : /*
322 : * CleanupInvalidationState
323 : * Mark the current backend as no longer active.
324 : *
325 : * This function is called via on_shmem_exit() during backend shutdown.
326 : *
327 : * arg is really of type "SISeg*".
328 : */
329 : static void
330 19953 : CleanupInvalidationState(int status, Datum arg)
331 : {
332 19953 : SISeg *segP = (SISeg *) DatumGetPointer(arg);
333 : ProcState *stateP;
334 : int i;
335 :
336 : Assert(segP);
337 :
338 19953 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
339 :
340 19953 : stateP = &segP->procState[MyProcNumber];
341 :
342 : /* Update next local transaction ID for next holder of this proc number */
343 19953 : stateP->nextLXID = nextLocalTransactionId;
344 :
345 : /* Mark myself inactive */
346 19953 : stateP->procPid = 0;
347 19953 : stateP->nextMsgNum = 0;
348 19953 : stateP->resetState = false;
349 19953 : stateP->signaled = false;
350 :
351 28017 : for (i = segP->numProcs - 1; i >= 0; i--)
352 : {
353 28017 : if (segP->pgprocnos[i] == MyProcNumber)
354 : {
355 19953 : if (i != segP->numProcs - 1)
356 3685 : segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1];
357 19953 : break;
358 : }
359 : }
360 19953 : if (i < 0)
361 0 : elog(PANIC, "could not find entry in sinval array");
362 19953 : segP->numProcs--;
363 :
364 19953 : LWLockRelease(SInvalWriteLock);
365 19953 : }
366 :
367 : /*
368 : * SIInsertDataEntries
369 : * Add new invalidation message(s) to the buffer.
370 : */
371 : void
372 534323 : SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
373 : {
374 534323 : SISeg *segP = shmInvalBuffer;
375 :
376 : /*
377 : * N can be arbitrarily large. We divide the work into groups of no more
378 : * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
379 : * an unreasonably long time. (This is not so much because we care about
380 : * letting in other writers, as that some just-caught-up backend might be
381 : * trying to do SICleanupQueue to pass on its signal, and we don't want it
382 : * to have to wait a long time.) Also, we need to consider calling
383 : * SICleanupQueue every so often.
384 : */
385 1097767 : while (n > 0)
386 : {
387 563444 : int nthistime = Min(n, WRITE_QUANTUM);
388 : int numMsgs;
389 : int max;
390 : int i;
391 :
392 563444 : n -= nthistime;
393 :
394 563444 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
395 :
396 : /*
397 : * If the buffer is full, we *must* acquire some space. Clean the
398 : * queue and reset anyone who is preventing space from being freed.
399 : * Otherwise, clean the queue only when it's exceeded the next
400 : * fullness threshold. We have to loop and recheck the buffer state
401 : * after any call of SICleanupQueue.
402 : */
403 : for (;;)
404 : {
405 569921 : numMsgs = segP->maxMsgNum - segP->minMsgNum;
406 569921 : if (numMsgs + nthistime > MAXNUMMESSAGES ||
407 569680 : numMsgs >= segP->nextThreshold)
408 6477 : SICleanupQueue(true, nthistime);
409 : else
410 : break;
411 : }
412 :
413 : /*
414 : * Insert new message(s) into proper slot of circular buffer
415 : */
416 563444 : max = segP->maxMsgNum;
417 5400569 : while (nthistime-- > 0)
418 : {
419 4837125 : segP->buffer[max % MAXNUMMESSAGES] = *data++;
420 4837125 : max++;
421 : }
422 :
423 : /* Update current value of maxMsgNum using spinlock */
424 563444 : SpinLockAcquire(&segP->msgnumLock);
425 563444 : segP->maxMsgNum = max;
426 563444 : SpinLockRelease(&segP->msgnumLock);
427 :
428 : /*
429 : * Now that the maxMsgNum change is globally visible, we give everyone
430 : * a swift kick to make sure they read the newly added messages.
431 : * Releasing SInvalWriteLock will enforce a full memory barrier, so
432 : * these (unlocked) changes will be committed to memory before we exit
433 : * the function.
434 : */
435 3384998 : for (i = 0; i < segP->numProcs; i++)
436 : {
437 2821554 : ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
438 :
439 2821554 : stateP->hasMessages = true;
440 : }
441 :
442 563444 : LWLockRelease(SInvalWriteLock);
443 : }
444 534323 : }
445 :
446 : /*
447 : * SIGetDataEntries
448 : * get next SI message(s) for current backend, if there are any
449 : *
450 : * Possible return values:
451 : * 0: no SI message available
452 : * n>0: next n SI messages have been extracted into data[]
453 : * -1: SI reset message extracted
454 : *
455 : * If the return value is less than the array size "datasize", the caller
456 : * can assume that there are no more SI messages after the one(s) returned.
457 : * Otherwise, another call is needed to collect more messages.
458 : *
459 : * NB: this can run in parallel with other instances of SIGetDataEntries
460 : * executing on behalf of other backends, since each instance will modify only
461 : * fields of its own backend's ProcState, and no instance will look at fields
462 : * of other backends' ProcStates. We express this by grabbing SInvalReadLock
463 : * in shared mode. Note that this is not exactly the normal (read-only)
464 : * interpretation of a shared lock! Look closely at the interactions before
465 : * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
466 : *
467 : * NB: this can also run in parallel with SIInsertDataEntries. It is not
468 : * guaranteed that we will return any messages added after the routine is
469 : * entered.
470 : *
471 : * Note: we assume that "datasize" is not so large that it might be important
472 : * to break our hold on SInvalReadLock into segments.
473 : */
474 : int
475 24803429 : SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
476 : {
477 : SISeg *segP;
478 : ProcState *stateP;
479 : int max;
480 : int n;
481 :
482 24803429 : segP = shmInvalBuffer;
483 24803429 : stateP = &segP->procState[MyProcNumber];
484 :
485 : /*
486 : * Before starting to take locks, do a quick, unlocked test to see whether
487 : * there can possibly be anything to read. On a multiprocessor system,
488 : * it's possible that this load could migrate backwards and occur before
489 : * we actually enter this function, so we might miss a sinval message that
490 : * was just added by some other processor. But they can't migrate
491 : * backwards over a preceding lock acquisition, so it should be OK. If we
492 : * haven't acquired a lock preventing against further relevant
493 : * invalidations, any such occurrence is not much different than if the
494 : * invalidation had arrived slightly later in the first place.
495 : */
496 24803429 : if (!stateP->hasMessages)
497 23856922 : return 0;
498 :
499 946507 : LWLockAcquire(SInvalReadLock, LW_SHARED);
500 :
501 : /*
502 : * We must reset hasMessages before determining how many messages we're
503 : * going to read. That way, if new messages arrive after we have
504 : * determined how many we're reading, the flag will get reset and we'll
505 : * notice those messages part-way through.
506 : *
507 : * Note that, if we don't end up reading all of the messages, we had
508 : * better be certain to reset this flag before exiting!
509 : */
510 946507 : stateP->hasMessages = false;
511 :
512 : /* Fetch current value of maxMsgNum using spinlock */
513 946507 : SpinLockAcquire(&segP->msgnumLock);
514 946507 : max = segP->maxMsgNum;
515 946507 : SpinLockRelease(&segP->msgnumLock);
516 :
517 946507 : if (stateP->resetState)
518 : {
519 : /*
520 : * Force reset. We can say we have dealt with any messages added
521 : * since the reset, as well; and that means we should clear the
522 : * signaled flag, too.
523 : */
524 296 : stateP->nextMsgNum = max;
525 296 : stateP->resetState = false;
526 296 : stateP->signaled = false;
527 296 : LWLockRelease(SInvalReadLock);
528 296 : return -1;
529 : }
530 :
531 : /*
532 : * Retrieve messages and advance backend's counter, until data array is
533 : * full or there are no more messages.
534 : *
535 : * There may be other backends that haven't read the message(s), so we
536 : * cannot delete them here. SICleanupQueue() will eventually remove them
537 : * from the queue.
538 : */
539 946211 : n = 0;
540 22357844 : while (n < datasize && stateP->nextMsgNum < max)
541 : {
542 21411633 : data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
543 21411633 : stateP->nextMsgNum++;
544 : }
545 :
546 : /*
547 : * If we have caught up completely, reset our "signaled" flag so that
548 : * we'll get another signal if we fall behind again.
549 : *
550 : * If we haven't caught up completely, reset the hasMessages flag so that
551 : * we see the remaining messages next time.
552 : */
553 946211 : if (stateP->nextMsgNum >= max)
554 377712 : stateP->signaled = false;
555 : else
556 568499 : stateP->hasMessages = true;
557 :
558 946211 : LWLockRelease(SInvalReadLock);
559 946211 : return n;
560 : }
561 :
562 : /*
563 : * SICleanupQueue
564 : * Remove messages that have been consumed by all active backends
565 : *
566 : * callerHasWriteLock is true if caller is holding SInvalWriteLock.
567 : * minFree is the minimum number of message slots to make free.
568 : *
569 : * Possible side effects of this routine include marking one or more
570 : * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
571 : * to some backend that seems to be getting too far behind. We signal at
572 : * most one backend at a time, for reasons explained at the top of the file.
573 : *
574 : * Caution: because we transiently release write lock when we have to signal
575 : * some other backend, it is NOT guaranteed that there are still minFree
576 : * free message slots at exit. Caller must recheck and perhaps retry.
577 : */
578 : void
579 9614 : SICleanupQueue(bool callerHasWriteLock, int minFree)
580 : {
581 9614 : SISeg *segP = shmInvalBuffer;
582 : int min,
583 : minsig,
584 : lowbound,
585 : numMsgs,
586 : i;
587 9614 : ProcState *needSig = NULL;
588 :
589 : /* Lock out all writers and readers */
590 9614 : if (!callerHasWriteLock)
591 3137 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
592 9614 : LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
593 :
594 : /*
595 : * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
596 : * furthest-back backend that needs signaling (if any), and reset any
597 : * backends that are too far back. Note that because we ignore sendOnly
598 : * backends here it is possible for them to keep sending messages without
599 : * a problem even when they are the only active backend.
600 : */
601 9614 : min = segP->maxMsgNum;
602 9614 : minsig = min - SIG_THRESHOLD;
603 9614 : lowbound = min - MAXNUMMESSAGES + minFree;
604 :
605 81710 : for (i = 0; i < segP->numProcs; i++)
606 : {
607 72096 : ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
608 72096 : int n = stateP->nextMsgNum;
609 :
610 : /* Ignore if already in reset state */
611 : Assert(stateP->procPid != 0);
612 72096 : if (stateP->resetState || stateP->sendOnly)
613 5601 : continue;
614 :
615 : /*
616 : * If we must free some space and this backend is preventing it, force
617 : * him into reset state and then ignore until he catches up.
618 : */
619 66495 : if (n < lowbound)
620 : {
621 299 : stateP->resetState = true;
622 : /* no point in signaling him ... */
623 299 : continue;
624 : }
625 :
626 : /* Track the global minimum nextMsgNum */
627 66196 : if (n < min)
628 14722 : min = n;
629 :
630 : /* Also see who's furthest back of the unsignaled backends */
631 66196 : if (n < minsig && !stateP->signaled)
632 : {
633 3638 : minsig = n;
634 3638 : needSig = stateP;
635 : }
636 : }
637 9614 : segP->minMsgNum = min;
638 :
639 : /*
640 : * When minMsgNum gets really large, decrement all message counters so as
641 : * to forestall overflow of the counters. This happens seldom enough that
642 : * folding it into the previous loop would be a loser.
643 : */
644 9614 : if (min >= MSGNUMWRAPAROUND)
645 : {
646 0 : segP->minMsgNum -= MSGNUMWRAPAROUND;
647 0 : segP->maxMsgNum -= MSGNUMWRAPAROUND;
648 0 : for (i = 0; i < segP->numProcs; i++)
649 0 : segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
650 : }
651 :
652 : /*
653 : * Determine how many messages are still in the queue, and set the
654 : * threshold at which we should repeat SICleanupQueue().
655 : */
656 9614 : numMsgs = segP->maxMsgNum - segP->minMsgNum;
657 9614 : if (numMsgs < CLEANUP_MIN)
658 3142 : segP->nextThreshold = CLEANUP_MIN;
659 : else
660 6472 : segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
661 :
662 : /*
663 : * Lastly, signal anyone who needs a catchup interrupt. Since
664 : * SendProcSignal() might not be fast, we don't want to hold locks while
665 : * executing it.
666 : */
667 9614 : if (needSig)
668 : {
669 3510 : pid_t his_pid = needSig->procPid;
670 3510 : ProcNumber his_procNumber = (needSig - &segP->procState[0]);
671 :
672 3510 : needSig->signaled = true;
673 3510 : LWLockRelease(SInvalReadLock);
674 3510 : LWLockRelease(SInvalWriteLock);
675 3510 : elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
676 3510 : SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_procNumber);
677 3510 : if (callerHasWriteLock)
678 2704 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
679 : }
680 : else
681 : {
682 6104 : LWLockRelease(SInvalReadLock);
683 6104 : if (!callerHasWriteLock)
684 2331 : LWLockRelease(SInvalWriteLock);
685 : }
686 9614 : }
687 :
688 :
689 : /*
690 : * GetNextLocalTransactionId --- allocate a new LocalTransactionId
691 : *
692 : * We split VirtualTransactionIds into two parts so that it is possible
693 : * to allocate a new one without any contention for shared memory, except
694 : * for a bit of additional overhead during backend startup/shutdown.
695 : * The high-order part of a VirtualTransactionId is a ProcNumber, and the
696 : * low-order part is a LocalTransactionId, which we assign from a local
697 : * counter. To avoid the risk of a VirtualTransactionId being reused
698 : * within a short interval, successive procs occupying the same PGPROC slot
699 : * should use a consecutive sequence of local IDs, which is implemented
700 : * by copying nextLocalTransactionId as seen above.
701 : */
702 : LocalTransactionId
703 627105 : GetNextLocalTransactionId(void)
704 : {
705 : LocalTransactionId result;
706 :
707 : /* loop to avoid returning InvalidLocalTransactionId at wraparound */
708 : do
709 : {
710 638293 : result = nextLocalTransactionId++;
711 638293 : } while (!LocalTransactionIdIsValid(result));
712 :
713 627105 : return result;
714 : }
|