LCOV - code coverage report
Current view: top level - src/backend/commands - async.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 519 605 85.8 %
Date: 2025-11-20 16:17:43 Functions: 44 46 95.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * async.c
       4             :  *    Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/commands/async.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : /*-------------------------------------------------------------------------
      16             :  * Async Notification Model as of 9.0:
      17             :  *
      18             :  * 1. Multiple backends on same machine. Multiple backends listening on
      19             :  *    several channels. (Channels are also called "conditions" in other
      20             :  *    parts of the code.)
      21             :  *
      22             :  * 2. There is one central queue in disk-based storage (directory pg_notify/),
      23             :  *    with actively-used pages mapped into shared memory by the slru.c module.
      24             :  *    All notification messages are placed in the queue and later read out
      25             :  *    by listening backends.
      26             :  *
      27             :  *    There is no central knowledge of which backend listens on which channel;
      28             :  *    every backend has its own list of interesting channels.
      29             :  *
      30             :  *    Although there is only one queue, notifications are treated as being
      31             :  *    database-local; this is done by including the sender's database OID
      32             :  *    in each notification message.  Listening backends ignore messages
      33             :  *    that don't match their database OID.  This is important because it
      34             :  *    ensures senders and receivers have the same database encoding and won't
      35             :  *    misinterpret non-ASCII text in the channel name or payload string.
      36             :  *
      37             :  *    Since notifications are not expected to survive database crashes,
      38             :  *    we can simply clean out the pg_notify data at any reboot, and there
      39             :  *    is no need for WAL support or fsync'ing.
      40             :  *
      41             :  * 3. Every backend that is listening on at least one channel registers by
      42             :  *    entering its PID into the array in AsyncQueueControl. It then scans all
      43             :  *    incoming notifications in the central queue and first compares the
      44             :  *    database OID of the notification with its own database OID and then
      45             :  *    compares the notified channel with the list of channels that it listens
      46             :  *    to. In case there is a match it delivers the notification event to its
      47             :  *    frontend.  Non-matching events are simply skipped.
      48             :  *
      49             :  * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
      50             :  *    a backend-local list which will not be processed until transaction end.
      51             :  *
      52             :  *    Duplicate notifications from the same transaction are sent out as one
      53             :  *    notification only. This is done to save work when for example a trigger
      54             :  *    on a 2 million row table fires a notification for each row that has been
      55             :  *    changed. If the application needs to receive every single notification
      56             :  *    that has been sent, it can easily add some unique string into the extra
      57             :  *    payload parameter.
      58             :  *
      59             :  *    When the transaction is ready to commit, PreCommit_Notify() adds the
      60             :  *    pending notifications to the head of the queue. The head pointer of the
      61             :  *    queue always points to the next free position and a position is just a
      62             :  *    page number and the offset in that page. This is done before marking the
      63             :  *    transaction as committed in clog. If we run into problems writing the
      64             :  *    notifications, we can still call elog(ERROR, ...) and the transaction
      65             :  *    will roll back.
      66             :  *
      67             :  *    Once we have put all of the notifications into the queue, we return to
      68             :  *    CommitTransaction() which will then do the actual transaction commit.
      69             :  *
      70             :  *    After commit we are called another time (AtCommit_Notify()). Here we
      71             :  *    make any actual updates to the effective listen state (listenChannels).
      72             :  *    Then we signal any backends that may be interested in our messages
      73             :  *    (including our own backend, if listening).  This is done by
      74             :  *    SignalBackends(), which scans the list of listening backends and sends a
      75             :  *    PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
      76             :  *    know which backend is listening on which channel so we must signal them
      77             :  *    all).  We can exclude backends that are already up to date, though, and
      78             :  *    we can also exclude backends that are in other databases (unless they
      79             :  *    are way behind and should be kicked to make them advance their
      80             :  *    pointers).
      81             :  *
      82             :  *    Finally, after we are out of the transaction altogether and about to go
      83             :  *    idle, we scan the queue for messages that need to be sent to our
      84             :  *    frontend (which might be notifies from other backends, or self-notifies
      85             :  *    from our own).  This step is not part of the CommitTransaction sequence
      86             :  *    for two important reasons.  First, we could get errors while sending
      87             :  *    data to our frontend, and it's really bad for errors to happen in
      88             :  *    post-commit cleanup.  Second, in cases where a procedure issues commits
      89             :  *    within a single frontend command, we don't want to send notifies to our
      90             :  *    frontend until the command is done; but notifies to other backends
      91             :  *    should go out immediately after each commit.
      92             :  *
      93             :  * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
      94             :  *    sets the process's latch, which triggers the event to be processed
      95             :  *    immediately if this backend is idle (i.e., it is waiting for a frontend
      96             :  *    command and is not within a transaction block. C.f.
      97             :  *    ProcessClientReadInterrupt()).  Otherwise the handler may only set a
      98             :  *    flag, which will cause the processing to occur just before we next go
      99             :  *    idle.
     100             :  *
     101             :  *    Inbound-notify processing consists of reading all of the notifications
     102             :  *    that have arrived since scanning last time. We read every notification
     103             :  *    until we reach either a notification from an uncommitted transaction or
     104             :  *    the head pointer's position.
     105             :  *
     106             :  * 6. To limit disk space consumption, the tail pointer needs to be advanced
     107             :  *    so that old pages can be truncated. This is relatively expensive
     108             :  *    (notably, it requires an exclusive lock), so we don't want to do it
     109             :  *    often. We make sending backends do this work if they advanced the queue
     110             :  *    head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
     111             :  *
     112             :  * An application that listens on the same channel it notifies will get
     113             :  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
     114             :  * by comparing be_pid in the NOTIFY message to the application's own backend's
     115             :  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
     116             :  * frontend during startup.)  The above design guarantees that notifies from
     117             :  * other backends will never be missed by ignoring self-notifies.
     118             :  *
     119             :  * The amount of shared memory used for notify management (notify_buffers)
     120             :  * can be varied without affecting anything but performance.  The maximum
     121             :  * amount of notification data that can be queued at one time is determined
     122             :  * by max_notify_queue_pages GUC.
     123             :  *-------------------------------------------------------------------------
     124             :  */
     125             : 
     126             : #include "postgres.h"
     127             : 
     128             : #include <limits.h>
     129             : #include <unistd.h>
     130             : #include <signal.h>
     131             : 
     132             : #include "access/parallel.h"
     133             : #include "access/slru.h"
     134             : #include "access/transam.h"
     135             : #include "access/xact.h"
     136             : #include "catalog/pg_database.h"
     137             : #include "commands/async.h"
     138             : #include "common/hashfn.h"
     139             : #include "funcapi.h"
     140             : #include "libpq/libpq.h"
     141             : #include "libpq/pqformat.h"
     142             : #include "miscadmin.h"
     143             : #include "storage/ipc.h"
     144             : #include "storage/lmgr.h"
     145             : #include "storage/procsignal.h"
     146             : #include "tcop/tcopprot.h"
     147             : #include "utils/builtins.h"
     148             : #include "utils/guc_hooks.h"
     149             : #include "utils/memutils.h"
     150             : #include "utils/ps_status.h"
     151             : #include "utils/snapmgr.h"
     152             : #include "utils/timestamp.h"
     153             : 
     154             : 
     155             : /*
     156             :  * Maximum size of a NOTIFY payload, including terminating NULL.  This
     157             :  * must be kept small enough so that a notification message fits on one
     158             :  * SLRU page.  The magic fudge factor here is noncritical as long as it's
     159             :  * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
     160             :  * than that, so changes in that data structure won't affect user-visible
     161             :  * restrictions.
     162             :  */
     163             : #define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
     164             : 
     165             : /*
     166             :  * Struct representing an entry in the global notify queue
     167             :  *
     168             :  * This struct declaration has the maximal length, but in a real queue entry
     169             :  * the data area is only big enough for the actual channel and payload strings
     170             :  * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
     171             :  * entry size, if both channel and payload strings are empty (but note it
     172             :  * doesn't include alignment padding).
     173             :  *
     174             :  * The "length" field should always be rounded up to the next QUEUEALIGN
     175             :  * multiple so that all fields are properly aligned.
     176             :  */
     177             : typedef struct AsyncQueueEntry
     178             : {
     179             :     int         length;         /* total allocated length of entry */
     180             :     Oid         dboid;          /* sender's database OID */
     181             :     TransactionId xid;          /* sender's XID */
     182             :     int32       srcPid;         /* sender's PID */
     183             :     char        data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
     184             : } AsyncQueueEntry;
     185             : 
     186             : /* Currently, no field of AsyncQueueEntry requires more than int alignment */
     187             : #define QUEUEALIGN(len)     INTALIGN(len)
     188             : 
     189             : #define AsyncQueueEntryEmptySize    (offsetof(AsyncQueueEntry, data) + 2)
     190             : 
     191             : /*
     192             :  * Struct describing a queue position, and assorted macros for working with it
     193             :  */
     194             : typedef struct QueuePosition
     195             : {
     196             :     int64       page;           /* SLRU page number */
     197             :     int         offset;         /* byte offset within page */
     198             : } QueuePosition;
     199             : 
     200             : #define QUEUE_POS_PAGE(x)       ((x).page)
     201             : #define QUEUE_POS_OFFSET(x)     ((x).offset)
     202             : 
     203             : #define SET_QUEUE_POS(x,y,z) \
     204             :     do { \
     205             :         (x).page = (y); \
     206             :         (x).offset = (z); \
     207             :     } while (0)
     208             : 
     209             : #define QUEUE_POS_EQUAL(x,y) \
     210             :     ((x).page == (y).page && (x).offset == (y).offset)
     211             : 
     212             : #define QUEUE_POS_IS_ZERO(x) \
     213             :     ((x).page == 0 && (x).offset == 0)
     214             : 
     215             : /* choose logically smaller QueuePosition */
     216             : #define QUEUE_POS_MIN(x,y) \
     217             :     (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
     218             :      (x).page != (y).page ? (y) : \
     219             :      (x).offset < (y).offset ? (x) : (y))
     220             : 
     221             : /* choose logically larger QueuePosition */
     222             : #define QUEUE_POS_MAX(x,y) \
     223             :     (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
     224             :      (x).page != (y).page ? (x) : \
     225             :      (x).offset > (y).offset ? (x) : (y))
     226             : 
     227             : /*
     228             :  * Parameter determining how often we try to advance the tail pointer:
     229             :  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
     230             :  * also the distance by which a backend in another database needs to be
     231             :  * behind before we'll decide we need to wake it up to advance its pointer.
     232             :  *
     233             :  * Resist the temptation to make this really large.  While that would save
     234             :  * work in some places, it would add cost in others.  In particular, this
     235             :  * should likely be less than notify_buffers, to ensure that backends
     236             :  * catch up before the pages they'll need to read fall out of SLRU cache.
     237             :  */
     238             : #define QUEUE_CLEANUP_DELAY 4
     239             : 
     240             : /*
     241             :  * Struct describing a listening backend's status
     242             :  */
     243             : typedef struct QueueBackendStatus
     244             : {
     245             :     int32       pid;            /* either a PID or InvalidPid */
     246             :     Oid         dboid;          /* backend's database OID, or InvalidOid */
     247             :     ProcNumber  nextListener;   /* id of next listener, or INVALID_PROC_NUMBER */
     248             :     QueuePosition pos;          /* backend has read queue up to here */
     249             : } QueueBackendStatus;
     250             : 
     251             : /*
     252             :  * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
     253             :  *
     254             :  * The AsyncQueueControl structure is protected by the NotifyQueueLock and
     255             :  * NotifyQueueTailLock.
     256             :  *
     257             :  * When holding NotifyQueueLock in SHARED mode, backends may only inspect
     258             :  * their own entries as well as the head and tail pointers. Consequently we
     259             :  * can allow a backend to update its own record while holding only SHARED lock
     260             :  * (since no other backend will inspect it).
     261             :  *
     262             :  * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
     263             :  * entries of other backends and also change the head pointer. When holding
     264             :  * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
     265             :  * can change the tail pointers.
     266             :  *
     267             :  * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
     268             :  * the control lock for the pg_notify SLRU buffers.
     269             :  * In order to avoid deadlocks, whenever we need multiple locks, we first get
     270             :  * NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock.
     271             :  *
     272             :  * Each backend uses the backend[] array entry with index equal to its
     273             :  * ProcNumber.  We rely on this to make SendProcSignal fast.
     274             :  *
     275             :  * The backend[] array entries for actively-listening backends are threaded
     276             :  * together using firstListener and the nextListener links, so that we can
     277             :  * scan them without having to iterate over inactive entries.  We keep this
     278             :  * list in order by ProcNumber so that the scan is cache-friendly when there
     279             :  * are many active entries.
     280             :  */
     281             : typedef struct AsyncQueueControl
     282             : {
     283             :     QueuePosition head;         /* head points to the next free location */
     284             :     QueuePosition tail;         /* tail must be <= the queue position of every
     285             :                                  * listening backend */
     286             :     int64       stopPage;       /* oldest unrecycled page; must be <=
     287             :                                  * tail.page */
     288             :     ProcNumber  firstListener;  /* id of first listener, or
     289             :                                  * INVALID_PROC_NUMBER */
     290             :     TimestampTz lastQueueFillWarn;  /* time of last queue-full msg */
     291             :     QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
     292             : } AsyncQueueControl;
     293             : 
     294             : static AsyncQueueControl *asyncQueueControl;
     295             : 
     296             : #define QUEUE_HEAD                  (asyncQueueControl->head)
     297             : #define QUEUE_TAIL                  (asyncQueueControl->tail)
     298             : #define QUEUE_STOP_PAGE             (asyncQueueControl->stopPage)
     299             : #define QUEUE_FIRST_LISTENER        (asyncQueueControl->firstListener)
     300             : #define QUEUE_BACKEND_PID(i)        (asyncQueueControl->backend[i].pid)
     301             : #define QUEUE_BACKEND_DBOID(i)      (asyncQueueControl->backend[i].dboid)
     302             : #define QUEUE_NEXT_LISTENER(i)      (asyncQueueControl->backend[i].nextListener)
     303             : #define QUEUE_BACKEND_POS(i)        (asyncQueueControl->backend[i].pos)
     304             : 
     305             : /*
     306             :  * The SLRU buffer area through which we access the notification queue
     307             :  */
     308             : static SlruCtlData NotifyCtlData;
     309             : 
     310             : #define NotifyCtl                   (&NotifyCtlData)
     311             : #define QUEUE_PAGESIZE              BLCKSZ
     312             : 
     313             : #define QUEUE_FULL_WARN_INTERVAL    5000    /* warn at most once every 5s */
     314             : 
     315             : /*
     316             :  * listenChannels identifies the channels we are actually listening to
     317             :  * (ie, have committed a LISTEN on).  It is a simple list of channel names,
     318             :  * allocated in TopMemoryContext.
     319             :  */
     320             : static List *listenChannels = NIL;  /* list of C strings */
     321             : 
     322             : /*
     323             :  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
     324             :  * all actions requested in the current transaction.  As explained above,
     325             :  * we don't actually change listenChannels until we reach transaction commit.
     326             :  *
     327             :  * The list is kept in CurTransactionContext.  In subtransactions, each
     328             :  * subtransaction has its own list in its own CurTransactionContext, but
     329             :  * successful subtransactions attach their lists to their parent's list.
     330             :  * Failed subtransactions simply discard their lists.
     331             :  */
     332             : typedef enum
     333             : {
     334             :     LISTEN_LISTEN,
     335             :     LISTEN_UNLISTEN,
     336             :     LISTEN_UNLISTEN_ALL,
     337             : } ListenActionKind;
     338             : 
     339             : typedef struct
     340             : {
     341             :     ListenActionKind action;
     342             :     char        channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
     343             : } ListenAction;
     344             : 
     345             : typedef struct ActionList
     346             : {
     347             :     int         nestingLevel;   /* current transaction nesting depth */
     348             :     List       *actions;        /* list of ListenAction structs */
     349             :     struct ActionList *upper;   /* details for upper transaction levels */
     350             : } ActionList;
     351             : 
     352             : static ActionList *pendingActions = NULL;
     353             : 
     354             : /*
     355             :  * State for outbound notifies consists of a list of all channels+payloads
     356             :  * NOTIFYed in the current transaction.  We do not actually perform a NOTIFY
     357             :  * until and unless the transaction commits.  pendingNotifies is NULL if no
     358             :  * NOTIFYs have been done in the current (sub) transaction.
     359             :  *
     360             :  * We discard duplicate notify events issued in the same transaction.
     361             :  * Hence, in addition to the list proper (which we need to track the order
     362             :  * of the events, since we guarantee to deliver them in order), we build a
     363             :  * hash table which we can probe to detect duplicates.  Since building the
     364             :  * hash table is somewhat expensive, we do so only once we have at least
     365             :  * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
     366             :  * before that we just scan the events linearly.
     367             :  *
     368             :  * The list is kept in CurTransactionContext.  In subtransactions, each
     369             :  * subtransaction has its own list in its own CurTransactionContext, but
     370             :  * successful subtransactions add their entries to their parent's list.
     371             :  * Failed subtransactions simply discard their lists.  Since these lists
     372             :  * are independent, there may be notify events in a subtransaction's list
     373             :  * that duplicate events in some ancestor (sub) transaction; we get rid of
     374             :  * the dups when merging the subtransaction's list into its parent's.
     375             :  *
     376             :  * Note: the action and notify lists do not interact within a transaction.
     377             :  * In particular, if a transaction does NOTIFY and then LISTEN on the same
     378             :  * condition name, it will get a self-notify at commit.  This is a bit odd
     379             :  * but is consistent with our historical behavior.
     380             :  */
     381             : typedef struct Notification
     382             : {
     383             :     uint16      channel_len;    /* length of channel-name string */
     384             :     uint16      payload_len;    /* length of payload string */
     385             :     /* null-terminated channel name, then null-terminated payload follow */
     386             :     char        data[FLEXIBLE_ARRAY_MEMBER];
     387             : } Notification;
     388             : 
     389             : typedef struct NotificationList
     390             : {
     391             :     int         nestingLevel;   /* current transaction nesting depth */
     392             :     List       *events;         /* list of Notification structs */
     393             :     HTAB       *hashtab;        /* hash of NotificationHash structs, or NULL */
     394             :     struct NotificationList *upper; /* details for upper transaction levels */
     395             : } NotificationList;
     396             : 
     397             : #define MIN_HASHABLE_NOTIFIES 16    /* threshold to build hashtab */
     398             : 
     399             : struct NotificationHash
     400             : {
     401             :     Notification *event;        /* => the actual Notification struct */
     402             : };
     403             : 
     404             : static NotificationList *pendingNotifies = NULL;
     405             : 
     406             : /*
     407             :  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
     408             :  * called from inside a signal handler. That just sets the
     409             :  * notifyInterruptPending flag and sets the process
     410             :  * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
     411             :  * actually deal with the interrupt.
     412             :  */
     413             : volatile sig_atomic_t notifyInterruptPending = false;
     414             : 
     415             : /* True if we've registered an on_shmem_exit cleanup */
     416             : static bool unlistenExitRegistered = false;
     417             : 
     418             : /* True if we're currently registered as a listener in asyncQueueControl */
     419             : static bool amRegisteredListener = false;
     420             : 
     421             : /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
     422             : static bool tryAdvanceTail = false;
     423             : 
     424             : /* GUC parameters */
     425             : bool        Trace_notify = false;
     426             : 
     427             : /* For 8 KB pages this gives 8 GB of disk space */
     428             : int         max_notify_queue_pages = 1048576;
     429             : 
     430             : /* local function prototypes */
     431             : static inline int64 asyncQueuePageDiff(int64 p, int64 q);
     432             : static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
     433             : static void queue_listen(ListenActionKind action, const char *channel);
     434             : static void Async_UnlistenOnExit(int code, Datum arg);
     435             : static void Exec_ListenPreCommit(void);
     436             : static void Exec_ListenCommit(const char *channel);
     437             : static void Exec_UnlistenCommit(const char *channel);
     438             : static void Exec_UnlistenAllCommit(void);
     439             : static bool IsListeningOn(const char *channel);
     440             : static void asyncQueueUnregister(void);
     441             : static bool asyncQueueIsFull(void);
     442             : static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
     443             : static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
     444             : static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
     445             : static double asyncQueueUsage(void);
     446             : static void asyncQueueFillWarning(void);
     447             : static void SignalBackends(void);
     448             : static void asyncQueueReadAllNotifications(void);
     449             : static bool asyncQueueProcessPageEntries(QueuePosition *current,
     450             :                                          QueuePosition stop,
     451             :                                          Snapshot snapshot);
     452             : static void asyncQueueAdvanceTail(void);
     453             : static void ProcessIncomingNotify(bool flush);
     454             : static bool AsyncExistsPendingNotify(Notification *n);
     455             : static void AddEventToPendingNotifies(Notification *n);
     456             : static uint32 notification_hash(const void *key, Size keysize);
     457             : static int  notification_match(const void *key1, const void *key2, Size keysize);
     458             : static void ClearPendingActionsAndNotifies(void);
     459             : 
     460             : /*
     461             :  * Compute the difference between two queue page numbers.
     462             :  * Previously this function accounted for a wraparound.
     463             :  */
     464             : static inline int64
     465           0 : asyncQueuePageDiff(int64 p, int64 q)
     466             : {
     467           0 :     return p - q;
     468             : }
     469             : 
     470             : /*
     471             :  * Determines whether p precedes q.
     472             :  * Previously this function accounted for a wraparound.
     473             :  */
     474             : static inline bool
     475          50 : asyncQueuePagePrecedes(int64 p, int64 q)
     476             : {
     477          50 :     return p < q;
     478             : }
     479             : 
     480             : /*
     481             :  * Report space needed for our shared memory area
     482             :  */
     483             : Size
     484        4100 : AsyncShmemSize(void)
     485             : {
     486             :     Size        size;
     487             : 
     488             :     /* This had better match AsyncShmemInit */
     489        4100 :     size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
     490        4100 :     size = add_size(size, offsetof(AsyncQueueControl, backend));
     491             : 
     492        4100 :     size = add_size(size, SimpleLruShmemSize(notify_buffers, 0));
     493             : 
     494        4100 :     return size;
     495             : }
     496             : 
     497             : /*
     498             :  * Initialize our shared memory area
     499             :  */
     500             : void
     501        2200 : AsyncShmemInit(void)
     502             : {
     503             :     bool        found;
     504             :     Size        size;
     505             : 
     506             :     /*
     507             :      * Create or attach to the AsyncQueueControl structure.
     508             :      */
     509        2200 :     size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
     510        2200 :     size = add_size(size, offsetof(AsyncQueueControl, backend));
     511             : 
     512        2200 :     asyncQueueControl = (AsyncQueueControl *)
     513        2200 :         ShmemInitStruct("Async Queue Control", size, &found);
     514             : 
     515        2200 :     if (!found)
     516             :     {
     517             :         /* First time through, so initialize it */
     518        2200 :         SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
     519        2200 :         SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
     520        2200 :         QUEUE_STOP_PAGE = 0;
     521        2200 :         QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
     522        2200 :         asyncQueueControl->lastQueueFillWarn = 0;
     523      205146 :         for (int i = 0; i < MaxBackends; i++)
     524             :         {
     525      202946 :             QUEUE_BACKEND_PID(i) = InvalidPid;
     526      202946 :             QUEUE_BACKEND_DBOID(i) = InvalidOid;
     527      202946 :             QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
     528      202946 :             SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
     529             :         }
     530             :     }
     531             : 
     532             :     /*
     533             :      * Set up SLRU management of the pg_notify data. Note that long segment
     534             :      * names are used in order to avoid wraparound.
     535             :      */
     536        2200 :     NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
     537        2200 :     SimpleLruInit(NotifyCtl, "notify", notify_buffers, 0,
     538             :                   "pg_notify", LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU,
     539             :                   SYNC_HANDLER_NONE, true);
     540             : 
     541        2200 :     if (!found)
     542             :     {
     543             :         /*
     544             :          * During start or reboot, clean out the pg_notify directory.
     545             :          */
     546        2200 :         (void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
     547             :     }
     548        2200 : }
     549             : 
     550             : 
     551             : /*
     552             :  * pg_notify -
     553             :  *    SQL function to send a notification event
     554             :  */
     555             : Datum
     556        2108 : pg_notify(PG_FUNCTION_ARGS)
     557             : {
     558             :     const char *channel;
     559             :     const char *payload;
     560             : 
     561        2108 :     if (PG_ARGISNULL(0))
     562           6 :         channel = "";
     563             :     else
     564        2102 :         channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
     565             : 
     566        2108 :     if (PG_ARGISNULL(1))
     567          12 :         payload = "";
     568             :     else
     569        2096 :         payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
     570             : 
     571             :     /* For NOTIFY as a statement, this is checked in ProcessUtility */
     572        2108 :     PreventCommandDuringRecovery("NOTIFY");
     573             : 
     574        2108 :     Async_Notify(channel, payload);
     575             : 
     576        2090 :     PG_RETURN_VOID();
     577             : }
     578             : 
     579             : 
     580             : /*
     581             :  * Async_Notify
     582             :  *
     583             :  *      This is executed by the SQL notify command.
     584             :  *
     585             :  *      Adds the message to the list of pending notifies.
     586             :  *      Actual notification happens during transaction commit.
     587             :  *      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     588             :  */
     589             : void
     590        2226 : Async_Notify(const char *channel, const char *payload)
     591             : {
     592        2226 :     int         my_level = GetCurrentTransactionNestLevel();
     593             :     size_t      channel_len;
     594             :     size_t      payload_len;
     595             :     Notification *n;
     596             :     MemoryContext oldcontext;
     597             : 
     598        2226 :     if (IsParallelWorker())
     599           0 :         elog(ERROR, "cannot send notifications from a parallel worker");
     600             : 
     601        2226 :     if (Trace_notify)
     602           0 :         elog(DEBUG1, "Async_Notify(%s)", channel);
     603             : 
     604        2226 :     channel_len = channel ? strlen(channel) : 0;
     605        2226 :     payload_len = payload ? strlen(payload) : 0;
     606             : 
     607             :     /* a channel name must be specified */
     608        2226 :     if (channel_len == 0)
     609          12 :         ereport(ERROR,
     610             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     611             :                  errmsg("channel name cannot be empty")));
     612             : 
     613             :     /* enforce length limits */
     614        2214 :     if (channel_len >= NAMEDATALEN)
     615           6 :         ereport(ERROR,
     616             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     617             :                  errmsg("channel name too long")));
     618             : 
     619        2208 :     if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
     620           0 :         ereport(ERROR,
     621             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     622             :                  errmsg("payload string too long")));
     623             : 
     624             :     /*
     625             :      * We must construct the Notification entry, even if we end up not using
     626             :      * it, in order to compare it cheaply to existing list entries.
     627             :      *
     628             :      * The notification list needs to live until end of transaction, so store
     629             :      * it in the transaction context.
     630             :      */
     631        2208 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     632             : 
     633        2208 :     n = (Notification *) palloc(offsetof(Notification, data) +
     634        2208 :                                 channel_len + payload_len + 2);
     635        2208 :     n->channel_len = channel_len;
     636        2208 :     n->payload_len = payload_len;
     637        2208 :     strcpy(n->data, channel);
     638        2208 :     if (payload)
     639        2182 :         strcpy(n->data + channel_len + 1, payload);
     640             :     else
     641          26 :         n->data[channel_len + 1] = '\0';
     642             : 
     643        2208 :     if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
     644         120 :     {
     645             :         NotificationList *notifies;
     646             : 
     647             :         /*
     648             :          * First notify event in current (sub)xact. Note that we allocate the
     649             :          * NotificationList in TopTransactionContext; the nestingLevel might
     650             :          * get changed later by AtSubCommit_Notify.
     651             :          */
     652             :         notifies = (NotificationList *)
     653         120 :             MemoryContextAlloc(TopTransactionContext,
     654             :                                sizeof(NotificationList));
     655         120 :         notifies->nestingLevel = my_level;
     656         120 :         notifies->events = list_make1(n);
     657             :         /* We certainly don't need a hashtable yet */
     658         120 :         notifies->hashtab = NULL;
     659         120 :         notifies->upper = pendingNotifies;
     660         120 :         pendingNotifies = notifies;
     661             :     }
     662             :     else
     663             :     {
     664             :         /* Now check for duplicates */
     665        2088 :         if (AsyncExistsPendingNotify(n))
     666             :         {
     667             :             /* It's a dup, so forget it */
     668          24 :             pfree(n);
     669          24 :             MemoryContextSwitchTo(oldcontext);
     670          24 :             return;
     671             :         }
     672             : 
     673             :         /* Append more events to existing list */
     674        2064 :         AddEventToPendingNotifies(n);
     675             :     }
     676             : 
     677        2184 :     MemoryContextSwitchTo(oldcontext);
     678             : }
     679             : 
     680             : /*
     681             :  * queue_listen
     682             :  *      Common code for listen, unlisten, unlisten all commands.
     683             :  *
     684             :  *      Adds the request to the list of pending actions.
     685             :  *      Actual update of the listenChannels list happens during transaction
     686             :  *      commit.
     687             :  */
     688             : static void
     689         114 : queue_listen(ListenActionKind action, const char *channel)
     690             : {
     691             :     MemoryContext oldcontext;
     692             :     ListenAction *actrec;
     693         114 :     int         my_level = GetCurrentTransactionNestLevel();
     694             : 
     695             :     /*
     696             :      * Unlike Async_Notify, we don't try to collapse out duplicates. It would
     697             :      * be too complicated to ensure we get the right interactions of
     698             :      * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
     699             :      * would be any performance benefit anyway in sane applications.
     700             :      */
     701         114 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     702             : 
     703             :     /* space for terminating null is included in sizeof(ListenAction) */
     704         114 :     actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
     705         114 :                                      strlen(channel) + 1);
     706         114 :     actrec->action = action;
     707         114 :     strcpy(actrec->channel, channel);
     708             : 
     709         114 :     if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
     710         100 :     {
     711             :         ActionList *actions;
     712             : 
     713             :         /*
     714             :          * First action in current sub(xact). Note that we allocate the
     715             :          * ActionList in TopTransactionContext; the nestingLevel might get
     716             :          * changed later by AtSubCommit_Notify.
     717             :          */
     718             :         actions = (ActionList *)
     719         100 :             MemoryContextAlloc(TopTransactionContext, sizeof(ActionList));
     720         100 :         actions->nestingLevel = my_level;
     721         100 :         actions->actions = list_make1(actrec);
     722         100 :         actions->upper = pendingActions;
     723         100 :         pendingActions = actions;
     724             :     }
     725             :     else
     726          14 :         pendingActions->actions = lappend(pendingActions->actions, actrec);
     727             : 
     728         114 :     MemoryContextSwitchTo(oldcontext);
     729         114 : }
     730             : 
     731             : /*
     732             :  * Async_Listen
     733             :  *
     734             :  *      This is executed by the SQL listen command.
     735             :  */
     736             : void
     737          74 : Async_Listen(const char *channel)
     738             : {
     739          74 :     if (Trace_notify)
     740           0 :         elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
     741             : 
     742          74 :     queue_listen(LISTEN_LISTEN, channel);
     743          74 : }
     744             : 
     745             : /*
     746             :  * Async_Unlisten
     747             :  *
     748             :  *      This is executed by the SQL unlisten command.
     749             :  */
     750             : void
     751           6 : Async_Unlisten(const char *channel)
     752             : {
     753           6 :     if (Trace_notify)
     754           0 :         elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
     755             : 
     756             :     /* If we couldn't possibly be listening, no need to queue anything */
     757           6 :     if (pendingActions == NULL && !unlistenExitRegistered)
     758           0 :         return;
     759             : 
     760           6 :     queue_listen(LISTEN_UNLISTEN, channel);
     761             : }
     762             : 
     763             : /*
     764             :  * Async_UnlistenAll
     765             :  *
     766             :  *      This is invoked by UNLISTEN * command, and also at backend exit.
     767             :  */
     768             : void
     769          38 : Async_UnlistenAll(void)
     770             : {
     771          38 :     if (Trace_notify)
     772           0 :         elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
     773             : 
     774             :     /* If we couldn't possibly be listening, no need to queue anything */
     775          38 :     if (pendingActions == NULL && !unlistenExitRegistered)
     776           4 :         return;
     777             : 
     778          34 :     queue_listen(LISTEN_UNLISTEN_ALL, "");
     779             : }
     780             : 
     781             : /*
     782             :  * SQL function: return a set of the channel names this backend is actively
     783             :  * listening to.
     784             :  *
     785             :  * Note: this coding relies on the fact that the listenChannels list cannot
     786             :  * change within a transaction.
     787             :  */
     788             : Datum
     789          18 : pg_listening_channels(PG_FUNCTION_ARGS)
     790             : {
     791             :     FuncCallContext *funcctx;
     792             : 
     793             :     /* stuff done only on the first call of the function */
     794          18 :     if (SRF_IS_FIRSTCALL())
     795             :     {
     796             :         /* create a function context for cross-call persistence */
     797          12 :         funcctx = SRF_FIRSTCALL_INIT();
     798             :     }
     799             : 
     800             :     /* stuff done on every call of the function */
     801          18 :     funcctx = SRF_PERCALL_SETUP();
     802             : 
     803          18 :     if (funcctx->call_cntr < list_length(listenChannels))
     804             :     {
     805           6 :         char       *channel = (char *) list_nth(listenChannels,
     806           6 :                                                 funcctx->call_cntr);
     807             : 
     808           6 :         SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
     809             :     }
     810             : 
     811          12 :     SRF_RETURN_DONE(funcctx);
     812             : }
     813             : 
     814             : /*
     815             :  * Async_UnlistenOnExit
     816             :  *
     817             :  * This is executed at backend exit if we have done any LISTENs in this
     818             :  * backend.  It might not be necessary anymore, if the user UNLISTENed
     819             :  * everything, but we don't try to detect that case.
     820             :  */
     821             : static void
     822          28 : Async_UnlistenOnExit(int code, Datum arg)
     823             : {
     824          28 :     Exec_UnlistenAllCommit();
     825          28 :     asyncQueueUnregister();
     826          28 : }
     827             : 
     828             : /*
     829             :  * AtPrepare_Notify
     830             :  *
     831             :  *      This is called at the prepare phase of a two-phase
     832             :  *      transaction.  Save the state for possible commit later.
     833             :  */
     834             : void
     835         622 : AtPrepare_Notify(void)
     836             : {
     837             :     /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
     838         622 :     if (pendingActions || pendingNotifies)
     839           0 :         ereport(ERROR,
     840             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     841             :                  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
     842         622 : }
     843             : 
     844             : /*
     845             :  * PreCommit_Notify
     846             :  *
     847             :  *      This is called at transaction commit, before actually committing to
     848             :  *      clog.
     849             :  *
     850             :  *      If there are pending LISTEN actions, make sure we are listed in the
     851             :  *      shared-memory listener array.  This must happen before commit to
     852             :  *      ensure we don't miss any notifies from transactions that commit
     853             :  *      just after ours.
     854             :  *
     855             :  *      If there are outbound notify requests in the pendingNotifies list,
     856             :  *      add them to the global queue.  We do that before commit so that
     857             :  *      we can still throw error if we run out of queue space.
     858             :  */
     859             : void
     860     1025684 : PreCommit_Notify(void)
     861             : {
     862             :     ListCell   *p;
     863             : 
     864     1025684 :     if (!pendingActions && !pendingNotifies)
     865     1025470 :         return;                 /* no relevant statements in this xact */
     866             : 
     867         214 :     if (Trace_notify)
     868           0 :         elog(DEBUG1, "PreCommit_Notify");
     869             : 
     870             :     /* Preflight for any pending listen/unlisten actions */
     871         214 :     if (pendingActions != NULL)
     872             :     {
     873         210 :         foreach(p, pendingActions->actions)
     874             :         {
     875         112 :             ListenAction *actrec = (ListenAction *) lfirst(p);
     876             : 
     877         112 :             switch (actrec->action)
     878             :             {
     879          74 :                 case LISTEN_LISTEN:
     880          74 :                     Exec_ListenPreCommit();
     881          74 :                     break;
     882           6 :                 case LISTEN_UNLISTEN:
     883             :                     /* there is no Exec_UnlistenPreCommit() */
     884           6 :                     break;
     885          32 :                 case LISTEN_UNLISTEN_ALL:
     886             :                     /* there is no Exec_UnlistenAllPreCommit() */
     887          32 :                     break;
     888             :             }
     889             :         }
     890             :     }
     891             : 
     892             :     /* Queue any pending notifies (must happen after the above) */
     893         214 :     if (pendingNotifies)
     894             :     {
     895             :         ListCell   *nextNotify;
     896             : 
     897             :         /*
     898             :          * Make sure that we have an XID assigned to the current transaction.
     899             :          * GetCurrentTransactionId is cheap if we already have an XID, but not
     900             :          * so cheap if we don't, and we'd prefer not to do that work while
     901             :          * holding NotifyQueueLock.
     902             :          */
     903         116 :         (void) GetCurrentTransactionId();
     904             : 
     905             :         /*
     906             :          * Serialize writers by acquiring a special lock that we hold till
     907             :          * after commit.  This ensures that queue entries appear in commit
     908             :          * order, and in particular that there are never uncommitted queue
     909             :          * entries ahead of committed ones, so an uncommitted transaction
     910             :          * can't block delivery of deliverable notifications.
     911             :          *
     912             :          * We use a heavyweight lock so that it'll automatically be released
     913             :          * after either commit or abort.  This also allows deadlocks to be
     914             :          * detected, though really a deadlock shouldn't be possible here.
     915             :          *
     916             :          * The lock is on "database 0", which is pretty ugly but it doesn't
     917             :          * seem worth inventing a special locktag category just for this.
     918             :          * (Historical note: before PG 9.0, a similar lock on "database 0" was
     919             :          * used by the flatfiles mechanism.)
     920             :          */
     921         116 :         LockSharedObject(DatabaseRelationId, InvalidOid, 0,
     922             :                          AccessExclusiveLock);
     923             : 
     924             :         /* Now push the notifications into the queue */
     925         116 :         nextNotify = list_head(pendingNotifies->events);
     926         302 :         while (nextNotify != NULL)
     927             :         {
     928             :             /*
     929             :              * Add the pending notifications to the queue.  We acquire and
     930             :              * release NotifyQueueLock once per page, which might be overkill
     931             :              * but it does allow readers to get in while we're doing this.
     932             :              *
     933             :              * A full queue is very uncommon and should really not happen,
     934             :              * given that we have so much space available in the SLRU pages.
     935             :              * Nevertheless we need to deal with this possibility. Note that
     936             :              * when we get here we are in the process of committing our
     937             :              * transaction, but we have not yet committed to clog, so at this
     938             :              * point in time we can still roll the transaction back.
     939             :              */
     940         186 :             LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
     941         186 :             asyncQueueFillWarning();
     942         186 :             if (asyncQueueIsFull())
     943           0 :                 ereport(ERROR,
     944             :                         (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     945             :                          errmsg("too many notifications in the NOTIFY queue")));
     946         186 :             nextNotify = asyncQueueAddEntries(nextNotify);
     947         186 :             LWLockRelease(NotifyQueueLock);
     948             :         }
     949             : 
     950             :         /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
     951             :     }
     952             : }
     953             : 
     954             : /*
     955             :  * AtCommit_Notify
     956             :  *
     957             :  *      This is called at transaction commit, after committing to clog.
     958             :  *
     959             :  *      Update listenChannels and clear transaction-local state.
     960             :  *
     961             :  *      If we issued any notifications in the transaction, send signals to
     962             :  *      listening backends (possibly including ourselves) to process them.
     963             :  *      Also, if we filled enough queue pages with new notifies, try to
     964             :  *      advance the queue tail pointer.
     965             :  */
     966             : void
     967     1025374 : AtCommit_Notify(void)
     968             : {
     969             :     ListCell   *p;
     970             : 
     971             :     /*
     972             :      * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
     973             :      * return as soon as possible
     974             :      */
     975     1025374 :     if (!pendingActions && !pendingNotifies)
     976     1025160 :         return;
     977             : 
     978         214 :     if (Trace_notify)
     979           0 :         elog(DEBUG1, "AtCommit_Notify");
     980             : 
     981             :     /* Perform any pending listen/unlisten actions */
     982         214 :     if (pendingActions != NULL)
     983             :     {
     984         210 :         foreach(p, pendingActions->actions)
     985             :         {
     986         112 :             ListenAction *actrec = (ListenAction *) lfirst(p);
     987             : 
     988         112 :             switch (actrec->action)
     989             :             {
     990          74 :                 case LISTEN_LISTEN:
     991          74 :                     Exec_ListenCommit(actrec->channel);
     992          74 :                     break;
     993           6 :                 case LISTEN_UNLISTEN:
     994           6 :                     Exec_UnlistenCommit(actrec->channel);
     995           6 :                     break;
     996          32 :                 case LISTEN_UNLISTEN_ALL:
     997          32 :                     Exec_UnlistenAllCommit();
     998          32 :                     break;
     999             :             }
    1000             :         }
    1001             :     }
    1002             : 
    1003             :     /* If no longer listening to anything, get out of listener array */
    1004         214 :     if (amRegisteredListener && listenChannels == NIL)
    1005          26 :         asyncQueueUnregister();
    1006             : 
    1007             :     /*
    1008             :      * Send signals to listening backends.  We need do this only if there are
    1009             :      * pending notifies, which were previously added to the shared queue by
    1010             :      * PreCommit_Notify().
    1011             :      */
    1012         214 :     if (pendingNotifies != NULL)
    1013         116 :         SignalBackends();
    1014             : 
    1015             :     /*
    1016             :      * If it's time to try to advance the global tail pointer, do that.
    1017             :      *
    1018             :      * (It might seem odd to do this in the sender, when more than likely the
    1019             :      * listeners won't yet have read the messages we just sent.  However,
    1020             :      * there's less contention if only the sender does it, and there is little
    1021             :      * need for urgency in advancing the global tail.  So this typically will
    1022             :      * be clearing out messages that were sent some time ago.)
    1023             :      */
    1024         214 :     if (tryAdvanceTail)
    1025             :     {
    1026          16 :         tryAdvanceTail = false;
    1027          16 :         asyncQueueAdvanceTail();
    1028             :     }
    1029             : 
    1030             :     /* And clean up */
    1031         214 :     ClearPendingActionsAndNotifies();
    1032             : }
    1033             : 
    1034             : /*
    1035             :  * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
    1036             :  *
    1037             :  * This function must make sure we are ready to catch any incoming messages.
    1038             :  */
    1039             : static void
    1040          74 : Exec_ListenPreCommit(void)
    1041             : {
    1042             :     QueuePosition head;
    1043             :     QueuePosition max;
    1044             :     ProcNumber  prevListener;
    1045             : 
    1046             :     /*
    1047             :      * Nothing to do if we are already listening to something, nor if we
    1048             :      * already ran this routine in this transaction.
    1049             :      */
    1050          74 :     if (amRegisteredListener)
    1051          36 :         return;
    1052             : 
    1053          38 :     if (Trace_notify)
    1054           0 :         elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
    1055             : 
    1056             :     /*
    1057             :      * Before registering, make sure we will unlisten before dying. (Note:
    1058             :      * this action does not get undone if we abort later.)
    1059             :      */
    1060          38 :     if (!unlistenExitRegistered)
    1061             :     {
    1062          28 :         before_shmem_exit(Async_UnlistenOnExit, 0);
    1063          28 :         unlistenExitRegistered = true;
    1064             :     }
    1065             : 
    1066             :     /*
    1067             :      * This is our first LISTEN, so establish our pointer.
    1068             :      *
    1069             :      * We set our pointer to the global tail pointer and then move it forward
    1070             :      * over already-committed notifications.  This ensures we cannot miss any
    1071             :      * not-yet-committed notifications.  We might get a few more but that
    1072             :      * doesn't hurt.
    1073             :      *
    1074             :      * In some scenarios there might be a lot of committed notifications that
    1075             :      * have not yet been pruned away (because some backend is being lazy about
    1076             :      * reading them).  To reduce our startup time, we can look at other
    1077             :      * backends and adopt the maximum "pos" pointer of any backend that's in
    1078             :      * our database; any notifications it's already advanced over are surely
    1079             :      * committed and need not be re-examined by us.  (We must consider only
    1080             :      * backends connected to our DB, because others will not have bothered to
    1081             :      * check committed-ness of notifications in our DB.)
    1082             :      *
    1083             :      * We need exclusive lock here so we can look at other backends' entries
    1084             :      * and manipulate the list links.
    1085             :      */
    1086          38 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1087          38 :     head = QUEUE_HEAD;
    1088          38 :     max = QUEUE_TAIL;
    1089          38 :     prevListener = INVALID_PROC_NUMBER;
    1090          42 :     for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    1091             :     {
    1092           4 :         if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
    1093           4 :             max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
    1094             :         /* Also find last listening backend before this one */
    1095           4 :         if (i < MyProcNumber)
    1096           4 :             prevListener = i;
    1097             :     }
    1098          38 :     QUEUE_BACKEND_POS(MyProcNumber) = max;
    1099          38 :     QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
    1100          38 :     QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
    1101             :     /* Insert backend into list of listeners at correct position */
    1102          38 :     if (prevListener != INVALID_PROC_NUMBER)
    1103             :     {
    1104           4 :         QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
    1105           4 :         QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
    1106             :     }
    1107             :     else
    1108             :     {
    1109          34 :         QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
    1110          34 :         QUEUE_FIRST_LISTENER = MyProcNumber;
    1111             :     }
    1112          38 :     LWLockRelease(NotifyQueueLock);
    1113             : 
    1114             :     /* Now we are listed in the global array, so remember we're listening */
    1115          38 :     amRegisteredListener = true;
    1116             : 
    1117             :     /*
    1118             :      * Try to move our pointer forward as far as possible.  This will skip
    1119             :      * over already-committed notifications, which we want to do because they
    1120             :      * might be quite stale.  Note that we are not yet listening on anything,
    1121             :      * so we won't deliver such notifications to our frontend.  Also, although
    1122             :      * our transaction might have executed NOTIFY, those message(s) aren't
    1123             :      * queued yet so we won't skip them here.
    1124             :      */
    1125          38 :     if (!QUEUE_POS_EQUAL(max, head))
    1126          22 :         asyncQueueReadAllNotifications();
    1127             : }
    1128             : 
    1129             : /*
    1130             :  * Exec_ListenCommit --- subroutine for AtCommit_Notify
    1131             :  *
    1132             :  * Add the channel to the list of channels we are listening on.
    1133             :  */
    1134             : static void
    1135          74 : Exec_ListenCommit(const char *channel)
    1136             : {
    1137             :     MemoryContext oldcontext;
    1138             : 
    1139             :     /* Do nothing if we are already listening on this channel */
    1140          74 :     if (IsListeningOn(channel))
    1141          20 :         return;
    1142             : 
    1143             :     /*
    1144             :      * Add the new channel name to listenChannels.
    1145             :      *
    1146             :      * XXX It is theoretically possible to get an out-of-memory failure here,
    1147             :      * which would be bad because we already committed.  For the moment it
    1148             :      * doesn't seem worth trying to guard against that, but maybe improve this
    1149             :      * later.
    1150             :      */
    1151          54 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1152          54 :     listenChannels = lappend(listenChannels, pstrdup(channel));
    1153          54 :     MemoryContextSwitchTo(oldcontext);
    1154             : }
    1155             : 
    1156             : /*
    1157             :  * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
    1158             :  *
    1159             :  * Remove the specified channel name from listenChannels.
    1160             :  */
    1161             : static void
    1162           6 : Exec_UnlistenCommit(const char *channel)
    1163             : {
    1164             :     ListCell   *q;
    1165             : 
    1166           6 :     if (Trace_notify)
    1167           0 :         elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
    1168             : 
    1169           6 :     foreach(q, listenChannels)
    1170             :     {
    1171           6 :         char       *lchan = (char *) lfirst(q);
    1172             : 
    1173           6 :         if (strcmp(lchan, channel) == 0)
    1174             :         {
    1175           6 :             listenChannels = foreach_delete_current(listenChannels, q);
    1176           6 :             pfree(lchan);
    1177           6 :             break;
    1178             :         }
    1179             :     }
    1180             : 
    1181             :     /*
    1182             :      * We do not complain about unlistening something not being listened;
    1183             :      * should we?
    1184             :      */
    1185           6 : }
    1186             : 
    1187             : /*
    1188             :  * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
    1189             :  *
    1190             :  *      Unlisten on all channels for this backend.
    1191             :  */
    1192             : static void
    1193          60 : Exec_UnlistenAllCommit(void)
    1194             : {
    1195          60 :     if (Trace_notify)
    1196           0 :         elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
    1197             : 
    1198          60 :     list_free_deep(listenChannels);
    1199          60 :     listenChannels = NIL;
    1200          60 : }
    1201             : 
    1202             : /*
    1203             :  * Test whether we are actively listening on the given channel name.
    1204             :  *
    1205             :  * Note: this function is executed for every notification found in the queue.
    1206             :  * Perhaps it is worth further optimization, eg convert the list to a sorted
    1207             :  * array so we can binary-search it.  In practice the list is likely to be
    1208             :  * fairly short, though.
    1209             :  */
    1210             : static bool
    1211         210 : IsListeningOn(const char *channel)
    1212             : {
    1213             :     ListCell   *p;
    1214             : 
    1215         332 :     foreach(p, listenChannels)
    1216             :     {
    1217         204 :         char       *lchan = (char *) lfirst(p);
    1218             : 
    1219         204 :         if (strcmp(lchan, channel) == 0)
    1220          82 :             return true;
    1221             :     }
    1222         128 :     return false;
    1223             : }
    1224             : 
    1225             : /*
    1226             :  * Remove our entry from the listeners array when we are no longer listening
    1227             :  * on any channel.  NB: must not fail if we're already not listening.
    1228             :  */
    1229             : static void
    1230          54 : asyncQueueUnregister(void)
    1231             : {
    1232             :     Assert(listenChannels == NIL);  /* else caller error */
    1233             : 
    1234          54 :     if (!amRegisteredListener)  /* nothing to do */
    1235          16 :         return;
    1236             : 
    1237             :     /*
    1238             :      * Need exclusive lock here to manipulate list links.
    1239             :      */
    1240          38 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1241             :     /* Mark our entry as invalid */
    1242          38 :     QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
    1243          38 :     QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
    1244             :     /* and remove it from the list */
    1245          38 :     if (QUEUE_FIRST_LISTENER == MyProcNumber)
    1246          38 :         QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
    1247             :     else
    1248             :     {
    1249           0 :         for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    1250             :         {
    1251           0 :             if (QUEUE_NEXT_LISTENER(i) == MyProcNumber)
    1252             :             {
    1253           0 :                 QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyProcNumber);
    1254           0 :                 break;
    1255             :             }
    1256             :         }
    1257             :     }
    1258          38 :     QUEUE_NEXT_LISTENER(MyProcNumber) = INVALID_PROC_NUMBER;
    1259          38 :     LWLockRelease(NotifyQueueLock);
    1260             : 
    1261             :     /* mark ourselves as no longer listed in the global array */
    1262          38 :     amRegisteredListener = false;
    1263             : }
    1264             : 
    1265             : /*
    1266             :  * Test whether there is room to insert more notification messages.
    1267             :  *
    1268             :  * Caller must hold at least shared NotifyQueueLock.
    1269             :  */
    1270             : static bool
    1271         186 : asyncQueueIsFull(void)
    1272             : {
    1273         186 :     int64       headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
    1274         186 :     int64       tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
    1275         186 :     int64       occupied = headPage - tailPage;
    1276             : 
    1277         186 :     return occupied >= max_notify_queue_pages;
    1278             : }
    1279             : 
    1280             : /*
    1281             :  * Advance the QueuePosition to the next entry, assuming that the current
    1282             :  * entry is of length entryLength.  If we jump to a new page the function
    1283             :  * returns true, else false.
    1284             :  */
    1285             : static bool
    1286        4662 : asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
    1287             : {
    1288        4662 :     int64       pageno = QUEUE_POS_PAGE(*position);
    1289        4662 :     int         offset = QUEUE_POS_OFFSET(*position);
    1290        4662 :     bool        pageJump = false;
    1291             : 
    1292             :     /*
    1293             :      * Move to the next writing position: First jump over what we have just
    1294             :      * written or read.
    1295             :      */
    1296        4662 :     offset += entryLength;
    1297             :     Assert(offset <= QUEUE_PAGESIZE);
    1298             : 
    1299             :     /*
    1300             :      * In a second step check if another entry can possibly be written to the
    1301             :      * page. If so, stay here, we have reached the next position. If not, then
    1302             :      * we need to move on to the next page.
    1303             :      */
    1304        4662 :     if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE)
    1305             :     {
    1306         140 :         pageno++;
    1307         140 :         offset = 0;
    1308         140 :         pageJump = true;
    1309             :     }
    1310             : 
    1311        4662 :     SET_QUEUE_POS(*position, pageno, offset);
    1312        4662 :     return pageJump;
    1313             : }
    1314             : 
    1315             : /*
    1316             :  * Fill the AsyncQueueEntry at *qe with an outbound notification message.
    1317             :  */
    1318             : static void
    1319        2238 : asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
    1320             : {
    1321        2238 :     size_t      channellen = n->channel_len;
    1322        2238 :     size_t      payloadlen = n->payload_len;
    1323             :     int         entryLength;
    1324             : 
    1325             :     Assert(channellen < NAMEDATALEN);
    1326             :     Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
    1327             : 
    1328             :     /* The terminators are already included in AsyncQueueEntryEmptySize */
    1329        2238 :     entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
    1330        2238 :     entryLength = QUEUEALIGN(entryLength);
    1331        2238 :     qe->length = entryLength;
    1332        2238 :     qe->dboid = MyDatabaseId;
    1333        2238 :     qe->xid = GetCurrentTransactionId();
    1334        2238 :     qe->srcPid = MyProcPid;
    1335        2238 :     memcpy(qe->data, n->data, channellen + payloadlen + 2);
    1336        2238 : }
    1337             : 
    1338             : /*
    1339             :  * Add pending notifications to the queue.
    1340             :  *
    1341             :  * We go page by page here, i.e. we stop once we have to go to a new page but
    1342             :  * we will be called again and then fill that next page. If an entry does not
    1343             :  * fit into the current page, we write a dummy entry with an InvalidOid as the
    1344             :  * database OID in order to fill the page. So every page is always used up to
    1345             :  * the last byte which simplifies reading the page later.
    1346             :  *
    1347             :  * We are passed the list cell (in pendingNotifies->events) containing the next
    1348             :  * notification to write and return the first still-unwritten cell back.
    1349             :  * Eventually we will return NULL indicating all is done.
    1350             :  *
    1351             :  * We are holding NotifyQueueLock already from the caller and grab
    1352             :  * page specific SLRU bank lock locally in this function.
    1353             :  */
    1354             : static ListCell *
    1355         186 : asyncQueueAddEntries(ListCell *nextNotify)
    1356             : {
    1357             :     AsyncQueueEntry qe;
    1358             :     QueuePosition queue_head;
    1359             :     int64       pageno;
    1360             :     int         offset;
    1361             :     int         slotno;
    1362             :     LWLock     *prevlock;
    1363             : 
    1364             :     /*
    1365             :      * We work with a local copy of QUEUE_HEAD, which we write back to shared
    1366             :      * memory upon exiting.  The reason for this is that if we have to advance
    1367             :      * to a new page, SimpleLruZeroPage might fail (out of disk space, for
    1368             :      * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
    1369             :      * subsequent insertions would try to put entries into a page that slru.c
    1370             :      * thinks doesn't exist yet.)  So, use a local position variable.  Note
    1371             :      * that if we do fail, any already-inserted queue entries are forgotten;
    1372             :      * this is okay, since they'd be useless anyway after our transaction
    1373             :      * rolls back.
    1374             :      */
    1375         186 :     queue_head = QUEUE_HEAD;
    1376             : 
    1377             :     /*
    1378             :      * If this is the first write since the postmaster started, we need to
    1379             :      * initialize the first page of the async SLRU.  Otherwise, the current
    1380             :      * page should be initialized already, so just fetch it.
    1381             :      */
    1382         186 :     pageno = QUEUE_POS_PAGE(queue_head);
    1383         186 :     prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
    1384             : 
    1385             :     /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
    1386         186 :     LWLockAcquire(prevlock, LW_EXCLUSIVE);
    1387             : 
    1388         186 :     if (QUEUE_POS_IS_ZERO(queue_head))
    1389          16 :         slotno = SimpleLruZeroPage(NotifyCtl, pageno);
    1390             :     else
    1391         170 :         slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
    1392             :                                    InvalidTransactionId);
    1393             : 
    1394             :     /* Note we mark the page dirty before writing in it */
    1395         186 :     NotifyCtl->shared->page_dirty[slotno] = true;
    1396             : 
    1397        2354 :     while (nextNotify != NULL)
    1398             :     {
    1399        2238 :         Notification *n = (Notification *) lfirst(nextNotify);
    1400             : 
    1401             :         /* Construct a valid queue entry in local variable qe */
    1402        2238 :         asyncQueueNotificationToEntry(n, &qe);
    1403             : 
    1404        2238 :         offset = QUEUE_POS_OFFSET(queue_head);
    1405             : 
    1406             :         /* Check whether the entry really fits on the current page */
    1407        2238 :         if (offset + qe.length <= QUEUE_PAGESIZE)
    1408             :         {
    1409             :             /* OK, so advance nextNotify past this item */
    1410        2172 :             nextNotify = lnext(pendingNotifies->events, nextNotify);
    1411             :         }
    1412             :         else
    1413             :         {
    1414             :             /*
    1415             :              * Write a dummy entry to fill up the page. Actually readers will
    1416             :              * only check dboid and since it won't match any reader's database
    1417             :              * OID, they will ignore this entry and move on.
    1418             :              */
    1419          66 :             qe.length = QUEUE_PAGESIZE - offset;
    1420          66 :             qe.dboid = InvalidOid;
    1421          66 :             qe.xid = InvalidTransactionId;
    1422          66 :             qe.data[0] = '\0';  /* empty channel */
    1423          66 :             qe.data[1] = '\0';  /* empty payload */
    1424             :         }
    1425             : 
    1426             :         /* Now copy qe into the shared buffer page */
    1427        2238 :         memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
    1428             :                &qe,
    1429        2238 :                qe.length);
    1430             : 
    1431             :         /* Advance queue_head appropriately, and detect if page is full */
    1432        2238 :         if (asyncQueueAdvance(&(queue_head), qe.length))
    1433             :         {
    1434             :             LWLock     *lock;
    1435             : 
    1436          70 :             pageno = QUEUE_POS_PAGE(queue_head);
    1437          70 :             lock = SimpleLruGetBankLock(NotifyCtl, pageno);
    1438          70 :             if (lock != prevlock)
    1439             :             {
    1440           0 :                 LWLockRelease(prevlock);
    1441           0 :                 LWLockAcquire(lock, LW_EXCLUSIVE);
    1442           0 :                 prevlock = lock;
    1443             :             }
    1444             : 
    1445             :             /*
    1446             :              * Page is full, so we're done here, but first fill the next page
    1447             :              * with zeroes.  The reason to do this is to ensure that slru.c's
    1448             :              * idea of the head page is always the same as ours, which avoids
    1449             :              * boundary problems in SimpleLruTruncate.  The test in
    1450             :              * asyncQueueIsFull() ensured that there is room to create this
    1451             :              * page without overrunning the queue.
    1452             :              */
    1453          70 :             slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
    1454             : 
    1455             :             /*
    1456             :              * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
    1457             :              * set flag to remember that we should try to advance the tail
    1458             :              * pointer (we don't want to actually do that right here).
    1459             :              */
    1460          70 :             if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
    1461          16 :                 tryAdvanceTail = true;
    1462             : 
    1463             :             /* And exit the loop */
    1464          70 :             break;
    1465             :         }
    1466             :     }
    1467             : 
    1468             :     /* Success, so update the global QUEUE_HEAD */
    1469         186 :     QUEUE_HEAD = queue_head;
    1470             : 
    1471         186 :     LWLockRelease(prevlock);
    1472             : 
    1473         186 :     return nextNotify;
    1474             : }
    1475             : 
    1476             : /*
    1477             :  * SQL function to return the fraction of the notification queue currently
    1478             :  * occupied.
    1479             :  */
    1480             : Datum
    1481          10 : pg_notification_queue_usage(PG_FUNCTION_ARGS)
    1482             : {
    1483             :     double      usage;
    1484             : 
    1485             :     /* Advance the queue tail so we don't report a too-large result */
    1486          10 :     asyncQueueAdvanceTail();
    1487             : 
    1488          10 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
    1489          10 :     usage = asyncQueueUsage();
    1490          10 :     LWLockRelease(NotifyQueueLock);
    1491             : 
    1492          10 :     PG_RETURN_FLOAT8(usage);
    1493             : }
    1494             : 
    1495             : /*
    1496             :  * Return the fraction of the queue that is currently occupied.
    1497             :  *
    1498             :  * The caller must hold NotifyQueueLock in (at least) shared mode.
    1499             :  *
    1500             :  * Note: we measure the distance to the logical tail page, not the physical
    1501             :  * tail page.  In some sense that's wrong, but the relative position of the
    1502             :  * physical tail is affected by details such as SLRU segment boundaries,
    1503             :  * so that a result based on that is unpleasantly unstable.
    1504             :  */
    1505             : static double
    1506         196 : asyncQueueUsage(void)
    1507             : {
    1508         196 :     int64       headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
    1509         196 :     int64       tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
    1510         196 :     int64       occupied = headPage - tailPage;
    1511             : 
    1512         196 :     if (occupied == 0)
    1513         102 :         return (double) 0;      /* fast exit for common case */
    1514             : 
    1515          94 :     return (double) occupied / (double) max_notify_queue_pages;
    1516             : }
    1517             : 
    1518             : /*
    1519             :  * Check whether the queue is at least half full, and emit a warning if so.
    1520             :  *
    1521             :  * This is unlikely given the size of the queue, but possible.
    1522             :  * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
    1523             :  *
    1524             :  * Caller must hold exclusive NotifyQueueLock.
    1525             :  */
    1526             : static void
    1527         186 : asyncQueueFillWarning(void)
    1528             : {
    1529             :     double      fillDegree;
    1530             :     TimestampTz t;
    1531             : 
    1532         186 :     fillDegree = asyncQueueUsage();
    1533         186 :     if (fillDegree < 0.5)
    1534         186 :         return;
    1535             : 
    1536           0 :     t = GetCurrentTimestamp();
    1537             : 
    1538           0 :     if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
    1539             :                                    t, QUEUE_FULL_WARN_INTERVAL))
    1540             :     {
    1541           0 :         QueuePosition min = QUEUE_HEAD;
    1542           0 :         int32       minPid = InvalidPid;
    1543             : 
    1544           0 :         for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    1545             :         {
    1546             :             Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
    1547           0 :             min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
    1548           0 :             if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
    1549           0 :                 minPid = QUEUE_BACKEND_PID(i);
    1550             :         }
    1551             : 
    1552           0 :         ereport(WARNING,
    1553             :                 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
    1554             :                  (minPid != InvalidPid ?
    1555             :                   errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
    1556             :                   : 0),
    1557             :                  (minPid != InvalidPid ?
    1558             :                   errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
    1559             :                   : 0)));
    1560             : 
    1561           0 :         asyncQueueControl->lastQueueFillWarn = t;
    1562             :     }
    1563             : }
    1564             : 
    1565             : /*
    1566             :  * Send signals to listening backends.
    1567             :  *
    1568             :  * Normally we signal only backends in our own database, since only those
    1569             :  * backends could be interested in notifies we send.  However, if there's
    1570             :  * notify traffic in our database but no traffic in another database that
    1571             :  * does have listener(s), those listeners will fall further and further
    1572             :  * behind.  Waken them anyway if they're far enough behind, so that they'll
    1573             :  * advance their queue position pointers, allowing the global tail to advance.
    1574             :  *
    1575             :  * Since we know the ProcNumber and the Pid the signaling is quite cheap.
    1576             :  *
    1577             :  * This is called during CommitTransaction(), so it's important for it
    1578             :  * to have very low probability of failure.
    1579             :  */
    1580             : static void
    1581         116 : SignalBackends(void)
    1582             : {
    1583             :     int32      *pids;
    1584             :     ProcNumber *procnos;
    1585             :     int         count;
    1586             : 
    1587             :     /*
    1588             :      * Identify backends that we need to signal.  We don't want to send
    1589             :      * signals while holding the NotifyQueueLock, so this loop just builds a
    1590             :      * list of target PIDs.
    1591             :      *
    1592             :      * XXX in principle these pallocs could fail, which would be bad. Maybe
    1593             :      * preallocate the arrays?  They're not that large, though.
    1594             :      */
    1595         116 :     pids = (int32 *) palloc(MaxBackends * sizeof(int32));
    1596         116 :     procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
    1597         116 :     count = 0;
    1598             : 
    1599         116 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1600         216 :     for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    1601             :     {
    1602         100 :         int32       pid = QUEUE_BACKEND_PID(i);
    1603             :         QueuePosition pos;
    1604             : 
    1605             :         Assert(pid != InvalidPid);
    1606         100 :         pos = QUEUE_BACKEND_POS(i);
    1607         100 :         if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
    1608             :         {
    1609             :             /*
    1610             :              * Always signal listeners in our own database, unless they're
    1611             :              * already caught up (unlikely, but possible).
    1612             :              */
    1613         100 :             if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
    1614           0 :                 continue;
    1615             :         }
    1616             :         else
    1617             :         {
    1618             :             /*
    1619             :              * Listeners in other databases should be signaled only if they
    1620             :              * are far behind.
    1621             :              */
    1622           0 :             if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
    1623             :                                    QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
    1624           0 :                 continue;
    1625             :         }
    1626             :         /* OK, need to signal this one */
    1627         100 :         pids[count] = pid;
    1628         100 :         procnos[count] = i;
    1629         100 :         count++;
    1630             :     }
    1631         116 :     LWLockRelease(NotifyQueueLock);
    1632             : 
    1633             :     /* Now send signals */
    1634         216 :     for (int i = 0; i < count; i++)
    1635             :     {
    1636         100 :         int32       pid = pids[i];
    1637             : 
    1638             :         /*
    1639             :          * If we are signaling our own process, no need to involve the kernel;
    1640             :          * just set the flag directly.
    1641             :          */
    1642         100 :         if (pid == MyProcPid)
    1643             :         {
    1644          40 :             notifyInterruptPending = true;
    1645          40 :             continue;
    1646             :         }
    1647             : 
    1648             :         /*
    1649             :          * Note: assuming things aren't broken, a signal failure here could
    1650             :          * only occur if the target backend exited since we released
    1651             :          * NotifyQueueLock; which is unlikely but certainly possible. So we
    1652             :          * just log a low-level debug message if it happens.
    1653             :          */
    1654          60 :         if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
    1655           0 :             elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
    1656             :     }
    1657             : 
    1658         116 :     pfree(pids);
    1659         116 :     pfree(procnos);
    1660         116 : }
    1661             : 
    1662             : /*
    1663             :  * AtAbort_Notify
    1664             :  *
    1665             :  *  This is called at transaction abort.
    1666             :  *
    1667             :  *  Gets rid of pending actions and outbound notifies that we would have
    1668             :  *  executed if the transaction got committed.
    1669             :  */
    1670             : void
    1671       50420 : AtAbort_Notify(void)
    1672             : {
    1673             :     /*
    1674             :      * If we LISTEN but then roll back the transaction after PreCommit_Notify,
    1675             :      * we have registered as a listener but have not made any entry in
    1676             :      * listenChannels.  In that case, deregister again.
    1677             :      */
    1678       50420 :     if (amRegisteredListener && listenChannels == NIL)
    1679           0 :         asyncQueueUnregister();
    1680             : 
    1681             :     /* And clean up */
    1682       50420 :     ClearPendingActionsAndNotifies();
    1683       50420 : }
    1684             : 
    1685             : /*
    1686             :  * AtSubCommit_Notify() --- Take care of subtransaction commit.
    1687             :  *
    1688             :  * Reassign all items in the pending lists to the parent transaction.
    1689             :  */
    1690             : void
    1691       10748 : AtSubCommit_Notify(void)
    1692             : {
    1693       10748 :     int         my_level = GetCurrentTransactionNestLevel();
    1694             : 
    1695             :     /* If there are actions at our nesting level, we must reparent them. */
    1696       10748 :     if (pendingActions != NULL &&
    1697           0 :         pendingActions->nestingLevel >= my_level)
    1698             :     {
    1699           0 :         if (pendingActions->upper == NULL ||
    1700           0 :             pendingActions->upper->nestingLevel < my_level - 1)
    1701             :         {
    1702             :             /* nothing to merge; give the whole thing to the parent */
    1703           0 :             --pendingActions->nestingLevel;
    1704             :         }
    1705             :         else
    1706             :         {
    1707           0 :             ActionList *childPendingActions = pendingActions;
    1708             : 
    1709           0 :             pendingActions = pendingActions->upper;
    1710             : 
    1711             :             /*
    1712             :              * Mustn't try to eliminate duplicates here --- see queue_listen()
    1713             :              */
    1714           0 :             pendingActions->actions =
    1715           0 :                 list_concat(pendingActions->actions,
    1716           0 :                             childPendingActions->actions);
    1717           0 :             pfree(childPendingActions);
    1718             :         }
    1719             :     }
    1720             : 
    1721             :     /* If there are notifies at our nesting level, we must reparent them. */
    1722       10748 :     if (pendingNotifies != NULL &&
    1723           4 :         pendingNotifies->nestingLevel >= my_level)
    1724             :     {
    1725             :         Assert(pendingNotifies->nestingLevel == my_level);
    1726             : 
    1727           2 :         if (pendingNotifies->upper == NULL ||
    1728           2 :             pendingNotifies->upper->nestingLevel < my_level - 1)
    1729             :         {
    1730             :             /* nothing to merge; give the whole thing to the parent */
    1731           0 :             --pendingNotifies->nestingLevel;
    1732             :         }
    1733             :         else
    1734             :         {
    1735             :             /*
    1736             :              * Formerly, we didn't bother to eliminate duplicates here, but
    1737             :              * now we must, else we fall foul of "Assert(!found)", either here
    1738             :              * or during a later attempt to build the parent-level hashtable.
    1739             :              */
    1740           2 :             NotificationList *childPendingNotifies = pendingNotifies;
    1741             :             ListCell   *l;
    1742             : 
    1743           2 :             pendingNotifies = pendingNotifies->upper;
    1744             :             /* Insert all the subxact's events into parent, except for dups */
    1745          10 :             foreach(l, childPendingNotifies->events)
    1746             :             {
    1747           8 :                 Notification *childn = (Notification *) lfirst(l);
    1748             : 
    1749           8 :                 if (!AsyncExistsPendingNotify(childn))
    1750           4 :                     AddEventToPendingNotifies(childn);
    1751             :             }
    1752           2 :             pfree(childPendingNotifies);
    1753             :         }
    1754             :     }
    1755       10748 : }
    1756             : 
    1757             : /*
    1758             :  * AtSubAbort_Notify() --- Take care of subtransaction abort.
    1759             :  */
    1760             : void
    1761        9380 : AtSubAbort_Notify(void)
    1762             : {
    1763        9380 :     int         my_level = GetCurrentTransactionNestLevel();
    1764             : 
    1765             :     /*
    1766             :      * All we have to do is pop the stack --- the actions/notifies made in
    1767             :      * this subxact are no longer interesting, and the space will be freed
    1768             :      * when CurTransactionContext is recycled. We still have to free the
    1769             :      * ActionList and NotificationList objects themselves, though, because
    1770             :      * those are allocated in TopTransactionContext.
    1771             :      *
    1772             :      * Note that there might be no entries at all, or no entries for the
    1773             :      * current subtransaction level, either because none were ever created, or
    1774             :      * because we reentered this routine due to trouble during subxact abort.
    1775             :      */
    1776        9380 :     while (pendingActions != NULL &&
    1777           0 :            pendingActions->nestingLevel >= my_level)
    1778             :     {
    1779           0 :         ActionList *childPendingActions = pendingActions;
    1780             : 
    1781           0 :         pendingActions = pendingActions->upper;
    1782           0 :         pfree(childPendingActions);
    1783             :     }
    1784             : 
    1785        9382 :     while (pendingNotifies != NULL &&
    1786           4 :            pendingNotifies->nestingLevel >= my_level)
    1787             :     {
    1788           2 :         NotificationList *childPendingNotifies = pendingNotifies;
    1789             : 
    1790           2 :         pendingNotifies = pendingNotifies->upper;
    1791           2 :         pfree(childPendingNotifies);
    1792             :     }
    1793        9380 : }
    1794             : 
    1795             : /*
    1796             :  * HandleNotifyInterrupt
    1797             :  *
    1798             :  *      Signal handler portion of interrupt handling. Let the backend know
    1799             :  *      that there's a pending notify interrupt. If we're currently reading
    1800             :  *      from the client, this will interrupt the read and
    1801             :  *      ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
    1802             :  */
    1803             : void
    1804          40 : HandleNotifyInterrupt(void)
    1805             : {
    1806             :     /*
    1807             :      * Note: this is called by a SIGNAL HANDLER. You must be very wary what
    1808             :      * you do here.
    1809             :      */
    1810             : 
    1811             :     /* signal that work needs to be done */
    1812          40 :     notifyInterruptPending = true;
    1813             : 
    1814             :     /* make sure the event is processed in due course */
    1815          40 :     SetLatch(MyLatch);
    1816          40 : }
    1817             : 
    1818             : /*
    1819             :  * ProcessNotifyInterrupt
    1820             :  *
    1821             :  *      This is called if we see notifyInterruptPending set, just before
    1822             :  *      transmitting ReadyForQuery at the end of a frontend command, and
    1823             :  *      also if a notify signal occurs while reading from the frontend.
    1824             :  *      HandleNotifyInterrupt() will cause the read to be interrupted
    1825             :  *      via the process's latch, and this routine will get called.
    1826             :  *      If we are truly idle (ie, *not* inside a transaction block),
    1827             :  *      process the incoming notifies.
    1828             :  *
    1829             :  *      If "flush" is true, force any frontend messages out immediately.
    1830             :  *      This can be false when being called at the end of a frontend command,
    1831             :  *      since we'll flush after sending ReadyForQuery.
    1832             :  */
    1833             : void
    1834         190 : ProcessNotifyInterrupt(bool flush)
    1835             : {
    1836         190 :     if (IsTransactionOrTransactionBlock())
    1837         112 :         return;                 /* not really idle */
    1838             : 
    1839             :     /* Loop in case another signal arrives while sending messages */
    1840         156 :     while (notifyInterruptPending)
    1841          78 :         ProcessIncomingNotify(flush);
    1842             : }
    1843             : 
    1844             : 
    1845             : /*
    1846             :  * Read all pending notifications from the queue, and deliver appropriate
    1847             :  * ones to my frontend.  Stop when we reach queue head or an uncommitted
    1848             :  * notification.
    1849             :  */
    1850             : static void
    1851         100 : asyncQueueReadAllNotifications(void)
    1852             : {
    1853             :     QueuePosition pos;
    1854             :     QueuePosition head;
    1855             :     Snapshot    snapshot;
    1856             : 
    1857             :     /* Fetch current state */
    1858         100 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
    1859             :     /* Assert checks that we have a valid state entry */
    1860             :     Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
    1861         100 :     pos = QUEUE_BACKEND_POS(MyProcNumber);
    1862         100 :     head = QUEUE_HEAD;
    1863         100 :     LWLockRelease(NotifyQueueLock);
    1864             : 
    1865         100 :     if (QUEUE_POS_EQUAL(pos, head))
    1866             :     {
    1867             :         /* Nothing to do, we have read all notifications already. */
    1868           0 :         return;
    1869             :     }
    1870             : 
    1871             :     /*----------
    1872             :      * Get snapshot we'll use to decide which xacts are still in progress.
    1873             :      * This is trickier than it might seem, because of race conditions.
    1874             :      * Consider the following example:
    1875             :      *
    1876             :      * Backend 1:                    Backend 2:
    1877             :      *
    1878             :      * transaction starts
    1879             :      * UPDATE foo SET ...;
    1880             :      * NOTIFY foo;
    1881             :      * commit starts
    1882             :      * queue the notify message
    1883             :      *                               transaction starts
    1884             :      *                               LISTEN foo;  -- first LISTEN in session
    1885             :      *                               SELECT * FROM foo WHERE ...;
    1886             :      * commit to clog
    1887             :      *                               commit starts
    1888             :      *                               add backend 2 to array of listeners
    1889             :      *                               advance to queue head (this code)
    1890             :      *                               commit to clog
    1891             :      *
    1892             :      * Transaction 2's SELECT has not seen the UPDATE's effects, since that
    1893             :      * wasn't committed yet.  Ideally we'd ensure that client 2 would
    1894             :      * eventually get transaction 1's notify message, but there's no way
    1895             :      * to do that; until we're in the listener array, there's no guarantee
    1896             :      * that the notify message doesn't get removed from the queue.
    1897             :      *
    1898             :      * Therefore the coding technique transaction 2 is using is unsafe:
    1899             :      * applications must commit a LISTEN before inspecting database state,
    1900             :      * if they want to ensure they will see notifications about subsequent
    1901             :      * changes to that state.
    1902             :      *
    1903             :      * What we do guarantee is that we'll see all notifications from
    1904             :      * transactions committing after the snapshot we take here.
    1905             :      * Exec_ListenPreCommit has already added us to the listener array,
    1906             :      * so no not-yet-committed messages can be removed from the queue
    1907             :      * before we see them.
    1908             :      *----------
    1909             :      */
    1910         100 :     snapshot = RegisterSnapshot(GetLatestSnapshot());
    1911             : 
    1912             :     /*
    1913             :      * It is possible that we fail while trying to send a message to our
    1914             :      * frontend (for example, because of encoding conversion failure).  If
    1915             :      * that happens it is critical that we not try to send the same message
    1916             :      * over and over again.  Therefore, we set ExitOnAnyError to upgrade any
    1917             :      * ERRORs to FATAL, causing the client connection to be closed on error.
    1918             :      *
    1919             :      * We used to only skip over the offending message and try to soldier on,
    1920             :      * but it was somewhat questionable to lose a notification and give the
    1921             :      * client an ERROR instead.  A client application is not be prepared for
    1922             :      * that and can't tell that a notification was missed.  It was also not
    1923             :      * very useful in practice because notifications are often processed while
    1924             :      * a connection is idle and reading a message from the client, and in that
    1925             :      * state, any error is upgraded to FATAL anyway.  Closing the connection
    1926             :      * is a clear signal to the application that it might have missed
    1927             :      * notifications.
    1928             :      */
    1929             :     {
    1930         100 :         bool        save_ExitOnAnyError = ExitOnAnyError;
    1931             :         bool        reachedStop;
    1932             : 
    1933         100 :         ExitOnAnyError = true;
    1934             : 
    1935             :         do
    1936             :         {
    1937             :             /*
    1938             :              * Process messages up to the stop position, end of page, or an
    1939             :              * uncommitted message.
    1940             :              *
    1941             :              * Our stop position is what we found to be the head's position
    1942             :              * when we entered this function. It might have changed already.
    1943             :              * But if it has, we will receive (or have already received and
    1944             :              * queued) another signal and come here again.
    1945             :              *
    1946             :              * We are not holding NotifyQueueLock here! The queue can only
    1947             :              * extend beyond the head pointer (see above) and we leave our
    1948             :              * backend's pointer where it is so nobody will truncate or
    1949             :              * rewrite pages under us. Especially we don't want to hold a lock
    1950             :              * while sending the notifications to the frontend.
    1951             :              */
    1952         170 :             reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
    1953         170 :         } while (!reachedStop);
    1954             : 
    1955             :         /* Update shared state */
    1956         100 :         LWLockAcquire(NotifyQueueLock, LW_SHARED);
    1957         100 :         QUEUE_BACKEND_POS(MyProcNumber) = pos;
    1958         100 :         LWLockRelease(NotifyQueueLock);
    1959             : 
    1960         100 :         ExitOnAnyError = save_ExitOnAnyError;
    1961             :     }
    1962             : 
    1963             :     /* Done with snapshot */
    1964         100 :     UnregisterSnapshot(snapshot);
    1965             : }
    1966             : 
    1967             : /*
    1968             :  * Fetch notifications from the shared queue, beginning at position current,
    1969             :  * and deliver relevant ones to my frontend.
    1970             :  *
    1971             :  * The function returns true once we have reached the stop position or an
    1972             :  * uncommitted notification, and false if we have finished with the page.
    1973             :  * In other words: once it returns true there is no need to look further.
    1974             :  * The QueuePosition *current is advanced past all processed messages.
    1975             :  */
    1976             : static bool
    1977         170 : asyncQueueProcessPageEntries(QueuePosition *current,
    1978             :                              QueuePosition stop,
    1979             :                              Snapshot snapshot)
    1980             : {
    1981         170 :     int64       curpage = QUEUE_POS_PAGE(*current);
    1982             :     int         slotno;
    1983             :     char       *page_buffer;
    1984         170 :     bool        reachedStop = false;
    1985             :     bool        reachedEndOfPage;
    1986             : 
    1987             :     /*
    1988             :      * We copy the entries into a local buffer to avoid holding the SLRU lock
    1989             :      * while we transmit them to our frontend.  The local buffer must be
    1990             :      * adequately aligned, so use a union.
    1991             :      */
    1992             :     union
    1993             :     {
    1994             :         char        buf[QUEUE_PAGESIZE];
    1995             :         AsyncQueueEntry align;
    1996             :     }           local_buf;
    1997         170 :     char       *local_buf_end = local_buf.buf;
    1998             : 
    1999         170 :     slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
    2000             :                                         InvalidTransactionId);
    2001         170 :     page_buffer = NotifyCtl->shared->page_buffer[slotno];
    2002             : 
    2003             :     do
    2004             :     {
    2005        2484 :         QueuePosition thisentry = *current;
    2006             :         AsyncQueueEntry *qe;
    2007             : 
    2008        2484 :         if (QUEUE_POS_EQUAL(thisentry, stop))
    2009         100 :             break;
    2010             : 
    2011        2384 :         qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
    2012             : 
    2013             :         /*
    2014             :          * Advance *current over this message, possibly to the next page. As
    2015             :          * noted in the comments for asyncQueueReadAllNotifications, we must
    2016             :          * do this before possibly failing while processing the message.
    2017             :          */
    2018        2384 :         reachedEndOfPage = asyncQueueAdvance(current, qe->length);
    2019             : 
    2020             :         /* Ignore messages destined for other databases */
    2021        2384 :         if (qe->dboid == MyDatabaseId)
    2022             :         {
    2023        2318 :             if (XidInMVCCSnapshot(qe->xid, snapshot))
    2024             :             {
    2025             :                 /*
    2026             :                  * The source transaction is still in progress, so we can't
    2027             :                  * process this message yet.  Break out of the loop, but first
    2028             :                  * back up *current so we will reprocess the message next
    2029             :                  * time.  (Note: it is unlikely but not impossible for
    2030             :                  * TransactionIdDidCommit to fail, so we can't really avoid
    2031             :                  * this advance-then-back-up behavior when dealing with an
    2032             :                  * uncommitted message.)
    2033             :                  *
    2034             :                  * Note that we must test XidInMVCCSnapshot before we test
    2035             :                  * TransactionIdDidCommit, else we might return a message from
    2036             :                  * a transaction that is not yet visible to snapshots; compare
    2037             :                  * the comments at the head of heapam_visibility.c.
    2038             :                  *
    2039             :                  * Also, while our own xact won't be listed in the snapshot,
    2040             :                  * we need not check for TransactionIdIsCurrentTransactionId
    2041             :                  * because our transaction cannot (yet) have queued any
    2042             :                  * messages.
    2043             :                  */
    2044           0 :                 *current = thisentry;
    2045           0 :                 reachedStop = true;
    2046           0 :                 break;
    2047             :             }
    2048             : 
    2049             :             /*
    2050             :              * Quick check for the case that we're not listening on any
    2051             :              * channels, before calling TransactionIdDidCommit().  This makes
    2052             :              * that case a little faster, but more importantly, it ensures
    2053             :              * that if there's a bad entry in the queue for which
    2054             :              * TransactionIdDidCommit() fails for some reason, we can skip
    2055             :              * over it on the first LISTEN in a session, and not get stuck on
    2056             :              * it indefinitely.
    2057             :              */
    2058        2318 :             if (listenChannels == NIL)
    2059        2182 :                 continue;
    2060             : 
    2061         136 :             if (TransactionIdDidCommit(qe->xid))
    2062             :             {
    2063         136 :                 memcpy(local_buf_end, qe, qe->length);
    2064         136 :                 local_buf_end += qe->length;
    2065             :             }
    2066             :             else
    2067             :             {
    2068             :                 /*
    2069             :                  * The source transaction aborted or crashed, so we just
    2070             :                  * ignore its notifications.
    2071             :                  */
    2072             :             }
    2073             :         }
    2074             : 
    2075             :         /* Loop back if we're not at end of page */
    2076        2384 :     } while (!reachedEndOfPage);
    2077             : 
    2078             :     /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
    2079         170 :     LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
    2080             : 
    2081             :     /*
    2082             :      * Now that we have let go of the SLRU bank lock, send the notifications
    2083             :      * to our backend
    2084             :      */
    2085             :     Assert(local_buf_end - local_buf.buf <= BLCKSZ);
    2086         306 :     for (char *p = local_buf.buf; p < local_buf_end;)
    2087             :     {
    2088         136 :         AsyncQueueEntry *qe = (AsyncQueueEntry *) p;
    2089             : 
    2090             :         /* qe->data is the null-terminated channel name */
    2091         136 :         char       *channel = qe->data;
    2092             : 
    2093         136 :         if (IsListeningOn(channel))
    2094             :         {
    2095             :             /* payload follows channel name */
    2096          62 :             char       *payload = qe->data + strlen(channel) + 1;
    2097             : 
    2098          62 :             NotifyMyFrontEnd(channel, payload, qe->srcPid);
    2099             :         }
    2100             : 
    2101         136 :         p += qe->length;
    2102             :     }
    2103             : 
    2104         170 :     if (QUEUE_POS_EQUAL(*current, stop))
    2105         100 :         reachedStop = true;
    2106             : 
    2107         170 :     return reachedStop;
    2108             : }
    2109             : 
    2110             : /*
    2111             :  * Advance the shared queue tail variable to the minimum of all the
    2112             :  * per-backend tail pointers.  Truncate pg_notify space if possible.
    2113             :  *
    2114             :  * This is (usually) called during CommitTransaction(), so it's important for
    2115             :  * it to have very low probability of failure.
    2116             :  */
    2117             : static void
    2118          26 : asyncQueueAdvanceTail(void)
    2119             : {
    2120             :     QueuePosition min;
    2121             :     int64       oldtailpage;
    2122             :     int64       newtailpage;
    2123             :     int64       boundary;
    2124             : 
    2125             :     /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
    2126          26 :     LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
    2127             : 
    2128             :     /*
    2129             :      * Compute the new tail.  Pre-v13, it's essential that QUEUE_TAIL be exact
    2130             :      * (ie, exactly match at least one backend's queue position), so it must
    2131             :      * be updated atomically with the actual computation.  Since v13, we could
    2132             :      * get away with not doing it like that, but it seems prudent to keep it
    2133             :      * so.
    2134             :      *
    2135             :      * Also, because incoming backends will scan forward from QUEUE_TAIL, that
    2136             :      * must be advanced before we can truncate any data.  Thus, QUEUE_TAIL is
    2137             :      * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
    2138             :      * un-truncated page.  When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
    2139             :      * there are pages we can truncate but haven't yet finished doing so.
    2140             :      *
    2141             :      * For concurrency's sake, we don't want to hold NotifyQueueLock while
    2142             :      * performing SimpleLruTruncate.  This is OK because no backend will try
    2143             :      * to access the pages we are in the midst of truncating.
    2144             :      */
    2145          26 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    2146          26 :     min = QUEUE_HEAD;
    2147          46 :     for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    2148             :     {
    2149             :         Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
    2150          20 :         min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
    2151             :     }
    2152          26 :     QUEUE_TAIL = min;
    2153          26 :     oldtailpage = QUEUE_STOP_PAGE;
    2154          26 :     LWLockRelease(NotifyQueueLock);
    2155             : 
    2156             :     /*
    2157             :      * We can truncate something if the global tail advanced across an SLRU
    2158             :      * segment boundary.
    2159             :      *
    2160             :      * XXX it might be better to truncate only once every several segments, to
    2161             :      * reduce the number of directory scans.
    2162             :      */
    2163          26 :     newtailpage = QUEUE_POS_PAGE(min);
    2164          26 :     boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
    2165          26 :     if (asyncQueuePagePrecedes(oldtailpage, boundary))
    2166             :     {
    2167             :         /*
    2168             :          * SimpleLruTruncate() will ask for SLRU bank locks but will also
    2169             :          * release the lock again.
    2170             :          */
    2171           0 :         SimpleLruTruncate(NotifyCtl, newtailpage);
    2172             : 
    2173           0 :         LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    2174           0 :         QUEUE_STOP_PAGE = newtailpage;
    2175           0 :         LWLockRelease(NotifyQueueLock);
    2176             :     }
    2177             : 
    2178          26 :     LWLockRelease(NotifyQueueTailLock);
    2179          26 : }
    2180             : 
    2181             : /*
    2182             :  * AsyncNotifyFreezeXids
    2183             :  *
    2184             :  * Prepare the async notification queue for CLOG truncation by freezing
    2185             :  * transaction IDs that are about to become inaccessible.
    2186             :  *
    2187             :  * This function is called by VACUUM before advancing datfrozenxid. It scans
    2188             :  * the notification queue and replaces XIDs that would become inaccessible
    2189             :  * after CLOG truncation with special markers:
    2190             :  * - Committed transactions are set to FrozenTransactionId
    2191             :  * - Aborted/crashed transactions are set to InvalidTransactionId
    2192             :  *
    2193             :  * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
    2194             :  * pages will be truncated. If XID < newFrozenXid, it cannot still be running
    2195             :  * (or it would have held back newFrozenXid through ProcArray).
    2196             :  * Therefore, if TransactionIdDidCommit returns false, we know the transaction
    2197             :  * either aborted explicitly or crashed, and we can safely mark it invalid.
    2198             :  */
    2199             : void
    2200        2190 : AsyncNotifyFreezeXids(TransactionId newFrozenXid)
    2201             : {
    2202             :     QueuePosition pos;
    2203             :     QueuePosition head;
    2204        2190 :     int64       curpage = -1;
    2205        2190 :     int         slotno = -1;
    2206        2190 :     char       *page_buffer = NULL;
    2207        2190 :     bool        page_dirty = false;
    2208             : 
    2209             :     /*
    2210             :      * Acquire locks in the correct order to avoid deadlocks. As per the
    2211             :      * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
    2212             :      * bank locks.
    2213             :      *
    2214             :      * We only need SHARED mode since we're just reading the head/tail
    2215             :      * positions, not modifying them.
    2216             :      */
    2217        2190 :     LWLockAcquire(NotifyQueueTailLock, LW_SHARED);
    2218        2190 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
    2219             : 
    2220        2190 :     pos = QUEUE_TAIL;
    2221        2190 :     head = QUEUE_HEAD;
    2222             : 
    2223             :     /* Release NotifyQueueLock early, we only needed to read the positions */
    2224        2190 :     LWLockRelease(NotifyQueueLock);
    2225             : 
    2226             :     /*
    2227             :      * Scan the queue from tail to head, freezing XIDs as needed. We hold
    2228             :      * NotifyQueueTailLock throughout to ensure the tail doesn't move while
    2229             :      * we're working.
    2230             :      */
    2231        2230 :     while (!QUEUE_POS_EQUAL(pos, head))
    2232             :     {
    2233             :         AsyncQueueEntry *qe;
    2234             :         TransactionId xid;
    2235          40 :         int64       pageno = QUEUE_POS_PAGE(pos);
    2236          40 :         int         offset = QUEUE_POS_OFFSET(pos);
    2237             : 
    2238             :         /* If we need a different page, release old lock and get new one */
    2239          40 :         if (pageno != curpage)
    2240             :         {
    2241             :             LWLock     *lock;
    2242             : 
    2243             :             /* Release previous page if any */
    2244           4 :             if (slotno >= 0)
    2245             :             {
    2246           0 :                 if (page_dirty)
    2247             :                 {
    2248           0 :                     NotifyCtl->shared->page_dirty[slotno] = true;
    2249           0 :                     page_dirty = false;
    2250             :                 }
    2251           0 :                 LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
    2252             :             }
    2253             : 
    2254           4 :             lock = SimpleLruGetBankLock(NotifyCtl, pageno);
    2255           4 :             LWLockAcquire(lock, LW_EXCLUSIVE);
    2256           4 :             slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
    2257             :                                        InvalidTransactionId);
    2258           4 :             page_buffer = NotifyCtl->shared->page_buffer[slotno];
    2259           4 :             curpage = pageno;
    2260             :         }
    2261             : 
    2262          40 :         qe = (AsyncQueueEntry *) (page_buffer + offset);
    2263          40 :         xid = qe->xid;
    2264             : 
    2265          80 :         if (TransactionIdIsNormal(xid) &&
    2266          40 :             TransactionIdPrecedes(xid, newFrozenXid))
    2267             :         {
    2268           0 :             if (TransactionIdDidCommit(xid))
    2269             :             {
    2270           0 :                 qe->xid = FrozenTransactionId;
    2271           0 :                 page_dirty = true;
    2272             :             }
    2273             :             else
    2274             :             {
    2275           0 :                 qe->xid = InvalidTransactionId;
    2276           0 :                 page_dirty = true;
    2277             :             }
    2278             :         }
    2279             : 
    2280             :         /* Advance to next entry */
    2281          40 :         asyncQueueAdvance(&pos, qe->length);
    2282             :     }
    2283             : 
    2284             :     /* Release final page lock if we acquired one */
    2285        2190 :     if (slotno >= 0)
    2286             :     {
    2287           4 :         if (page_dirty)
    2288           0 :             NotifyCtl->shared->page_dirty[slotno] = true;
    2289           4 :         LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
    2290             :     }
    2291             : 
    2292        2190 :     LWLockRelease(NotifyQueueTailLock);
    2293        2190 : }
    2294             : 
    2295             : /*
    2296             :  * ProcessIncomingNotify
    2297             :  *
    2298             :  *      Scan the queue for arriving notifications and report them to the front
    2299             :  *      end.  The notifications might be from other sessions, or our own;
    2300             :  *      there's no need to distinguish here.
    2301             :  *
    2302             :  *      If "flush" is true, force any frontend messages out immediately.
    2303             :  *
    2304             :  *      NOTE: since we are outside any transaction, we must create our own.
    2305             :  */
    2306             : static void
    2307          78 : ProcessIncomingNotify(bool flush)
    2308             : {
    2309             :     /* We *must* reset the flag */
    2310          78 :     notifyInterruptPending = false;
    2311             : 
    2312             :     /* Do nothing else if we aren't actively listening */
    2313          78 :     if (listenChannels == NIL)
    2314           0 :         return;
    2315             : 
    2316          78 :     if (Trace_notify)
    2317           0 :         elog(DEBUG1, "ProcessIncomingNotify");
    2318             : 
    2319          78 :     set_ps_display("notify interrupt");
    2320             : 
    2321             :     /*
    2322             :      * We must run asyncQueueReadAllNotifications inside a transaction, else
    2323             :      * bad things happen if it gets an error.
    2324             :      */
    2325          78 :     StartTransactionCommand();
    2326             : 
    2327          78 :     asyncQueueReadAllNotifications();
    2328             : 
    2329          78 :     CommitTransactionCommand();
    2330             : 
    2331             :     /*
    2332             :      * If this isn't an end-of-command case, we must flush the notify messages
    2333             :      * to ensure frontend gets them promptly.
    2334             :      */
    2335          78 :     if (flush)
    2336          20 :         pq_flush();
    2337             : 
    2338          78 :     set_ps_display("idle");
    2339             : 
    2340          78 :     if (Trace_notify)
    2341           0 :         elog(DEBUG1, "ProcessIncomingNotify: done");
    2342             : }
    2343             : 
    2344             : /*
    2345             :  * Send NOTIFY message to my front end.
    2346             :  */
    2347             : void
    2348          62 : NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
    2349             : {
    2350          62 :     if (whereToSendOutput == DestRemote)
    2351             :     {
    2352             :         StringInfoData buf;
    2353             : 
    2354          62 :         pq_beginmessage(&buf, PqMsg_NotificationResponse);
    2355          62 :         pq_sendint32(&buf, srcPid);
    2356          62 :         pq_sendstring(&buf, channel);
    2357          62 :         pq_sendstring(&buf, payload);
    2358          62 :         pq_endmessage(&buf);
    2359             : 
    2360             :         /*
    2361             :          * NOTE: we do not do pq_flush() here.  Some level of caller will
    2362             :          * handle it later, allowing this message to be combined into a packet
    2363             :          * with other ones.
    2364             :          */
    2365             :     }
    2366             :     else
    2367           0 :         elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
    2368          62 : }
    2369             : 
    2370             : /* Does pendingNotifies include a match for the given event? */
    2371             : static bool
    2372        2096 : AsyncExistsPendingNotify(Notification *n)
    2373             : {
    2374        2096 :     if (pendingNotifies == NULL)
    2375           0 :         return false;
    2376             : 
    2377        2096 :     if (pendingNotifies->hashtab != NULL)
    2378             :     {
    2379             :         /* Use the hash table to probe for a match */
    2380        1966 :         if (hash_search(pendingNotifies->hashtab,
    2381             :                         &n,
    2382             :                         HASH_FIND,
    2383             :                         NULL))
    2384           0 :             return true;
    2385             :     }
    2386             :     else
    2387             :     {
    2388             :         /* Must scan the event list */
    2389             :         ListCell   *l;
    2390             : 
    2391         542 :         foreach(l, pendingNotifies->events)
    2392             :         {
    2393         440 :             Notification *oldn = (Notification *) lfirst(l);
    2394             : 
    2395         440 :             if (n->channel_len == oldn->channel_len &&
    2396         440 :                 n->payload_len == oldn->payload_len &&
    2397         250 :                 memcmp(n->data, oldn->data,
    2398         250 :                        n->channel_len + n->payload_len + 2) == 0)
    2399          28 :                 return true;
    2400             :         }
    2401             :     }
    2402             : 
    2403        2068 :     return false;
    2404             : }
    2405             : 
    2406             : /*
    2407             :  * Add a notification event to a pre-existing pendingNotifies list.
    2408             :  *
    2409             :  * Because pendingNotifies->events is already nonempty, this works
    2410             :  * correctly no matter what CurrentMemoryContext is.
    2411             :  */
    2412             : static void
    2413        2068 : AddEventToPendingNotifies(Notification *n)
    2414             : {
    2415             :     Assert(pendingNotifies->events != NIL);
    2416             : 
    2417             :     /* Create the hash table if it's time to */
    2418        2068 :     if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
    2419        1968 :         pendingNotifies->hashtab == NULL)
    2420             :     {
    2421             :         HASHCTL     hash_ctl;
    2422             :         ListCell   *l;
    2423             : 
    2424             :         /* Create the hash table */
    2425           2 :         hash_ctl.keysize = sizeof(Notification *);
    2426           2 :         hash_ctl.entrysize = sizeof(struct NotificationHash);
    2427           2 :         hash_ctl.hash = notification_hash;
    2428           2 :         hash_ctl.match = notification_match;
    2429           2 :         hash_ctl.hcxt = CurTransactionContext;
    2430           4 :         pendingNotifies->hashtab =
    2431           2 :             hash_create("Pending Notifies",
    2432             :                         256L,
    2433             :                         &hash_ctl,
    2434             :                         HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
    2435             : 
    2436             :         /* Insert all the already-existing events */
    2437          34 :         foreach(l, pendingNotifies->events)
    2438             :         {
    2439          32 :             Notification *oldn = (Notification *) lfirst(l);
    2440             :             bool        found;
    2441             : 
    2442          32 :             (void) hash_search(pendingNotifies->hashtab,
    2443             :                                &oldn,
    2444             :                                HASH_ENTER,
    2445             :                                &found);
    2446             :             Assert(!found);
    2447             :         }
    2448             :     }
    2449             : 
    2450             :     /* Add new event to the list, in order */
    2451        2068 :     pendingNotifies->events = lappend(pendingNotifies->events, n);
    2452             : 
    2453             :     /* Add event to the hash table if needed */
    2454        2068 :     if (pendingNotifies->hashtab != NULL)
    2455             :     {
    2456             :         bool        found;
    2457             : 
    2458        1968 :         (void) hash_search(pendingNotifies->hashtab,
    2459             :                            &n,
    2460             :                            HASH_ENTER,
    2461             :                            &found);
    2462             :         Assert(!found);
    2463             :     }
    2464        2068 : }
    2465             : 
    2466             : /*
    2467             :  * notification_hash: hash function for notification hash table
    2468             :  *
    2469             :  * The hash "keys" are pointers to Notification structs.
    2470             :  */
    2471             : static uint32
    2472        3966 : notification_hash(const void *key, Size keysize)
    2473             : {
    2474        3966 :     const Notification *k = *(const Notification *const *) key;
    2475             : 
    2476             :     Assert(keysize == sizeof(Notification *));
    2477             :     /* We don't bother to include the payload's trailing null in the hash */
    2478        3966 :     return DatumGetUInt32(hash_any((const unsigned char *) k->data,
    2479        3966 :                                    k->channel_len + k->payload_len + 1));
    2480             : }
    2481             : 
    2482             : /*
    2483             :  * notification_match: match function to use with notification_hash
    2484             :  */
    2485             : static int
    2486           0 : notification_match(const void *key1, const void *key2, Size keysize)
    2487             : {
    2488           0 :     const Notification *k1 = *(const Notification *const *) key1;
    2489           0 :     const Notification *k2 = *(const Notification *const *) key2;
    2490             : 
    2491             :     Assert(keysize == sizeof(Notification *));
    2492           0 :     if (k1->channel_len == k2->channel_len &&
    2493           0 :         k1->payload_len == k2->payload_len &&
    2494           0 :         memcmp(k1->data, k2->data,
    2495           0 :                k1->channel_len + k1->payload_len + 2) == 0)
    2496           0 :         return 0;               /* equal */
    2497           0 :     return 1;                   /* not equal */
    2498             : }
    2499             : 
    2500             : /* Clear the pendingActions and pendingNotifies lists. */
    2501             : static void
    2502       50634 : ClearPendingActionsAndNotifies(void)
    2503             : {
    2504             :     /*
    2505             :      * Everything's allocated in either TopTransactionContext or the context
    2506             :      * for the subtransaction to which it corresponds.  So, there's nothing to
    2507             :      * do here except reset the pointers; the space will be reclaimed when the
    2508             :      * contexts are deleted.
    2509             :      */
    2510       50634 :     pendingActions = NULL;
    2511       50634 :     pendingNotifies = NULL;
    2512       50634 : }
    2513             : 
    2514             : /*
    2515             :  * GUC check_hook for notify_buffers
    2516             :  */
    2517             : bool
    2518        2276 : check_notify_buffers(int *newval, void **extra, GucSource source)
    2519             : {
    2520        2276 :     return check_slru_buffers("notify_buffers", newval);
    2521             : }

Generated by: LCOV version 1.16