Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * sinval.c
4 : * POSTGRES shared cache invalidation communication code.
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/storage/ipc/sinval.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/xact.h"
18 : #include "miscadmin.h"
19 : #include "storage/latch.h"
20 : #include "storage/sinvaladt.h"
21 : #include "utils/inval.h"
22 :
23 :
24 : uint64 SharedInvalidMessageCounter;
25 :
26 :
27 : /*
28 : * Because backends sitting idle will not be reading sinval events, we
29 : * need a way to give an idle backend a swift kick in the rear and make
30 : * it catch up before the sinval queue overflows and forces it to go
31 : * through a cache reset exercise. This is done by sending
32 : * PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far behind.
33 : *
34 : * The signal handler will set an interrupt pending flag and will set the
35 : * processes latch. Whenever starting to read from the client, or when
36 : * interrupted while doing so, ProcessClientReadInterrupt() will call
37 : * ProcessCatchupEvent().
38 : */
39 : volatile sig_atomic_t catchupInterruptPending = false;
40 :
41 :
42 : /*
43 : * SendSharedInvalidMessages
44 : * Add shared-cache-invalidation message(s) to the global SI message queue.
45 : */
46 : void
47 447158 : SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n)
48 : {
49 447158 : SIInsertDataEntries(msgs, n);
50 447158 : }
51 :
52 : /*
53 : * ReceiveSharedInvalidMessages
54 : * Process shared-cache-invalidation messages waiting for this backend
55 : *
56 : * We guarantee to process all messages that had been queued before the
57 : * routine was entered. It is of course possible for more messages to get
58 : * queued right after our last SIGetDataEntries call.
59 : *
60 : * NOTE: it is entirely possible for this routine to be invoked recursively
61 : * as a consequence of processing inside the invalFunction or resetFunction.
62 : * Furthermore, such a recursive call must guarantee that all outstanding
63 : * inval messages have been processed before it exits. This is the reason
64 : * for the strange-looking choice to use a statically allocated buffer array
65 : * and counters; it's so that a recursive call can process messages already
66 : * sucked out of sinvaladt.c.
67 : */
68 : void
69 20282102 : ReceiveSharedInvalidMessages(void (*invalFunction) (SharedInvalidationMessage *msg),
70 : void (*resetFunction) (void))
71 : {
72 : #define MAXINVALMSGS 32
73 : static SharedInvalidationMessage messages[MAXINVALMSGS];
74 :
75 : /*
76 : * We use volatile here to prevent bugs if a compiler doesn't realize that
77 : * recursion is a possibility ...
78 : */
79 : static volatile int nextmsg = 0;
80 : static volatile int nummsgs = 0;
81 :
82 : /* Deal with any messages still pending from an outer recursion */
83 20282927 : while (nextmsg < nummsgs)
84 : {
85 825 : SharedInvalidationMessage msg = messages[nextmsg++];
86 :
87 825 : SharedInvalidMessageCounter++;
88 825 : invalFunction(&msg);
89 : }
90 :
91 : do
92 : {
93 : int getResult;
94 :
95 20758866 : nextmsg = nummsgs = 0;
96 :
97 : /* Try to get some more messages */
98 20758866 : getResult = SIGetDataEntries(messages, MAXINVALMSGS);
99 :
100 20758866 : if (getResult < 0)
101 : {
102 : /* got a reset message */
103 217 : elog(DEBUG4, "cache state reset");
104 217 : SharedInvalidMessageCounter++;
105 217 : resetFunction();
106 217 : break; /* nothing more to do */
107 : }
108 :
109 : /* Process them, being wary that a recursive call might eat some */
110 20758649 : nextmsg = 0;
111 20758649 : nummsgs = getResult;
112 :
113 38357793 : while (nextmsg < nummsgs)
114 : {
115 17599144 : SharedInvalidationMessage msg = messages[nextmsg++];
116 :
117 17599144 : SharedInvalidMessageCounter++;
118 17599144 : invalFunction(&msg);
119 : }
120 :
121 : /*
122 : * We only need to loop if the last SIGetDataEntries call (which might
123 : * have been within a recursive call) returned a full buffer.
124 : */
125 20758649 : } while (nummsgs == MAXINVALMSGS);
126 :
127 : /*
128 : * We are now caught up. If we received a catchup signal, reset that
129 : * flag, and call SICleanupQueue(). This is not so much because we need
130 : * to flush dead messages right now, as that we want to pass on the
131 : * catchup signal to the next slowest backend. "Daisy chaining" the
132 : * catchup signal this way avoids creating spikes in system load for what
133 : * should be just a background maintenance activity.
134 : */
135 20282102 : if (catchupInterruptPending)
136 : {
137 2879 : catchupInterruptPending = false;
138 2879 : elog(DEBUG4, "sinval catchup complete, cleaning queue");
139 2879 : SICleanupQueue(false, 0);
140 : }
141 20282102 : }
142 :
143 :
144 : /*
145 : * HandleCatchupInterrupt
146 : *
147 : * This is called when PROCSIG_CATCHUP_INTERRUPT is received.
148 : *
149 : * We used to directly call ProcessCatchupEvent directly when idle. These days
150 : * we just set a flag to do it later and notify the process of that fact by
151 : * setting the process's latch.
152 : */
153 : void
154 2880 : HandleCatchupInterrupt(void)
155 : {
156 : /*
157 : * Note: this is called by a SIGNAL HANDLER. You must be very wary what
158 : * you do here.
159 : */
160 :
161 2880 : catchupInterruptPending = true;
162 :
163 : /* make sure the event is processed in due course */
164 2880 : SetLatch(MyLatch);
165 2880 : }
166 :
167 : /*
168 : * ProcessCatchupInterrupt
169 : *
170 : * The portion of catchup interrupt handling that runs outside of the signal
171 : * handler, which allows it to actually process pending invalidations.
172 : */
173 : void
174 5485 : ProcessCatchupInterrupt(void)
175 : {
176 6981 : while (catchupInterruptPending)
177 : {
178 : /*
179 : * What we need to do here is cause ReceiveSharedInvalidMessages() to
180 : * run, which will do the necessary work and also reset the
181 : * catchupInterruptPending flag. If we are inside a transaction we
182 : * can just call AcceptInvalidationMessages() to do this. If we
183 : * aren't, we start and immediately end a transaction; the call to
184 : * AcceptInvalidationMessages() happens down inside transaction start.
185 : *
186 : * It is awfully tempting to just call AcceptInvalidationMessages()
187 : * without the rest of the xact start/stop overhead, and I think that
188 : * would actually work in the normal case; but I am not sure that
189 : * things would clean up nicely if we got an error partway through.
190 : */
191 1496 : if (IsTransactionOrTransactionBlock())
192 : {
193 36 : elog(DEBUG4, "ProcessCatchupEvent inside transaction");
194 36 : AcceptInvalidationMessages();
195 : }
196 : else
197 : {
198 1460 : elog(DEBUG4, "ProcessCatchupEvent outside transaction");
199 1460 : StartTransactionCommand();
200 1460 : CommitTransactionCommand();
201 : }
202 : }
203 5485 : }
|