Line data Source code
1 : /*------------------------------------------------------------------------- 2 : * 3 : * sinval.c 4 : * POSTGRES shared cache invalidation communication code. 5 : * 6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group 7 : * Portions Copyright (c) 1994, Regents of the University of California 8 : * 9 : * 10 : * IDENTIFICATION 11 : * src/backend/storage/ipc/sinval.c 12 : * 13 : *------------------------------------------------------------------------- 14 : */ 15 : #include "postgres.h" 16 : 17 : #include "access/xact.h" 18 : #include "miscadmin.h" 19 : #include "storage/latch.h" 20 : #include "storage/sinvaladt.h" 21 : #include "utils/inval.h" 22 : 23 : 24 : uint64 SharedInvalidMessageCounter; 25 : 26 : 27 : /* 28 : * Because backends sitting idle will not be reading sinval events, we 29 : * need a way to give an idle backend a swift kick in the rear and make 30 : * it catch up before the sinval queue overflows and forces it to go 31 : * through a cache reset exercise. This is done by sending 32 : * PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far behind. 33 : * 34 : * The signal handler will set an interrupt pending flag and will set the 35 : * processes latch. Whenever starting to read from the client, or when 36 : * interrupted while doing so, ProcessClientReadInterrupt() will call 37 : * ProcessCatchupEvent(). 38 : */ 39 : volatile sig_atomic_t catchupInterruptPending = false; 40 : 41 : 42 : /* 43 : * SendSharedInvalidMessages 44 : * Add shared-cache-invalidation message(s) to the global SI message queue. 45 : */ 46 : void 47 783312 : SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n) 48 : { 49 783312 : SIInsertDataEntries(msgs, n); 50 783312 : } 51 : 52 : /* 53 : * ReceiveSharedInvalidMessages 54 : * Process shared-cache-invalidation messages waiting for this backend 55 : * 56 : * We guarantee to process all messages that had been queued before the 57 : * routine was entered. It is of course possible for more messages to get 58 : * queued right after our last SIGetDataEntries call. 59 : * 60 : * NOTE: it is entirely possible for this routine to be invoked recursively 61 : * as a consequence of processing inside the invalFunction or resetFunction. 62 : * Furthermore, such a recursive call must guarantee that all outstanding 63 : * inval messages have been processed before it exits. This is the reason 64 : * for the strange-looking choice to use a statically allocated buffer array 65 : * and counters; it's so that a recursive call can process messages already 66 : * sucked out of sinvaladt.c. 67 : */ 68 : void 69 31514800 : ReceiveSharedInvalidMessages(void (*invalFunction) (SharedInvalidationMessage *msg), 70 : void (*resetFunction) (void)) 71 : { 72 : #define MAXINVALMSGS 32 73 : static SharedInvalidationMessage messages[MAXINVALMSGS]; 74 : 75 : /* 76 : * We use volatile here to prevent bugs if a compiler doesn't realize that 77 : * recursion is a possibility ... 78 : */ 79 : static volatile int nextmsg = 0; 80 : static volatile int nummsgs = 0; 81 : 82 : /* Deal with any messages still pending from an outer recursion */ 83 31516722 : while (nextmsg < nummsgs) 84 : { 85 1990 : SharedInvalidationMessage msg = messages[nextmsg++]; 86 : 87 1990 : SharedInvalidMessageCounter++; 88 1990 : invalFunction(&msg); 89 : } 90 : 91 : do 92 : { 93 : int getResult; 94 : 95 32240978 : nextmsg = nummsgs = 0; 96 : 97 : /* Try to get some more messages */ 98 32240978 : getResult = SIGetDataEntries(messages, MAXINVALMSGS); 99 : 100 32240978 : if (getResult < 0) 101 : { 102 : /* got a reset message */ 103 520 : elog(DEBUG4, "cache state reset"); 104 520 : SharedInvalidMessageCounter++; 105 520 : resetFunction(); 106 520 : break; /* nothing more to do */ 107 : } 108 : 109 : /* Process them, being wary that a recursive call might eat some */ 110 32240458 : nextmsg = 0; 111 32240458 : nummsgs = getResult; 112 : 113 59459626 : while (nextmsg < nummsgs) 114 : { 115 27219172 : SharedInvalidationMessage msg = messages[nextmsg++]; 116 : 117 27219172 : SharedInvalidMessageCounter++; 118 27219172 : invalFunction(&msg); 119 : } 120 : 121 : /* 122 : * We only need to loop if the last SIGetDataEntries call (which might 123 : * have been within a recursive call) returned a full buffer. 124 : */ 125 32240454 : } while (nummsgs == MAXINVALMSGS); 126 : 127 : /* 128 : * We are now caught up. If we received a catchup signal, reset that 129 : * flag, and call SICleanupQueue(). This is not so much because we need 130 : * to flush dead messages right now, as that we want to pass on the 131 : * catchup signal to the next slowest backend. "Daisy chaining" the 132 : * catchup signal this way avoids creating spikes in system load for what 133 : * should be just a background maintenance activity. 134 : */ 135 31514728 : if (catchupInterruptPending) 136 : { 137 5304 : catchupInterruptPending = false; 138 5304 : elog(DEBUG4, "sinval catchup complete, cleaning queue"); 139 5304 : SICleanupQueue(false, 0); 140 : } 141 31514728 : } 142 : 143 : 144 : /* 145 : * HandleCatchupInterrupt 146 : * 147 : * This is called when PROCSIG_CATCHUP_INTERRUPT is received. 148 : * 149 : * We used to directly call ProcessCatchupEvent directly when idle. These days 150 : * we just set a flag to do it later and notify the process of that fact by 151 : * setting the process's latch. 152 : */ 153 : void 154 5306 : HandleCatchupInterrupt(void) 155 : { 156 : /* 157 : * Note: this is called by a SIGNAL HANDLER. You must be very wary what 158 : * you do here. 159 : */ 160 : 161 5306 : catchupInterruptPending = true; 162 : 163 : /* make sure the event is processed in due course */ 164 5306 : SetLatch(MyLatch); 165 5306 : } 166 : 167 : /* 168 : * ProcessCatchupInterrupt 169 : * 170 : * The portion of catchup interrupt handling that runs outside of the signal 171 : * handler, which allows it to actually process pending invalidations. 172 : */ 173 : void 174 6762 : ProcessCatchupInterrupt(void) 175 : { 176 9508 : while (catchupInterruptPending) 177 : { 178 : /* 179 : * What we need to do here is cause ReceiveSharedInvalidMessages() to 180 : * run, which will do the necessary work and also reset the 181 : * catchupInterruptPending flag. If we are inside a transaction we 182 : * can just call AcceptInvalidationMessages() to do this. If we 183 : * aren't, we start and immediately end a transaction; the call to 184 : * AcceptInvalidationMessages() happens down inside transaction start. 185 : * 186 : * It is awfully tempting to just call AcceptInvalidationMessages() 187 : * without the rest of the xact start/stop overhead, and I think that 188 : * would actually work in the normal case; but I am not sure that 189 : * things would clean up nicely if we got an error partway through. 190 : */ 191 2746 : if (IsTransactionOrTransactionBlock()) 192 : { 193 40 : elog(DEBUG4, "ProcessCatchupEvent inside transaction"); 194 40 : AcceptInvalidationMessages(); 195 : } 196 : else 197 : { 198 2706 : elog(DEBUG4, "ProcessCatchupEvent outside transaction"); 199 2706 : StartTransactionCommand(); 200 2706 : CommitTransactionCommand(); 201 : } 202 : } 203 6762 : }