Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * async.c
4 : * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/async.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : /*-------------------------------------------------------------------------
16 : * Async Notification Model as of v19:
17 : *
18 : * 1. Multiple backends on same machine. Multiple backends may be listening
19 : * on each of several channels.
20 : *
21 : * 2. There is one central queue in disk-based storage (directory pg_notify/),
22 : * with actively-used pages mapped into shared memory by the slru.c module.
23 : * All notification messages are placed in the queue and later read out
24 : * by listening backends. The single queue allows us to guarantee that
25 : * notifications are received in commit order.
26 : *
27 : * Although there is only one queue, notifications are treated as being
28 : * database-local; this is done by including the sender's database OID
29 : * in each notification message. Listening backends ignore messages
30 : * that don't match their database OID. This is important because it
31 : * ensures senders and receivers have the same database encoding and won't
32 : * misinterpret non-ASCII text in the channel name or payload string.
33 : *
34 : * Since notifications are not expected to survive database crashes,
35 : * we can simply clean out the pg_notify data at any reboot, and there
36 : * is no need for WAL support or fsync'ing.
37 : *
38 : * 3. Every backend that is listening on at least one channel registers by
39 : * entering its PID into the array in AsyncQueueControl. It then scans all
40 : * incoming notifications in the central queue and first compares the
41 : * database OID of the notification with its own database OID and then
42 : * compares the notified channel with the list of channels that it listens
43 : * to. In case there is a match it delivers the notification event to its
44 : * frontend. Non-matching events are simply skipped.
45 : *
46 : * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
47 : * a backend-local list which will not be processed until transaction end.
48 : *
49 : * Duplicate notifications from the same transaction are sent out as one
50 : * notification only. This is done to save work when for example a trigger
51 : * on a 2 million row table fires a notification for each row that has been
52 : * changed. If the application needs to receive every single notification
53 : * that has been sent, it can easily add some unique string into the extra
54 : * payload parameter.
55 : *
56 : * When the transaction is ready to commit, PreCommit_Notify() adds the
57 : * pending notifications to the head of the queue. The head pointer of the
58 : * queue always points to the next free position and a position is just a
59 : * page number and the offset in that page. This is done before marking the
60 : * transaction as committed in clog. If we run into problems writing the
61 : * notifications, we can still call elog(ERROR, ...) and the transaction
62 : * will roll back safely.
63 : *
64 : * Once we have put all of the notifications into the queue, we return to
65 : * CommitTransaction() which will then do the actual transaction commit.
66 : *
67 : * After commit we are called another time (AtCommit_Notify()). Here we
68 : * make any required updates to the effective listen state (see below).
69 : * Then we signal any backends that may be interested in our messages
70 : * (including our own backend, if listening). This is done by
71 : * SignalBackends(), which sends a PROCSIG_NOTIFY_INTERRUPT signal to
72 : * each relevant backend, as described below.
73 : *
74 : * Finally, after we are out of the transaction altogether and about to go
75 : * idle, we scan the queue for messages that need to be sent to our
76 : * frontend (which might be notifies from other backends, or self-notifies
77 : * from our own). This step is not part of the CommitTransaction sequence
78 : * for two important reasons. First, we could get errors while sending
79 : * data to our frontend, and it's really bad for errors to happen in
80 : * post-commit cleanup. Second, in cases where a procedure issues commits
81 : * within a single frontend command, we don't want to send notifies to our
82 : * frontend until the command is done; but notifies to other backends
83 : * should go out immediately after each commit.
84 : *
85 : * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
86 : * sets the process's latch, which triggers the event to be processed
87 : * immediately if this backend is idle (i.e., it is waiting for a frontend
88 : * command and is not within a transaction block. C.f.
89 : * ProcessClientReadInterrupt()). Otherwise the handler may only set a
90 : * flag, which will cause the processing to occur just before we next go
91 : * idle.
92 : *
93 : * Inbound-notify processing consists of reading all of the notifications
94 : * that have arrived since scanning last time. We read every notification
95 : * until we reach either a notification from an uncommitted transaction or
96 : * the head pointer's position.
97 : *
98 : * 6. To limit disk space consumption, the tail pointer needs to be advanced
99 : * so that old pages can be truncated. This is relatively expensive
100 : * (notably, it requires an exclusive lock), so we don't want to do it
101 : * often. We make sending backends do this work if they advanced the queue
102 : * head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
103 : *
104 : * 7. So far we have not discussed how backends change their listening state,
105 : * nor how notification senders know which backends to awaken. To handle
106 : * the latter, we maintain a global channel table (implemented as a dynamic
107 : * shared hash table, or dshash) that maps channel names to the set of
108 : * backends listening on each channel. This table is created lazily on the
109 : * first LISTEN command and grows dynamically as needed. There is also a
110 : * local channel table (a plain dynahash table) in each listening backend,
111 : * tracking which channels that backend is listening to. The local table
112 : * serves to reduce the number of accesses needed to the shared table.
113 : *
114 : * If the current transaction has executed any LISTEN/UNLISTEN actions,
115 : * PreCommit_Notify() prepares to commit those. For LISTEN, it
116 : * pre-allocates entries in both the per-backend localChannelTable and the
117 : * shared globalChannelTable (with listening=false so that these entries
118 : * are no-ops for the moment). It also records the final per-channel
119 : * intent in pendingListenActions, so post-commit/abort processing can
120 : * apply that in a single step. Since all these allocations happen before
121 : * committing to clog, we can safely abort the transaction on failure.
122 : *
123 : * After commit, AtCommit_Notify() runs through pendingListenActions and
124 : * updates the backend's per-channel listening flags to activate or
125 : * deactivate listening. This happens before sending signals.
126 : *
127 : * SignalBackends() consults the shared global channel table to identify
128 : * listeners for the channels that the current transaction sent
129 : * notification(s) to. Each selected backend is marked as having a wakeup
130 : * pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
131 : * signal is sent to it.
132 : *
133 : * 8. While writing notifications, PreCommit_Notify() records the queue head
134 : * position both before and after the write. Because all writers serialize
135 : * on a cluster-wide heavyweight lock, no other backend can insert entries
136 : * between these two points. SignalBackends() uses this fact to directly
137 : * advance the queue pointer for any backend that is still positioned at
138 : * the old head, or within the range written, but is not interested in any
139 : * of our notifications. This avoids unnecessary wakeups for idle
140 : * listeners that have nothing to read. Backends that are not interested
141 : * in our notifications, but cannot be directly advanced, are signaled only
142 : * if they are far behind the current queue head; that is to ensure that
143 : * we can advance the queue tail without undue delay.
144 : *
145 : * An application that listens on the same channel it notifies will get
146 : * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
147 : * by comparing be_pid in the NOTIFY message to the application's own backend's
148 : * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
149 : * frontend during startup.) The above design guarantees that notifies from
150 : * other backends will never be missed by ignoring self-notifies.
151 : *
152 : * The amount of shared memory used for notify management (notify_buffers)
153 : * can be varied without affecting anything but performance. The maximum
154 : * amount of notification data that can be queued at one time is determined
155 : * by the max_notify_queue_pages GUC.
156 : *-------------------------------------------------------------------------
157 : */
158 :
159 : #include "postgres.h"
160 :
161 : #include <limits.h>
162 : #include <unistd.h>
163 : #include <signal.h>
164 :
165 : #include "access/parallel.h"
166 : #include "access/slru.h"
167 : #include "access/transam.h"
168 : #include "access/xact.h"
169 : #include "catalog/pg_database.h"
170 : #include "commands/async.h"
171 : #include "common/hashfn.h"
172 : #include "funcapi.h"
173 : #include "lib/dshash.h"
174 : #include "libpq/libpq.h"
175 : #include "libpq/pqformat.h"
176 : #include "miscadmin.h"
177 : #include "storage/dsm_registry.h"
178 : #include "storage/ipc.h"
179 : #include "storage/latch.h"
180 : #include "storage/lmgr.h"
181 : #include "storage/procsignal.h"
182 : #include "tcop/tcopprot.h"
183 : #include "utils/builtins.h"
184 : #include "utils/dsa.h"
185 : #include "utils/guc_hooks.h"
186 : #include "utils/memutils.h"
187 : #include "utils/ps_status.h"
188 : #include "utils/snapmgr.h"
189 : #include "utils/timestamp.h"
190 :
191 :
192 : /*
193 : * Maximum size of a NOTIFY payload, including terminating NULL. This
194 : * must be kept small enough so that a notification message fits on one
195 : * SLRU page. The magic fudge factor here is noncritical as long as it's
196 : * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
197 : * than that, so changes in that data structure won't affect user-visible
198 : * restrictions.
199 : */
200 : #define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
201 :
202 : /*
203 : * Struct representing an entry in the global notify queue
204 : *
205 : * This struct declaration has the maximal length, but in a real queue entry
206 : * the data area is only big enough for the actual channel and payload strings
207 : * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
208 : * entry size, if both channel and payload strings are empty (but note it
209 : * doesn't include alignment padding).
210 : *
211 : * The "length" field should always be rounded up to the next QUEUEALIGN
212 : * multiple so that all fields are properly aligned.
213 : */
214 : typedef struct AsyncQueueEntry
215 : {
216 : int length; /* total allocated length of entry */
217 : Oid dboid; /* sender's database OID */
218 : TransactionId xid; /* sender's XID */
219 : int32 srcPid; /* sender's PID */
220 : char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
221 : } AsyncQueueEntry;
222 :
223 : /* Currently, no field of AsyncQueueEntry requires more than int alignment */
224 : #define QUEUEALIGN(len) INTALIGN(len)
225 :
226 : #define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2)
227 :
228 : /*
229 : * Struct describing a queue position, and assorted macros for working with it
230 : */
231 : typedef struct QueuePosition
232 : {
233 : int64 page; /* SLRU page number */
234 : int offset; /* byte offset within page */
235 : } QueuePosition;
236 :
237 : #define QUEUE_POS_PAGE(x) ((x).page)
238 : #define QUEUE_POS_OFFSET(x) ((x).offset)
239 :
240 : #define SET_QUEUE_POS(x,y,z) \
241 : do { \
242 : (x).page = (y); \
243 : (x).offset = (z); \
244 : } while (0)
245 :
246 : #define QUEUE_POS_EQUAL(x,y) \
247 : ((x).page == (y).page && (x).offset == (y).offset)
248 :
249 : #define QUEUE_POS_IS_ZERO(x) \
250 : ((x).page == 0 && (x).offset == 0)
251 :
252 : /* choose logically smaller QueuePosition */
253 : #define QUEUE_POS_MIN(x,y) \
254 : (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
255 : (x).page != (y).page ? (y) : \
256 : (x).offset < (y).offset ? (x) : (y))
257 :
258 : /* choose logically larger QueuePosition */
259 : #define QUEUE_POS_MAX(x,y) \
260 : (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
261 : (x).page != (y).page ? (x) : \
262 : (x).offset > (y).offset ? (x) : (y))
263 :
264 : /* returns true if x comes before y in queue order */
265 : #define QUEUE_POS_PRECEDES(x,y) \
266 : (asyncQueuePagePrecedes((x).page, (y).page) || \
267 : ((x).page == (y).page && (x).offset < (y).offset))
268 :
269 : /*
270 : * Parameter determining how often we try to advance the tail pointer:
271 : * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
272 : * also the distance by which a backend that's not interested in our
273 : * notifications needs to be behind before we'll decide we need to wake it
274 : * up so it can advance its pointer.
275 : *
276 : * Resist the temptation to make this really large. While that would save
277 : * work in some places, it would add cost in others. In particular, this
278 : * should likely be less than notify_buffers, to ensure that backends
279 : * catch up before the pages they'll need to read fall out of SLRU cache.
280 : */
281 : #define QUEUE_CLEANUP_DELAY 4
282 :
283 : /*
284 : * Struct describing a listening backend's status
285 : */
286 : typedef struct QueueBackendStatus
287 : {
288 : int32 pid; /* either a PID or InvalidPid */
289 : Oid dboid; /* backend's database OID, or InvalidOid */
290 : ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */
291 : QueuePosition pos; /* backend has read queue up to here */
292 : bool wakeupPending; /* signal sent to backend, not yet processed */
293 : bool isAdvancing; /* backend is advancing its position */
294 : } QueueBackendStatus;
295 :
296 : /*
297 : * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
298 : *
299 : * The AsyncQueueControl structure is protected by the NotifyQueueLock and
300 : * NotifyQueueTailLock.
301 : *
302 : * When holding NotifyQueueLock in SHARED mode, backends may only inspect
303 : * their own entries as well as the head and tail pointers. Consequently we
304 : * can allow a backend to update its own record while holding only SHARED lock
305 : * (since no other backend will inspect it).
306 : *
307 : * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
308 : * entries of other backends and also change the head pointer. They can
309 : * also advance other backends' queue positions, unless the other backend
310 : * has isAdvancing set (i.e., is in process of doing that itself).
311 : *
312 : * When holding both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE
313 : * mode, backends can change the tail pointers.
314 : *
315 : * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
316 : * the control lock for the pg_notify SLRU buffers.
317 : * In order to avoid deadlocks, whenever we need multiple locks, we first get
318 : * NotifyQueueTailLock, then NotifyQueueLock, then SLRU bank lock, and lastly
319 : * globalChannelTable partition locks.
320 : *
321 : * Each backend uses the backend[] array entry with index equal to its
322 : * ProcNumber. We rely on this to make SendProcSignal fast.
323 : *
324 : * The backend[] array entries for actively-listening backends are threaded
325 : * together using firstListener and the nextListener links, so that we can
326 : * scan them without having to iterate over inactive entries. We keep this
327 : * list in order by ProcNumber so that the scan is cache-friendly when there
328 : * are many active entries.
329 : */
330 : typedef struct AsyncQueueControl
331 : {
332 : QueuePosition head; /* head points to the next free location */
333 : QueuePosition tail; /* tail must be <= the queue position of every
334 : * listening backend */
335 : int64 stopPage; /* oldest unrecycled page; must be <=
336 : * tail.page */
337 : ProcNumber firstListener; /* id of first listener, or
338 : * INVALID_PROC_NUMBER */
339 : TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
340 : dsa_handle globalChannelTableDSA; /* global channel table's DSA handle */
341 : dshash_table_handle globalChannelTableDSH; /* and its dshash handle */
342 : /* Array with room for MaxBackends entries: */
343 : QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
344 : } AsyncQueueControl;
345 :
346 : static AsyncQueueControl *asyncQueueControl;
347 :
348 : #define QUEUE_HEAD (asyncQueueControl->head)
349 : #define QUEUE_TAIL (asyncQueueControl->tail)
350 : #define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
351 : #define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
352 : #define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
353 : #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
354 : #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
355 : #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
356 : #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
357 : #define QUEUE_BACKEND_IS_ADVANCING(i) (asyncQueueControl->backend[i].isAdvancing)
358 :
359 : /*
360 : * The SLRU buffer area through which we access the notification queue
361 : */
362 : static SlruCtlData NotifyCtlData;
363 :
364 : #define NotifyCtl (&NotifyCtlData)
365 : #define QUEUE_PAGESIZE BLCKSZ
366 :
367 : #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
368 :
369 : /*
370 : * Global channel table definitions
371 : *
372 : * This hash table maps (database OID, channel name) keys to arrays of
373 : * ProcNumbers representing the backends listening or about to listen
374 : * on each channel. The "listening" flags allow us to create hash table
375 : * entries pre-commit and not have to assume that creating them post-commit
376 : * will succeed.
377 : */
378 : #define INITIAL_LISTENERS_ARRAY_SIZE 4
379 :
380 : typedef struct GlobalChannelKey
381 : {
382 : Oid dboid;
383 : char channel[NAMEDATALEN];
384 : } GlobalChannelKey;
385 :
386 : typedef struct ListenerEntry
387 : {
388 : ProcNumber procNo; /* listener's ProcNumber */
389 : bool listening; /* true if committed listener */
390 : } ListenerEntry;
391 :
392 : typedef struct GlobalChannelEntry
393 : {
394 : GlobalChannelKey key; /* hash key */
395 : dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */
396 : int numListeners; /* Number of listeners currently stored */
397 : int allocatedListeners; /* Allocated size of array */
398 : } GlobalChannelEntry;
399 :
400 : static dshash_table *globalChannelTable = NULL;
401 : static dsa_area *globalChannelDSA = NULL;
402 :
403 : /*
404 : * localChannelTable caches the channel names this backend is listening on
405 : * (including those we have staged to be listened on, but not yet committed).
406 : * Used by IsListeningOn() for fast lookups when reading notifications.
407 : */
408 : static HTAB *localChannelTable = NULL;
409 :
410 : /* We test this condition to detect that we're not listening at all */
411 : #define LocalChannelTableIsEmpty() \
412 : (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
413 :
414 : /*
415 : * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
416 : * all actions requested in the current transaction. As explained above,
417 : * we don't actually change listen state until we reach transaction commit.
418 : *
419 : * The list is kept in CurTransactionContext. In subtransactions, each
420 : * subtransaction has its own list in its own CurTransactionContext, but
421 : * successful subtransactions attach their lists to their parent's list.
422 : * Failed subtransactions simply discard their lists.
423 : */
424 : typedef enum
425 : {
426 : LISTEN_LISTEN,
427 : LISTEN_UNLISTEN,
428 : LISTEN_UNLISTEN_ALL,
429 : } ListenActionKind;
430 :
431 : typedef struct
432 : {
433 : ListenActionKind action;
434 : char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
435 : } ListenAction;
436 :
437 : typedef struct ActionList
438 : {
439 : int nestingLevel; /* current transaction nesting depth */
440 : List *actions; /* list of ListenAction structs */
441 : struct ActionList *upper; /* details for upper transaction levels */
442 : } ActionList;
443 :
444 : static ActionList *pendingActions = NULL;
445 :
446 : /*
447 : * Hash table recording the final listen/unlisten intent per channel for
448 : * the current transaction. Key is channel name, value is PENDING_LISTEN or
449 : * PENDING_UNLISTEN. This keeps critical commit/abort processing to one step
450 : * per channel instead of replaying every action. This is built from the
451 : * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
452 : * AtAbort_Notify.
453 : */
454 : typedef enum
455 : {
456 : PENDING_LISTEN,
457 : PENDING_UNLISTEN,
458 : } PendingListenAction;
459 :
460 : typedef struct PendingListenEntry
461 : {
462 : char channel[NAMEDATALEN]; /* hash key */
463 : PendingListenAction action; /* which action should we perform? */
464 : } PendingListenEntry;
465 :
466 : static HTAB *pendingListenActions = NULL;
467 :
468 : /*
469 : * State for outbound notifies consists of a list of all channels+payloads
470 : * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
471 : * until and unless the transaction commits. pendingNotifies is NULL if no
472 : * NOTIFYs have been done in the current (sub) transaction.
473 : *
474 : * We discard duplicate notify events issued in the same transaction.
475 : * Hence, in addition to the list proper (which we need to track the order
476 : * of the events, since we guarantee to deliver them in order), we build a
477 : * hash table which we can probe to detect duplicates. Since building the
478 : * hash table is somewhat expensive, we do so only once we have at least
479 : * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
480 : * before that we just scan the events linearly.
481 : *
482 : * The list is kept in CurTransactionContext. In subtransactions, each
483 : * subtransaction has its own list in its own CurTransactionContext, but
484 : * successful subtransactions add their entries to their parent's list.
485 : * Failed subtransactions simply discard their lists. Since these lists
486 : * are independent, there may be notify events in a subtransaction's list
487 : * that duplicate events in some ancestor (sub) transaction; we get rid of
488 : * the dups when merging the subtransaction's list into its parent's.
489 : *
490 : * Note: the action and notify lists do not interact within a transaction.
491 : * In particular, if a transaction does NOTIFY and then LISTEN on the same
492 : * condition name, it will get a self-notify at commit. This is a bit odd
493 : * but is consistent with our historical behavior.
494 : */
495 : typedef struct Notification
496 : {
497 : uint16 channel_len; /* length of channel-name string */
498 : uint16 payload_len; /* length of payload string */
499 : /* null-terminated channel name, then null-terminated payload follow */
500 : char data[FLEXIBLE_ARRAY_MEMBER];
501 : } Notification;
502 :
503 : typedef struct NotificationList
504 : {
505 : int nestingLevel; /* current transaction nesting depth */
506 : List *events; /* list of Notification structs */
507 : HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
508 : List *uniqueChannelNames; /* unique channel names being notified */
509 : HTAB *uniqueChannelHash; /* hash of unique channel names, or NULL */
510 : struct NotificationList *upper; /* details for upper transaction levels */
511 : } NotificationList;
512 :
513 : #define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
514 :
515 : struct NotificationHash
516 : {
517 : Notification *event; /* => the actual Notification struct */
518 : };
519 :
520 : static NotificationList *pendingNotifies = NULL;
521 :
522 : /*
523 : * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
524 : * (both just carry the channel name, with no payload).
525 : */
526 : typedef struct ChannelName
527 : {
528 : char channel[NAMEDATALEN]; /* hash key */
529 : } ChannelName;
530 :
531 : /*
532 : * Inbound notifications are initially processed by HandleNotifyInterrupt(),
533 : * called from inside a signal handler. That just sets the
534 : * notifyInterruptPending flag and sets the process
535 : * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
536 : * actually deal with the interrupt.
537 : */
538 : volatile sig_atomic_t notifyInterruptPending = false;
539 :
540 : /* True if we've registered an on_shmem_exit cleanup */
541 : static bool unlistenExitRegistered = false;
542 :
543 : /* True if we're currently registered as a listener in asyncQueueControl */
544 : static bool amRegisteredListener = false;
545 :
546 : /*
547 : * Queue head positions for direct advancement.
548 : * These are captured during PreCommit_Notify while holding the heavyweight
549 : * lock on database 0, ensuring no other backend can insert notifications
550 : * between them. SignalBackends uses these to advance idle backends.
551 : */
552 : static QueuePosition queueHeadBeforeWrite;
553 : static QueuePosition queueHeadAfterWrite;
554 :
555 : /*
556 : * Workspace arrays for SignalBackends. These are preallocated in
557 : * PreCommit_Notify to avoid needing memory allocation after committing to
558 : * clog.
559 : */
560 : static int32 *signalPids = NULL;
561 : static ProcNumber *signalProcnos = NULL;
562 :
563 : /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
564 : static bool tryAdvanceTail = false;
565 :
566 : /* GUC parameters */
567 : bool Trace_notify = false;
568 :
569 : /* For 8 KB pages this gives 8 GB of disk space */
570 : int max_notify_queue_pages = 1048576;
571 :
572 : /* local function prototypes */
573 : static inline int64 asyncQueuePageDiff(int64 p, int64 q);
574 : static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
575 : static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
576 : const char *channel);
577 : static dshash_hash globalChannelTableHash(const void *key, size_t size,
578 : void *arg);
579 : static void initGlobalChannelTable(void);
580 : static void initLocalChannelTable(void);
581 : static void queue_listen(ListenActionKind action, const char *channel);
582 : static void Async_UnlistenOnExit(int code, Datum arg);
583 : static void BecomeRegisteredListener(void);
584 : static void PrepareTableEntriesForListen(const char *channel);
585 : static void PrepareTableEntriesForUnlisten(const char *channel);
586 : static void PrepareTableEntriesForUnlistenAll(void);
587 : static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
588 : ListenerEntry *listeners,
589 : int idx);
590 : static void ApplyPendingListenActions(bool isCommit);
591 : static void CleanupListenersOnExit(void);
592 : static bool IsListeningOn(const char *channel);
593 : static void asyncQueueUnregister(void);
594 : static bool asyncQueueIsFull(void);
595 : static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
596 : static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
597 : static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
598 : static double asyncQueueUsage(void);
599 : static void asyncQueueFillWarning(void);
600 : static void SignalBackends(void);
601 : static void asyncQueueReadAllNotifications(void);
602 : static bool asyncQueueProcessPageEntries(QueuePosition *current,
603 : QueuePosition stop,
604 : Snapshot snapshot);
605 : static void asyncQueueAdvanceTail(void);
606 : static void ProcessIncomingNotify(bool flush);
607 : static bool AsyncExistsPendingNotify(Notification *n);
608 : static void AddEventToPendingNotifies(Notification *n);
609 : static uint32 notification_hash(const void *key, Size keysize);
610 : static int notification_match(const void *key1, const void *key2, Size keysize);
611 : static void ClearPendingActionsAndNotifies(void);
612 :
613 : /*
614 : * Compute the difference between two queue page numbers.
615 : * Previously this function accounted for a wraparound.
616 : */
617 : static inline int64
618 0 : asyncQueuePageDiff(int64 p, int64 q)
619 : {
620 0 : return p - q;
621 : }
622 :
623 : /*
624 : * Determines whether p precedes q.
625 : * Previously this function accounted for a wraparound.
626 : */
627 : static inline bool
628 142 : asyncQueuePagePrecedes(int64 p, int64 q)
629 : {
630 142 : return p < q;
631 : }
632 :
633 : /*
634 : * GlobalChannelKeyInit
635 : * Prepare a global channel table key for hashing.
636 : */
637 : static inline void
638 216 : GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
639 : {
640 216 : memset(key, 0, sizeof(GlobalChannelKey));
641 216 : key->dboid = dboid;
642 216 : strlcpy(key->channel, channel, NAMEDATALEN);
643 216 : }
644 :
645 : /*
646 : * globalChannelTableHash
647 : * Hash function for global channel table keys.
648 : */
649 : static dshash_hash
650 216 : globalChannelTableHash(const void *key, size_t size, void *arg)
651 : {
652 216 : const GlobalChannelKey *k = (const GlobalChannelKey *) key;
653 : dshash_hash h;
654 :
655 216 : h = DatumGetUInt32(hash_uint32(k->dboid));
656 216 : h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
657 216 : strnlen(k->channel, NAMEDATALEN)));
658 :
659 216 : return h;
660 : }
661 :
662 : /* parameters for the global channel table */
663 : static const dshash_parameters globalChannelTableDSHParams = {
664 : sizeof(GlobalChannelKey),
665 : sizeof(GlobalChannelEntry),
666 : dshash_memcmp,
667 : globalChannelTableHash,
668 : dshash_memcpy,
669 : LWTRANCHE_NOTIFY_CHANNEL_HASH
670 : };
671 :
672 : /*
673 : * initGlobalChannelTable
674 : * Lazy initialization of the global channel table.
675 : */
676 : static void
677 156 : initGlobalChannelTable(void)
678 : {
679 : MemoryContext oldcontext;
680 :
681 : /* Quick exit if we already did this */
682 156 : if (asyncQueueControl->globalChannelTableDSH != DSHASH_HANDLE_INVALID &&
683 149 : globalChannelTable != NULL)
684 127 : return;
685 :
686 : /* Otherwise, use a lock to ensure only one process creates the table */
687 29 : LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
688 :
689 : /* Be sure any local memory allocated by DSA routines is persistent */
690 29 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
691 :
692 29 : if (asyncQueueControl->globalChannelTableDSH == DSHASH_HANDLE_INVALID)
693 : {
694 : /* Initialize dynamic shared hash table for global channels */
695 7 : globalChannelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH);
696 7 : dsa_pin(globalChannelDSA);
697 7 : dsa_pin_mapping(globalChannelDSA);
698 7 : globalChannelTable = dshash_create(globalChannelDSA,
699 : &globalChannelTableDSHParams,
700 : NULL);
701 :
702 : /* Store handles in shared memory for other backends to use */
703 7 : asyncQueueControl->globalChannelTableDSA = dsa_get_handle(globalChannelDSA);
704 7 : asyncQueueControl->globalChannelTableDSH =
705 7 : dshash_get_hash_table_handle(globalChannelTable);
706 : }
707 22 : else if (!globalChannelTable)
708 : {
709 : /* Attach to existing dynamic shared hash table */
710 22 : globalChannelDSA = dsa_attach(asyncQueueControl->globalChannelTableDSA);
711 22 : dsa_pin_mapping(globalChannelDSA);
712 22 : globalChannelTable = dshash_attach(globalChannelDSA,
713 : &globalChannelTableDSHParams,
714 22 : asyncQueueControl->globalChannelTableDSH,
715 : NULL);
716 : }
717 :
718 29 : MemoryContextSwitchTo(oldcontext);
719 29 : LWLockRelease(NotifyQueueLock);
720 : }
721 :
722 : /*
723 : * initLocalChannelTable
724 : * Lazy initialization of the local channel table.
725 : * Once created, this table lasts for the life of the session.
726 : */
727 : static void
728 91 : initLocalChannelTable(void)
729 : {
730 : HASHCTL hash_ctl;
731 :
732 : /* Quick exit if we already did this */
733 91 : if (localChannelTable != NULL)
734 74 : return;
735 :
736 : /* Initialize local hash table for this backend's listened channels */
737 17 : hash_ctl.keysize = NAMEDATALEN;
738 17 : hash_ctl.entrysize = sizeof(ChannelName);
739 :
740 17 : localChannelTable =
741 17 : hash_create("Local Listen Channels",
742 : 64,
743 : &hash_ctl,
744 : HASH_ELEM | HASH_STRINGS);
745 : }
746 :
747 : /*
748 : * initPendingListenActions
749 : * Lazy initialization of the pending listen actions hash table.
750 : * This is allocated in CurTransactionContext during PreCommit_Notify,
751 : * and destroyed at transaction end.
752 : */
753 : static void
754 91 : initPendingListenActions(void)
755 : {
756 : HASHCTL hash_ctl;
757 :
758 91 : if (pendingListenActions != NULL)
759 0 : return;
760 :
761 91 : hash_ctl.keysize = NAMEDATALEN;
762 91 : hash_ctl.entrysize = sizeof(PendingListenEntry);
763 91 : hash_ctl.hcxt = CurTransactionContext;
764 :
765 91 : pendingListenActions =
766 91 : hash_create("Pending Listen Actions",
767 91 : list_length(pendingActions->actions),
768 : &hash_ctl,
769 : HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
770 : }
771 :
772 : /*
773 : * Report space needed for our shared memory area
774 : */
775 : Size
776 2163 : AsyncShmemSize(void)
777 : {
778 : Size size;
779 :
780 : /* This had better match AsyncShmemInit */
781 2163 : size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
782 2163 : size = add_size(size, offsetof(AsyncQueueControl, backend));
783 :
784 2163 : size = add_size(size, SimpleLruShmemSize(notify_buffers, 0));
785 :
786 2163 : return size;
787 : }
788 :
789 : /*
790 : * Initialize our shared memory area
791 : */
792 : void
793 1158 : AsyncShmemInit(void)
794 : {
795 : bool found;
796 : Size size;
797 :
798 : /*
799 : * Create or attach to the AsyncQueueControl structure.
800 : */
801 1158 : size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
802 1158 : size = add_size(size, offsetof(AsyncQueueControl, backend));
803 :
804 1158 : asyncQueueControl = (AsyncQueueControl *)
805 1158 : ShmemInitStruct("Async Queue Control", size, &found);
806 :
807 1158 : if (!found)
808 : {
809 : /* First time through, so initialize it */
810 1158 : SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
811 1158 : SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
812 1158 : QUEUE_STOP_PAGE = 0;
813 1158 : QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
814 1158 : asyncQueueControl->lastQueueFillWarn = 0;
815 1158 : asyncQueueControl->globalChannelTableDSA = DSA_HANDLE_INVALID;
816 1158 : asyncQueueControl->globalChannelTableDSH = DSHASH_HANDLE_INVALID;
817 107850 : for (int i = 0; i < MaxBackends; i++)
818 : {
819 106692 : QUEUE_BACKEND_PID(i) = InvalidPid;
820 106692 : QUEUE_BACKEND_DBOID(i) = InvalidOid;
821 106692 : QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
822 106692 : SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
823 106692 : QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
824 106692 : QUEUE_BACKEND_IS_ADVANCING(i) = false;
825 : }
826 : }
827 :
828 : /*
829 : * Set up SLRU management of the pg_notify data. Note that long segment
830 : * names are used in order to avoid wraparound.
831 : */
832 1158 : NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
833 1158 : SimpleLruInit(NotifyCtl, "notify", notify_buffers, 0,
834 : "pg_notify", LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU,
835 : SYNC_HANDLER_NONE, true);
836 :
837 1158 : if (!found)
838 : {
839 : /*
840 : * During start or reboot, clean out the pg_notify directory.
841 : */
842 1158 : (void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
843 : }
844 1158 : }
845 :
846 :
847 : /*
848 : * pg_notify -
849 : * SQL function to send a notification event
850 : */
851 : Datum
852 1072 : pg_notify(PG_FUNCTION_ARGS)
853 : {
854 : const char *channel;
855 : const char *payload;
856 :
857 1072 : if (PG_ARGISNULL(0))
858 3 : channel = "";
859 : else
860 1069 : channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
861 :
862 1072 : if (PG_ARGISNULL(1))
863 6 : payload = "";
864 : else
865 1066 : payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
866 :
867 : /* For NOTIFY as a statement, this is checked in ProcessUtility */
868 1072 : PreventCommandDuringRecovery("NOTIFY");
869 :
870 1072 : Async_Notify(channel, payload);
871 :
872 1063 : PG_RETURN_VOID();
873 : }
874 :
875 :
876 : /*
877 : * Async_Notify
878 : *
879 : * This is executed by the SQL notify command.
880 : *
881 : * Adds the message to the list of pending notifies.
882 : * Actual notification happens during transaction commit.
883 : * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
884 : */
885 : void
886 1138 : Async_Notify(const char *channel, const char *payload)
887 : {
888 1138 : int my_level = GetCurrentTransactionNestLevel();
889 : size_t channel_len;
890 : size_t payload_len;
891 : Notification *n;
892 : MemoryContext oldcontext;
893 :
894 1138 : if (IsParallelWorker())
895 0 : elog(ERROR, "cannot send notifications from a parallel worker");
896 :
897 1138 : if (Trace_notify)
898 0 : elog(DEBUG1, "Async_Notify(%s)", channel);
899 :
900 1138 : channel_len = channel ? strlen(channel) : 0;
901 1138 : payload_len = payload ? strlen(payload) : 0;
902 :
903 : /* a channel name must be specified */
904 1138 : if (channel_len == 0)
905 6 : ereport(ERROR,
906 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
907 : errmsg("channel name cannot be empty")));
908 :
909 : /* enforce length limits */
910 1132 : if (channel_len >= NAMEDATALEN)
911 3 : ereport(ERROR,
912 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
913 : errmsg("channel name too long")));
914 :
915 1129 : if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
916 0 : ereport(ERROR,
917 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
918 : errmsg("payload string too long")));
919 :
920 : /*
921 : * We must construct the Notification entry, even if we end up not using
922 : * it, in order to compare it cheaply to existing list entries.
923 : *
924 : * The notification list needs to live until end of transaction, so store
925 : * it in the transaction context.
926 : */
927 1129 : oldcontext = MemoryContextSwitchTo(CurTransactionContext);
928 :
929 1129 : n = (Notification *) palloc(offsetof(Notification, data) +
930 1129 : channel_len + payload_len + 2);
931 1129 : n->channel_len = channel_len;
932 1129 : n->payload_len = payload_len;
933 1129 : strcpy(n->data, channel);
934 1129 : if (payload)
935 1115 : strcpy(n->data + channel_len + 1, payload);
936 : else
937 14 : n->data[channel_len + 1] = '\0';
938 :
939 1129 : if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
940 67 : {
941 : NotificationList *notifies;
942 :
943 : /*
944 : * First notify event in current (sub)xact. Note that we allocate the
945 : * NotificationList in TopTransactionContext; the nestingLevel might
946 : * get changed later by AtSubCommit_Notify.
947 : */
948 : notifies = (NotificationList *)
949 67 : MemoryContextAlloc(TopTransactionContext,
950 : sizeof(NotificationList));
951 67 : notifies->nestingLevel = my_level;
952 67 : notifies->events = list_make1(n);
953 : /* We certainly don't need a hashtable yet */
954 67 : notifies->hashtab = NULL;
955 : /* We won't build uniqueChannelNames/Hash till later, either */
956 67 : notifies->uniqueChannelNames = NIL;
957 67 : notifies->uniqueChannelHash = NULL;
958 67 : notifies->upper = pendingNotifies;
959 67 : pendingNotifies = notifies;
960 : }
961 : else
962 : {
963 : /* Now check for duplicates */
964 1062 : if (AsyncExistsPendingNotify(n))
965 : {
966 : /* It's a dup, so forget it */
967 13 : pfree(n);
968 13 : MemoryContextSwitchTo(oldcontext);
969 13 : return;
970 : }
971 :
972 : /* Append more events to existing list */
973 1049 : AddEventToPendingNotifies(n);
974 : }
975 :
976 1116 : MemoryContextSwitchTo(oldcontext);
977 : }
978 :
979 : /*
980 : * queue_listen
981 : * Common code for listen, unlisten, unlisten all commands.
982 : *
983 : * Adds the request to the list of pending actions.
984 : * Actual update of localChannelTable and globalChannelTable happens during
985 : * PreCommit_Notify, with staged changes committed in AtCommit_Notify.
986 : */
987 : static void
988 110 : queue_listen(ListenActionKind action, const char *channel)
989 : {
990 : MemoryContext oldcontext;
991 : ListenAction *actrec;
992 110 : int my_level = GetCurrentTransactionNestLevel();
993 :
994 : /*
995 : * Unlike Async_Notify, we don't try to collapse out duplicates here. We
996 : * keep the ordered list to preserve interactions like UNLISTEN ALL; the
997 : * final per-channel intent is computed during PreCommit_Notify.
998 : */
999 110 : oldcontext = MemoryContextSwitchTo(CurTransactionContext);
1000 :
1001 : /* space for terminating null is included in sizeof(ListenAction) */
1002 110 : actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
1003 110 : strlen(channel) + 1);
1004 110 : actrec->action = action;
1005 110 : strcpy(actrec->channel, channel);
1006 :
1007 110 : if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
1008 94 : {
1009 : ActionList *actions;
1010 :
1011 : /*
1012 : * First action in current sub(xact). Note that we allocate the
1013 : * ActionList in TopTransactionContext; the nestingLevel might get
1014 : * changed later by AtSubCommit_Notify.
1015 : */
1016 : actions = (ActionList *)
1017 94 : MemoryContextAlloc(TopTransactionContext, sizeof(ActionList));
1018 94 : actions->nestingLevel = my_level;
1019 94 : actions->actions = list_make1(actrec);
1020 94 : actions->upper = pendingActions;
1021 94 : pendingActions = actions;
1022 : }
1023 : else
1024 16 : pendingActions->actions = lappend(pendingActions->actions, actrec);
1025 :
1026 110 : MemoryContextSwitchTo(oldcontext);
1027 110 : }
1028 :
1029 : /*
1030 : * Async_Listen
1031 : *
1032 : * This is executed by the SQL listen command.
1033 : */
1034 : void
1035 58 : Async_Listen(const char *channel)
1036 : {
1037 58 : if (Trace_notify)
1038 0 : elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
1039 :
1040 58 : queue_listen(LISTEN_LISTEN, channel);
1041 58 : }
1042 :
1043 : /*
1044 : * Async_Unlisten
1045 : *
1046 : * This is executed by the SQL unlisten command.
1047 : */
1048 : void
1049 3 : Async_Unlisten(const char *channel)
1050 : {
1051 3 : if (Trace_notify)
1052 0 : elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
1053 :
1054 : /* If we couldn't possibly be listening, no need to queue anything */
1055 3 : if (pendingActions == NULL && !unlistenExitRegistered)
1056 0 : return;
1057 :
1058 3 : queue_listen(LISTEN_UNLISTEN, channel);
1059 : }
1060 :
1061 : /*
1062 : * Async_UnlistenAll
1063 : *
1064 : * This is invoked by UNLISTEN * command, and also at backend exit.
1065 : */
1066 : void
1067 78 : Async_UnlistenAll(void)
1068 : {
1069 78 : if (Trace_notify)
1070 0 : elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
1071 :
1072 : /* If we couldn't possibly be listening, no need to queue anything */
1073 78 : if (pendingActions == NULL && !unlistenExitRegistered)
1074 29 : return;
1075 :
1076 49 : queue_listen(LISTEN_UNLISTEN_ALL, "");
1077 : }
1078 :
1079 : /*
1080 : * SQL function: return a set of the channel names this backend is actively
1081 : * listening to.
1082 : *
1083 : * Note: this coding relies on the fact that the localChannelTable cannot
1084 : * change within a transaction.
1085 : */
1086 : Datum
1087 9 : pg_listening_channels(PG_FUNCTION_ARGS)
1088 : {
1089 : FuncCallContext *funcctx;
1090 : HASH_SEQ_STATUS *status;
1091 :
1092 : /* stuff done only on the first call of the function */
1093 9 : if (SRF_IS_FIRSTCALL())
1094 : {
1095 : /* create a function context for cross-call persistence */
1096 6 : funcctx = SRF_FIRSTCALL_INIT();
1097 :
1098 : /* Initialize hash table iteration if we have any channels */
1099 6 : if (localChannelTable != NULL)
1100 : {
1101 : MemoryContext oldcontext;
1102 :
1103 6 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1104 6 : status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
1105 6 : hash_seq_init(status, localChannelTable);
1106 6 : funcctx->user_fctx = status;
1107 6 : MemoryContextSwitchTo(oldcontext);
1108 : }
1109 : else
1110 : {
1111 0 : funcctx->user_fctx = NULL;
1112 : }
1113 : }
1114 :
1115 : /* stuff done on every call of the function */
1116 9 : funcctx = SRF_PERCALL_SETUP();
1117 9 : status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
1118 :
1119 9 : if (status != NULL)
1120 : {
1121 : ChannelName *entry;
1122 :
1123 9 : entry = (ChannelName *) hash_seq_search(status);
1124 9 : if (entry != NULL)
1125 3 : SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel));
1126 : }
1127 :
1128 6 : SRF_RETURN_DONE(funcctx);
1129 : }
1130 :
1131 : /*
1132 : * Async_UnlistenOnExit
1133 : *
1134 : * This is executed at backend exit if we have done any LISTENs in this
1135 : * backend. It might not be necessary anymore, if the user UNLISTENed
1136 : * everything, but we don't try to detect that case.
1137 : */
1138 : static void
1139 17 : Async_UnlistenOnExit(int code, Datum arg)
1140 : {
1141 17 : CleanupListenersOnExit();
1142 17 : asyncQueueUnregister();
1143 17 : }
1144 :
1145 : /*
1146 : * AtPrepare_Notify
1147 : *
1148 : * This is called at the prepare phase of a two-phase
1149 : * transaction. Save the state for possible commit later.
1150 : */
1151 : void
1152 327 : AtPrepare_Notify(void)
1153 : {
1154 : /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
1155 327 : if (pendingActions || pendingNotifies)
1156 0 : ereport(ERROR,
1157 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1158 : errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
1159 327 : }
1160 :
1161 : /*
1162 : * PreCommit_Notify
1163 : *
1164 : * This is called at transaction commit, before actually committing to
1165 : * clog.
1166 : *
1167 : * If there are pending LISTEN actions, make sure we are listed in the
1168 : * shared-memory listener array. This must happen before commit to
1169 : * ensure we don't miss any notifies from transactions that commit
1170 : * just after ours.
1171 : *
1172 : * If there are outbound notify requests in the pendingNotifies list,
1173 : * add them to the global queue. We do that before commit so that
1174 : * we can still throw error if we run out of queue space.
1175 : */
1176 : void
1177 513131 : PreCommit_Notify(void)
1178 : {
1179 : ListCell *p;
1180 :
1181 513131 : if (!pendingActions && !pendingNotifies)
1182 512975 : return; /* no relevant statements in this xact */
1183 :
1184 156 : if (Trace_notify)
1185 0 : elog(DEBUG1, "PreCommit_Notify");
1186 :
1187 : /* Preflight for any pending listen/unlisten actions */
1188 156 : initGlobalChannelTable();
1189 :
1190 156 : if (pendingActions != NULL)
1191 : {
1192 : /* Ensure we have a local channel table */
1193 91 : initLocalChannelTable();
1194 : /* Create pendingListenActions hash table for this transaction */
1195 91 : initPendingListenActions();
1196 :
1197 : /* Stage all the actions this transaction wants to perform */
1198 198 : foreach(p, pendingActions->actions)
1199 : {
1200 107 : ListenAction *actrec = (ListenAction *) lfirst(p);
1201 :
1202 107 : switch (actrec->action)
1203 : {
1204 56 : case LISTEN_LISTEN:
1205 56 : BecomeRegisteredListener();
1206 56 : PrepareTableEntriesForListen(actrec->channel);
1207 56 : break;
1208 3 : case LISTEN_UNLISTEN:
1209 3 : PrepareTableEntriesForUnlisten(actrec->channel);
1210 3 : break;
1211 48 : case LISTEN_UNLISTEN_ALL:
1212 48 : PrepareTableEntriesForUnlistenAll();
1213 48 : break;
1214 : }
1215 : }
1216 : }
1217 :
1218 : /* Queue any pending notifies (must happen after the above) */
1219 156 : if (pendingNotifies)
1220 : {
1221 : ListCell *nextNotify;
1222 65 : bool firstIteration = true;
1223 :
1224 : /*
1225 : * Build list of unique channel names being notified for use by
1226 : * SignalBackends().
1227 : *
1228 : * If uniqueChannelHash is available, use it to efficiently get the
1229 : * unique channels. Otherwise, fall back to the O(N^2) approach.
1230 : */
1231 65 : pendingNotifies->uniqueChannelNames = NIL;
1232 65 : if (pendingNotifies->uniqueChannelHash != NULL)
1233 : {
1234 : HASH_SEQ_STATUS status;
1235 : ChannelName *channelEntry;
1236 :
1237 2 : hash_seq_init(&status, pendingNotifies->uniqueChannelHash);
1238 4 : while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
1239 2 : pendingNotifies->uniqueChannelNames =
1240 2 : lappend(pendingNotifies->uniqueChannelNames,
1241 2 : channelEntry->channel);
1242 : }
1243 : else
1244 : {
1245 : /* O(N^2) approach is better for small number of notifications */
1246 219 : foreach_ptr(Notification, n, pendingNotifies->events)
1247 : {
1248 93 : char *channel = n->data;
1249 93 : bool found = false;
1250 :
1251 : /* Name present in list? */
1252 190 : foreach_ptr(char, oldchan, pendingNotifies->uniqueChannelNames)
1253 : {
1254 31 : if (strcmp(oldchan, channel) == 0)
1255 : {
1256 27 : found = true;
1257 27 : break;
1258 : }
1259 : }
1260 : /* Add if not already in list */
1261 93 : if (!found)
1262 66 : pendingNotifies->uniqueChannelNames =
1263 66 : lappend(pendingNotifies->uniqueChannelNames,
1264 : channel);
1265 : }
1266 : }
1267 :
1268 : /* Preallocate workspace that will be needed by SignalBackends() */
1269 65 : if (signalPids == NULL)
1270 22 : signalPids = MemoryContextAlloc(TopMemoryContext,
1271 : MaxBackends * sizeof(int32));
1272 :
1273 65 : if (signalProcnos == NULL)
1274 22 : signalProcnos = MemoryContextAlloc(TopMemoryContext,
1275 : MaxBackends * sizeof(ProcNumber));
1276 :
1277 : /*
1278 : * Make sure that we have an XID assigned to the current transaction.
1279 : * GetCurrentTransactionId is cheap if we already have an XID, but not
1280 : * so cheap if we don't, and we'd prefer not to do that work while
1281 : * holding NotifyQueueLock.
1282 : */
1283 65 : (void) GetCurrentTransactionId();
1284 :
1285 : /*
1286 : * Serialize writers by acquiring a special lock that we hold till
1287 : * after commit. This ensures that queue entries appear in commit
1288 : * order, and in particular that there are never uncommitted queue
1289 : * entries ahead of committed ones, so an uncommitted transaction
1290 : * can't block delivery of deliverable notifications.
1291 : *
1292 : * We use a heavyweight lock so that it'll automatically be released
1293 : * after either commit or abort. This also allows deadlocks to be
1294 : * detected, though really a deadlock shouldn't be possible here.
1295 : *
1296 : * The lock is on "database 0", which is pretty ugly but it doesn't
1297 : * seem worth inventing a special locktag category just for this.
1298 : * (Historical note: before PG 9.0, a similar lock on "database 0" was
1299 : * used by the flatfiles mechanism.)
1300 : */
1301 65 : LockSharedObject(DatabaseRelationId, InvalidOid, 0,
1302 : AccessExclusiveLock);
1303 :
1304 : /*
1305 : * For the direct advancement optimization in SignalBackends(), we
1306 : * need to ensure that no other backend can insert queue entries
1307 : * between queueHeadBeforeWrite and queueHeadAfterWrite. The
1308 : * heavyweight lock above provides this guarantee, since it serializes
1309 : * all writers.
1310 : *
1311 : * Note: if the heavyweight lock were ever removed for scalability
1312 : * reasons, we could achieve the same guarantee by holding
1313 : * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
1314 : * than releasing and reacquiring it for each page as we do below.
1315 : */
1316 :
1317 : /* Initialize values to a safe default in case list is empty */
1318 65 : SET_QUEUE_POS(queueHeadBeforeWrite, 0, 0);
1319 65 : SET_QUEUE_POS(queueHeadAfterWrite, 0, 0);
1320 :
1321 : /* Now push the notifications into the queue */
1322 65 : nextNotify = list_head(pendingNotifies->events);
1323 165 : while (nextNotify != NULL)
1324 : {
1325 : /*
1326 : * Add the pending notifications to the queue. We acquire and
1327 : * release NotifyQueueLock once per page, which might be overkill
1328 : * but it does allow readers to get in while we're doing this.
1329 : *
1330 : * A full queue is very uncommon and should really not happen,
1331 : * given that we have so much space available in the SLRU pages.
1332 : * Nevertheless we need to deal with this possibility. Note that
1333 : * when we get here we are in the process of committing our
1334 : * transaction, but we have not yet committed to clog, so at this
1335 : * point in time we can still roll the transaction back.
1336 : */
1337 100 : LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1338 100 : if (firstIteration)
1339 : {
1340 65 : queueHeadBeforeWrite = QUEUE_HEAD;
1341 65 : firstIteration = false;
1342 : }
1343 100 : asyncQueueFillWarning();
1344 100 : if (asyncQueueIsFull())
1345 0 : ereport(ERROR,
1346 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1347 : errmsg("too many notifications in the NOTIFY queue")));
1348 100 : nextNotify = asyncQueueAddEntries(nextNotify);
1349 100 : queueHeadAfterWrite = QUEUE_HEAD;
1350 100 : LWLockRelease(NotifyQueueLock);
1351 : }
1352 :
1353 : /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
1354 : }
1355 : }
1356 :
1357 : /*
1358 : * AtCommit_Notify
1359 : *
1360 : * This is called at transaction commit, after committing to clog.
1361 : *
1362 : * Apply pending listen/unlisten changes and clear transaction-local state.
1363 : *
1364 : * If we issued any notifications in the transaction, send signals to
1365 : * listening backends (possibly including ourselves) to process them.
1366 : * Also, if we filled enough queue pages with new notifies, try to
1367 : * advance the queue tail pointer.
1368 : */
1369 : void
1370 512976 : AtCommit_Notify(void)
1371 : {
1372 : /*
1373 : * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
1374 : * return as soon as possible
1375 : */
1376 512976 : if (!pendingActions && !pendingNotifies)
1377 512820 : return;
1378 :
1379 156 : if (Trace_notify)
1380 0 : elog(DEBUG1, "AtCommit_Notify");
1381 :
1382 : /* Apply staged listen/unlisten changes */
1383 156 : ApplyPendingListenActions(true);
1384 :
1385 : /* If no longer listening to anything, get out of listener array */
1386 156 : if (amRegisteredListener && LocalChannelTableIsEmpty())
1387 23 : asyncQueueUnregister();
1388 :
1389 : /*
1390 : * Send signals to listening backends. We need do this only if there are
1391 : * pending notifies, which were previously added to the shared queue by
1392 : * PreCommit_Notify().
1393 : */
1394 156 : if (pendingNotifies != NULL)
1395 65 : SignalBackends();
1396 :
1397 : /*
1398 : * If it's time to try to advance the global tail pointer, do that.
1399 : *
1400 : * (It might seem odd to do this in the sender, when more than likely the
1401 : * listeners won't yet have read the messages we just sent. However,
1402 : * there's less contention if only the sender does it, and there is little
1403 : * need for urgency in advancing the global tail. So this typically will
1404 : * be clearing out messages that were sent some time ago.)
1405 : */
1406 156 : if (tryAdvanceTail)
1407 : {
1408 8 : tryAdvanceTail = false;
1409 8 : asyncQueueAdvanceTail();
1410 : }
1411 :
1412 : /* And clean up */
1413 156 : ClearPendingActionsAndNotifies();
1414 : }
1415 :
1416 : /*
1417 : * BecomeRegisteredListener --- subroutine for PreCommit_Notify
1418 : *
1419 : * This function must make sure we are ready to catch any incoming messages.
1420 : */
1421 : static void
1422 56 : BecomeRegisteredListener(void)
1423 : {
1424 : QueuePosition head;
1425 : QueuePosition max;
1426 : ProcNumber prevListener;
1427 :
1428 : /*
1429 : * Nothing to do if we are already listening to something, nor if we
1430 : * already ran this routine in this transaction.
1431 : */
1432 56 : if (amRegisteredListener)
1433 27 : return;
1434 :
1435 29 : if (Trace_notify)
1436 0 : elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
1437 :
1438 : /*
1439 : * Before registering, make sure we will unlisten before dying. (Note:
1440 : * this action does not get undone if we abort later.)
1441 : */
1442 29 : if (!unlistenExitRegistered)
1443 : {
1444 17 : before_shmem_exit(Async_UnlistenOnExit, 0);
1445 17 : unlistenExitRegistered = true;
1446 : }
1447 :
1448 : /*
1449 : * This is our first LISTEN, so establish our pointer.
1450 : *
1451 : * We set our pointer to the global tail pointer and then move it forward
1452 : * over already-committed notifications. This ensures we cannot miss any
1453 : * not-yet-committed notifications. We might get a few more but that
1454 : * doesn't hurt.
1455 : *
1456 : * In some scenarios there might be a lot of committed notifications that
1457 : * have not yet been pruned away (because some backend is being lazy about
1458 : * reading them). To reduce our startup time, we can look at other
1459 : * backends and adopt the maximum "pos" pointer of any backend that's in
1460 : * our database; any notifications it's already advanced over are surely
1461 : * committed and need not be re-examined by us. (We must consider only
1462 : * backends connected to our DB, because others will not have bothered to
1463 : * check committed-ness of notifications in our DB.)
1464 : *
1465 : * We need exclusive lock here so we can look at other backends' entries
1466 : * and manipulate the list links.
1467 : */
1468 29 : LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1469 29 : head = QUEUE_HEAD;
1470 29 : max = QUEUE_TAIL;
1471 29 : prevListener = INVALID_PROC_NUMBER;
1472 44 : for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
1473 : {
1474 15 : if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
1475 15 : max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1476 : /* Also find last listening backend before this one */
1477 15 : if (i < MyProcNumber)
1478 9 : prevListener = i;
1479 : }
1480 29 : QUEUE_BACKEND_POS(MyProcNumber) = max;
1481 29 : QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
1482 29 : QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
1483 29 : QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
1484 29 : QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
1485 : /* Insert backend into list of listeners at correct position */
1486 29 : if (prevListener != INVALID_PROC_NUMBER)
1487 : {
1488 4 : QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
1489 4 : QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
1490 : }
1491 : else
1492 : {
1493 25 : QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
1494 25 : QUEUE_FIRST_LISTENER = MyProcNumber;
1495 : }
1496 29 : LWLockRelease(NotifyQueueLock);
1497 :
1498 : /* Now we are listed in the global array, so remember we're listening */
1499 29 : amRegisteredListener = true;
1500 :
1501 : /*
1502 : * Try to move our pointer forward as far as possible. This will skip
1503 : * over already-committed notifications, which we want to do because they
1504 : * might be quite stale. Note that we are not yet listening on anything,
1505 : * so we won't deliver such notifications to our frontend. Also, although
1506 : * our transaction might have executed NOTIFY, those message(s) aren't
1507 : * queued yet so we won't skip them here.
1508 : */
1509 29 : if (!QUEUE_POS_EQUAL(max, head))
1510 15 : asyncQueueReadAllNotifications();
1511 : }
1512 :
1513 : /*
1514 : * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
1515 : *
1516 : * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
1517 : * an entry in localChannelTable, and pre-allocating an entry in the shared
1518 : * globalChannelTable with listening=false. The listening flag will be set
1519 : * to true in AtCommit_Notify. If we abort later, unwanted table entries
1520 : * will be removed.
1521 : */
1522 : static void
1523 56 : PrepareTableEntriesForListen(const char *channel)
1524 : {
1525 : GlobalChannelKey key;
1526 : GlobalChannelEntry *entry;
1527 : bool found;
1528 : ListenerEntry *listeners;
1529 : PendingListenEntry *pending;
1530 :
1531 : /*
1532 : * Record in local pending hash that we want to LISTEN, overwriting any
1533 : * earlier attempt to UNLISTEN.
1534 : */
1535 : pending = (PendingListenEntry *)
1536 56 : hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
1537 56 : pending->action = PENDING_LISTEN;
1538 :
1539 : /*
1540 : * Ensure that there is an entry for the channel in localChannelTable.
1541 : * (Should this fail, we can just roll back.) If the transaction fails
1542 : * after this point, we will remove the entry if appropriate during
1543 : * ApplyPendingListenActions. Note that this entry allows IsListeningOn()
1544 : * to return TRUE; we assume nothing is going to consult that before
1545 : * AtCommit_Notify/AtAbort_Notify. However, if later actions attempt to
1546 : * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
1547 : * present to ensure they do the right things; see
1548 : * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
1549 : */
1550 56 : (void) hash_search(localChannelTable, channel, HASH_ENTER, NULL);
1551 :
1552 : /* Pre-allocate entry in shared globalChannelTable with listening=false */
1553 56 : GlobalChannelKeyInit(&key, MyDatabaseId, channel);
1554 56 : entry = dshash_find_or_insert(globalChannelTable, &key, &found);
1555 :
1556 56 : if (!found)
1557 : {
1558 : /* New channel entry, so initialize it to a safe state */
1559 34 : entry->listenersArray = InvalidDsaPointer;
1560 34 : entry->numListeners = 0;
1561 34 : entry->allocatedListeners = 0;
1562 : }
1563 :
1564 : /*
1565 : * Create listenersArray if entry doesn't have one. It's tempting to fold
1566 : * this into the !found case, but this coding allows us to cope in case
1567 : * dsa_allocate() failed in an earlier attempt.
1568 : */
1569 56 : if (!DsaPointerIsValid(entry->listenersArray))
1570 : {
1571 34 : entry->listenersArray = dsa_allocate(globalChannelDSA,
1572 : sizeof(ListenerEntry) * INITIAL_LISTENERS_ARRAY_SIZE);
1573 34 : entry->allocatedListeners = INITIAL_LISTENERS_ARRAY_SIZE;
1574 : }
1575 :
1576 : listeners = (ListenerEntry *)
1577 56 : dsa_get_address(globalChannelDSA, entry->listenersArray);
1578 :
1579 : /*
1580 : * Check if we already have a ListenerEntry (possibly from earlier in this
1581 : * transaction)
1582 : */
1583 75 : for (int i = 0; i < entry->numListeners; i++)
1584 : {
1585 30 : if (listeners[i].procNo == MyProcNumber)
1586 : {
1587 : /* Already have an entry; listening flag stays as-is until commit */
1588 11 : dshash_release_lock(globalChannelTable, entry);
1589 11 : return;
1590 : }
1591 : }
1592 :
1593 : /* Need to add a new entry; grow array if necessary */
1594 45 : if (entry->numListeners >= entry->allocatedListeners)
1595 : {
1596 1 : int new_size = entry->allocatedListeners * 2;
1597 1 : dsa_pointer old_array = entry->listenersArray;
1598 1 : dsa_pointer new_array = dsa_allocate(globalChannelDSA,
1599 : sizeof(ListenerEntry) * new_size);
1600 1 : ListenerEntry *new_listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA, new_array);
1601 :
1602 1 : memcpy(new_listeners, listeners, sizeof(ListenerEntry) * entry->numListeners);
1603 1 : entry->listenersArray = new_array;
1604 1 : entry->allocatedListeners = new_size;
1605 1 : dsa_free(globalChannelDSA, old_array);
1606 1 : listeners = new_listeners;
1607 : }
1608 :
1609 45 : listeners[entry->numListeners].procNo = MyProcNumber;
1610 45 : listeners[entry->numListeners].listening = false; /* staged, not yet
1611 : * committed */
1612 45 : entry->numListeners++;
1613 :
1614 45 : dshash_release_lock(globalChannelTable, entry);
1615 : }
1616 :
1617 : /*
1618 : * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
1619 : *
1620 : * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
1621 : * we're currently listening (committed or staged). We don't touch
1622 : * globalChannelTable yet - the listener keeps receiving signals until
1623 : * commit, when the entry is removed.
1624 : */
1625 : static void
1626 3 : PrepareTableEntriesForUnlisten(const char *channel)
1627 : {
1628 : PendingListenEntry *pending;
1629 :
1630 : /*
1631 : * If the channel name is not in localChannelTable, then we are neither
1632 : * listening on it nor preparing to listen on it, so we don't need to
1633 : * record an UNLISTEN action.
1634 : */
1635 : Assert(localChannelTable != NULL);
1636 3 : if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
1637 0 : return;
1638 :
1639 : /*
1640 : * Record in local pending hash that we want to UNLISTEN, overwriting any
1641 : * earlier attempt to LISTEN. Don't touch localChannelTable or
1642 : * globalChannelTable yet - we keep receiving signals until commit.
1643 : */
1644 : pending = (PendingListenEntry *)
1645 3 : hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
1646 3 : pending->action = PENDING_UNLISTEN;
1647 : }
1648 :
1649 : /*
1650 : * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
1651 : *
1652 : * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
1653 : * about-to-be-listened channels in pendingListenActions.
1654 : */
1655 : static void
1656 48 : PrepareTableEntriesForUnlistenAll(void)
1657 : {
1658 : HASH_SEQ_STATUS seq;
1659 : ChannelName *channelEntry;
1660 : PendingListenEntry *pending;
1661 :
1662 : /*
1663 : * Scan localChannelTable, which will have the names of all channels that
1664 : * we are listening on or have prepared to listen on. Record an UNLISTEN
1665 : * action for each one, overwriting any earlier attempt to LISTEN.
1666 : */
1667 48 : hash_seq_init(&seq, localChannelTable);
1668 82 : while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
1669 : {
1670 : pending = (PendingListenEntry *)
1671 34 : hash_search(pendingListenActions, channelEntry->channel, HASH_ENTER, NULL);
1672 34 : pending->action = PENDING_UNLISTEN;
1673 : }
1674 48 : }
1675 :
1676 : /*
1677 : * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
1678 : *
1679 : * Decrements numListeners, compacts the array, and frees the entry if empty.
1680 : * Sets *entry_ptr to NULL if the entry was deleted.
1681 : *
1682 : * We could get the listeners pointer from the entry, but all callers
1683 : * already have it at hand.
1684 : */
1685 : static void
1686 37 : RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
1687 : ListenerEntry *listeners,
1688 : int idx)
1689 : {
1690 37 : GlobalChannelEntry *entry = *entry_ptr;
1691 :
1692 37 : entry->numListeners--;
1693 37 : if (idx < entry->numListeners)
1694 8 : memmove(&listeners[idx], &listeners[idx + 1],
1695 8 : sizeof(ListenerEntry) * (entry->numListeners - idx));
1696 :
1697 37 : if (entry->numListeners == 0)
1698 : {
1699 26 : dsa_free(globalChannelDSA, entry->listenersArray);
1700 26 : dshash_delete_entry(globalChannelTable, entry);
1701 : /* tells caller not to release the entry's lock: */
1702 26 : *entry_ptr = NULL;
1703 : }
1704 37 : }
1705 :
1706 : /*
1707 : * ApplyPendingListenActions
1708 : *
1709 : * Apply, or revert, staged listen/unlisten changes to the local and global
1710 : * hash tables.
1711 : */
1712 : static void
1713 26719 : ApplyPendingListenActions(bool isCommit)
1714 : {
1715 : HASH_SEQ_STATUS seq;
1716 : PendingListenEntry *pending;
1717 :
1718 : /* Quick exit if nothing to do */
1719 26719 : if (pendingListenActions == NULL)
1720 26628 : return;
1721 :
1722 : /* We made a globalChannelTable before building pendingListenActions */
1723 91 : if (globalChannelTable == NULL)
1724 0 : elog(PANIC, "global channel table missing post-commit/abort");
1725 :
1726 : /* For each staged action ... */
1727 91 : hash_seq_init(&seq, pendingListenActions);
1728 183 : while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
1729 : {
1730 : GlobalChannelKey key;
1731 : GlobalChannelEntry *entry;
1732 92 : bool removeLocal = true;
1733 92 : bool foundListener = false;
1734 :
1735 : /*
1736 : * Find the global entry for this channel. If isCommit, it had better
1737 : * exist (it was created in PreCommit). In an abort, it might not
1738 : * exist, in which case we are not listening and should discard any
1739 : * local entry that PreCommit may have managed to create.
1740 : */
1741 92 : GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
1742 92 : entry = dshash_find(globalChannelTable, &key, true);
1743 92 : if (entry != NULL)
1744 : {
1745 : /* Scan entry to find the ListenerEntry for this backend */
1746 : ListenerEntry *listeners;
1747 :
1748 : listeners = (ListenerEntry *)
1749 92 : dsa_get_address(globalChannelDSA, entry->listenersArray);
1750 :
1751 116 : for (int i = 0; i < entry->numListeners; i++)
1752 : {
1753 116 : if (listeners[i].procNo != MyProcNumber)
1754 24 : continue;
1755 92 : foundListener = true;
1756 92 : if (isCommit)
1757 : {
1758 92 : if (pending->action == PENDING_LISTEN)
1759 : {
1760 : /*
1761 : * LISTEN being committed: set listening=true.
1762 : * localChannelTable entry was created during
1763 : * PreCommit and should be kept.
1764 : */
1765 55 : listeners[i].listening = true;
1766 55 : removeLocal = false;
1767 : }
1768 : else
1769 : {
1770 : /*
1771 : * UNLISTEN being committed: remove pre-allocated
1772 : * entries from both tables.
1773 : */
1774 37 : RemoveListenerFromChannel(&entry, listeners, i);
1775 : }
1776 : }
1777 : else
1778 : {
1779 : /*
1780 : * Note: this part is reachable only if the transaction
1781 : * aborts after PreCommit_Notify() has made some
1782 : * pendingListenActions entries, so it's pretty hard to
1783 : * test.
1784 : */
1785 0 : if (!listeners[i].listening)
1786 : {
1787 : /*
1788 : * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
1789 : * and we weren't listening before, so remove
1790 : * pre-allocated entries from both tables.
1791 : */
1792 0 : RemoveListenerFromChannel(&entry, listeners, i);
1793 : }
1794 : else
1795 : {
1796 : /*
1797 : * We're aborting, but the previous state was that
1798 : * we're listening, so keep localChannelTable entry.
1799 : */
1800 0 : removeLocal = false;
1801 : }
1802 : }
1803 92 : break; /* there shouldn't be another match */
1804 : }
1805 :
1806 : /* We might have already released the entry by removing it */
1807 92 : if (entry != NULL)
1808 66 : dshash_release_lock(globalChannelTable, entry);
1809 : }
1810 :
1811 : /*
1812 : * If we're committing a LISTEN action, we should have found a
1813 : * matching ListenerEntry, but otherwise it's okay if we didn't.
1814 : */
1815 92 : if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
1816 0 : elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
1817 : pending->channel, MyProcNumber);
1818 :
1819 : /*
1820 : * If we did not find a globalChannelTable entry for our backend, or
1821 : * if we are unlistening, remove any localChannelTable entry that may
1822 : * exist. (Note in particular that this cleans up if we created a
1823 : * localChannelTable entry and then failed while trying to create a
1824 : * globalChannelTable entry.)
1825 : */
1826 92 : if (removeLocal && localChannelTable != NULL)
1827 37 : (void) hash_search(localChannelTable, pending->channel,
1828 : HASH_REMOVE, NULL);
1829 : }
1830 : }
1831 :
1832 : /*
1833 : * CleanupListenersOnExit --- called from Async_UnlistenOnExit
1834 : *
1835 : * Remove this backend from all channels in the shared global table.
1836 : */
1837 : static void
1838 17 : CleanupListenersOnExit(void)
1839 : {
1840 : dshash_seq_status status;
1841 : GlobalChannelEntry *entry;
1842 :
1843 17 : if (Trace_notify)
1844 0 : elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
1845 :
1846 : /* Clear our local cache (not really necessary, but be consistent) */
1847 17 : if (localChannelTable != NULL)
1848 : {
1849 17 : hash_destroy(localChannelTable);
1850 17 : localChannelTable = NULL;
1851 : }
1852 :
1853 : /* Now remove our entries from the shared globalChannelTable */
1854 17 : if (globalChannelTable == NULL)
1855 0 : return;
1856 :
1857 17 : dshash_seq_init(&status, globalChannelTable, true);
1858 25 : while ((entry = dshash_seq_next(&status)) != NULL)
1859 : {
1860 : ListenerEntry *listeners;
1861 :
1862 8 : if (entry->key.dboid != MyDatabaseId)
1863 0 : continue; /* not relevant */
1864 :
1865 : listeners = (ListenerEntry *)
1866 8 : dsa_get_address(globalChannelDSA, entry->listenersArray);
1867 :
1868 8 : for (int i = 0; i < entry->numListeners; i++)
1869 : {
1870 8 : if (listeners[i].procNo == MyProcNumber)
1871 : {
1872 8 : entry->numListeners--;
1873 8 : if (i < entry->numListeners)
1874 0 : memmove(&listeners[i], &listeners[i + 1],
1875 0 : sizeof(ListenerEntry) * (entry->numListeners - i));
1876 :
1877 8 : if (entry->numListeners == 0)
1878 : {
1879 8 : dsa_free(globalChannelDSA, entry->listenersArray);
1880 8 : dshash_delete_current(&status);
1881 : }
1882 8 : break;
1883 : }
1884 : }
1885 : }
1886 17 : dshash_seq_term(&status);
1887 : }
1888 :
1889 : /*
1890 : * Test whether we are actively listening on the given channel name.
1891 : *
1892 : * Note: this function is executed for every notification found in the queue.
1893 : */
1894 : static bool
1895 56 : IsListeningOn(const char *channel)
1896 : {
1897 56 : if (localChannelTable == NULL)
1898 0 : return false;
1899 :
1900 56 : return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
1901 : }
1902 :
1903 : /*
1904 : * Remove our entry from the listeners array when we are no longer listening
1905 : * on any channel. NB: must not fail if we're already not listening.
1906 : */
1907 : static void
1908 40 : asyncQueueUnregister(void)
1909 : {
1910 : Assert(LocalChannelTableIsEmpty()); /* else caller error */
1911 :
1912 40 : if (!amRegisteredListener) /* nothing to do */
1913 11 : return;
1914 :
1915 : /*
1916 : * Need exclusive lock here to manipulate list links.
1917 : */
1918 29 : LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1919 : /* Mark our entry as invalid */
1920 29 : QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
1921 29 : QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
1922 29 : QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
1923 29 : QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
1924 : /* and remove it from the list */
1925 29 : if (QUEUE_FIRST_LISTENER == MyProcNumber)
1926 24 : QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
1927 : else
1928 : {
1929 7 : for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
1930 : {
1931 7 : if (QUEUE_NEXT_LISTENER(i) == MyProcNumber)
1932 : {
1933 5 : QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyProcNumber);
1934 5 : break;
1935 : }
1936 : }
1937 : }
1938 29 : QUEUE_NEXT_LISTENER(MyProcNumber) = INVALID_PROC_NUMBER;
1939 29 : LWLockRelease(NotifyQueueLock);
1940 :
1941 : /* mark ourselves as no longer listed in the global array */
1942 29 : amRegisteredListener = false;
1943 : }
1944 :
1945 : /*
1946 : * Test whether there is room to insert more notification messages.
1947 : *
1948 : * Caller must hold at least shared NotifyQueueLock.
1949 : */
1950 : static bool
1951 100 : asyncQueueIsFull(void)
1952 : {
1953 100 : int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1954 100 : int64 tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1955 100 : int64 occupied = headPage - tailPage;
1956 :
1957 100 : return occupied >= max_notify_queue_pages;
1958 : }
1959 :
1960 : /*
1961 : * Advance the QueuePosition to the next entry, assuming that the current
1962 : * entry is of length entryLength. If we jump to a new page the function
1963 : * returns true, else false.
1964 : */
1965 : static bool
1966 2454 : asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
1967 : {
1968 2454 : int64 pageno = QUEUE_POS_PAGE(*position);
1969 2454 : int offset = QUEUE_POS_OFFSET(*position);
1970 2454 : bool pageJump = false;
1971 :
1972 : /*
1973 : * Move to the next writing position: First jump over what we have just
1974 : * written or read.
1975 : */
1976 2454 : offset += entryLength;
1977 : Assert(offset <= QUEUE_PAGESIZE);
1978 :
1979 : /*
1980 : * In a second step check if another entry can possibly be written to the
1981 : * page. If so, stay here, we have reached the next position. If not, then
1982 : * we need to move on to the next page.
1983 : */
1984 2454 : if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE)
1985 : {
1986 38 : pageno++;
1987 38 : offset = 0;
1988 38 : pageJump = true;
1989 : }
1990 :
1991 2454 : SET_QUEUE_POS(*position, pageno, offset);
1992 2454 : return pageJump;
1993 : }
1994 :
1995 : /*
1996 : * Fill the AsyncQueueEntry at *qe with an outbound notification message.
1997 : */
1998 : static void
1999 1142 : asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
2000 : {
2001 1142 : size_t channellen = n->channel_len;
2002 1142 : size_t payloadlen = n->payload_len;
2003 : int entryLength;
2004 :
2005 : Assert(channellen < NAMEDATALEN);
2006 : Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
2007 :
2008 : /* The terminators are already included in AsyncQueueEntryEmptySize */
2009 1142 : entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
2010 1142 : entryLength = QUEUEALIGN(entryLength);
2011 1142 : qe->length = entryLength;
2012 1142 : qe->dboid = MyDatabaseId;
2013 1142 : qe->xid = GetCurrentTransactionId();
2014 1142 : qe->srcPid = MyProcPid;
2015 1142 : memcpy(qe->data, n->data, channellen + payloadlen + 2);
2016 1142 : }
2017 :
2018 : /*
2019 : * Add pending notifications to the queue.
2020 : *
2021 : * We go page by page here, i.e. we stop once we have to go to a new page but
2022 : * we will be called again and then fill that next page. If an entry does not
2023 : * fit into the current page, we write a dummy entry with an InvalidOid as the
2024 : * database OID in order to fill the page. So every page is always used up to
2025 : * the last byte which simplifies reading the page later.
2026 : *
2027 : * We are passed the list cell (in pendingNotifies->events) containing the next
2028 : * notification to write and return the first still-unwritten cell back.
2029 : * Eventually we will return NULL indicating all is done.
2030 : *
2031 : * We are holding NotifyQueueLock already from the caller and grab
2032 : * page specific SLRU bank lock locally in this function.
2033 : */
2034 : static ListCell *
2035 100 : asyncQueueAddEntries(ListCell *nextNotify)
2036 : {
2037 : AsyncQueueEntry qe;
2038 : QueuePosition queue_head;
2039 : int64 pageno;
2040 : int offset;
2041 : int slotno;
2042 : LWLock *prevlock;
2043 :
2044 : /*
2045 : * We work with a local copy of QUEUE_HEAD, which we write back to shared
2046 : * memory upon exiting. The reason for this is that if we have to advance
2047 : * to a new page, SimpleLruZeroPage might fail (out of disk space, for
2048 : * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
2049 : * subsequent insertions would try to put entries into a page that slru.c
2050 : * thinks doesn't exist yet.) So, use a local position variable. Note
2051 : * that if we do fail, any already-inserted queue entries are forgotten;
2052 : * this is okay, since they'd be useless anyway after our transaction
2053 : * rolls back.
2054 : */
2055 100 : queue_head = QUEUE_HEAD;
2056 :
2057 : /*
2058 : * If this is the first write since the postmaster started, we need to
2059 : * initialize the first page of the async SLRU. Otherwise, the current
2060 : * page should be initialized already, so just fetch it.
2061 : */
2062 100 : pageno = QUEUE_POS_PAGE(queue_head);
2063 100 : prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
2064 :
2065 : /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
2066 100 : LWLockAcquire(prevlock, LW_EXCLUSIVE);
2067 :
2068 100 : if (QUEUE_POS_IS_ZERO(queue_head))
2069 8 : slotno = SimpleLruZeroPage(NotifyCtl, pageno);
2070 : else
2071 92 : slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
2072 : InvalidTransactionId);
2073 :
2074 : /* Note we mark the page dirty before writing in it */
2075 100 : NotifyCtl->shared->page_dirty[slotno] = true;
2076 :
2077 1207 : while (nextNotify != NULL)
2078 : {
2079 1142 : Notification *n = (Notification *) lfirst(nextNotify);
2080 :
2081 : /* Construct a valid queue entry in local variable qe */
2082 1142 : asyncQueueNotificationToEntry(n, &qe);
2083 :
2084 1142 : offset = QUEUE_POS_OFFSET(queue_head);
2085 :
2086 : /* Check whether the entry really fits on the current page */
2087 1142 : if (offset + qe.length <= QUEUE_PAGESIZE)
2088 : {
2089 : /* OK, so advance nextNotify past this item */
2090 1110 : nextNotify = lnext(pendingNotifies->events, nextNotify);
2091 : }
2092 : else
2093 : {
2094 : /*
2095 : * Write a dummy entry to fill up the page. Actually readers will
2096 : * only check dboid and since it won't match any reader's database
2097 : * OID, they will ignore this entry and move on.
2098 : */
2099 32 : qe.length = QUEUE_PAGESIZE - offset;
2100 32 : qe.dboid = InvalidOid;
2101 32 : qe.xid = InvalidTransactionId;
2102 32 : qe.data[0] = '\0'; /* empty channel */
2103 32 : qe.data[1] = '\0'; /* empty payload */
2104 : }
2105 :
2106 : /* Now copy qe into the shared buffer page */
2107 1142 : memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
2108 : &qe,
2109 1142 : qe.length);
2110 :
2111 : /* Advance queue_head appropriately, and detect if page is full */
2112 1142 : if (asyncQueueAdvance(&(queue_head), qe.length))
2113 : {
2114 : LWLock *lock;
2115 :
2116 35 : pageno = QUEUE_POS_PAGE(queue_head);
2117 35 : lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2118 35 : if (lock != prevlock)
2119 : {
2120 0 : LWLockRelease(prevlock);
2121 0 : LWLockAcquire(lock, LW_EXCLUSIVE);
2122 0 : prevlock = lock;
2123 : }
2124 :
2125 : /*
2126 : * Page is full, so we're done here, but first fill the next page
2127 : * with zeroes. The reason to do this is to ensure that slru.c's
2128 : * idea of the head page is always the same as ours, which avoids
2129 : * boundary problems in SimpleLruTruncate. The test in
2130 : * asyncQueueIsFull() ensured that there is room to create this
2131 : * page without overrunning the queue.
2132 : */
2133 35 : slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
2134 :
2135 : /*
2136 : * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
2137 : * set flag to remember that we should try to advance the tail
2138 : * pointer (we don't want to actually do that right here).
2139 : */
2140 35 : if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
2141 8 : tryAdvanceTail = true;
2142 :
2143 : /* And exit the loop */
2144 35 : break;
2145 : }
2146 : }
2147 :
2148 : /* Success, so update the global QUEUE_HEAD */
2149 100 : QUEUE_HEAD = queue_head;
2150 :
2151 100 : LWLockRelease(prevlock);
2152 :
2153 100 : return nextNotify;
2154 : }
2155 :
2156 : /*
2157 : * SQL function to return the fraction of the notification queue currently
2158 : * occupied.
2159 : */
2160 : Datum
2161 5 : pg_notification_queue_usage(PG_FUNCTION_ARGS)
2162 : {
2163 : double usage;
2164 :
2165 : /* Advance the queue tail so we don't report a too-large result */
2166 5 : asyncQueueAdvanceTail();
2167 :
2168 5 : LWLockAcquire(NotifyQueueLock, LW_SHARED);
2169 5 : usage = asyncQueueUsage();
2170 5 : LWLockRelease(NotifyQueueLock);
2171 :
2172 5 : PG_RETURN_FLOAT8(usage);
2173 : }
2174 :
2175 : /*
2176 : * Return the fraction of the queue that is currently occupied.
2177 : *
2178 : * The caller must hold NotifyQueueLock in (at least) shared mode.
2179 : *
2180 : * Note: we measure the distance to the logical tail page, not the physical
2181 : * tail page. In some sense that's wrong, but the relative position of the
2182 : * physical tail is affected by details such as SLRU segment boundaries,
2183 : * so that a result based on that is unpleasantly unstable.
2184 : */
2185 : static double
2186 105 : asyncQueueUsage(void)
2187 : {
2188 105 : int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
2189 105 : int64 tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
2190 105 : int64 occupied = headPage - tailPage;
2191 :
2192 105 : if (occupied == 0)
2193 66 : return (double) 0; /* fast exit for common case */
2194 :
2195 39 : return (double) occupied / (double) max_notify_queue_pages;
2196 : }
2197 :
2198 : /*
2199 : * Check whether the queue is at least half full, and emit a warning if so.
2200 : *
2201 : * This is unlikely given the size of the queue, but possible.
2202 : * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
2203 : *
2204 : * Caller must hold exclusive NotifyQueueLock.
2205 : */
2206 : static void
2207 100 : asyncQueueFillWarning(void)
2208 : {
2209 : double fillDegree;
2210 : TimestampTz t;
2211 :
2212 100 : fillDegree = asyncQueueUsage();
2213 100 : if (fillDegree < 0.5)
2214 100 : return;
2215 :
2216 0 : t = GetCurrentTimestamp();
2217 :
2218 0 : if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
2219 : t, QUEUE_FULL_WARN_INTERVAL))
2220 : {
2221 0 : QueuePosition min = QUEUE_HEAD;
2222 0 : int32 minPid = InvalidPid;
2223 :
2224 0 : for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
2225 : {
2226 : Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
2227 0 : min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2228 0 : if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
2229 0 : minPid = QUEUE_BACKEND_PID(i);
2230 : }
2231 :
2232 0 : ereport(WARNING,
2233 : (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
2234 : (minPid != InvalidPid ?
2235 : errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
2236 : : 0),
2237 : (minPid != InvalidPid ?
2238 : errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
2239 : : 0)));
2240 :
2241 0 : asyncQueueControl->lastQueueFillWarn = t;
2242 : }
2243 : }
2244 :
2245 : /*
2246 : * Send signals to listening backends.
2247 : *
2248 : * Normally we signal only backends that are interested in the notifies that
2249 : * we just sent. However, that will leave idle listeners falling further and
2250 : * further behind. Waken them anyway if they're far enough behind, so they'll
2251 : * advance their queue position pointers, allowing the global tail to advance.
2252 : *
2253 : * Since we know the ProcNumber and the Pid the signaling is quite cheap.
2254 : *
2255 : * This is called during CommitTransaction(), so it's important for it
2256 : * to have very low probability of failure.
2257 : */
2258 : static void
2259 65 : SignalBackends(void)
2260 : {
2261 : int count;
2262 :
2263 : /* Can't get here without PreCommit_Notify having made the global table */
2264 : Assert(globalChannelTable != NULL);
2265 :
2266 : /* It should have set up these arrays, too */
2267 : Assert(signalPids != NULL && signalProcnos != NULL);
2268 :
2269 : /*
2270 : * Identify backends that we need to signal. We don't want to send
2271 : * signals while holding the NotifyQueueLock, so this part just builds a
2272 : * list of target PIDs in signalPids[] and signalProcnos[].
2273 : */
2274 65 : count = 0;
2275 :
2276 65 : LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2277 :
2278 : /* Scan each channel name that we notified in this transaction */
2279 198 : foreach_ptr(char, channel, pendingNotifies->uniqueChannelNames)
2280 : {
2281 : GlobalChannelKey key;
2282 : GlobalChannelEntry *entry;
2283 : ListenerEntry *listeners;
2284 :
2285 68 : GlobalChannelKeyInit(&key, MyDatabaseId, channel);
2286 68 : entry = dshash_find(globalChannelTable, &key, false);
2287 68 : if (entry == NULL)
2288 28 : continue; /* nobody is listening */
2289 :
2290 40 : listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA,
2291 : entry->listenersArray);
2292 :
2293 : /* Identify listeners that now need waking, add them to arrays */
2294 85 : for (int j = 0; j < entry->numListeners; j++)
2295 : {
2296 : ProcNumber i;
2297 : int32 pid;
2298 : QueuePosition pos;
2299 :
2300 45 : if (!listeners[j].listening)
2301 14 : continue; /* ignore not-yet-committed listeners */
2302 :
2303 45 : i = listeners[j].procNo;
2304 :
2305 45 : if (QUEUE_BACKEND_WAKEUP_PENDING(i))
2306 14 : continue; /* already signaled, no need to repeat */
2307 :
2308 31 : pid = QUEUE_BACKEND_PID(i);
2309 31 : pos = QUEUE_BACKEND_POS(i);
2310 :
2311 31 : if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
2312 0 : continue; /* it's fully caught up already */
2313 :
2314 : Assert(pid != InvalidPid);
2315 :
2316 31 : QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
2317 31 : signalPids[count] = pid;
2318 31 : signalProcnos[count] = i;
2319 31 : count++;
2320 : }
2321 :
2322 40 : dshash_release_lock(globalChannelTable, entry);
2323 : }
2324 :
2325 : /*
2326 : * Scan all listeners. Any that are not already pending wakeup must not
2327 : * be interested in our notifications (else we'd have set their wakeup
2328 : * flags above). Check to see if we can directly advance their queue
2329 : * pointers to save a wakeup. Otherwise, if they are far behind, wake
2330 : * them anyway so they will catch up.
2331 : */
2332 128 : for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
2333 : {
2334 : int32 pid;
2335 : QueuePosition pos;
2336 :
2337 63 : if (QUEUE_BACKEND_WAKEUP_PENDING(i))
2338 43 : continue;
2339 :
2340 : /* If it's currently advancing, we should not touch it */
2341 20 : if (QUEUE_BACKEND_IS_ADVANCING(i))
2342 0 : continue;
2343 :
2344 20 : pid = QUEUE_BACKEND_PID(i);
2345 20 : pos = QUEUE_BACKEND_POS(i);
2346 :
2347 : /*
2348 : * We can directly advance the other backend's queue pointer if it's
2349 : * not currently advancing (else there are race conditions), and its
2350 : * current pointer is not behind queueHeadBeforeWrite (else we'd make
2351 : * it miss some older messages), and we'd not be moving the pointer
2352 : * backward.
2353 : */
2354 40 : if (!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite) &&
2355 29 : QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))
2356 : {
2357 : /* We can directly advance its pointer past what we wrote */
2358 20 : QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
2359 : }
2360 0 : else if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
2361 : QUEUE_POS_PAGE(pos)) >= QUEUE_CLEANUP_DELAY)
2362 : {
2363 : /* It's idle and far behind, so wake it up */
2364 : Assert(pid != InvalidPid);
2365 :
2366 0 : QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
2367 0 : signalPids[count] = pid;
2368 0 : signalProcnos[count] = i;
2369 0 : count++;
2370 : }
2371 : }
2372 :
2373 65 : LWLockRelease(NotifyQueueLock);
2374 :
2375 : /* Now send signals */
2376 96 : for (int i = 0; i < count; i++)
2377 : {
2378 31 : int32 pid = signalPids[i];
2379 :
2380 : /*
2381 : * If we are signaling our own process, no need to involve the kernel;
2382 : * just set the flag directly.
2383 : */
2384 31 : if (pid == MyProcPid)
2385 : {
2386 21 : notifyInterruptPending = true;
2387 21 : continue;
2388 : }
2389 :
2390 : /*
2391 : * Note: assuming things aren't broken, a signal failure here could
2392 : * only occur if the target backend exited since we released
2393 : * NotifyQueueLock; which is unlikely but certainly possible. So we
2394 : * just log a low-level debug message if it happens.
2395 : */
2396 10 : if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, signalProcnos[i]) < 0)
2397 0 : elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
2398 : }
2399 65 : }
2400 :
2401 : /*
2402 : * AtAbort_Notify
2403 : *
2404 : * This is called at transaction abort.
2405 : *
2406 : * Revert any staged listen/unlisten changes and clean up transaction state.
2407 : * This only does anything if we abort after PreCommit_Notify has staged
2408 : * some entries.
2409 : */
2410 : void
2411 26563 : AtAbort_Notify(void)
2412 : {
2413 : /* Revert staged listen/unlisten changes */
2414 26563 : ApplyPendingListenActions(false);
2415 :
2416 : /* If we're no longer listening on anything, unregister */
2417 26563 : if (amRegisteredListener && LocalChannelTableIsEmpty())
2418 0 : asyncQueueUnregister();
2419 :
2420 : /* And clean up */
2421 26563 : ClearPendingActionsAndNotifies();
2422 26563 : }
2423 :
2424 : /*
2425 : * AtSubCommit_Notify() --- Take care of subtransaction commit.
2426 : *
2427 : * Reassign all items in the pending lists to the parent transaction.
2428 : */
2429 : void
2430 5378 : AtSubCommit_Notify(void)
2431 : {
2432 5378 : int my_level = GetCurrentTransactionNestLevel();
2433 :
2434 : /* If there are actions at our nesting level, we must reparent them. */
2435 5378 : if (pendingActions != NULL &&
2436 2 : pendingActions->nestingLevel >= my_level)
2437 : {
2438 2 : if (pendingActions->upper == NULL ||
2439 1 : pendingActions->upper->nestingLevel < my_level - 1)
2440 : {
2441 : /* nothing to merge; give the whole thing to the parent */
2442 1 : --pendingActions->nestingLevel;
2443 : }
2444 : else
2445 : {
2446 1 : ActionList *childPendingActions = pendingActions;
2447 :
2448 1 : pendingActions = pendingActions->upper;
2449 :
2450 : /*
2451 : * Mustn't try to eliminate duplicates here --- see queue_listen()
2452 : */
2453 2 : pendingActions->actions =
2454 1 : list_concat(pendingActions->actions,
2455 1 : childPendingActions->actions);
2456 1 : pfree(childPendingActions);
2457 : }
2458 : }
2459 :
2460 : /* If there are notifies at our nesting level, we must reparent them. */
2461 5378 : if (pendingNotifies != NULL &&
2462 3 : pendingNotifies->nestingLevel >= my_level)
2463 : {
2464 : Assert(pendingNotifies->nestingLevel == my_level);
2465 :
2466 2 : if (pendingNotifies->upper == NULL ||
2467 1 : pendingNotifies->upper->nestingLevel < my_level - 1)
2468 : {
2469 : /* nothing to merge; give the whole thing to the parent */
2470 1 : --pendingNotifies->nestingLevel;
2471 : }
2472 : else
2473 : {
2474 : /*
2475 : * Formerly, we didn't bother to eliminate duplicates here, but
2476 : * now we must, else we fall foul of "Assert(!found)", either here
2477 : * or during a later attempt to build the parent-level hashtable.
2478 : */
2479 1 : NotificationList *childPendingNotifies = pendingNotifies;
2480 : ListCell *l;
2481 :
2482 1 : pendingNotifies = pendingNotifies->upper;
2483 : /* Insert all the subxact's events into parent, except for dups */
2484 5 : foreach(l, childPendingNotifies->events)
2485 : {
2486 4 : Notification *childn = (Notification *) lfirst(l);
2487 :
2488 4 : if (!AsyncExistsPendingNotify(childn))
2489 2 : AddEventToPendingNotifies(childn);
2490 : }
2491 1 : pfree(childPendingNotifies);
2492 : }
2493 : }
2494 5378 : }
2495 :
2496 : /*
2497 : * AtSubAbort_Notify() --- Take care of subtransaction abort.
2498 : */
2499 : void
2500 4713 : AtSubAbort_Notify(void)
2501 : {
2502 4713 : int my_level = GetCurrentTransactionNestLevel();
2503 :
2504 : /*
2505 : * All we have to do is pop the stack --- the actions/notifies made in
2506 : * this subxact are no longer interesting, and the space will be freed
2507 : * when CurTransactionContext is recycled. We still have to free the
2508 : * ActionList and NotificationList objects themselves, though, because
2509 : * those are allocated in TopTransactionContext.
2510 : *
2511 : * Note that there might be no entries at all, or no entries for the
2512 : * current subtransaction level, either because none were ever created, or
2513 : * because we reentered this routine due to trouble during subxact abort.
2514 : */
2515 4714 : while (pendingActions != NULL &&
2516 1 : pendingActions->nestingLevel >= my_level)
2517 : {
2518 1 : ActionList *childPendingActions = pendingActions;
2519 :
2520 1 : pendingActions = pendingActions->upper;
2521 1 : pfree(childPendingActions);
2522 : }
2523 :
2524 4714 : while (pendingNotifies != NULL &&
2525 2 : pendingNotifies->nestingLevel >= my_level)
2526 : {
2527 1 : NotificationList *childPendingNotifies = pendingNotifies;
2528 :
2529 1 : pendingNotifies = pendingNotifies->upper;
2530 1 : pfree(childPendingNotifies);
2531 : }
2532 4713 : }
2533 :
2534 : /*
2535 : * HandleNotifyInterrupt
2536 : *
2537 : * Signal handler portion of interrupt handling. Let the backend know
2538 : * that there's a pending notify interrupt. If we're currently reading
2539 : * from the client, this will interrupt the read and
2540 : * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
2541 : */
2542 : void
2543 9 : HandleNotifyInterrupt(void)
2544 : {
2545 : /*
2546 : * Note: this is called by a SIGNAL HANDLER. You must be very wary what
2547 : * you do here.
2548 : */
2549 :
2550 : /* signal that work needs to be done */
2551 9 : notifyInterruptPending = true;
2552 :
2553 : /* make sure the event is processed in due course */
2554 9 : SetLatch(MyLatch);
2555 9 : }
2556 :
2557 : /*
2558 : * ProcessNotifyInterrupt
2559 : *
2560 : * This is called if we see notifyInterruptPending set, just before
2561 : * transmitting ReadyForQuery at the end of a frontend command, and
2562 : * also if a notify signal occurs while reading from the frontend.
2563 : * HandleNotifyInterrupt() will cause the read to be interrupted
2564 : * via the process's latch, and this routine will get called.
2565 : * If we are truly idle (ie, *not* inside a transaction block),
2566 : * process the incoming notifies.
2567 : *
2568 : * If "flush" is true, force any frontend messages out immediately.
2569 : * This can be false when being called at the end of a frontend command,
2570 : * since we'll flush after sending ReadyForQuery.
2571 : */
2572 : void
2573 35 : ProcessNotifyInterrupt(bool flush)
2574 : {
2575 35 : if (IsTransactionOrTransactionBlock())
2576 6 : return; /* not really idle */
2577 :
2578 : /* Loop in case another signal arrives while sending messages */
2579 58 : while (notifyInterruptPending)
2580 29 : ProcessIncomingNotify(flush);
2581 : }
2582 :
2583 :
2584 : /*
2585 : * Read all pending notifications from the queue, and deliver appropriate
2586 : * ones to my frontend. Stop when we reach queue head or an uncommitted
2587 : * notification.
2588 : */
2589 : static void
2590 44 : asyncQueueReadAllNotifications(void)
2591 : {
2592 : QueuePosition pos;
2593 : QueuePosition head;
2594 : Snapshot snapshot;
2595 :
2596 : /*
2597 : * Fetch current state, indicate to others that we have woken up, and that
2598 : * we are in process of advancing our position.
2599 : */
2600 44 : LWLockAcquire(NotifyQueueLock, LW_SHARED);
2601 : /* Assert checks that we have a valid state entry */
2602 : Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
2603 44 : QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
2604 44 : pos = QUEUE_BACKEND_POS(MyProcNumber);
2605 44 : head = QUEUE_HEAD;
2606 :
2607 44 : if (QUEUE_POS_EQUAL(pos, head))
2608 : {
2609 : /* Nothing to do, we have read all notifications already. */
2610 0 : LWLockRelease(NotifyQueueLock);
2611 0 : return;
2612 : }
2613 :
2614 44 : QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = true;
2615 44 : LWLockRelease(NotifyQueueLock);
2616 :
2617 : /*----------
2618 : * Get snapshot we'll use to decide which xacts are still in progress.
2619 : * This is trickier than it might seem, because of race conditions.
2620 : * Consider the following example:
2621 : *
2622 : * Backend 1: Backend 2:
2623 : *
2624 : * transaction starts
2625 : * UPDATE foo SET ...;
2626 : * NOTIFY foo;
2627 : * commit starts
2628 : * queue the notify message
2629 : * transaction starts
2630 : * LISTEN foo; -- first LISTEN in session
2631 : * SELECT * FROM foo WHERE ...;
2632 : * commit to clog
2633 : * commit starts
2634 : * add backend 2 to array of listeners
2635 : * advance to queue head (this code)
2636 : * commit to clog
2637 : *
2638 : * Transaction 2's SELECT has not seen the UPDATE's effects, since that
2639 : * wasn't committed yet. Ideally we'd ensure that client 2 would
2640 : * eventually get transaction 1's notify message, but there's no way
2641 : * to do that; until we're in the listener array, there's no guarantee
2642 : * that the notify message doesn't get removed from the queue.
2643 : *
2644 : * Therefore the coding technique transaction 2 is using is unsafe:
2645 : * applications must commit a LISTEN before inspecting database state,
2646 : * if they want to ensure they will see notifications about subsequent
2647 : * changes to that state.
2648 : *
2649 : * What we do guarantee is that we'll see all notifications from
2650 : * transactions committing after the snapshot we take here.
2651 : * BecomeRegisteredListener has already added us to the listener array,
2652 : * so no not-yet-committed messages can be removed from the queue
2653 : * before we see them.
2654 : *----------
2655 : */
2656 44 : snapshot = RegisterSnapshot(GetLatestSnapshot());
2657 :
2658 : /*
2659 : * It is possible that we fail while trying to send a message to our
2660 : * frontend (for example, because of encoding conversion failure). If
2661 : * that happens it is critical that we not try to send the same message
2662 : * over and over again. Therefore, we set ExitOnAnyError to upgrade any
2663 : * ERRORs to FATAL, causing the client connection to be closed on error.
2664 : *
2665 : * We used to only skip over the offending message and try to soldier on,
2666 : * but it was somewhat questionable to lose a notification and give the
2667 : * client an ERROR instead. A client application is not be prepared for
2668 : * that and can't tell that a notification was missed. It was also not
2669 : * very useful in practice because notifications are often processed while
2670 : * a connection is idle and reading a message from the client, and in that
2671 : * state, any error is upgraded to FATAL anyway. Closing the connection
2672 : * is a clear signal to the application that it might have missed
2673 : * notifications.
2674 : */
2675 : {
2676 44 : bool save_ExitOnAnyError = ExitOnAnyError;
2677 : bool reachedStop;
2678 :
2679 44 : ExitOnAnyError = true;
2680 :
2681 : do
2682 : {
2683 : /*
2684 : * Process messages up to the stop position, end of page, or an
2685 : * uncommitted message.
2686 : *
2687 : * Our stop position is what we found to be the head's position
2688 : * when we entered this function. It might have changed already.
2689 : * But if it has, we will receive (or have already received and
2690 : * queued) another signal and come here again.
2691 : *
2692 : * We are not holding NotifyQueueLock here! The queue can only
2693 : * extend beyond the head pointer (see above) and we leave our
2694 : * backend's pointer where it is so nobody will truncate or
2695 : * rewrite pages under us. Especially we don't want to hold a lock
2696 : * while sending the notifications to the frontend.
2697 : */
2698 47 : reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
2699 47 : } while (!reachedStop);
2700 :
2701 : /* Update shared state */
2702 44 : LWLockAcquire(NotifyQueueLock, LW_SHARED);
2703 44 : QUEUE_BACKEND_POS(MyProcNumber) = pos;
2704 44 : QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
2705 44 : LWLockRelease(NotifyQueueLock);
2706 :
2707 44 : ExitOnAnyError = save_ExitOnAnyError;
2708 : }
2709 :
2710 : /* Done with snapshot */
2711 44 : UnregisterSnapshot(snapshot);
2712 : }
2713 :
2714 : /*
2715 : * Fetch notifications from the shared queue, beginning at position current,
2716 : * and deliver relevant ones to my frontend.
2717 : *
2718 : * The function returns true once we have reached the stop position or an
2719 : * uncommitted notification, and false if we have finished with the page.
2720 : * In other words: once it returns true there is no need to look further.
2721 : * The QueuePosition *current is advanced past all processed messages.
2722 : */
2723 : static bool
2724 47 : asyncQueueProcessPageEntries(QueuePosition *current,
2725 : QueuePosition stop,
2726 : Snapshot snapshot)
2727 : {
2728 47 : int64 curpage = QUEUE_POS_PAGE(*current);
2729 : int slotno;
2730 : char *page_buffer;
2731 47 : bool reachedStop = false;
2732 : bool reachedEndOfPage;
2733 :
2734 : /*
2735 : * We copy the entries into a local buffer to avoid holding the SLRU lock
2736 : * while we transmit them to our frontend. The local buffer must be
2737 : * adequately aligned.
2738 : */
2739 : alignas(AsyncQueueEntry) char local_buf[QUEUE_PAGESIZE];
2740 47 : char *local_buf_end = local_buf;
2741 :
2742 47 : slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
2743 : InvalidTransactionId);
2744 47 : page_buffer = NotifyCtl->shared->page_buffer[slotno];
2745 :
2746 : do
2747 : {
2748 1326 : QueuePosition thisentry = *current;
2749 : AsyncQueueEntry *qe;
2750 :
2751 1326 : if (QUEUE_POS_EQUAL(thisentry, stop))
2752 44 : break;
2753 :
2754 1282 : qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2755 :
2756 : /*
2757 : * Advance *current over this message, possibly to the next page. As
2758 : * noted in the comments for asyncQueueReadAllNotifications, we must
2759 : * do this before possibly failing while processing the message.
2760 : */
2761 1282 : reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2762 :
2763 : /* Ignore messages destined for other databases */
2764 1282 : if (qe->dboid == MyDatabaseId)
2765 : {
2766 1282 : if (XidInMVCCSnapshot(qe->xid, snapshot))
2767 : {
2768 : /*
2769 : * The source transaction is still in progress, so we can't
2770 : * process this message yet. Break out of the loop, but first
2771 : * back up *current so we will reprocess the message next
2772 : * time. (Note: it is unlikely but not impossible for
2773 : * TransactionIdDidCommit to fail, so we can't really avoid
2774 : * this advance-then-back-up behavior when dealing with an
2775 : * uncommitted message.)
2776 : *
2777 : * Note that we must test XidInMVCCSnapshot before we test
2778 : * TransactionIdDidCommit, else we might return a message from
2779 : * a transaction that is not yet visible to snapshots; compare
2780 : * the comments at the head of heapam_visibility.c.
2781 : *
2782 : * Also, while our own xact won't be listed in the snapshot,
2783 : * we need not check for TransactionIdIsCurrentTransactionId
2784 : * because our transaction cannot (yet) have queued any
2785 : * messages.
2786 : */
2787 0 : *current = thisentry;
2788 0 : reachedStop = true;
2789 0 : break;
2790 : }
2791 :
2792 : /*
2793 : * Quick check for the case that we're not listening on any
2794 : * channels, before calling TransactionIdDidCommit(). This makes
2795 : * that case a little faster, but more importantly, it ensures
2796 : * that if there's a bad entry in the queue for which
2797 : * TransactionIdDidCommit() fails for some reason, we can skip
2798 : * over it on the first LISTEN in a session, and not get stuck on
2799 : * it indefinitely. (This is a little trickier than it looks: it
2800 : * works because BecomeRegisteredListener runs this code before we
2801 : * have made the first entry in localChannelTable.)
2802 : */
2803 1282 : if (LocalChannelTableIsEmpty())
2804 1226 : continue;
2805 :
2806 56 : if (TransactionIdDidCommit(qe->xid))
2807 : {
2808 56 : memcpy(local_buf_end, qe, qe->length);
2809 56 : local_buf_end += qe->length;
2810 : }
2811 : else
2812 : {
2813 : /*
2814 : * The source transaction aborted or crashed, so we just
2815 : * ignore its notifications.
2816 : */
2817 : }
2818 : }
2819 :
2820 : /* Loop back if we're not at end of page */
2821 1282 : } while (!reachedEndOfPage);
2822 :
2823 : /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2824 47 : LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
2825 :
2826 : /*
2827 : * Now that we have let go of the SLRU bank lock, send the notifications
2828 : * to our backend
2829 : */
2830 : Assert(local_buf_end - local_buf <= BLCKSZ);
2831 103 : for (char *p = local_buf; p < local_buf_end;)
2832 : {
2833 56 : AsyncQueueEntry *qe = (AsyncQueueEntry *) p;
2834 :
2835 : /* qe->data is the null-terminated channel name */
2836 56 : char *channel = qe->data;
2837 :
2838 56 : if (IsListeningOn(channel))
2839 : {
2840 : /* payload follows channel name */
2841 55 : char *payload = qe->data + strlen(channel) + 1;
2842 :
2843 55 : NotifyMyFrontEnd(channel, payload, qe->srcPid);
2844 : }
2845 :
2846 56 : p += qe->length;
2847 : }
2848 :
2849 47 : if (QUEUE_POS_EQUAL(*current, stop))
2850 44 : reachedStop = true;
2851 :
2852 47 : return reachedStop;
2853 : }
2854 :
2855 : /*
2856 : * Advance the shared queue tail variable to the minimum of all the
2857 : * per-backend tail pointers. Truncate pg_notify space if possible.
2858 : *
2859 : * This is (usually) called during CommitTransaction(), so it's important for
2860 : * it to have very low probability of failure.
2861 : */
2862 : static void
2863 13 : asyncQueueAdvanceTail(void)
2864 : {
2865 : QueuePosition min;
2866 : int64 oldtailpage;
2867 : int64 newtailpage;
2868 : int64 boundary;
2869 :
2870 : /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2871 13 : LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
2872 :
2873 : /*
2874 : * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2875 : * (ie, exactly match at least one backend's queue position), so it must
2876 : * be updated atomically with the actual computation. Since v13, we could
2877 : * get away with not doing it like that, but it seems prudent to keep it
2878 : * so.
2879 : *
2880 : * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2881 : * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2882 : * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2883 : * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2884 : * there are pages we can truncate but haven't yet finished doing so.
2885 : *
2886 : * For concurrency's sake, we don't want to hold NotifyQueueLock while
2887 : * performing SimpleLruTruncate. This is OK because no backend will try
2888 : * to access the pages we are in the midst of truncating.
2889 : */
2890 13 : LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2891 13 : min = QUEUE_HEAD;
2892 23 : for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
2893 : {
2894 : Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
2895 10 : min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2896 : }
2897 13 : QUEUE_TAIL = min;
2898 13 : oldtailpage = QUEUE_STOP_PAGE;
2899 13 : LWLockRelease(NotifyQueueLock);
2900 :
2901 : /*
2902 : * We can truncate something if the global tail advanced across an SLRU
2903 : * segment boundary.
2904 : *
2905 : * XXX it might be better to truncate only once every several segments, to
2906 : * reduce the number of directory scans.
2907 : */
2908 13 : newtailpage = QUEUE_POS_PAGE(min);
2909 13 : boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
2910 13 : if (asyncQueuePagePrecedes(oldtailpage, boundary))
2911 : {
2912 : /*
2913 : * SimpleLruTruncate() will ask for SLRU bank locks but will also
2914 : * release the lock again.
2915 : */
2916 1 : SimpleLruTruncate(NotifyCtl, newtailpage);
2917 :
2918 1 : LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2919 1 : QUEUE_STOP_PAGE = newtailpage;
2920 1 : LWLockRelease(NotifyQueueLock);
2921 : }
2922 :
2923 13 : LWLockRelease(NotifyQueueTailLock);
2924 13 : }
2925 :
2926 : /*
2927 : * AsyncNotifyFreezeXids
2928 : *
2929 : * Prepare the async notification queue for CLOG truncation by freezing
2930 : * transaction IDs that are about to become inaccessible.
2931 : *
2932 : * This function is called by VACUUM before advancing datfrozenxid. It scans
2933 : * the notification queue and replaces XIDs that would become inaccessible
2934 : * after CLOG truncation with special markers:
2935 : * - Committed transactions are set to FrozenTransactionId
2936 : * - Aborted/crashed transactions are set to InvalidTransactionId
2937 : *
2938 : * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
2939 : * pages will be truncated. If XID < newFrozenXid, it cannot still be running
2940 : * (or it would have held back newFrozenXid through ProcArray).
2941 : * Therefore, if TransactionIdDidCommit returns false, we know the transaction
2942 : * either aborted explicitly or crashed, and we can safely mark it invalid.
2943 : */
2944 : void
2945 1079 : AsyncNotifyFreezeXids(TransactionId newFrozenXid)
2946 : {
2947 : QueuePosition pos;
2948 : QueuePosition head;
2949 1079 : int64 curpage = -1;
2950 1079 : int slotno = -1;
2951 1079 : char *page_buffer = NULL;
2952 1079 : bool page_dirty = false;
2953 :
2954 : /*
2955 : * Acquire locks in the correct order to avoid deadlocks. As per the
2956 : * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2957 : * bank locks.
2958 : *
2959 : * We only need SHARED mode since we're just reading the head/tail
2960 : * positions, not modifying them.
2961 : */
2962 1079 : LWLockAcquire(NotifyQueueTailLock, LW_SHARED);
2963 1079 : LWLockAcquire(NotifyQueueLock, LW_SHARED);
2964 :
2965 1079 : pos = QUEUE_TAIL;
2966 1079 : head = QUEUE_HEAD;
2967 :
2968 : /* Release NotifyQueueLock early, we only needed to read the positions */
2969 1079 : LWLockRelease(NotifyQueueLock);
2970 :
2971 : /*
2972 : * Scan the queue from tail to head, freezing XIDs as needed. We hold
2973 : * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2974 : * we're working.
2975 : */
2976 1109 : while (!QUEUE_POS_EQUAL(pos, head))
2977 : {
2978 : AsyncQueueEntry *qe;
2979 : TransactionId xid;
2980 30 : int64 pageno = QUEUE_POS_PAGE(pos);
2981 30 : int offset = QUEUE_POS_OFFSET(pos);
2982 :
2983 : /* If we need a different page, release old lock and get new one */
2984 30 : if (pageno != curpage)
2985 : {
2986 : LWLock *lock;
2987 :
2988 : /* Release previous page if any */
2989 3 : if (slotno >= 0)
2990 : {
2991 0 : if (page_dirty)
2992 : {
2993 0 : NotifyCtl->shared->page_dirty[slotno] = true;
2994 0 : page_dirty = false;
2995 : }
2996 0 : LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
2997 : }
2998 :
2999 3 : lock = SimpleLruGetBankLock(NotifyCtl, pageno);
3000 3 : LWLockAcquire(lock, LW_EXCLUSIVE);
3001 3 : slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
3002 : InvalidTransactionId);
3003 3 : page_buffer = NotifyCtl->shared->page_buffer[slotno];
3004 3 : curpage = pageno;
3005 : }
3006 :
3007 30 : qe = (AsyncQueueEntry *) (page_buffer + offset);
3008 30 : xid = qe->xid;
3009 :
3010 60 : if (TransactionIdIsNormal(xid) &&
3011 30 : TransactionIdPrecedes(xid, newFrozenXid))
3012 : {
3013 10 : if (TransactionIdDidCommit(xid))
3014 : {
3015 10 : qe->xid = FrozenTransactionId;
3016 10 : page_dirty = true;
3017 : }
3018 : else
3019 : {
3020 0 : qe->xid = InvalidTransactionId;
3021 0 : page_dirty = true;
3022 : }
3023 : }
3024 :
3025 : /* Advance to next entry */
3026 30 : asyncQueueAdvance(&pos, qe->length);
3027 : }
3028 :
3029 : /* Release final page lock if we acquired one */
3030 1079 : if (slotno >= 0)
3031 : {
3032 3 : if (page_dirty)
3033 1 : NotifyCtl->shared->page_dirty[slotno] = true;
3034 3 : LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
3035 : }
3036 :
3037 1079 : LWLockRelease(NotifyQueueTailLock);
3038 1079 : }
3039 :
3040 : /*
3041 : * ProcessIncomingNotify
3042 : *
3043 : * Scan the queue for arriving notifications and report them to the front
3044 : * end. The notifications might be from other sessions, or our own;
3045 : * there's no need to distinguish here.
3046 : *
3047 : * If "flush" is true, force any frontend messages out immediately.
3048 : *
3049 : * NOTE: since we are outside any transaction, we must create our own.
3050 : */
3051 : static void
3052 29 : ProcessIncomingNotify(bool flush)
3053 : {
3054 : /* We *must* reset the flag */
3055 29 : notifyInterruptPending = false;
3056 :
3057 : /* Do nothing else if we aren't actively listening */
3058 29 : if (LocalChannelTableIsEmpty())
3059 0 : return;
3060 :
3061 29 : if (Trace_notify)
3062 0 : elog(DEBUG1, "ProcessIncomingNotify");
3063 :
3064 29 : set_ps_display("notify interrupt");
3065 :
3066 : /*
3067 : * We must run asyncQueueReadAllNotifications inside a transaction, else
3068 : * bad things happen if it gets an error.
3069 : */
3070 29 : StartTransactionCommand();
3071 :
3072 29 : asyncQueueReadAllNotifications();
3073 :
3074 29 : CommitTransactionCommand();
3075 :
3076 : /*
3077 : * If this isn't an end-of-command case, we must flush the notify messages
3078 : * to ensure frontend gets them promptly.
3079 : */
3080 29 : if (flush)
3081 7 : pq_flush();
3082 :
3083 29 : set_ps_display("idle");
3084 :
3085 29 : if (Trace_notify)
3086 0 : elog(DEBUG1, "ProcessIncomingNotify: done");
3087 : }
3088 :
3089 : /*
3090 : * Send NOTIFY message to my front end.
3091 : */
3092 : void
3093 55 : NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
3094 : {
3095 55 : if (whereToSendOutput == DestRemote)
3096 : {
3097 : StringInfoData buf;
3098 :
3099 55 : pq_beginmessage(&buf, PqMsg_NotificationResponse);
3100 55 : pq_sendint32(&buf, srcPid);
3101 55 : pq_sendstring(&buf, channel);
3102 55 : pq_sendstring(&buf, payload);
3103 55 : pq_endmessage(&buf);
3104 :
3105 : /*
3106 : * NOTE: we do not do pq_flush() here. Some level of caller will
3107 : * handle it later, allowing this message to be combined into a packet
3108 : * with other ones.
3109 : */
3110 : }
3111 : else
3112 0 : elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
3113 55 : }
3114 :
3115 : /* Does pendingNotifies include a match for the given event? */
3116 : static bool
3117 1066 : AsyncExistsPendingNotify(Notification *n)
3118 : {
3119 1066 : if (pendingNotifies == NULL)
3120 0 : return false;
3121 :
3122 1066 : if (pendingNotifies->hashtab != NULL)
3123 : {
3124 : /* Use the hash table to probe for a match */
3125 984 : if (hash_search(pendingNotifies->hashtab,
3126 : &n,
3127 : HASH_FIND,
3128 : NULL))
3129 1 : return true;
3130 : }
3131 : else
3132 : {
3133 : /* Must scan the event list */
3134 : ListCell *l;
3135 :
3136 425 : foreach(l, pendingNotifies->events)
3137 : {
3138 357 : Notification *oldn = (Notification *) lfirst(l);
3139 :
3140 357 : if (n->channel_len == oldn->channel_len &&
3141 357 : n->payload_len == oldn->payload_len &&
3142 190 : memcmp(n->data, oldn->data,
3143 190 : n->channel_len + n->payload_len + 2) == 0)
3144 14 : return true;
3145 : }
3146 : }
3147 :
3148 1051 : return false;
3149 : }
3150 :
3151 : /*
3152 : * Add a notification event to a pre-existing pendingNotifies list.
3153 : *
3154 : * Because pendingNotifies->events is already nonempty, this works
3155 : * correctly no matter what CurrentMemoryContext is.
3156 : */
3157 : static void
3158 1051 : AddEventToPendingNotifies(Notification *n)
3159 : {
3160 : Assert(pendingNotifies->events != NIL);
3161 :
3162 : /* Create the hash tables if it's time to */
3163 1051 : if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
3164 985 : pendingNotifies->hashtab == NULL)
3165 : {
3166 : HASHCTL hash_ctl;
3167 : ListCell *l;
3168 :
3169 : /* Create the hash table */
3170 2 : hash_ctl.keysize = sizeof(Notification *);
3171 2 : hash_ctl.entrysize = sizeof(struct NotificationHash);
3172 2 : hash_ctl.hash = notification_hash;
3173 2 : hash_ctl.match = notification_match;
3174 2 : hash_ctl.hcxt = CurTransactionContext;
3175 4 : pendingNotifies->hashtab =
3176 2 : hash_create("Pending Notifies",
3177 : 256L,
3178 : &hash_ctl,
3179 : HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
3180 :
3181 : /* Create the unique channel name table */
3182 : Assert(pendingNotifies->uniqueChannelHash == NULL);
3183 2 : hash_ctl.keysize = NAMEDATALEN;
3184 2 : hash_ctl.entrysize = sizeof(ChannelName);
3185 2 : hash_ctl.hcxt = CurTransactionContext;
3186 4 : pendingNotifies->uniqueChannelHash =
3187 2 : hash_create("Pending Notify Channel Names",
3188 : 64L,
3189 : &hash_ctl,
3190 : HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
3191 :
3192 : /* Insert all the already-existing events */
3193 34 : foreach(l, pendingNotifies->events)
3194 : {
3195 32 : Notification *oldn = (Notification *) lfirst(l);
3196 32 : char *channel = oldn->data;
3197 : bool found;
3198 :
3199 32 : (void) hash_search(pendingNotifies->hashtab,
3200 : &oldn,
3201 : HASH_ENTER,
3202 : &found);
3203 : Assert(!found);
3204 :
3205 : /* Add channel name to uniqueChannelHash; might be there already */
3206 32 : (void) hash_search(pendingNotifies->uniqueChannelHash,
3207 : channel,
3208 : HASH_ENTER,
3209 : NULL);
3210 : }
3211 : }
3212 :
3213 : /* Add new event to the list, in order */
3214 1051 : pendingNotifies->events = lappend(pendingNotifies->events, n);
3215 :
3216 : /* Add event to the hash tables if needed */
3217 1051 : if (pendingNotifies->hashtab != NULL)
3218 : {
3219 985 : char *channel = n->data;
3220 : bool found;
3221 :
3222 985 : (void) hash_search(pendingNotifies->hashtab,
3223 : &n,
3224 : HASH_ENTER,
3225 : &found);
3226 : Assert(!found);
3227 :
3228 : /* Add channel name to uniqueChannelHash; might be there already */
3229 985 : (void) hash_search(pendingNotifies->uniqueChannelHash,
3230 : channel,
3231 : HASH_ENTER,
3232 : NULL);
3233 : }
3234 1051 : }
3235 :
3236 : /*
3237 : * notification_hash: hash function for notification hash table
3238 : *
3239 : * The hash "keys" are pointers to Notification structs.
3240 : */
3241 : static uint32
3242 2001 : notification_hash(const void *key, Size keysize)
3243 : {
3244 2001 : const Notification *k = *(const Notification *const *) key;
3245 :
3246 : Assert(keysize == sizeof(Notification *));
3247 : /* We don't bother to include the payload's trailing null in the hash */
3248 2001 : return DatumGetUInt32(hash_any((const unsigned char *) k->data,
3249 2001 : k->channel_len + k->payload_len + 1));
3250 : }
3251 :
3252 : /*
3253 : * notification_match: match function to use with notification_hash
3254 : */
3255 : static int
3256 1 : notification_match(const void *key1, const void *key2, Size keysize)
3257 : {
3258 1 : const Notification *k1 = *(const Notification *const *) key1;
3259 1 : const Notification *k2 = *(const Notification *const *) key2;
3260 :
3261 : Assert(keysize == sizeof(Notification *));
3262 1 : if (k1->channel_len == k2->channel_len &&
3263 1 : k1->payload_len == k2->payload_len &&
3264 1 : memcmp(k1->data, k2->data,
3265 1 : k1->channel_len + k1->payload_len + 2) == 0)
3266 1 : return 0; /* equal */
3267 0 : return 1; /* not equal */
3268 : }
3269 :
3270 : /* Clear the pendingActions and pendingNotifies lists. */
3271 : static void
3272 26719 : ClearPendingActionsAndNotifies(void)
3273 : {
3274 : /*
3275 : * Everything's allocated in either TopTransactionContext or the context
3276 : * for the subtransaction to which it corresponds. So, there's nothing to
3277 : * do here except reset the pointers; the space will be reclaimed when the
3278 : * contexts are deleted.
3279 : */
3280 26719 : pendingActions = NULL;
3281 26719 : pendingNotifies = NULL;
3282 : /* Also clear pendingListenActions, which is derived from pendingActions */
3283 26719 : pendingListenActions = NULL;
3284 26719 : }
3285 :
3286 : /*
3287 : * GUC check_hook for notify_buffers
3288 : */
3289 : bool
3290 1192 : check_notify_buffers(int *newval, void **extra, GucSource source)
3291 : {
3292 1192 : return check_slru_buffers("notify_buffers", newval);
3293 : }
|