LCOV - code coverage report
Current view: top level - src/backend/commands - async.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 90.1 % 809 729
Test Date: 2026-06-12 20:16:36 Functions: 96.4 % 55 53
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * async.c
       4              :  *    Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *    src/backend/commands/async.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : /*-------------------------------------------------------------------------
      16              :  * Async Notification Model as of v19:
      17              :  *
      18              :  * 1. Multiple backends on same machine.  Multiple backends may be listening
      19              :  *    on each of several channels.
      20              :  *
      21              :  * 2. There is one central queue in disk-based storage (directory pg_notify/),
      22              :  *    with actively-used pages mapped into shared memory by the slru.c module.
      23              :  *    All notification messages are placed in the queue and later read out
      24              :  *    by listening backends.  The single queue allows us to guarantee that
      25              :  *    notifications are received in commit order.
      26              :  *
      27              :  *    Although there is only one queue, notifications are treated as being
      28              :  *    database-local; this is done by including the sender's database OID
      29              :  *    in each notification message.  Listening backends ignore messages
      30              :  *    that don't match their database OID.  This is important because it
      31              :  *    ensures senders and receivers have the same database encoding and won't
      32              :  *    misinterpret non-ASCII text in the channel name or payload string.
      33              :  *
      34              :  *    Since notifications are not expected to survive database crashes,
      35              :  *    we can simply clean out the pg_notify data at any reboot, and there
      36              :  *    is no need for WAL support or fsync'ing.
      37              :  *
      38              :  * 3. Every backend that is listening on at least one channel registers by
      39              :  *    entering its PID into the array in AsyncQueueControl. It then scans all
      40              :  *    incoming notifications in the central queue and first compares the
      41              :  *    database OID of the notification with its own database OID and then
      42              :  *    compares the notified channel with the list of channels that it listens
      43              :  *    to. In case there is a match it delivers the notification event to its
      44              :  *    frontend.  Non-matching events are simply skipped.
      45              :  *
      46              :  * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
      47              :  *    a backend-local list which will not be processed until transaction end.
      48              :  *
      49              :  *    Duplicate notifications from the same transaction are sent out as one
      50              :  *    notification only. This is done to save work when for example a trigger
      51              :  *    on a 2 million row table fires a notification for each row that has been
      52              :  *    changed. If the application needs to receive every single notification
      53              :  *    that has been sent, it can easily add some unique string into the extra
      54              :  *    payload parameter.
      55              :  *
      56              :  *    When the transaction is ready to commit, PreCommit_Notify() adds the
      57              :  *    pending notifications to the head of the queue. The head pointer of the
      58              :  *    queue always points to the next free position and a position is just a
      59              :  *    page number and the offset in that page. This is done before marking the
      60              :  *    transaction as committed in clog. If we run into problems writing the
      61              :  *    notifications, we can still call elog(ERROR, ...) and the transaction
      62              :  *    will roll back safely.
      63              :  *
      64              :  *    Once we have put all of the notifications into the queue, we return to
      65              :  *    CommitTransaction() which will then do the actual transaction commit.
      66              :  *
      67              :  *    After commit we are called another time (AtCommit_Notify()). Here we
      68              :  *    make any required updates to the effective listen state (see below).
      69              :  *    Then we signal any backends that may be interested in our messages
      70              :  *    (including our own backend, if listening).  This is done by
      71              :  *    SignalBackends(), which sends a PROCSIG_NOTIFY_INTERRUPT signal to
      72              :  *    each relevant backend, as described below.
      73              :  *
      74              :  *    Finally, after we are out of the transaction altogether and about to go
      75              :  *    idle, we scan the queue for messages that need to be sent to our
      76              :  *    frontend (which might be notifies from other backends, or self-notifies
      77              :  *    from our own).  This step is not part of the CommitTransaction sequence
      78              :  *    for two important reasons.  First, we could get errors while sending
      79              :  *    data to our frontend, and it's really bad for errors to happen in
      80              :  *    post-commit cleanup.  Second, in cases where a procedure issues commits
      81              :  *    within a single frontend command, we don't want to send notifies to our
      82              :  *    frontend until the command is done; but notifies to other backends
      83              :  *    should go out immediately after each commit.
      84              :  *
      85              :  * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
      86              :  *    sets the process's latch, which triggers the event to be processed
      87              :  *    immediately if this backend is idle (i.e., it is waiting for a frontend
      88              :  *    command and is not within a transaction block. C.f.
      89              :  *    ProcessClientReadInterrupt()).  Otherwise the handler may only set a
      90              :  *    flag, which will cause the processing to occur just before we next go
      91              :  *    idle.
      92              :  *
      93              :  *    Inbound-notify processing consists of reading all of the notifications
      94              :  *    that have arrived since scanning last time. We read every notification
      95              :  *    until we reach either a notification from an uncommitted transaction or
      96              :  *    the head pointer's position.
      97              :  *
      98              :  * 6. To limit disk space consumption, the tail pointer needs to be advanced
      99              :  *    so that old pages can be truncated. This is relatively expensive
     100              :  *    (notably, it requires an exclusive lock), so we don't want to do it
     101              :  *    often. We make sending backends do this work if they advanced the queue
     102              :  *    head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
     103              :  *
     104              :  * 7. So far we have not discussed how backends change their listening state,
     105              :  *    nor how notification senders know which backends to awaken.  To handle
     106              :  *    the latter, we maintain a global channel table (implemented as a dynamic
     107              :  *    shared hash table, or dshash) that maps channel names to the set of
     108              :  *    backends listening on each channel.  This table is created lazily on the
     109              :  *    first LISTEN command and grows dynamically as needed.  There is also a
     110              :  *    local channel table (a plain dynahash table) in each listening backend,
     111              :  *    tracking which channels that backend is listening to.  The local table
     112              :  *    serves to reduce the number of accesses needed to the shared table.
     113              :  *
     114              :  *    If the current transaction has executed any LISTEN/UNLISTEN actions,
     115              :  *    PreCommit_Notify() prepares to commit those.  For LISTEN, it
     116              :  *    pre-allocates entries in both the per-backend localChannelTable and the
     117              :  *    shared globalChannelTable, marking new shared entries removeOnAbort.
     118              :  *    It also records the final per-channel intent in pendingListenActions,
     119              :  *    so post-commit/abort processing can apply that in a single step.
     120              :  *    Since all these allocations happen before committing to clog, we can
     121              :  *    safely abort the transaction on failure.
     122              :  *
     123              :  *    After commit, AtCommit_Notify() runs through pendingListenActions and
     124              :  *    applies the final per-channel listen/unlisten state.  This happens
     125              :  *    before sending signals.
     126              :  *
     127              :  *    SignalBackends() consults the shared global channel table to identify
     128              :  *    listeners for the channels that the current transaction sent
     129              :  *    notification(s) to.  Each selected backend is marked as having a wakeup
     130              :  *    pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
     131              :  *    signal is sent to it.
     132              :  *
     133              :  * 8. While writing notifications, PreCommit_Notify() records the queue head
     134              :  *    position both before and after the write.  Because all writers serialize
     135              :  *    on a cluster-wide heavyweight lock, no other backend can insert entries
     136              :  *    between these two points.  SignalBackends() uses this fact to directly
     137              :  *    advance the queue pointer for any backend that is still positioned at
     138              :  *    the old head, or within the range written, but is not interested in any
     139              :  *    of our notifications.  This avoids unnecessary wakeups for idle
     140              :  *    listeners that have nothing to read.  Backends that are not interested
     141              :  *    in our notifications, but cannot be directly advanced, are signaled only
     142              :  *    if they are far behind the current queue head; that is to ensure that
     143              :  *    we can advance the queue tail without undue delay.
     144              :  *
     145              :  * An application that listens on the same channel it notifies will get
     146              :  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
     147              :  * by comparing be_pid in the NOTIFY message to the application's own backend's
     148              :  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
     149              :  * frontend during startup.)  The above design guarantees that notifies from
     150              :  * other backends will never be missed by ignoring self-notifies.
     151              :  *
     152              :  * The amount of shared memory used for notify management (notify_buffers)
     153              :  * can be varied without affecting anything but performance.  The maximum
     154              :  * amount of notification data that can be queued at one time is determined
     155              :  * by the max_notify_queue_pages GUC.
     156              :  *-------------------------------------------------------------------------
     157              :  */
     158              : 
     159              : #include "postgres.h"
     160              : 
     161              : #include <limits.h>
     162              : #include <unistd.h>
     163              : #include <signal.h>
     164              : 
     165              : #include "access/parallel.h"
     166              : #include "access/slru.h"
     167              : #include "access/transam.h"
     168              : #include "access/xact.h"
     169              : #include "catalog/pg_database.h"
     170              : #include "commands/async.h"
     171              : #include "common/hashfn.h"
     172              : #include "funcapi.h"
     173              : #include "lib/dshash.h"
     174              : #include "libpq/libpq.h"
     175              : #include "libpq/pqformat.h"
     176              : #include "miscadmin.h"
     177              : #include "storage/dsm_registry.h"
     178              : #include "storage/ipc.h"
     179              : #include "storage/latch.h"
     180              : #include "storage/lmgr.h"
     181              : #include "storage/procsignal.h"
     182              : #include "storage/subsystems.h"
     183              : #include "tcop/tcopprot.h"
     184              : #include "utils/builtins.h"
     185              : #include "utils/dsa.h"
     186              : #include "utils/guc_hooks.h"
     187              : #include "utils/memutils.h"
     188              : #include "utils/ps_status.h"
     189              : #include "utils/snapmgr.h"
     190              : #include "utils/timestamp.h"
     191              : 
     192              : 
     193              : /*
     194              :  * Maximum size of a NOTIFY payload, including terminating NULL.  This
     195              :  * must be kept small enough so that a notification message fits on one
     196              :  * SLRU page.  The magic fudge factor here is noncritical as long as it's
     197              :  * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
     198              :  * than that, so changes in that data structure won't affect user-visible
     199              :  * restrictions.
     200              :  */
     201              : #define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
     202              : 
     203              : /*
     204              :  * Struct representing an entry in the global notify queue
     205              :  *
     206              :  * This struct declaration has the maximal length, but in a real queue entry
     207              :  * the data area is only big enough for the actual channel and payload strings
     208              :  * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
     209              :  * entry size, if both channel and payload strings are empty (but note it
     210              :  * doesn't include alignment padding).
     211              :  *
     212              :  * The "length" field should always be rounded up to the next QUEUEALIGN
     213              :  * multiple so that all fields are properly aligned.
     214              :  */
     215              : typedef struct AsyncQueueEntry
     216              : {
     217              :     int         length;         /* total allocated length of entry */
     218              :     Oid         dboid;          /* sender's database OID */
     219              :     TransactionId xid;          /* sender's XID */
     220              :     int32       srcPid;         /* sender's PID */
     221              :     char        data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
     222              : } AsyncQueueEntry;
     223              : 
     224              : /* Currently, no field of AsyncQueueEntry requires more than int alignment */
     225              : #define QUEUEALIGN(len)     INTALIGN(len)
     226              : 
     227              : #define AsyncQueueEntryEmptySize    (offsetof(AsyncQueueEntry, data) + 2)
     228              : 
     229              : /*
     230              :  * Struct describing a queue position, and assorted macros for working with it
     231              :  */
     232              : typedef struct QueuePosition
     233              : {
     234              :     int64       page;           /* SLRU page number */
     235              :     int         offset;         /* byte offset within page */
     236              : } QueuePosition;
     237              : 
     238              : #define QUEUE_POS_PAGE(x)       ((x).page)
     239              : #define QUEUE_POS_OFFSET(x)     ((x).offset)
     240              : 
     241              : #define SET_QUEUE_POS(x,y,z) \
     242              :     do { \
     243              :         (x).page = (y); \
     244              :         (x).offset = (z); \
     245              :     } while (0)
     246              : 
     247              : #define QUEUE_POS_EQUAL(x,y) \
     248              :     ((x).page == (y).page && (x).offset == (y).offset)
     249              : 
     250              : #define QUEUE_POS_IS_ZERO(x) \
     251              :     ((x).page == 0 && (x).offset == 0)
     252              : 
     253              : /* choose logically smaller QueuePosition */
     254              : #define QUEUE_POS_MIN(x,y) \
     255              :     (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
     256              :      (x).page != (y).page ? (y) : \
     257              :      (x).offset < (y).offset ? (x) : (y))
     258              : 
     259              : /* choose logically larger QueuePosition */
     260              : #define QUEUE_POS_MAX(x,y) \
     261              :     (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
     262              :      (x).page != (y).page ? (x) : \
     263              :      (x).offset > (y).offset ? (x) : (y))
     264              : 
     265              : /* returns true if x comes before y in queue order */
     266              : #define QUEUE_POS_PRECEDES(x,y) \
     267              :     (asyncQueuePagePrecedes((x).page, (y).page) || \
     268              :      ((x).page == (y).page && (x).offset < (y).offset))
     269              : 
     270              : /*
     271              :  * Parameter determining how often we try to advance the tail pointer:
     272              :  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
     273              :  * also the distance by which a backend that's not interested in our
     274              :  * notifications needs to be behind before we'll decide we need to wake it
     275              :  * up so it can advance its pointer.
     276              :  *
     277              :  * Resist the temptation to make this really large.  While that would save
     278              :  * work in some places, it would add cost in others.  In particular, this
     279              :  * should likely be less than notify_buffers, to ensure that backends
     280              :  * catch up before the pages they'll need to read fall out of SLRU cache.
     281              :  */
     282              : #define QUEUE_CLEANUP_DELAY 4
     283              : 
     284              : /*
     285              :  * Struct describing a listening backend's status
     286              :  */
     287              : typedef struct QueueBackendStatus
     288              : {
     289              :     int32       pid;            /* either a PID or InvalidPid */
     290              :     Oid         dboid;          /* backend's database OID, or InvalidOid */
     291              :     ProcNumber  nextListener;   /* id of next listener, or INVALID_PROC_NUMBER */
     292              :     QueuePosition pos;          /* backend has read queue up to here */
     293              :     bool        wakeupPending;  /* signal sent to backend, not yet processed */
     294              :     bool        isAdvancing;    /* backend is advancing its position */
     295              : } QueueBackendStatus;
     296              : 
     297              : /*
     298              :  * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
     299              :  *
     300              :  * The AsyncQueueControl structure is protected by the NotifyQueueLock and
     301              :  * NotifyQueueTailLock.
     302              :  *
     303              :  * When holding NotifyQueueLock in SHARED mode, backends may only inspect
     304              :  * their own entries as well as the head and tail pointers. Consequently we
     305              :  * can allow a backend to update its own record while holding only SHARED lock
     306              :  * (since no other backend will inspect it).
     307              :  *
     308              :  * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
     309              :  * entries of other backends and also change the head pointer. They can
     310              :  * also advance other backends' queue positions, unless the other backend
     311              :  * has isAdvancing set (i.e., is in process of doing that itself).
     312              :  *
     313              :  * When holding both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE
     314              :  * mode, backends can change the tail pointers.
     315              :  *
     316              :  * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
     317              :  * the control lock for the pg_notify SLRU buffers.
     318              :  * In order to avoid deadlocks, whenever we need multiple locks, we first get
     319              :  * NotifyQueueTailLock, then NotifyQueueLock, then SLRU bank lock, and lastly
     320              :  * globalChannelTable partition locks.
     321              :  *
     322              :  * Each backend uses the backend[] array entry with index equal to its
     323              :  * ProcNumber.  We rely on this to make SendProcSignal fast.
     324              :  *
     325              :  * The backend[] array entries for actively-listening backends are threaded
     326              :  * together using firstListener and the nextListener links, so that we can
     327              :  * scan them without having to iterate over inactive entries.  We keep this
     328              :  * list in order by ProcNumber so that the scan is cache-friendly when there
     329              :  * are many active entries.
     330              :  */
     331              : typedef struct AsyncQueueControl
     332              : {
     333              :     QueuePosition head;         /* head points to the next free location */
     334              :     QueuePosition tail;         /* tail must be <= the queue position of every
     335              :                                  * listening backend */
     336              :     int64       stopPage;       /* oldest unrecycled page; must be <=
     337              :                                  * tail.page */
     338              :     ProcNumber  firstListener;  /* id of first listener, or
     339              :                                  * INVALID_PROC_NUMBER */
     340              :     TimestampTz lastQueueFillWarn;  /* time of last queue-full msg */
     341              :     dsa_handle  globalChannelTableDSA;  /* global channel table's DSA handle */
     342              :     dshash_table_handle globalChannelTableDSH;  /* and its dshash handle */
     343              :     /* Array with room for MaxBackends entries: */
     344              :     QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
     345              : } AsyncQueueControl;
     346              : 
     347              : static AsyncQueueControl *asyncQueueControl;
     348              : 
     349              : static void AsyncShmemRequest(void *arg);
     350              : static void AsyncShmemInit(void *arg);
     351              : 
     352              : const ShmemCallbacks AsyncShmemCallbacks = {
     353              :     .request_fn = AsyncShmemRequest,
     354              :     .init_fn = AsyncShmemInit,
     355              : };
     356              : 
     357              : 
     358              : #define QUEUE_HEAD                  (asyncQueueControl->head)
     359              : #define QUEUE_TAIL                  (asyncQueueControl->tail)
     360              : #define QUEUE_STOP_PAGE             (asyncQueueControl->stopPage)
     361              : #define QUEUE_FIRST_LISTENER        (asyncQueueControl->firstListener)
     362              : #define QUEUE_BACKEND_PID(i)        (asyncQueueControl->backend[i].pid)
     363              : #define QUEUE_BACKEND_DBOID(i)      (asyncQueueControl->backend[i].dboid)
     364              : #define QUEUE_NEXT_LISTENER(i)      (asyncQueueControl->backend[i].nextListener)
     365              : #define QUEUE_BACKEND_POS(i)        (asyncQueueControl->backend[i].pos)
     366              : #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
     367              : #define QUEUE_BACKEND_IS_ADVANCING(i)   (asyncQueueControl->backend[i].isAdvancing)
     368              : 
     369              : /*
     370              :  * The SLRU buffer area through which we access the notification queue
     371              :  */
     372              : static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
     373              : static int  asyncQueueErrdetailForIoError(const void *opaque_data);
     374              : 
     375              : static SlruDesc NotifySlruDesc;
     376              : 
     377              : 
     378              : #define NotifyCtl                   (&NotifySlruDesc)
     379              : #define QUEUE_PAGESIZE              BLCKSZ
     380              : 
     381              : #define QUEUE_FULL_WARN_INTERVAL    5000    /* warn at most once every 5s */
     382              : 
     383              : /*
     384              :  * Global channel table definitions
     385              :  *
     386              :  * This hash table maps (database OID, channel name) keys to arrays of
     387              :  * ProcNumbers representing the backends listening or about to listen
     388              :  * on each channel.  The removeOnAbort flags allow us to create hash table
     389              :  * entries pre-commit and not have to assume that creating them post-commit
     390              :  * will succeed.
     391              :  */
     392              : #define INITIAL_LISTENERS_ARRAY_SIZE 4
     393              : 
     394              : typedef struct GlobalChannelKey
     395              : {
     396              :     Oid         dboid;
     397              :     char        channel[NAMEDATALEN];
     398              : } GlobalChannelKey;
     399              : 
     400              : typedef struct ListenerEntry
     401              : {
     402              :     ProcNumber  procNo;         /* listener's ProcNumber */
     403              :     bool        removeOnAbort;  /* remove entry if current xact aborts */
     404              : } ListenerEntry;
     405              : 
     406              : typedef struct GlobalChannelEntry
     407              : {
     408              :     GlobalChannelKey key;       /* hash key */
     409              :     dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */
     410              :     int         numListeners;   /* Number of listeners currently stored */
     411              :     int         allocatedListeners; /* Allocated size of array */
     412              : } GlobalChannelEntry;
     413              : 
     414              : static dshash_table *globalChannelTable = NULL;
     415              : static dsa_area *globalChannelDSA = NULL;
     416              : 
     417              : /*
     418              :  * localChannelTable caches the channel names this backend is listening on
     419              :  * (including those we have staged to be listened on, but not yet committed).
     420              :  * Used by IsListeningOn() for fast lookups when reading notifications.
     421              :  */
     422              : static HTAB *localChannelTable = NULL;
     423              : 
     424              : /* We test this condition to detect that we're not listening at all */
     425              : #define LocalChannelTableIsEmpty() \
     426              :     (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
     427              : 
     428              : /*
     429              :  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
     430              :  * all actions requested in the current transaction.  As explained above,
     431              :  * we don't actually change listen state until we reach transaction commit.
     432              :  *
     433              :  * The list is kept in CurTransactionContext.  In subtransactions, each
     434              :  * subtransaction has its own list in its own CurTransactionContext, but
     435              :  * successful subtransactions attach their lists to their parent's list.
     436              :  * Failed subtransactions simply discard their lists.
     437              :  */
     438              : typedef enum
     439              : {
     440              :     LISTEN_LISTEN,
     441              :     LISTEN_UNLISTEN,
     442              :     LISTEN_UNLISTEN_ALL,
     443              : } ListenActionKind;
     444              : 
     445              : typedef struct
     446              : {
     447              :     ListenActionKind action;
     448              :     char        channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
     449              : } ListenAction;
     450              : 
     451              : typedef struct ActionList
     452              : {
     453              :     int         nestingLevel;   /* current transaction nesting depth */
     454              :     List       *actions;        /* list of ListenAction structs */
     455              :     struct ActionList *upper;   /* details for upper transaction levels */
     456              : } ActionList;
     457              : 
     458              : static ActionList *pendingActions = NULL;
     459              : 
     460              : /*
     461              :  * Hash table recording the final listen/unlisten intent per channel for
     462              :  * the current transaction.  Key is channel name, value is PENDING_LISTEN or
     463              :  * PENDING_UNLISTEN.  This keeps critical commit/abort processing to one step
     464              :  * per channel instead of replaying every action.  This is built from the
     465              :  * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
     466              :  * AtAbort_Notify.
     467              :  */
     468              : typedef enum
     469              : {
     470              :     PENDING_LISTEN,
     471              :     PENDING_UNLISTEN,
     472              : } PendingListenAction;
     473              : 
     474              : typedef struct PendingListenEntry
     475              : {
     476              :     char        channel[NAMEDATALEN];   /* hash key */
     477              :     PendingListenAction action; /* which action should we perform? */
     478              : } PendingListenEntry;
     479              : 
     480              : static HTAB *pendingListenActions = NULL;
     481              : 
     482              : /*
     483              :  * State for outbound notifies consists of a list of all channels+payloads
     484              :  * NOTIFYed in the current transaction.  We do not actually perform a NOTIFY
     485              :  * until and unless the transaction commits.  pendingNotifies is NULL if no
     486              :  * NOTIFYs have been done in the current (sub) transaction.
     487              :  *
     488              :  * We discard duplicate notify events issued in the same transaction.
     489              :  * Hence, in addition to the list proper (which we need to track the order
     490              :  * of the events, since we guarantee to deliver them in order), we build a
     491              :  * hash table which we can probe to detect duplicates.  Since building the
     492              :  * hash table is somewhat expensive, we do so only once we have at least
     493              :  * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
     494              :  * before that we just scan the events linearly.
     495              :  *
     496              :  * The list is kept in CurTransactionContext.  In subtransactions, each
     497              :  * subtransaction has its own list in its own CurTransactionContext, but
     498              :  * successful subtransactions add their entries to their parent's list.
     499              :  * Failed subtransactions simply discard their lists.  Since these lists
     500              :  * are independent, there may be notify events in a subtransaction's list
     501              :  * that duplicate events in some ancestor (sub) transaction; we get rid of
     502              :  * the dups when merging the subtransaction's list into its parent's.
     503              :  *
     504              :  * Note: the action and notify lists do not interact within a transaction.
     505              :  * In particular, if a transaction does NOTIFY and then LISTEN on the same
     506              :  * condition name, it will get a self-notify at commit.  This is a bit odd
     507              :  * but is consistent with our historical behavior.
     508              :  */
     509              : typedef struct Notification
     510              : {
     511              :     uint16      channel_len;    /* length of channel-name string */
     512              :     uint16      payload_len;    /* length of payload string */
     513              :     /* null-terminated channel name, then null-terminated payload follow */
     514              :     char        data[FLEXIBLE_ARRAY_MEMBER];
     515              : } Notification;
     516              : 
     517              : typedef struct NotificationList
     518              : {
     519              :     int         nestingLevel;   /* current transaction nesting depth */
     520              :     List       *events;         /* list of Notification structs */
     521              :     HTAB       *hashtab;        /* hash of NotificationHash structs, or NULL */
     522              :     List       *uniqueChannelNames; /* unique channel names being notified */
     523              :     HTAB       *uniqueChannelHash;  /* hash of unique channel names, or NULL */
     524              :     struct NotificationList *upper; /* details for upper transaction levels */
     525              : } NotificationList;
     526              : 
     527              : #define MIN_HASHABLE_NOTIFIES 16    /* threshold to build hashtab */
     528              : 
     529              : struct NotificationHash
     530              : {
     531              :     Notification *event;        /* => the actual Notification struct */
     532              : };
     533              : 
     534              : static NotificationList *pendingNotifies = NULL;
     535              : 
     536              : /*
     537              :  * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
     538              :  * (both just carry the channel name, with no payload).
     539              :  */
     540              : typedef struct ChannelName
     541              : {
     542              :     char        channel[NAMEDATALEN];   /* hash key */
     543              : } ChannelName;
     544              : 
     545              : /*
     546              :  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
     547              :  * called from inside a signal handler. That just sets the
     548              :  * notifyInterruptPending flag and sets the process
     549              :  * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
     550              :  * actually deal with the interrupt.
     551              :  */
     552              : volatile sig_atomic_t notifyInterruptPending = false;
     553              : 
     554              : /* True if we've registered an on_shmem_exit cleanup */
     555              : static bool unlistenExitRegistered = false;
     556              : 
     557              : /* True if we're currently registered as a listener in asyncQueueControl */
     558              : static bool amRegisteredListener = false;
     559              : 
     560              : /*
     561              :  * Queue head positions for direct advancement.
     562              :  * These are captured during PreCommit_Notify while holding the heavyweight
     563              :  * lock on database 0, ensuring no other backend can insert notifications
     564              :  * between them.  SignalBackends uses these to advance idle backends.
     565              :  */
     566              : static QueuePosition queueHeadBeforeWrite;
     567              : static QueuePosition queueHeadAfterWrite;
     568              : 
     569              : /*
     570              :  * Workspace arrays for SignalBackends.  These are preallocated in
     571              :  * PreCommit_Notify to avoid needing memory allocation after committing to
     572              :  * clog.
     573              :  */
     574              : static int32 *signalPids = NULL;
     575              : static ProcNumber *signalProcnos = NULL;
     576              : 
     577              : /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
     578              : static bool tryAdvanceTail = false;
     579              : 
     580              : /* GUC parameters */
     581              : bool        Trace_notify = false;
     582              : 
     583              : /* For 8 KB pages this gives 8 GB of disk space */
     584              : int         max_notify_queue_pages = 1048576;
     585              : 
     586              : /* local function prototypes */
     587              : static inline int64 asyncQueuePageDiff(int64 p, int64 q);
     588              : static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
     589              :                                         const char *channel);
     590              : static dshash_hash globalChannelTableHash(const void *key, size_t size,
     591              :                                           void *arg);
     592              : static void initGlobalChannelTable(void);
     593              : static void initLocalChannelTable(void);
     594              : static void queue_listen(ListenActionKind action, const char *channel);
     595              : static void Async_UnlistenOnExit(int code, Datum arg);
     596              : static void BecomeRegisteredListener(void);
     597              : static void PrepareTableEntriesForListen(const char *channel);
     598              : static void PrepareTableEntriesForUnlisten(const char *channel);
     599              : static void PrepareTableEntriesForUnlistenAll(void);
     600              : static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
     601              :                                       ListenerEntry *listeners,
     602              :                                       int idx);
     603              : static void ApplyPendingListenActions(bool isCommit);
     604              : static void CleanupListenersOnExit(void);
     605              : static bool IsListeningOn(const char *channel);
     606              : static void asyncQueueUnregister(void);
     607              : static bool asyncQueueIsFull(void);
     608              : static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
     609              : static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
     610              : static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
     611              : static double asyncQueueUsage(void);
     612              : static void asyncQueueFillWarning(void);
     613              : static void SignalBackends(void);
     614              : static void asyncQueueReadAllNotifications(void);
     615              : static bool asyncQueueProcessPageEntries(QueuePosition *current,
     616              :                                          QueuePosition stop,
     617              :                                          Snapshot snapshot);
     618              : static void asyncQueueAdvanceTail(void);
     619              : static void ProcessIncomingNotify(bool flush);
     620              : static bool AsyncExistsPendingNotify(Notification *n);
     621              : static void AddEventToPendingNotifies(Notification *n);
     622              : static uint32 notification_hash(const void *key, Size keysize);
     623              : static int  notification_match(const void *key1, const void *key2, Size keysize);
     624              : static void ClearPendingActionsAndNotifies(void);
     625              : 
     626              : static int
     627            0 : asyncQueueErrdetailForIoError(const void *opaque_data)
     628              : {
     629            0 :     const QueuePosition *position = opaque_data;
     630              : 
     631            0 :     return errdetail("Could not access async queue at page %" PRId64 ", offset %d.",
     632            0 :                      position->page, position->offset);
     633              : }
     634              : 
     635              : /*
     636              :  * Compute the difference between two queue page numbers.
     637              :  * Previously this function accounted for a wraparound.
     638              :  */
     639              : static inline int64
     640            0 : asyncQueuePageDiff(int64 p, int64 q)
     641              : {
     642            0 :     return p - q;
     643              : }
     644              : 
     645              : /*
     646              :  * Determines whether p precedes q.
     647              :  * Previously this function accounted for a wraparound.
     648              :  */
     649              : static inline bool
     650          120 : asyncQueuePagePrecedes(int64 p, int64 q)
     651              : {
     652          120 :     return p < q;
     653              : }
     654              : 
     655              : /*
     656              :  * GlobalChannelKeyInit
     657              :  *      Prepare a global channel table key for hashing.
     658              :  */
     659              : static inline void
     660          203 : GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
     661              : {
     662          203 :     memset(key, 0, sizeof(GlobalChannelKey));
     663          203 :     key->dboid = dboid;
     664          203 :     strlcpy(key->channel, channel, NAMEDATALEN);
     665          203 : }
     666              : 
     667              : /*
     668              :  * globalChannelTableHash
     669              :  *      Hash function for global channel table keys.
     670              :  */
     671              : static dshash_hash
     672          203 : globalChannelTableHash(const void *key, size_t size, void *arg)
     673              : {
     674          203 :     const GlobalChannelKey *k = (const GlobalChannelKey *) key;
     675              :     dshash_hash h;
     676              : 
     677          203 :     h = DatumGetUInt32(hash_uint32(k->dboid));
     678          203 :     h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
     679          203 :                                  strnlen(k->channel, NAMEDATALEN)));
     680              : 
     681          203 :     return h;
     682              : }
     683              : 
     684              : /* parameters for the global channel table */
     685              : static const dshash_parameters globalChannelTableDSHParams = {
     686              :     sizeof(GlobalChannelKey),
     687              :     sizeof(GlobalChannelEntry),
     688              :     dshash_memcmp,
     689              :     globalChannelTableHash,
     690              :     dshash_memcpy,
     691              :     LWTRANCHE_NOTIFY_CHANNEL_HASH
     692              : };
     693              : 
     694              : /*
     695              :  * initGlobalChannelTable
     696              :  *      Lazy initialization of the global channel table.
     697              :  */
     698              : static void
     699          151 : initGlobalChannelTable(void)
     700              : {
     701              :     MemoryContext oldcontext;
     702              : 
     703              :     /* Quick exit if we already did this */
     704          151 :     if (asyncQueueControl->globalChannelTableDSH != DSHASH_HANDLE_INVALID &&
     705          143 :         globalChannelTable != NULL)
     706          121 :         return;
     707              : 
     708              :     /* Otherwise, use a lock to ensure only one process creates the table */
     709           30 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
     710              : 
     711              :     /* Be sure any local memory allocated by DSA routines is persistent */
     712           30 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
     713              : 
     714           30 :     if (asyncQueueControl->globalChannelTableDSH == DSHASH_HANDLE_INVALID)
     715              :     {
     716              :         /* Initialize dynamic shared hash table for global channels */
     717            8 :         globalChannelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH);
     718            8 :         dsa_pin(globalChannelDSA);
     719            8 :         dsa_pin_mapping(globalChannelDSA);
     720            8 :         globalChannelTable = dshash_create(globalChannelDSA,
     721              :                                            &globalChannelTableDSHParams,
     722              :                                            NULL);
     723              : 
     724              :         /* Store handles in shared memory for other backends to use */
     725            8 :         asyncQueueControl->globalChannelTableDSA = dsa_get_handle(globalChannelDSA);
     726            8 :         asyncQueueControl->globalChannelTableDSH =
     727            8 :             dshash_get_hash_table_handle(globalChannelTable);
     728              :     }
     729           22 :     else if (!globalChannelTable)
     730              :     {
     731              :         /* Attach to existing dynamic shared hash table */
     732           22 :         globalChannelDSA = dsa_attach(asyncQueueControl->globalChannelTableDSA);
     733           22 :         dsa_pin_mapping(globalChannelDSA);
     734           22 :         globalChannelTable = dshash_attach(globalChannelDSA,
     735              :                                            &globalChannelTableDSHParams,
     736           22 :                                            asyncQueueControl->globalChannelTableDSH,
     737              :                                            NULL);
     738              :     }
     739              : 
     740           30 :     MemoryContextSwitchTo(oldcontext);
     741           30 :     LWLockRelease(NotifyQueueLock);
     742              : }
     743              : 
     744              : /*
     745              :  * initLocalChannelTable
     746              :  *      Lazy initialization of the local channel table.
     747              :  *      Once created, this table lasts for the life of the session.
     748              :  */
     749              : static void
     750           82 : initLocalChannelTable(void)
     751              : {
     752              :     HASHCTL     hash_ctl;
     753              : 
     754              :     /* Quick exit if we already did this */
     755           82 :     if (localChannelTable != NULL)
     756           64 :         return;
     757              : 
     758              :     /* Initialize local hash table for this backend's listened channels */
     759           18 :     hash_ctl.keysize = NAMEDATALEN;
     760           18 :     hash_ctl.entrysize = sizeof(ChannelName);
     761              : 
     762           18 :     localChannelTable =
     763           18 :         hash_create("Local Listen Channels",
     764              :                     64,
     765              :                     &hash_ctl,
     766              :                     HASH_ELEM | HASH_STRINGS);
     767              : }
     768              : 
     769              : /*
     770              :  * initPendingListenActions
     771              :  *      Lazy initialization of the pending listen actions hash table.
     772              :  *      This is allocated in CurTransactionContext during PreCommit_Notify,
     773              :  *      and destroyed at transaction end.
     774              :  */
     775              : static void
     776           82 : initPendingListenActions(void)
     777              : {
     778              :     HASHCTL     hash_ctl;
     779              : 
     780           82 :     if (pendingListenActions != NULL)
     781            0 :         return;
     782              : 
     783           82 :     hash_ctl.keysize = NAMEDATALEN;
     784           82 :     hash_ctl.entrysize = sizeof(PendingListenEntry);
     785           82 :     hash_ctl.hcxt = CurTransactionContext;
     786              : 
     787           82 :     pendingListenActions =
     788           82 :         hash_create("Pending Listen Actions",
     789           82 :                     list_length(pendingActions->actions),
     790              :                     &hash_ctl,
     791              :                     HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
     792              : }
     793              : 
     794              : /*
     795              :  * Register our shared memory needs
     796              :  */
     797              : static void
     798         1255 : AsyncShmemRequest(void *arg)
     799              : {
     800              :     Size        size;
     801              : 
     802         1255 :     size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
     803         1255 :     size = add_size(size, offsetof(AsyncQueueControl, backend));
     804              : 
     805         1255 :     ShmemRequestStruct(.name = "Async Queue Control",
     806              :                        .size = size,
     807              :                        .ptr = (void **) &asyncQueueControl,
     808              :         );
     809              : 
     810         1255 :     SimpleLruRequest(.desc = &NotifySlruDesc,
     811              :                      .name = "notify",
     812              :                      .Dir = "pg_notify",
     813              : 
     814              :     /* long segment names are used in order to avoid wraparound */
     815              :                      .long_segment_names = true,
     816              : 
     817              :                      .nslots = notify_buffers,
     818              : 
     819              :                      .sync_handler = SYNC_HANDLER_NONE,
     820              :                      .PagePrecedes = asyncQueuePagePrecedes,
     821              :                      .errdetail_for_io_error = asyncQueueErrdetailForIoError,
     822              : 
     823              :                      .buffer_tranche_id = LWTRANCHE_NOTIFY_BUFFER,
     824              :                      .bank_tranche_id = LWTRANCHE_NOTIFY_SLRU,
     825              :         );
     826         1255 : }
     827              : 
     828              : static void
     829         1252 : AsyncShmemInit(void *arg)
     830              : {
     831         1252 :     SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
     832         1252 :     SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
     833         1252 :     QUEUE_STOP_PAGE = 0;
     834         1252 :     QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
     835         1252 :     asyncQueueControl->lastQueueFillWarn = 0;
     836         1252 :     asyncQueueControl->globalChannelTableDSA = DSA_HANDLE_INVALID;
     837         1252 :     asyncQueueControl->globalChannelTableDSH = DSHASH_HANDLE_INVALID;
     838       117894 :     for (int i = 0; i < MaxBackends; i++)
     839              :     {
     840       116642 :         QUEUE_BACKEND_PID(i) = InvalidPid;
     841       116642 :         QUEUE_BACKEND_DBOID(i) = InvalidOid;
     842       116642 :         QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
     843       116642 :         SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
     844       116642 :         QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
     845       116642 :         QUEUE_BACKEND_IS_ADVANCING(i) = false;
     846              :     }
     847              : 
     848              :     /*
     849              :      * During start or reboot, clean out the pg_notify directory.
     850              :      */
     851         1252 :     (void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
     852         1252 : }
     853              : 
     854              : 
     855              : /*
     856              :  * pg_notify -
     857              :  *    SQL function to send a notification event
     858              :  */
     859              : Datum
     860         1078 : pg_notify(PG_FUNCTION_ARGS)
     861              : {
     862              :     const char *channel;
     863              :     const char *payload;
     864              : 
     865         1078 :     if (PG_ARGISNULL(0))
     866            4 :         channel = "";
     867              :     else
     868         1074 :         channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
     869              : 
     870         1078 :     if (PG_ARGISNULL(1))
     871            7 :         payload = "";
     872              :     else
     873         1071 :         payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
     874              : 
     875              :     /* For NOTIFY as a statement, this is checked in ProcessUtility */
     876         1078 :     PreventCommandDuringRecovery("NOTIFY");
     877              : 
     878         1078 :     Async_Notify(channel, payload);
     879              : 
     880         1066 :     PG_RETURN_VOID();
     881              : }
     882              : 
     883              : 
     884              : /*
     885              :  * Async_Notify
     886              :  *
     887              :  *      This is executed by the SQL notify command.
     888              :  *
     889              :  *      Adds the message to the list of pending notifies.
     890              :  *      Actual notification happens during transaction commit.
     891              :  *      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     892              :  */
     893              : void
     894         1145 : Async_Notify(const char *channel, const char *payload)
     895              : {
     896         1145 :     int         my_level = GetCurrentTransactionNestLevel();
     897              :     size_t      channel_len;
     898              :     size_t      payload_len;
     899              :     Notification *n;
     900              :     MemoryContext oldcontext;
     901              : 
     902         1145 :     if (IsParallelWorker())
     903            0 :         elog(ERROR, "cannot send notifications from a parallel worker");
     904              : 
     905         1145 :     if (Trace_notify)
     906            0 :         elog(DEBUG1, "Async_Notify(%s)", channel);
     907              : 
     908         1145 :     channel_len = channel ? strlen(channel) : 0;
     909         1145 :     payload_len = payload ? strlen(payload) : 0;
     910              : 
     911              :     /* a channel name must be specified */
     912         1145 :     if (channel_len == 0)
     913            8 :         ereport(ERROR,
     914              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     915              :                  errmsg("channel name cannot be empty")));
     916              : 
     917              :     /* enforce length limits */
     918         1137 :     if (channel_len >= NAMEDATALEN)
     919            4 :         ereport(ERROR,
     920              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     921              :                  errmsg("channel name too long")));
     922              : 
     923         1133 :     if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
     924            0 :         ereport(ERROR,
     925              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     926              :                  errmsg("payload string too long")));
     927              : 
     928              :     /*
     929              :      * We must construct the Notification entry, even if we end up not using
     930              :      * it, in order to compare it cheaply to existing list entries.
     931              :      *
     932              :      * The notification list needs to live until end of transaction, so store
     933              :      * it in the transaction context.
     934              :      */
     935         1133 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     936              : 
     937         1133 :     n = (Notification *) palloc(offsetof(Notification, data) +
     938         1133 :                                 channel_len + payload_len + 2);
     939         1133 :     n->channel_len = channel_len;
     940         1133 :     n->payload_len = payload_len;
     941         1133 :     strcpy(n->data, channel);
     942         1133 :     if (payload)
     943         1118 :         strcpy(n->data + channel_len + 1, payload);
     944              :     else
     945           15 :         n->data[channel_len + 1] = '\0';
     946              : 
     947         1133 :     if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
     948           71 :     {
     949              :         NotificationList *notifies;
     950              : 
     951              :         /*
     952              :          * First notify event in current (sub)xact. Note that we allocate the
     953              :          * NotificationList in TopTransactionContext; the nestingLevel might
     954              :          * get changed later by AtSubCommit_Notify.
     955              :          */
     956              :         notifies = (NotificationList *)
     957           71 :             MemoryContextAlloc(TopTransactionContext,
     958              :                                sizeof(NotificationList));
     959           71 :         notifies->nestingLevel = my_level;
     960           71 :         notifies->events = list_make1(n);
     961              :         /* We certainly don't need a hashtable yet */
     962           71 :         notifies->hashtab = NULL;
     963              :         /* We won't build uniqueChannelNames/Hash till later, either */
     964           71 :         notifies->uniqueChannelNames = NIL;
     965           71 :         notifies->uniqueChannelHash = NULL;
     966           71 :         notifies->upper = pendingNotifies;
     967           71 :         pendingNotifies = notifies;
     968              :     }
     969              :     else
     970              :     {
     971              :         /* Now check for duplicates */
     972         1062 :         if (AsyncExistsPendingNotify(n))
     973              :         {
     974              :             /* It's a dup, so forget it */
     975           13 :             pfree(n);
     976           13 :             MemoryContextSwitchTo(oldcontext);
     977           13 :             return;
     978              :         }
     979              : 
     980              :         /* Append more events to existing list */
     981         1049 :         AddEventToPendingNotifies(n);
     982              :     }
     983              : 
     984         1120 :     MemoryContextSwitchTo(oldcontext);
     985              : }
     986              : 
     987              : /*
     988              :  * queue_listen
     989              :  *      Common code for listen, unlisten, unlisten all commands.
     990              :  *
     991              :  *      Adds the request to the list of pending actions.
     992              :  *      Actual update of localChannelTable and globalChannelTable happens during
     993              :  *      PreCommit_Notify, with staged changes committed in AtCommit_Notify.
     994              :  */
     995              : static void
     996           98 : queue_listen(ListenActionKind action, const char *channel)
     997              : {
     998              :     MemoryContext oldcontext;
     999              :     ListenAction *actrec;
    1000           98 :     int         my_level = GetCurrentTransactionNestLevel();
    1001              : 
    1002              :     /*
    1003              :      * Unlike Async_Notify, we don't try to collapse out duplicates here. We
    1004              :      * keep the ordered list to preserve interactions like UNLISTEN ALL; the
    1005              :      * final per-channel intent is computed during PreCommit_Notify.
    1006              :      */
    1007           98 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
    1008              : 
    1009              :     /* space for terminating null is included in sizeof(ListenAction) */
    1010           98 :     actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
    1011           98 :                                      strlen(channel) + 1);
    1012           98 :     actrec->action = action;
    1013           98 :     strcpy(actrec->channel, channel);
    1014              : 
    1015           98 :     if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
    1016           85 :     {
    1017              :         ActionList *actions;
    1018              : 
    1019              :         /*
    1020              :          * First action in current sub(xact). Note that we allocate the
    1021              :          * ActionList in TopTransactionContext; the nestingLevel might get
    1022              :          * changed later by AtSubCommit_Notify.
    1023              :          */
    1024              :         actions = (ActionList *)
    1025           85 :             MemoryContextAlloc(TopTransactionContext, sizeof(ActionList));
    1026           85 :         actions->nestingLevel = my_level;
    1027           85 :         actions->actions = list_make1(actrec);
    1028           85 :         actions->upper = pendingActions;
    1029           85 :         pendingActions = actions;
    1030              :     }
    1031              :     else
    1032           13 :         pendingActions->actions = lappend(pendingActions->actions, actrec);
    1033              : 
    1034           98 :     MemoryContextSwitchTo(oldcontext);
    1035           98 : }
    1036              : 
    1037              : /*
    1038              :  * Async_Listen
    1039              :  *
    1040              :  *      This is executed by the SQL listen command.
    1041              :  */
    1042              : void
    1043           52 : Async_Listen(const char *channel)
    1044              : {
    1045           52 :     if (Trace_notify)
    1046            0 :         elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
    1047              : 
    1048           52 :     queue_listen(LISTEN_LISTEN, channel);
    1049           52 : }
    1050              : 
    1051              : /*
    1052              :  * Async_Unlisten
    1053              :  *
    1054              :  *      This is executed by the SQL unlisten command.
    1055              :  */
    1056              : void
    1057            4 : Async_Unlisten(const char *channel)
    1058              : {
    1059            4 :     if (Trace_notify)
    1060            0 :         elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
    1061              : 
    1062              :     /* If we couldn't possibly be listening, no need to queue anything */
    1063            4 :     if (pendingActions == NULL && !unlistenExitRegistered)
    1064            0 :         return;
    1065              : 
    1066            4 :     queue_listen(LISTEN_UNLISTEN, channel);
    1067              : }
    1068              : 
    1069              : /*
    1070              :  * Async_UnlistenAll
    1071              :  *
    1072              :  *      This is invoked by UNLISTEN * command, and also at backend exit.
    1073              :  */
    1074              : void
    1075           62 : Async_UnlistenAll(void)
    1076              : {
    1077           62 :     if (Trace_notify)
    1078            0 :         elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
    1079              : 
    1080              :     /* If we couldn't possibly be listening, no need to queue anything */
    1081           62 :     if (pendingActions == NULL && !unlistenExitRegistered)
    1082           20 :         return;
    1083              : 
    1084           42 :     queue_listen(LISTEN_UNLISTEN_ALL, "");
    1085              : }
    1086              : 
    1087              : /*
    1088              :  * SQL function: return a set of the channel names this backend is actively
    1089              :  * listening to.
    1090              :  *
    1091              :  * Note: this coding relies on the fact that the localChannelTable cannot
    1092              :  * change within a transaction.
    1093              :  */
    1094              : Datum
    1095           12 : pg_listening_channels(PG_FUNCTION_ARGS)
    1096              : {
    1097              :     FuncCallContext *funcctx;
    1098              :     HASH_SEQ_STATUS *status;
    1099              : 
    1100              :     /* stuff done only on the first call of the function */
    1101           12 :     if (SRF_IS_FIRSTCALL())
    1102              :     {
    1103              :         /* create a function context for cross-call persistence */
    1104            8 :         funcctx = SRF_FIRSTCALL_INIT();
    1105              : 
    1106              :         /* Initialize hash table iteration if we have any channels */
    1107            8 :         if (localChannelTable != NULL)
    1108              :         {
    1109              :             MemoryContext oldcontext;
    1110              : 
    1111            8 :             oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1112            8 :             status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
    1113            8 :             hash_seq_init(status, localChannelTable);
    1114            8 :             funcctx->user_fctx = status;
    1115            8 :             MemoryContextSwitchTo(oldcontext);
    1116              :         }
    1117              :         else
    1118              :         {
    1119            0 :             funcctx->user_fctx = NULL;
    1120              :         }
    1121              :     }
    1122              : 
    1123              :     /* stuff done on every call of the function */
    1124           12 :     funcctx = SRF_PERCALL_SETUP();
    1125           12 :     status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
    1126              : 
    1127           12 :     if (status != NULL)
    1128              :     {
    1129              :         ChannelName *entry;
    1130              : 
    1131           12 :         entry = (ChannelName *) hash_seq_search(status);
    1132           12 :         if (entry != NULL)
    1133            4 :             SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel));
    1134              :     }
    1135              : 
    1136            8 :     SRF_RETURN_DONE(funcctx);
    1137              : }
    1138              : 
    1139              : /*
    1140              :  * Async_UnlistenOnExit
    1141              :  *
    1142              :  * This is executed at backend exit if we have done any LISTENs in this
    1143              :  * backend.  It might not be necessary anymore, if the user UNLISTENed
    1144              :  * everything, but we don't try to detect that case.
    1145              :  */
    1146              : static void
    1147           18 : Async_UnlistenOnExit(int code, Datum arg)
    1148              : {
    1149           18 :     CleanupListenersOnExit();
    1150           18 :     asyncQueueUnregister();
    1151           18 : }
    1152              : 
    1153              : /*
    1154              :  * AtPrepare_Notify
    1155              :  *
    1156              :  *      This is called at the prepare phase of a two-phase
    1157              :  *      transaction.  Save the state for possible commit later.
    1158              :  */
    1159              : void
    1160          312 : AtPrepare_Notify(void)
    1161              : {
    1162              :     /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
    1163          312 :     if (pendingActions || pendingNotifies)
    1164            0 :         ereport(ERROR,
    1165              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1166              :                  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
    1167          312 : }
    1168              : 
    1169              : /*
    1170              :  * PreCommit_Notify
    1171              :  *
    1172              :  *      This is called at transaction commit, before actually committing to
    1173              :  *      clog.
    1174              :  *
    1175              :  *      If there are pending LISTEN actions, make sure we are listed in the
    1176              :  *      shared-memory listener array.  This must happen before commit to
    1177              :  *      ensure we don't miss any notifies from transactions that commit
    1178              :  *      just after ours.
    1179              :  *
    1180              :  *      If there are outbound notify requests in the pendingNotifies list,
    1181              :  *      add them to the global queue.  We do that before commit so that
    1182              :  *      we can still throw error if we run out of queue space.
    1183              :  */
    1184              : void
    1185       612594 : PreCommit_Notify(void)
    1186              : {
    1187              :     ListCell   *p;
    1188              : 
    1189       612594 :     if (!pendingActions && !pendingNotifies)
    1190       612443 :         return;                 /* no relevant statements in this xact */
    1191              : 
    1192          151 :     if (Trace_notify)
    1193            0 :         elog(DEBUG1, "PreCommit_Notify");
    1194              : 
    1195              :     /* Preflight for any pending listen/unlisten actions */
    1196          151 :     initGlobalChannelTable();
    1197              : 
    1198          151 :     if (pendingActions != NULL)
    1199              :     {
    1200              :         /* Ensure we have a local channel table */
    1201           82 :         initLocalChannelTable();
    1202              :         /* Create pendingListenActions hash table for this transaction */
    1203           82 :         initPendingListenActions();
    1204              : 
    1205              :         /* Stage all the actions this transaction wants to perform */
    1206          177 :         foreach(p, pendingActions->actions)
    1207              :         {
    1208           95 :             ListenAction *actrec = (ListenAction *) lfirst(p);
    1209              : 
    1210           95 :             switch (actrec->action)
    1211              :             {
    1212           50 :                 case LISTEN_LISTEN:
    1213           50 :                     BecomeRegisteredListener();
    1214           50 :                     PrepareTableEntriesForListen(actrec->channel);
    1215           50 :                     break;
    1216            4 :                 case LISTEN_UNLISTEN:
    1217            4 :                     PrepareTableEntriesForUnlisten(actrec->channel);
    1218            4 :                     break;
    1219           41 :                 case LISTEN_UNLISTEN_ALL:
    1220           41 :                     PrepareTableEntriesForUnlistenAll();
    1221           41 :                     break;
    1222              :             }
    1223              :         }
    1224              :     }
    1225              : 
    1226              :     /* Queue any pending notifies (must happen after the above) */
    1227          151 :     if (pendingNotifies)
    1228              :     {
    1229              :         ListCell   *nextNotify;
    1230           69 :         bool        firstIteration = true;
    1231              : 
    1232              :         /*
    1233              :          * Build list of unique channel names being notified for use by
    1234              :          * SignalBackends().
    1235              :          *
    1236              :          * If uniqueChannelHash is available, use it to efficiently get the
    1237              :          * unique channels.  Otherwise, fall back to the O(N^2) approach.
    1238              :          */
    1239           69 :         pendingNotifies->uniqueChannelNames = NIL;
    1240           69 :         if (pendingNotifies->uniqueChannelHash != NULL)
    1241              :         {
    1242              :             HASH_SEQ_STATUS status;
    1243              :             ChannelName *channelEntry;
    1244              : 
    1245            2 :             hash_seq_init(&status, pendingNotifies->uniqueChannelHash);
    1246            4 :             while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
    1247            2 :                 pendingNotifies->uniqueChannelNames =
    1248            2 :                     lappend(pendingNotifies->uniqueChannelNames,
    1249            2 :                             channelEntry->channel);
    1250              :         }
    1251              :         else
    1252              :         {
    1253              :             /* O(N^2) approach is better for small number of notifications */
    1254          231 :             foreach_ptr(Notification, n, pendingNotifies->events)
    1255              :             {
    1256           97 :                 char       *channel = n->data;
    1257           97 :                 bool        found = false;
    1258              : 
    1259              :                 /* Name present in list? */
    1260          198 :                 foreach_ptr(char, oldchan, pendingNotifies->uniqueChannelNames)
    1261              :                 {
    1262           31 :                     if (strcmp(oldchan, channel) == 0)
    1263              :                     {
    1264           27 :                         found = true;
    1265           27 :                         break;
    1266              :                     }
    1267              :                 }
    1268              :                 /* Add if not already in list */
    1269           97 :                 if (!found)
    1270           70 :                     pendingNotifies->uniqueChannelNames =
    1271           70 :                         lappend(pendingNotifies->uniqueChannelNames,
    1272              :                                 channel);
    1273              :             }
    1274              :         }
    1275              : 
    1276              :         /* Preallocate workspace that will be needed by SignalBackends() */
    1277           69 :         if (signalPids == NULL)
    1278           23 :             signalPids = MemoryContextAlloc(TopMemoryContext,
    1279              :                                             MaxBackends * sizeof(int32));
    1280              : 
    1281           69 :         if (signalProcnos == NULL)
    1282           23 :             signalProcnos = MemoryContextAlloc(TopMemoryContext,
    1283              :                                                MaxBackends * sizeof(ProcNumber));
    1284              : 
    1285              :         /*
    1286              :          * Make sure that we have an XID assigned to the current transaction.
    1287              :          * GetCurrentTransactionId is cheap if we already have an XID, but not
    1288              :          * so cheap if we don't, and we'd prefer not to do that work while
    1289              :          * holding NotifyQueueLock.
    1290              :          */
    1291           69 :         (void) GetCurrentTransactionId();
    1292              : 
    1293              :         /*
    1294              :          * Serialize writers by acquiring a special lock that we hold till
    1295              :          * after commit.  This ensures that queue entries appear in commit
    1296              :          * order, and in particular that there are never uncommitted queue
    1297              :          * entries ahead of committed ones, so an uncommitted transaction
    1298              :          * can't block delivery of deliverable notifications.
    1299              :          *
    1300              :          * We use a heavyweight lock so that it'll automatically be released
    1301              :          * after either commit or abort.  This also allows deadlocks to be
    1302              :          * detected, though really a deadlock shouldn't be possible here.
    1303              :          *
    1304              :          * The lock is on "database 0", which is pretty ugly but it doesn't
    1305              :          * seem worth inventing a special locktag category just for this.
    1306              :          * (Historical note: before PG 9.0, a similar lock on "database 0" was
    1307              :          * used by the flatfiles mechanism.)
    1308              :          */
    1309           69 :         LockSharedObject(DatabaseRelationId, InvalidOid, 0,
    1310              :                          AccessExclusiveLock);
    1311              : 
    1312              :         /*
    1313              :          * For the direct advancement optimization in SignalBackends(), we
    1314              :          * need to ensure that no other backend can insert queue entries
    1315              :          * between queueHeadBeforeWrite and queueHeadAfterWrite.  The
    1316              :          * heavyweight lock above provides this guarantee, since it serializes
    1317              :          * all writers.
    1318              :          *
    1319              :          * Note: if the heavyweight lock were ever removed for scalability
    1320              :          * reasons, we could achieve the same guarantee by holding
    1321              :          * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
    1322              :          * than releasing and reacquiring it for each page as we do below.
    1323              :          */
    1324              : 
    1325              :         /* Initialize values to a safe default in case list is empty */
    1326           69 :         SET_QUEUE_POS(queueHeadBeforeWrite, 0, 0);
    1327           69 :         SET_QUEUE_POS(queueHeadAfterWrite, 0, 0);
    1328              : 
    1329              :         /* Now push the notifications into the queue */
    1330           69 :         nextNotify = list_head(pendingNotifies->events);
    1331          173 :         while (nextNotify != NULL)
    1332              :         {
    1333              :             /*
    1334              :              * Add the pending notifications to the queue.  We acquire and
    1335              :              * release NotifyQueueLock once per page, which might be overkill
    1336              :              * but it does allow readers to get in while we're doing this.
    1337              :              *
    1338              :              * A full queue is very uncommon and should really not happen,
    1339              :              * given that we have so much space available in the SLRU pages.
    1340              :              * Nevertheless we need to deal with this possibility. Note that
    1341              :              * when we get here we are in the process of committing our
    1342              :              * transaction, but we have not yet committed to clog, so at this
    1343              :              * point in time we can still roll the transaction back.
    1344              :              */
    1345          104 :             LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1346          104 :             if (firstIteration)
    1347              :             {
    1348           69 :                 queueHeadBeforeWrite = QUEUE_HEAD;
    1349           69 :                 firstIteration = false;
    1350              :             }
    1351          104 :             asyncQueueFillWarning();
    1352          104 :             if (asyncQueueIsFull())
    1353            0 :                 ereport(ERROR,
    1354              :                         (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    1355              :                          errmsg("too many notifications in the NOTIFY queue")));
    1356          104 :             nextNotify = asyncQueueAddEntries(nextNotify);
    1357          104 :             queueHeadAfterWrite = QUEUE_HEAD;
    1358          104 :             LWLockRelease(NotifyQueueLock);
    1359              :         }
    1360              : 
    1361              :         /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
    1362              :     }
    1363              : }
    1364              : 
    1365              : /*
    1366              :  * AtCommit_Notify
    1367              :  *
    1368              :  *      This is called at transaction commit, after committing to clog.
    1369              :  *
    1370              :  *      Apply pending listen/unlisten changes and clear transaction-local state.
    1371              :  *
    1372              :  *      If we issued any notifications in the transaction, send signals to
    1373              :  *      listening backends (possibly including ourselves) to process them.
    1374              :  *      Also, if we filled enough queue pages with new notifies, try to
    1375              :  *      advance the queue tail pointer.
    1376              :  */
    1377              : void
    1378       612439 : AtCommit_Notify(void)
    1379              : {
    1380              :     /*
    1381              :      * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
    1382              :      * return as soon as possible
    1383              :      */
    1384       612439 :     if (!pendingActions && !pendingNotifies)
    1385       612288 :         return;
    1386              : 
    1387          151 :     if (Trace_notify)
    1388            0 :         elog(DEBUG1, "AtCommit_Notify");
    1389              : 
    1390              :     /* Apply staged listen/unlisten changes */
    1391          151 :     ApplyPendingListenActions(true);
    1392              : 
    1393              :     /* If no longer listening to anything, get out of listener array */
    1394          151 :     if (amRegisteredListener && LocalChannelTableIsEmpty())
    1395           21 :         asyncQueueUnregister();
    1396              : 
    1397              :     /*
    1398              :      * Send signals to listening backends.  We need do this only if there are
    1399              :      * pending notifies, which were previously added to the shared queue by
    1400              :      * PreCommit_Notify().
    1401              :      */
    1402          151 :     if (pendingNotifies != NULL)
    1403           69 :         SignalBackends();
    1404              : 
    1405              :     /*
    1406              :      * If it's time to try to advance the global tail pointer, do that.
    1407              :      *
    1408              :      * (It might seem odd to do this in the sender, when more than likely the
    1409              :      * listeners won't yet have read the messages we just sent.  However,
    1410              :      * there's less contention if only the sender does it, and there is little
    1411              :      * need for urgency in advancing the global tail.  So this typically will
    1412              :      * be clearing out messages that were sent some time ago.)
    1413              :      */
    1414          151 :     if (tryAdvanceTail)
    1415              :     {
    1416            8 :         tryAdvanceTail = false;
    1417            8 :         asyncQueueAdvanceTail();
    1418              :     }
    1419              : 
    1420              :     /* And clean up */
    1421          151 :     ClearPendingActionsAndNotifies();
    1422              : }
    1423              : 
    1424              : /*
    1425              :  * BecomeRegisteredListener --- subroutine for PreCommit_Notify
    1426              :  *
    1427              :  * This function must make sure we are ready to catch any incoming messages.
    1428              :  */
    1429              : static void
    1430           50 : BecomeRegisteredListener(void)
    1431              : {
    1432              :     QueuePosition head;
    1433              :     QueuePosition max;
    1434              :     ProcNumber  prevListener;
    1435              : 
    1436              :     /*
    1437              :      * Nothing to do if we are already listening to something, nor if we
    1438              :      * already ran this routine in this transaction.
    1439              :      */
    1440           50 :     if (amRegisteredListener)
    1441           23 :         return;
    1442              : 
    1443           27 :     if (Trace_notify)
    1444            0 :         elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
    1445              : 
    1446              :     /*
    1447              :      * Before registering, make sure we will unlisten before dying. (Note:
    1448              :      * this action does not get undone if we abort later.)
    1449              :      */
    1450           27 :     if (!unlistenExitRegistered)
    1451              :     {
    1452           18 :         before_shmem_exit(Async_UnlistenOnExit, 0);
    1453           18 :         unlistenExitRegistered = true;
    1454              :     }
    1455              : 
    1456              :     /*
    1457              :      * This is our first LISTEN, so establish our pointer.
    1458              :      *
    1459              :      * We set our pointer to the global tail pointer and then move it forward
    1460              :      * over already-committed notifications.  This ensures we cannot miss any
    1461              :      * not-yet-committed notifications.  We might get a few more but that
    1462              :      * doesn't hurt.
    1463              :      *
    1464              :      * In some scenarios there might be a lot of committed notifications that
    1465              :      * have not yet been pruned away (because some backend is being lazy about
    1466              :      * reading them).  To reduce our startup time, we can look at other
    1467              :      * backends and adopt the maximum "pos" pointer of any backend that's in
    1468              :      * our database; any notifications it's already advanced over are surely
    1469              :      * committed and need not be re-examined by us.  (We must consider only
    1470              :      * backends connected to our DB, because others will not have bothered to
    1471              :      * check committed-ness of notifications in our DB.)
    1472              :      *
    1473              :      * We need exclusive lock here so we can look at other backends' entries
    1474              :      * and manipulate the list links.
    1475              :      */
    1476           27 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1477           27 :     head = QUEUE_HEAD;
    1478           27 :     max = QUEUE_TAIL;
    1479           27 :     prevListener = INVALID_PROC_NUMBER;
    1480           29 :     for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    1481              :     {
    1482            2 :         if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
    1483            2 :             max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
    1484              :         /* Also find last listening backend before this one */
    1485            2 :         if (i < MyProcNumber)
    1486            1 :             prevListener = i;
    1487              :     }
    1488           27 :     QUEUE_BACKEND_POS(MyProcNumber) = max;
    1489           27 :     QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
    1490           27 :     QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
    1491           27 :     QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
    1492           27 :     QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
    1493              :     /* Insert backend into list of listeners at correct position */
    1494           27 :     if (prevListener != INVALID_PROC_NUMBER)
    1495              :     {
    1496            1 :         QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
    1497            1 :         QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
    1498              :     }
    1499              :     else
    1500              :     {
    1501           26 :         QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
    1502           26 :         QUEUE_FIRST_LISTENER = MyProcNumber;
    1503              :     }
    1504           27 :     LWLockRelease(NotifyQueueLock);
    1505              : 
    1506              :     /* Now we are listed in the global array, so remember we're listening */
    1507           27 :     amRegisteredListener = true;
    1508              : 
    1509              :     /*
    1510              :      * Try to move our pointer forward as far as possible.  This will skip
    1511              :      * over already-committed notifications, which we want to do because they
    1512              :      * might be quite stale.  Note that we are not yet listening on anything,
    1513              :      * so we won't deliver such notifications to our frontend.  Also, although
    1514              :      * our transaction might have executed NOTIFY, those message(s) aren't
    1515              :      * queued yet so we won't skip them here.
    1516              :      */
    1517           27 :     if (!QUEUE_POS_EQUAL(max, head))
    1518           18 :         asyncQueueReadAllNotifications();
    1519              : }
    1520              : 
    1521              : /*
    1522              :  * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
    1523              :  *
    1524              :  * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
    1525              :  * an entry in localChannelTable, and pre-allocating an entry in the shared
    1526              :  * globalChannelTable with removeOnAbort set.  AtCommit_Notify will clear
    1527              :  * removeOnAbort; abort processing will remove entries still marked so.
    1528              :  */
    1529              : static void
    1530           50 : PrepareTableEntriesForListen(const char *channel)
    1531              : {
    1532              :     GlobalChannelKey key;
    1533              :     GlobalChannelEntry *entry;
    1534              :     bool        found;
    1535              :     ListenerEntry *listeners;
    1536              :     PendingListenEntry *pending;
    1537              : 
    1538              :     /*
    1539              :      * Record in local pending hash that we want to LISTEN, overwriting any
    1540              :      * earlier attempt to UNLISTEN.
    1541              :      */
    1542              :     pending = (PendingListenEntry *)
    1543           50 :         hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
    1544           50 :     pending->action = PENDING_LISTEN;
    1545              : 
    1546              :     /*
    1547              :      * Ensure that there is an entry for the channel in localChannelTable.
    1548              :      * (Should this fail, we can just roll back.)  If the transaction fails
    1549              :      * after this point, we will remove the entry if appropriate during
    1550              :      * ApplyPendingListenActions.  Note that this entry allows IsListeningOn()
    1551              :      * to return TRUE; we assume nothing is going to consult that before
    1552              :      * AtCommit_Notify/AtAbort_Notify.  However, if later actions attempt to
    1553              :      * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
    1554              :      * present to ensure they do the right things; see
    1555              :      * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
    1556              :      */
    1557           50 :     (void) hash_search(localChannelTable, channel, HASH_ENTER, NULL);
    1558              : 
    1559              :     /* Pre-allocate entry in shared globalChannelTable */
    1560           50 :     GlobalChannelKeyInit(&key, MyDatabaseId, channel);
    1561           50 :     entry = dshash_find_or_insert(globalChannelTable, &key, &found);
    1562              : 
    1563           50 :     if (!found)
    1564              :     {
    1565              :         /* New channel entry, so initialize it to a safe state */
    1566           37 :         entry->listenersArray = InvalidDsaPointer;
    1567           37 :         entry->numListeners = 0;
    1568           37 :         entry->allocatedListeners = 0;
    1569              :     }
    1570              : 
    1571              :     /*
    1572              :      * Create listenersArray if entry doesn't have one.  It's tempting to fold
    1573              :      * this into the !found case, but this coding allows us to cope in case
    1574              :      * dsa_allocate() failed in an earlier attempt.
    1575              :      */
    1576           50 :     if (!DsaPointerIsValid(entry->listenersArray))
    1577              :     {
    1578           37 :         entry->listenersArray = dsa_allocate(globalChannelDSA,
    1579              :                                              sizeof(ListenerEntry) * INITIAL_LISTENERS_ARRAY_SIZE);
    1580           37 :         entry->allocatedListeners = INITIAL_LISTENERS_ARRAY_SIZE;
    1581              :     }
    1582              : 
    1583              :     listeners = (ListenerEntry *)
    1584           50 :         dsa_get_address(globalChannelDSA, entry->listenersArray);
    1585              : 
    1586              :     /*
    1587              :      * Check if we already have a ListenerEntry (possibly from earlier in this
    1588              :      * transaction)
    1589              :      */
    1590           53 :     for (int i = 0; i < entry->numListeners; i++)
    1591              :     {
    1592           13 :         if (listeners[i].procNo == MyProcNumber)
    1593              :         {
    1594              :             /* Already have an entry; leave removeOnAbort as-is */
    1595           10 :             dshash_release_lock(globalChannelTable, entry);
    1596           10 :             return;
    1597              :         }
    1598              :     }
    1599              : 
    1600              :     /* Need to add a new entry; grow array if necessary */
    1601           40 :     if (entry->numListeners >= entry->allocatedListeners)
    1602              :     {
    1603            0 :         int         new_size = entry->allocatedListeners * 2;
    1604            0 :         dsa_pointer old_array = entry->listenersArray;
    1605            0 :         dsa_pointer new_array = dsa_allocate(globalChannelDSA,
    1606              :                                              sizeof(ListenerEntry) * new_size);
    1607            0 :         ListenerEntry *new_listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA, new_array);
    1608              : 
    1609            0 :         memcpy(new_listeners, listeners, sizeof(ListenerEntry) * entry->numListeners);
    1610            0 :         entry->listenersArray = new_array;
    1611            0 :         entry->allocatedListeners = new_size;
    1612            0 :         dsa_free(globalChannelDSA, old_array);
    1613            0 :         listeners = new_listeners;
    1614              :     }
    1615              : 
    1616           40 :     listeners[entry->numListeners].procNo = MyProcNumber;
    1617           40 :     listeners[entry->numListeners].removeOnAbort = true;
    1618           40 :     entry->numListeners++;
    1619              : 
    1620           40 :     dshash_release_lock(globalChannelTable, entry);
    1621              : }
    1622              : 
    1623              : /*
    1624              :  * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
    1625              :  *
    1626              :  * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
    1627              :  * we're currently listening (committed or staged).  We don't touch
    1628              :  * globalChannelTable yet - the listener keeps receiving signals until
    1629              :  * commit, when the entry is removed.
    1630              :  */
    1631              : static void
    1632            4 : PrepareTableEntriesForUnlisten(const char *channel)
    1633              : {
    1634              :     PendingListenEntry *pending;
    1635              : 
    1636              :     /*
    1637              :      * If the channel name is not in localChannelTable, then we are neither
    1638              :      * listening on it nor preparing to listen on it, so we don't need to
    1639              :      * record an UNLISTEN action.
    1640              :      */
    1641              :     Assert(localChannelTable != NULL);
    1642            4 :     if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
    1643            0 :         return;
    1644              : 
    1645              :     /*
    1646              :      * Record in local pending hash that we want to UNLISTEN, overwriting any
    1647              :      * earlier attempt to LISTEN.  Don't touch localChannelTable or
    1648              :      * globalChannelTable yet - we keep receiving signals until commit.
    1649              :      */
    1650              :     pending = (PendingListenEntry *)
    1651            4 :         hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
    1652            4 :     pending->action = PENDING_UNLISTEN;
    1653              : }
    1654              : 
    1655              : /*
    1656              :  * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
    1657              :  *
    1658              :  * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
    1659              :  * about-to-be-listened channels in pendingListenActions.
    1660              :  */
    1661              : static void
    1662           41 : PrepareTableEntriesForUnlistenAll(void)
    1663              : {
    1664              :     HASH_SEQ_STATUS seq;
    1665              :     ChannelName *channelEntry;
    1666              :     PendingListenEntry *pending;
    1667              : 
    1668              :     /*
    1669              :      * Scan localChannelTable, which will have the names of all channels that
    1670              :      * we are listening on or have prepared to listen on.  Record an UNLISTEN
    1671              :      * action for each one, overwriting any earlier attempt to LISTEN.
    1672              :      */
    1673           41 :     hash_seq_init(&seq, localChannelTable);
    1674           69 :     while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
    1675              :     {
    1676              :         pending = (PendingListenEntry *)
    1677           28 :             hash_search(pendingListenActions, channelEntry->channel, HASH_ENTER, NULL);
    1678           28 :         pending->action = PENDING_UNLISTEN;
    1679              :     }
    1680           41 : }
    1681              : 
    1682              : /*
    1683              :  * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
    1684              :  *
    1685              :  * Decrements numListeners, compacts the array, and frees the entry if empty.
    1686              :  * Sets *entry_ptr to NULL if the entry was deleted.
    1687              :  *
    1688              :  * We could get the listeners pointer from the entry, but all callers
    1689              :  * already have it at hand.
    1690              :  */
    1691              : static void
    1692           32 : RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
    1693              :                           ListenerEntry *listeners,
    1694              :                           int idx)
    1695              : {
    1696           32 :     GlobalChannelEntry *entry = *entry_ptr;
    1697              : 
    1698           32 :     entry->numListeners--;
    1699           32 :     if (idx < entry->numListeners)
    1700            3 :         memmove(&listeners[idx], &listeners[idx + 1],
    1701            3 :                 sizeof(ListenerEntry) * (entry->numListeners - idx));
    1702              : 
    1703           32 :     if (entry->numListeners == 0)
    1704              :     {
    1705           29 :         dsa_free(globalChannelDSA, entry->listenersArray);
    1706           29 :         dshash_delete_entry(globalChannelTable, entry);
    1707              :         /* tells caller not to release the entry's lock: */
    1708           29 :         *entry_ptr = NULL;
    1709              :     }
    1710           32 : }
    1711              : 
    1712              : /*
    1713              :  * ApplyPendingListenActions
    1714              :  *
    1715              :  * Apply, or revert, staged listen/unlisten changes to the local and global
    1716              :  * hash tables.
    1717              :  */
    1718              : static void
    1719        35680 : ApplyPendingListenActions(bool isCommit)
    1720              : {
    1721              :     HASH_SEQ_STATUS seq;
    1722              :     PendingListenEntry *pending;
    1723              : 
    1724              :     /* Quick exit if nothing to do */
    1725        35680 :     if (pendingListenActions == NULL)
    1726        35598 :         return;
    1727              : 
    1728              :     /* We made a globalChannelTable before building pendingListenActions */
    1729           82 :     if (globalChannelTable == NULL)
    1730            0 :         elog(PANIC, "global channel table missing post-commit/abort");
    1731              : 
    1732              :     /* For each staged action ... */
    1733           82 :     hash_seq_init(&seq, pendingListenActions);
    1734          163 :     while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
    1735              :     {
    1736              :         GlobalChannelKey key;
    1737              :         GlobalChannelEntry *entry;
    1738           81 :         bool        removeLocal = true;
    1739           81 :         bool        foundListener = false;
    1740              : 
    1741              :         /*
    1742              :          * Find the global entry for this channel.  If isCommit, it had better
    1743              :          * exist (it was created in PreCommit).  In an abort, it might not
    1744              :          * exist, in which case we are not listening and should discard any
    1745              :          * local entry that PreCommit may have managed to create.
    1746              :          */
    1747           81 :         GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
    1748           81 :         entry = dshash_find(globalChannelTable, &key, true);
    1749           81 :         if (entry != NULL)
    1750              :         {
    1751              :             /* Scan entry to find the ListenerEntry for this backend */
    1752              :             ListenerEntry *listeners;
    1753              : 
    1754              :             listeners = (ListenerEntry *)
    1755           81 :                 dsa_get_address(globalChannelDSA, entry->listenersArray);
    1756              : 
    1757           84 :             for (int i = 0; i < entry->numListeners; i++)
    1758              :             {
    1759           84 :                 if (listeners[i].procNo != MyProcNumber)
    1760            3 :                     continue;
    1761           81 :                 foundListener = true;
    1762           81 :                 if (isCommit)
    1763              :                 {
    1764           81 :                     if (pending->action == PENDING_LISTEN)
    1765              :                     {
    1766              :                         /*
    1767              :                          * LISTEN being committed: entry is now permanent.
    1768              :                          * localChannelTable entry was created during
    1769              :                          * PreCommit and should be kept.
    1770              :                          */
    1771           49 :                         listeners[i].removeOnAbort = false;
    1772           49 :                         removeLocal = false;
    1773              :                     }
    1774              :                     else
    1775              :                     {
    1776              :                         /*
    1777              :                          * UNLISTEN being committed: remove pre-allocated
    1778              :                          * entries from both tables.
    1779              :                          */
    1780           32 :                         RemoveListenerFromChannel(&entry, listeners, i);
    1781              :                     }
    1782              :                 }
    1783              :                 else
    1784              :                 {
    1785              :                     /*
    1786              :                      * Note: this part is reachable only if the transaction
    1787              :                      * aborts after PreCommit_Notify() has made some
    1788              :                      * pendingListenActions entries, so it's pretty hard to
    1789              :                      * test.
    1790              :                      */
    1791            0 :                     if (listeners[i].removeOnAbort)
    1792              :                     {
    1793              :                         /*
    1794              :                          * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
    1795              :                          * so remove pre-allocated entries from both tables.
    1796              :                          */
    1797            0 :                         RemoveListenerFromChannel(&entry, listeners, i);
    1798              :                     }
    1799              :                     else
    1800              :                     {
    1801              :                         /*
    1802              :                          * Entry predates this transaction, so keep the
    1803              :                          * localChannelTable entry.
    1804              :                          */
    1805            0 :                         removeLocal = false;
    1806              :                     }
    1807              :                 }
    1808           81 :                 break;          /* there shouldn't be another match */
    1809              :             }
    1810              : 
    1811              :             /* We might have already released the entry by removing it */
    1812           81 :             if (entry != NULL)
    1813           52 :                 dshash_release_lock(globalChannelTable, entry);
    1814              :         }
    1815              : 
    1816              :         /*
    1817              :          * If we're committing a LISTEN action, we should have found a
    1818              :          * matching ListenerEntry, but otherwise it's okay if we didn't.
    1819              :          */
    1820           81 :         if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
    1821            0 :             elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
    1822              :                  pending->channel, MyProcNumber);
    1823              : 
    1824              :         /*
    1825              :          * If we did not find a globalChannelTable entry for our backend, or
    1826              :          * if we are unlistening, remove any localChannelTable entry that may
    1827              :          * exist.  (Note in particular that this cleans up if we created a
    1828              :          * localChannelTable entry and then failed while trying to create a
    1829              :          * globalChannelTable entry.)
    1830              :          */
    1831           81 :         if (removeLocal && localChannelTable != NULL)
    1832           32 :             (void) hash_search(localChannelTable, pending->channel,
    1833              :                                HASH_REMOVE, NULL);
    1834              :     }
    1835              : }
    1836              : 
    1837              : /*
    1838              :  * CleanupListenersOnExit --- called from Async_UnlistenOnExit
    1839              :  *
    1840              :  *      Remove this backend from all channels in the shared global table.
    1841              :  */
    1842              : static void
    1843           18 : CleanupListenersOnExit(void)
    1844              : {
    1845              :     dshash_seq_status status;
    1846              :     GlobalChannelEntry *entry;
    1847              : 
    1848           18 :     if (Trace_notify)
    1849            0 :         elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
    1850              : 
    1851              :     /* Clear our local cache (not really necessary, but be consistent) */
    1852           18 :     if (localChannelTable != NULL)
    1853              :     {
    1854           18 :         hash_destroy(localChannelTable);
    1855           18 :         localChannelTable = NULL;
    1856              :     }
    1857              : 
    1858              :     /* Now remove our entries from the shared globalChannelTable */
    1859           18 :     if (globalChannelTable == NULL)
    1860            0 :         return;
    1861              : 
    1862           18 :     dshash_seq_init(&status, globalChannelTable, true);
    1863           28 :     while ((entry = dshash_seq_next(&status)) != NULL)
    1864              :     {
    1865              :         ListenerEntry *listeners;
    1866              : 
    1867           10 :         if (entry->key.dboid != MyDatabaseId)
    1868            0 :             continue;           /* not relevant */
    1869              : 
    1870              :         listeners = (ListenerEntry *)
    1871           10 :             dsa_get_address(globalChannelDSA, entry->listenersArray);
    1872              : 
    1873           12 :         for (int i = 0; i < entry->numListeners; i++)
    1874              :         {
    1875           10 :             if (listeners[i].procNo == MyProcNumber)
    1876              :             {
    1877            8 :                 entry->numListeners--;
    1878            8 :                 if (i < entry->numListeners)
    1879            0 :                     memmove(&listeners[i], &listeners[i + 1],
    1880            0 :                             sizeof(ListenerEntry) * (entry->numListeners - i));
    1881              : 
    1882            8 :                 if (entry->numListeners == 0)
    1883              :                 {
    1884            8 :                     dsa_free(globalChannelDSA, entry->listenersArray);
    1885            8 :                     dshash_delete_current(&status);
    1886              :                 }
    1887            8 :                 break;
    1888              :             }
    1889              :         }
    1890              :     }
    1891           18 :     dshash_seq_term(&status);
    1892              : }
    1893              : 
    1894              : /*
    1895              :  * Test whether we are actively listening on the given channel name.
    1896              :  *
    1897              :  * Note: this function is executed for every notification found in the queue.
    1898              :  */
    1899              : static bool
    1900           53 : IsListeningOn(const char *channel)
    1901              : {
    1902           53 :     if (localChannelTable == NULL)
    1903            0 :         return false;
    1904              : 
    1905           53 :     return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
    1906              : }
    1907              : 
    1908              : /*
    1909              :  * Remove our entry from the listeners array when we are no longer listening
    1910              :  * on any channel.  NB: must not fail if we're already not listening.
    1911              :  */
    1912              : static void
    1913           39 : asyncQueueUnregister(void)
    1914              : {
    1915              :     Assert(LocalChannelTableIsEmpty()); /* else caller error */
    1916              : 
    1917           39 :     if (!amRegisteredListener)  /* nothing to do */
    1918           12 :         return;
    1919              : 
    1920              :     /*
    1921              :      * Need exclusive lock here to manipulate list links.
    1922              :      */
    1923           27 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1924              :     /* Mark our entry as invalid */
    1925           27 :     QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
    1926           27 :     QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
    1927           27 :     QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
    1928           27 :     QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
    1929              :     /* and remove it from the list */
    1930           27 :     if (QUEUE_FIRST_LISTENER == MyProcNumber)
    1931           26 :         QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
    1932              :     else
    1933              :     {
    1934            1 :         for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    1935              :         {
    1936            1 :             if (QUEUE_NEXT_LISTENER(i) == MyProcNumber)
    1937              :             {
    1938            1 :                 QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyProcNumber);
    1939            1 :                 break;
    1940              :             }
    1941              :         }
    1942              :     }
    1943           27 :     QUEUE_NEXT_LISTENER(MyProcNumber) = INVALID_PROC_NUMBER;
    1944           27 :     LWLockRelease(NotifyQueueLock);
    1945              : 
    1946              :     /* mark ourselves as no longer listed in the global array */
    1947           27 :     amRegisteredListener = false;
    1948              : }
    1949              : 
    1950              : /*
    1951              :  * Test whether there is room to insert more notification messages.
    1952              :  *
    1953              :  * Caller must hold at least shared NotifyQueueLock.
    1954              :  */
    1955              : static bool
    1956          104 : asyncQueueIsFull(void)
    1957              : {
    1958          104 :     int64       headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
    1959          104 :     int64       tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
    1960          104 :     int64       occupied = headPage - tailPage;
    1961              : 
    1962          104 :     return occupied >= max_notify_queue_pages;
    1963              : }
    1964              : 
    1965              : /*
    1966              :  * Advance the QueuePosition to the next entry, assuming that the current
    1967              :  * entry is of length entryLength.  If we jump to a new page the function
    1968              :  * returns true, else false.
    1969              :  */
    1970              : static bool
    1971         2543 : asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
    1972              : {
    1973         2543 :     int64       pageno = QUEUE_POS_PAGE(*position);
    1974         2543 :     int         offset = QUEUE_POS_OFFSET(*position);
    1975         2543 :     bool        pageJump = false;
    1976              : 
    1977              :     /*
    1978              :      * Move to the next writing position: First jump over what we have just
    1979              :      * written or read.
    1980              :      */
    1981         2543 :     offset += entryLength;
    1982              :     Assert(offset <= QUEUE_PAGESIZE);
    1983              : 
    1984              :     /*
    1985              :      * In a second step check if another entry can possibly be written to the
    1986              :      * page. If so, stay here, we have reached the next position. If not, then
    1987              :      * we need to move on to the next page.
    1988              :      */
    1989         2543 :     if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE)
    1990              :     {
    1991           38 :         pageno++;
    1992           38 :         offset = 0;
    1993           38 :         pageJump = true;
    1994              :     }
    1995              : 
    1996         2543 :     SET_QUEUE_POS(*position, pageno, offset);
    1997         2543 :     return pageJump;
    1998              : }
    1999              : 
    2000              : /*
    2001              :  * Fill the AsyncQueueEntry at *qe with an outbound notification message.
    2002              :  */
    2003              : static void
    2004         1146 : asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
    2005              : {
    2006         1146 :     size_t      channellen = n->channel_len;
    2007         1146 :     size_t      payloadlen = n->payload_len;
    2008              :     int         entryLength;
    2009              : 
    2010              :     Assert(channellen < NAMEDATALEN);
    2011              :     Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
    2012              : 
    2013              :     /* The terminators are already included in AsyncQueueEntryEmptySize */
    2014         1146 :     entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
    2015         1146 :     entryLength = QUEUEALIGN(entryLength);
    2016         1146 :     qe->length = entryLength;
    2017         1146 :     qe->dboid = MyDatabaseId;
    2018         1146 :     qe->xid = GetCurrentTransactionId();
    2019         1146 :     qe->srcPid = MyProcPid;
    2020         1146 :     memcpy(qe->data, n->data, channellen + payloadlen + 2);
    2021         1146 : }
    2022              : 
    2023              : /*
    2024              :  * Add pending notifications to the queue.
    2025              :  *
    2026              :  * We go page by page here, i.e. we stop once we have to go to a new page but
    2027              :  * we will be called again and then fill that next page. If an entry does not
    2028              :  * fit into the current page, we write a dummy entry with an InvalidOid as the
    2029              :  * database OID in order to fill the page. So every page is always used up to
    2030              :  * the last byte which simplifies reading the page later.
    2031              :  *
    2032              :  * We are passed the list cell (in pendingNotifies->events) containing the next
    2033              :  * notification to write and return the first still-unwritten cell back.
    2034              :  * Eventually we will return NULL indicating all is done.
    2035              :  *
    2036              :  * We are holding NotifyQueueLock already from the caller and grab
    2037              :  * page specific SLRU bank lock locally in this function.
    2038              :  */
    2039              : static ListCell *
    2040          104 : asyncQueueAddEntries(ListCell *nextNotify)
    2041              : {
    2042              :     AsyncQueueEntry qe;
    2043              :     QueuePosition queue_head;
    2044              :     int64       pageno;
    2045              :     int         offset;
    2046              :     int         slotno;
    2047              :     LWLock     *prevlock;
    2048              : 
    2049              :     /*
    2050              :      * We work with a local copy of QUEUE_HEAD, which we write back to shared
    2051              :      * memory upon exiting.  The reason for this is that if we have to advance
    2052              :      * to a new page, SimpleLruZeroPage might fail (out of disk space, for
    2053              :      * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
    2054              :      * subsequent insertions would try to put entries into a page that slru.c
    2055              :      * thinks doesn't exist yet.)  So, use a local position variable.  Note
    2056              :      * that if we do fail, any already-inserted queue entries are forgotten;
    2057              :      * this is okay, since they'd be useless anyway after our transaction
    2058              :      * rolls back.
    2059              :      */
    2060          104 :     queue_head = QUEUE_HEAD;
    2061              : 
    2062              :     /*
    2063              :      * If this is the first write since the postmaster started, we need to
    2064              :      * initialize the first page of the async SLRU.  Otherwise, the current
    2065              :      * page should be initialized already, so just fetch it.
    2066              :      */
    2067          104 :     pageno = QUEUE_POS_PAGE(queue_head);
    2068          104 :     prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
    2069              : 
    2070              :     /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
    2071          104 :     LWLockAcquire(prevlock, LW_EXCLUSIVE);
    2072              : 
    2073          104 :     if (QUEUE_POS_IS_ZERO(queue_head))
    2074            9 :         slotno = SimpleLruZeroPage(NotifyCtl, pageno);
    2075              :     else
    2076           95 :         slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &queue_head);
    2077              : 
    2078              :     /* Note we mark the page dirty before writing in it */
    2079          104 :     NotifyCtl->shared->page_dirty[slotno] = true;
    2080              : 
    2081         1215 :     while (nextNotify != NULL)
    2082              :     {
    2083         1146 :         Notification *n = (Notification *) lfirst(nextNotify);
    2084              : 
    2085              :         /* Construct a valid queue entry in local variable qe */
    2086         1146 :         asyncQueueNotificationToEntry(n, &qe);
    2087              : 
    2088         1146 :         offset = QUEUE_POS_OFFSET(queue_head);
    2089              : 
    2090              :         /* Check whether the entry really fits on the current page */
    2091         1146 :         if (offset + qe.length <= QUEUE_PAGESIZE)
    2092              :         {
    2093              :             /* OK, so advance nextNotify past this item */
    2094         1114 :             nextNotify = lnext(pendingNotifies->events, nextNotify);
    2095              :         }
    2096              :         else
    2097              :         {
    2098              :             /*
    2099              :              * Write a dummy entry to fill up the page. Actually readers will
    2100              :              * only check dboid and since it won't match any reader's database
    2101              :              * OID, they will ignore this entry and move on.
    2102              :              */
    2103           32 :             qe.length = QUEUE_PAGESIZE - offset;
    2104           32 :             qe.dboid = InvalidOid;
    2105           32 :             qe.xid = InvalidTransactionId;
    2106           32 :             qe.data[0] = '\0';  /* empty channel */
    2107           32 :             qe.data[1] = '\0';  /* empty payload */
    2108              :         }
    2109              : 
    2110              :         /* Now copy qe into the shared buffer page */
    2111         1146 :         memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
    2112              :                &qe,
    2113         1146 :                qe.length);
    2114              : 
    2115              :         /* Advance queue_head appropriately, and detect if page is full */
    2116         1146 :         if (asyncQueueAdvance(&(queue_head), qe.length))
    2117              :         {
    2118              :             LWLock     *lock;
    2119              : 
    2120           35 :             pageno = QUEUE_POS_PAGE(queue_head);
    2121           35 :             lock = SimpleLruGetBankLock(NotifyCtl, pageno);
    2122           35 :             if (lock != prevlock)
    2123              :             {
    2124            0 :                 LWLockRelease(prevlock);
    2125            0 :                 LWLockAcquire(lock, LW_EXCLUSIVE);
    2126            0 :                 prevlock = lock;
    2127              :             }
    2128              : 
    2129              :             /*
    2130              :              * Page is full, so we're done here, but first fill the next page
    2131              :              * with zeroes.  The reason to do this is to ensure that slru.c's
    2132              :              * idea of the head page is always the same as ours, which avoids
    2133              :              * boundary problems in SimpleLruTruncate.  The test in
    2134              :              * asyncQueueIsFull() ensured that there is room to create this
    2135              :              * page without overrunning the queue.
    2136              :              */
    2137           35 :             slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
    2138              : 
    2139              :             /*
    2140              :              * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
    2141              :              * set flag to remember that we should try to advance the tail
    2142              :              * pointer (we don't want to actually do that right here).
    2143              :              */
    2144           35 :             if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
    2145            8 :                 tryAdvanceTail = true;
    2146              : 
    2147              :             /* And exit the loop */
    2148           35 :             break;
    2149              :         }
    2150              :     }
    2151              : 
    2152              :     /* Success, so update the global QUEUE_HEAD */
    2153          104 :     QUEUE_HEAD = queue_head;
    2154              : 
    2155          104 :     LWLockRelease(prevlock);
    2156              : 
    2157          104 :     return nextNotify;
    2158              : }
    2159              : 
    2160              : /*
    2161              :  * SQL function to return the fraction of the notification queue currently
    2162              :  * occupied.
    2163              :  */
    2164              : Datum
    2165            6 : pg_notification_queue_usage(PG_FUNCTION_ARGS)
    2166              : {
    2167              :     double      usage;
    2168              : 
    2169              :     /* Advance the queue tail so we don't report a too-large result */
    2170            6 :     asyncQueueAdvanceTail();
    2171              : 
    2172            6 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
    2173            6 :     usage = asyncQueueUsage();
    2174            6 :     LWLockRelease(NotifyQueueLock);
    2175              : 
    2176            6 :     PG_RETURN_FLOAT8(usage);
    2177              : }
    2178              : 
    2179              : /*
    2180              :  * Return the fraction of the queue that is currently occupied.
    2181              :  *
    2182              :  * The caller must hold NotifyQueueLock in (at least) shared mode.
    2183              :  *
    2184              :  * Note: we measure the distance to the logical tail page, not the physical
    2185              :  * tail page.  In some sense that's wrong, but the relative position of the
    2186              :  * physical tail is affected by details such as SLRU segment boundaries,
    2187              :  * so that a result based on that is unpleasantly unstable.
    2188              :  */
    2189              : static double
    2190          110 : asyncQueueUsage(void)
    2191              : {
    2192          110 :     int64       headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
    2193          110 :     int64       tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
    2194          110 :     int64       occupied = headPage - tailPage;
    2195              : 
    2196          110 :     if (occupied == 0)
    2197           71 :         return (double) 0;      /* fast exit for common case */
    2198              : 
    2199           39 :     return (double) occupied / (double) max_notify_queue_pages;
    2200              : }
    2201              : 
    2202              : /*
    2203              :  * Check whether the queue is at least half full, and emit a warning if so.
    2204              :  *
    2205              :  * This is unlikely given the size of the queue, but possible.
    2206              :  * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
    2207              :  *
    2208              :  * Caller must hold exclusive NotifyQueueLock.
    2209              :  */
    2210              : static void
    2211          104 : asyncQueueFillWarning(void)
    2212              : {
    2213              :     double      fillDegree;
    2214              :     TimestampTz t;
    2215              : 
    2216          104 :     fillDegree = asyncQueueUsage();
    2217          104 :     if (fillDegree < 0.5)
    2218          104 :         return;
    2219              : 
    2220            0 :     t = GetCurrentTimestamp();
    2221              : 
    2222            0 :     if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
    2223              :                                    t, QUEUE_FULL_WARN_INTERVAL))
    2224              :     {
    2225            0 :         QueuePosition min = QUEUE_HEAD;
    2226            0 :         int32       minPid = InvalidPid;
    2227              : 
    2228            0 :         for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    2229              :         {
    2230              :             Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
    2231            0 :             min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
    2232            0 :             if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
    2233            0 :                 minPid = QUEUE_BACKEND_PID(i);
    2234              :         }
    2235              : 
    2236            0 :         ereport(WARNING,
    2237              :                 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
    2238              :                  (minPid != InvalidPid ?
    2239              :                   errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
    2240              :                   : 0),
    2241              :                  (minPid != InvalidPid ?
    2242              :                   errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
    2243              :                   : 0)));
    2244              : 
    2245            0 :         asyncQueueControl->lastQueueFillWarn = t;
    2246              :     }
    2247              : }
    2248              : 
    2249              : /*
    2250              :  * Send signals to listening backends.
    2251              :  *
    2252              :  * Normally we signal only backends that are interested in the notifies that
    2253              :  * we just sent.  However, that will leave idle listeners falling further and
    2254              :  * further behind.  Waken them anyway if they're far enough behind, so they'll
    2255              :  * advance their queue position pointers, allowing the global tail to advance.
    2256              :  *
    2257              :  * Since we know the ProcNumber and the Pid the signaling is quite cheap.
    2258              :  *
    2259              :  * This is called during CommitTransaction(), so it's important for it
    2260              :  * to have very low probability of failure.
    2261              :  */
    2262              : static void
    2263           69 : SignalBackends(void)
    2264              : {
    2265              :     int         count;
    2266              : 
    2267              :     /* Can't get here without PreCommit_Notify having made the global table */
    2268              :     Assert(globalChannelTable != NULL);
    2269              : 
    2270              :     /* It should have set up these arrays, too */
    2271              :     Assert(signalPids != NULL && signalProcnos != NULL);
    2272              : 
    2273              :     /*
    2274              :      * Identify backends that we need to signal.  We don't want to send
    2275              :      * signals while holding the NotifyQueueLock, so this part just builds a
    2276              :      * list of target PIDs in signalPids[] and signalProcnos[].
    2277              :      */
    2278           69 :     count = 0;
    2279              : 
    2280           69 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    2281              : 
    2282              :     /* Scan each channel name that we notified in this transaction */
    2283          210 :     foreach_ptr(char, channel, pendingNotifies->uniqueChannelNames)
    2284              :     {
    2285              :         GlobalChannelKey key;
    2286              :         GlobalChannelEntry *entry;
    2287              :         ListenerEntry *listeners;
    2288              : 
    2289           72 :         GlobalChannelKeyInit(&key, MyDatabaseId, channel);
    2290           72 :         entry = dshash_find(globalChannelTable, &key, false);
    2291           72 :         if (entry == NULL)
    2292           32 :             continue;           /* nobody is listening */
    2293              : 
    2294           40 :         listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA,
    2295              :                                                       entry->listenersArray);
    2296              : 
    2297              :         /*
    2298              :          * Identify listeners that now need waking, add them to arrays.
    2299              :          *
    2300              :          * Note that we signal listeners regardless of the state of their
    2301              :          * removeOnAbort flags.  Hence a new listener that reached PreCommit,
    2302              :          * but then failed before AtCommit_Notify, can receive a signal even
    2303              :          * though it was never really listening.  This is okay because it will
    2304              :          * not do anything in response to that signal.  If we did not do it
    2305              :          * like this then a new listener might miss some messages due to the
    2306              :          * direct-advance logic below.
    2307              :          */
    2308           83 :         for (int j = 0; j < entry->numListeners; j++)
    2309              :         {
    2310           43 :             ProcNumber  i = listeners[j].procNo;
    2311              :             int32       pid;
    2312              :             QueuePosition pos;
    2313              : 
    2314           43 :             if (QUEUE_BACKEND_WAKEUP_PENDING(i))
    2315           12 :                 continue;       /* already signaled, no need to repeat */
    2316              : 
    2317           31 :             pid = QUEUE_BACKEND_PID(i);
    2318           31 :             pos = QUEUE_BACKEND_POS(i);
    2319              : 
    2320           31 :             if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
    2321            0 :                 continue;       /* it's fully caught up already */
    2322              : 
    2323              :             Assert(pid != InvalidPid);
    2324              : 
    2325           31 :             QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
    2326           31 :             signalPids[count] = pid;
    2327           31 :             signalProcnos[count] = i;
    2328           31 :             count++;
    2329              :         }
    2330              : 
    2331           40 :         dshash_release_lock(globalChannelTable, entry);
    2332              :     }
    2333              : 
    2334              :     /*
    2335              :      * Scan all listeners.  Any that are not already pending wakeup must not
    2336              :      * be interested in our notifications (else we'd have set their wakeup
    2337              :      * flags above).  Check to see if we can directly advance their queue
    2338              :      * pointers to save a wakeup.  Otherwise, if they are far behind, wake
    2339              :      * them anyway so they will catch up.
    2340              :      */
    2341          124 :     for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    2342              :     {
    2343              :         int32       pid;
    2344              :         QueuePosition pos;
    2345              : 
    2346           55 :         if (QUEUE_BACKEND_WAKEUP_PENDING(i))
    2347           40 :             continue;
    2348              : 
    2349              :         /* If it's currently advancing, we should not touch it */
    2350           15 :         if (QUEUE_BACKEND_IS_ADVANCING(i))
    2351            0 :             continue;
    2352              : 
    2353           15 :         pid = QUEUE_BACKEND_PID(i);
    2354           15 :         pos = QUEUE_BACKEND_POS(i);
    2355              : 
    2356              :         /*
    2357              :          * We can directly advance the other backend's queue pointer if it's
    2358              :          * not currently advancing (else there are race conditions), and its
    2359              :          * current pointer is not behind queueHeadBeforeWrite (else we'd make
    2360              :          * it miss some older messages), and we'd not be moving the pointer
    2361              :          * backward.
    2362              :          */
    2363           30 :         if (!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite) &&
    2364           19 :             QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))
    2365              :         {
    2366              :             /* We can directly advance its pointer past what we wrote */
    2367           15 :             QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
    2368              :         }
    2369            0 :         else if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
    2370              :                                     QUEUE_POS_PAGE(pos)) >= QUEUE_CLEANUP_DELAY)
    2371              :         {
    2372              :             /* It's idle and far behind, so wake it up */
    2373              :             Assert(pid != InvalidPid);
    2374              : 
    2375            0 :             QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
    2376            0 :             signalPids[count] = pid;
    2377            0 :             signalProcnos[count] = i;
    2378            0 :             count++;
    2379              :         }
    2380              :     }
    2381              : 
    2382           69 :     LWLockRelease(NotifyQueueLock);
    2383              : 
    2384              :     /* Now send signals */
    2385          100 :     for (int i = 0; i < count; i++)
    2386              :     {
    2387           31 :         int32       pid = signalPids[i];
    2388              : 
    2389              :         /*
    2390              :          * If we are signaling our own process, no need to involve the kernel;
    2391              :          * just set the flag directly.
    2392              :          */
    2393           31 :         if (pid == MyProcPid)
    2394              :         {
    2395           21 :             notifyInterruptPending = true;
    2396           21 :             continue;
    2397              :         }
    2398              : 
    2399              :         /*
    2400              :          * Note: assuming things aren't broken, a signal failure here could
    2401              :          * only occur if the target backend exited since we released
    2402              :          * NotifyQueueLock; which is unlikely but certainly possible. So we
    2403              :          * just log a low-level debug message if it happens.
    2404              :          */
    2405           10 :         if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, signalProcnos[i]) < 0)
    2406            0 :             elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
    2407              :     }
    2408           69 : }
    2409              : 
    2410              : /*
    2411              :  * AtAbort_Notify
    2412              :  *
    2413              :  *  This is called at transaction abort.
    2414              :  *
    2415              :  *  Revert any staged listen/unlisten changes and clean up transaction state.
    2416              :  *  This only does anything if we abort after PreCommit_Notify has staged
    2417              :  *  some entries.
    2418              :  */
    2419              : void
    2420        35529 : AtAbort_Notify(void)
    2421              : {
    2422              :     /* Revert staged listen/unlisten changes */
    2423        35529 :     ApplyPendingListenActions(false);
    2424              : 
    2425              :     /* If we're no longer listening on anything, unregister */
    2426        35529 :     if (amRegisteredListener && LocalChannelTableIsEmpty())
    2427            0 :         asyncQueueUnregister();
    2428              : 
    2429              :     /* And clean up */
    2430        35529 :     ClearPendingActionsAndNotifies();
    2431        35529 : }
    2432              : 
    2433              : /*
    2434              :  * AtSubCommit_Notify() --- Take care of subtransaction commit.
    2435              :  *
    2436              :  * Reassign all items in the pending lists to the parent transaction.
    2437              :  */
    2438              : void
    2439         5631 : AtSubCommit_Notify(void)
    2440              : {
    2441         5631 :     int         my_level = GetCurrentTransactionNestLevel();
    2442              : 
    2443              :     /* If there are actions at our nesting level, we must reparent them. */
    2444         5631 :     if (pendingActions != NULL &&
    2445            2 :         pendingActions->nestingLevel >= my_level)
    2446              :     {
    2447            2 :         if (pendingActions->upper == NULL ||
    2448            1 :             pendingActions->upper->nestingLevel < my_level - 1)
    2449              :         {
    2450              :             /* nothing to merge; give the whole thing to the parent */
    2451            1 :             --pendingActions->nestingLevel;
    2452              :         }
    2453              :         else
    2454              :         {
    2455            1 :             ActionList *childPendingActions = pendingActions;
    2456              : 
    2457            1 :             pendingActions = pendingActions->upper;
    2458              : 
    2459              :             /*
    2460              :              * Mustn't try to eliminate duplicates here --- see queue_listen()
    2461              :              */
    2462            2 :             pendingActions->actions =
    2463            1 :                 list_concat(pendingActions->actions,
    2464            1 :                             childPendingActions->actions);
    2465            1 :             pfree(childPendingActions);
    2466              :         }
    2467              :     }
    2468              : 
    2469              :     /* If there are notifies at our nesting level, we must reparent them. */
    2470         5631 :     if (pendingNotifies != NULL &&
    2471            3 :         pendingNotifies->nestingLevel >= my_level)
    2472              :     {
    2473              :         Assert(pendingNotifies->nestingLevel == my_level);
    2474              : 
    2475            2 :         if (pendingNotifies->upper == NULL ||
    2476            1 :             pendingNotifies->upper->nestingLevel < my_level - 1)
    2477              :         {
    2478              :             /* nothing to merge; give the whole thing to the parent */
    2479            1 :             --pendingNotifies->nestingLevel;
    2480              :         }
    2481              :         else
    2482              :         {
    2483              :             /*
    2484              :              * Formerly, we didn't bother to eliminate duplicates here, but
    2485              :              * now we must, else we fall foul of "Assert(!found)", either here
    2486              :              * or during a later attempt to build the parent-level hashtable.
    2487              :              */
    2488            1 :             NotificationList *childPendingNotifies = pendingNotifies;
    2489              :             ListCell   *l;
    2490              : 
    2491            1 :             pendingNotifies = pendingNotifies->upper;
    2492              :             /* Insert all the subxact's events into parent, except for dups */
    2493            5 :             foreach(l, childPendingNotifies->events)
    2494              :             {
    2495            4 :                 Notification *childn = (Notification *) lfirst(l);
    2496              : 
    2497            4 :                 if (!AsyncExistsPendingNotify(childn))
    2498            2 :                     AddEventToPendingNotifies(childn);
    2499              :             }
    2500            1 :             pfree(childPendingNotifies);
    2501              :         }
    2502              :     }
    2503         5631 : }
    2504              : 
    2505              : /*
    2506              :  * AtSubAbort_Notify() --- Take care of subtransaction abort.
    2507              :  */
    2508              : void
    2509         5421 : AtSubAbort_Notify(void)
    2510              : {
    2511         5421 :     int         my_level = GetCurrentTransactionNestLevel();
    2512              : 
    2513              :     /*
    2514              :      * All we have to do is pop the stack --- the actions/notifies made in
    2515              :      * this subxact are no longer interesting, and the space will be freed
    2516              :      * when CurTransactionContext is recycled. We still have to free the
    2517              :      * ActionList and NotificationList objects themselves, though, because
    2518              :      * those are allocated in TopTransactionContext.
    2519              :      *
    2520              :      * Note that there might be no entries at all, or no entries for the
    2521              :      * current subtransaction level, either because none were ever created, or
    2522              :      * because we reentered this routine due to trouble during subxact abort.
    2523              :      */
    2524         5422 :     while (pendingActions != NULL &&
    2525            1 :            pendingActions->nestingLevel >= my_level)
    2526              :     {
    2527            1 :         ActionList *childPendingActions = pendingActions;
    2528              : 
    2529            1 :         pendingActions = pendingActions->upper;
    2530            1 :         pfree(childPendingActions);
    2531              :     }
    2532              : 
    2533         5422 :     while (pendingNotifies != NULL &&
    2534            2 :            pendingNotifies->nestingLevel >= my_level)
    2535              :     {
    2536            1 :         NotificationList *childPendingNotifies = pendingNotifies;
    2537              : 
    2538            1 :         pendingNotifies = pendingNotifies->upper;
    2539            1 :         pfree(childPendingNotifies);
    2540              :     }
    2541         5421 : }
    2542              : 
    2543              : /*
    2544              :  * HandleNotifyInterrupt
    2545              :  *
    2546              :  *      Signal handler portion of interrupt handling. Let the backend know
    2547              :  *      that there's a pending notify interrupt. If we're currently reading
    2548              :  *      from the client, this will interrupt the read and
    2549              :  *      ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
    2550              :  */
    2551              : void
    2552            9 : HandleNotifyInterrupt(void)
    2553              : {
    2554              :     /*
    2555              :      * Note: this is called by a SIGNAL HANDLER. You must be very wary what
    2556              :      * you do here.
    2557              :      */
    2558              : 
    2559              :     /* signal that work needs to be done */
    2560            9 :     notifyInterruptPending = true;
    2561              : 
    2562              :     /* latch will be set by procsignal_sigusr1_handler */
    2563            9 : }
    2564              : 
    2565              : /*
    2566              :  * ProcessNotifyInterrupt
    2567              :  *
    2568              :  *      This is called if we see notifyInterruptPending set, just before
    2569              :  *      transmitting ReadyForQuery at the end of a frontend command, and
    2570              :  *      also if a notify signal occurs while reading from the frontend.
    2571              :  *      HandleNotifyInterrupt() will cause the read to be interrupted
    2572              :  *      via the process's latch, and this routine will get called.
    2573              :  *      If we are truly idle (ie, *not* inside a transaction block),
    2574              :  *      process the incoming notifies.
    2575              :  *
    2576              :  *      If "flush" is true, force any frontend messages out immediately.
    2577              :  *      This can be false when being called at the end of a frontend command,
    2578              :  *      since we'll flush after sending ReadyForQuery.
    2579              :  */
    2580              : void
    2581           35 : ProcessNotifyInterrupt(bool flush)
    2582              : {
    2583           35 :     if (IsTransactionOrTransactionBlock())
    2584            6 :         return;                 /* not really idle */
    2585              : 
    2586              :     /* Loop in case another signal arrives while sending messages */
    2587           58 :     while (notifyInterruptPending)
    2588           29 :         ProcessIncomingNotify(flush);
    2589              : }
    2590              : 
    2591              : 
    2592              : /*
    2593              :  * Read all pending notifications from the queue, and deliver appropriate
    2594              :  * ones to my frontend.  Stop when we reach queue head or an uncommitted
    2595              :  * notification.
    2596              :  */
    2597              : static void
    2598           47 : asyncQueueReadAllNotifications(void)
    2599              : {
    2600              :     QueuePosition pos;
    2601              :     QueuePosition head;
    2602              :     Snapshot    snapshot;
    2603              : 
    2604              :     /*
    2605              :      * Fetch current state, indicate to others that we have woken up, and that
    2606              :      * we are in process of advancing our position.
    2607              :      */
    2608           47 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
    2609              :     /* Assert checks that we have a valid state entry */
    2610              :     Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
    2611           47 :     QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
    2612           47 :     pos = QUEUE_BACKEND_POS(MyProcNumber);
    2613           47 :     head = QUEUE_HEAD;
    2614              : 
    2615           47 :     if (QUEUE_POS_EQUAL(pos, head))
    2616              :     {
    2617              :         /* Nothing to do, we have read all notifications already. */
    2618            0 :         LWLockRelease(NotifyQueueLock);
    2619            0 :         return;
    2620              :     }
    2621              : 
    2622           47 :     QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = true;
    2623           47 :     LWLockRelease(NotifyQueueLock);
    2624              : 
    2625              :     /*----------
    2626              :      * Get snapshot we'll use to decide which xacts are still in progress.
    2627              :      * This is trickier than it might seem, because of race conditions.
    2628              :      * Consider the following example:
    2629              :      *
    2630              :      * Backend 1:                    Backend 2:
    2631              :      *
    2632              :      * transaction starts
    2633              :      * UPDATE foo SET ...;
    2634              :      * NOTIFY foo;
    2635              :      * commit starts
    2636              :      * queue the notify message
    2637              :      *                               transaction starts
    2638              :      *                               LISTEN foo;  -- first LISTEN in session
    2639              :      *                               SELECT * FROM foo WHERE ...;
    2640              :      * commit to clog
    2641              :      *                               commit starts
    2642              :      *                               add backend 2 to array of listeners
    2643              :      *                               advance to queue head (this code)
    2644              :      *                               commit to clog
    2645              :      *
    2646              :      * Transaction 2's SELECT has not seen the UPDATE's effects, since that
    2647              :      * wasn't committed yet.  Ideally we'd ensure that client 2 would
    2648              :      * eventually get transaction 1's notify message, but there's no way
    2649              :      * to do that; until we're in the listener array, there's no guarantee
    2650              :      * that the notify message doesn't get removed from the queue.
    2651              :      *
    2652              :      * Therefore the coding technique transaction 2 is using is unsafe:
    2653              :      * applications must commit a LISTEN before inspecting database state,
    2654              :      * if they want to ensure they will see notifications about subsequent
    2655              :      * changes to that state.
    2656              :      *
    2657              :      * What we do guarantee is that we'll see all notifications from
    2658              :      * transactions committing after the snapshot we take here.
    2659              :      * BecomeRegisteredListener has already added us to the listener array,
    2660              :      * so no not-yet-committed messages can be removed from the queue
    2661              :      * before we see them.
    2662              :      *----------
    2663              :      */
    2664           47 :     snapshot = RegisterSnapshot(GetLatestSnapshot());
    2665              : 
    2666              :     /*
    2667              :      * It is possible that we fail while trying to send a message to our
    2668              :      * frontend (for example, because of encoding conversion failure).  If
    2669              :      * that happens it is critical that we not try to send the same message
    2670              :      * over and over again.  Therefore, we set ExitOnAnyError to upgrade any
    2671              :      * ERRORs to FATAL, causing the client connection to be closed on error.
    2672              :      *
    2673              :      * We used to only skip over the offending message and try to soldier on,
    2674              :      * but it was somewhat questionable to lose a notification and give the
    2675              :      * client an ERROR instead.  A client application is not be prepared for
    2676              :      * that and can't tell that a notification was missed.  It was also not
    2677              :      * very useful in practice because notifications are often processed while
    2678              :      * a connection is idle and reading a message from the client, and in that
    2679              :      * state, any error is upgraded to FATAL anyway.  Closing the connection
    2680              :      * is a clear signal to the application that it might have missed
    2681              :      * notifications.
    2682              :      */
    2683              :     {
    2684           47 :         bool        save_ExitOnAnyError = ExitOnAnyError;
    2685              :         bool        reachedStop;
    2686              : 
    2687           47 :         ExitOnAnyError = true;
    2688              : 
    2689              :         do
    2690              :         {
    2691              :             /*
    2692              :              * Process messages up to the stop position, end of page, or an
    2693              :              * uncommitted message.
    2694              :              *
    2695              :              * Our stop position is what we found to be the head's position
    2696              :              * when we entered this function. It might have changed already.
    2697              :              * But if it has, we will receive (or have already received and
    2698              :              * queued) another signal and come here again.
    2699              :              *
    2700              :              * We are not holding NotifyQueueLock here! The queue can only
    2701              :              * extend beyond the head pointer (see above) and we leave our
    2702              :              * backend's pointer where it is so nobody will truncate or
    2703              :              * rewrite pages under us. Especially we don't want to hold a lock
    2704              :              * while sending the notifications to the frontend.
    2705              :              */
    2706           50 :             reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
    2707           50 :         } while (!reachedStop);
    2708              : 
    2709              :         /* Update shared state */
    2710           47 :         LWLockAcquire(NotifyQueueLock, LW_SHARED);
    2711           47 :         QUEUE_BACKEND_POS(MyProcNumber) = pos;
    2712           47 :         QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
    2713           47 :         LWLockRelease(NotifyQueueLock);
    2714              : 
    2715           47 :         ExitOnAnyError = save_ExitOnAnyError;
    2716              :     }
    2717              : 
    2718              :     /* Done with snapshot */
    2719           47 :     UnregisterSnapshot(snapshot);
    2720              : }
    2721              : 
    2722              : /*
    2723              :  * Fetch notifications from the shared queue, beginning at position current,
    2724              :  * and deliver relevant ones to my frontend.
    2725              :  *
    2726              :  * The function returns true once we have reached the stop position or an
    2727              :  * uncommitted notification, and false if we have finished with the page.
    2728              :  * In other words: once it returns true there is no need to look further.
    2729              :  * The QueuePosition *current is advanced past all processed messages.
    2730              :  */
    2731              : static bool
    2732           50 : asyncQueueProcessPageEntries(QueuePosition *current,
    2733              :                              QueuePosition stop,
    2734              :                              Snapshot snapshot)
    2735              : {
    2736           50 :     int64       curpage = QUEUE_POS_PAGE(*current);
    2737              :     int         slotno;
    2738              :     char       *page_buffer;
    2739           50 :     bool        reachedStop = false;
    2740              :     bool        reachedEndOfPage;
    2741              : 
    2742              :     /*
    2743              :      * We copy the entries into a local buffer to avoid holding the SLRU lock
    2744              :      * while we transmit them to our frontend.  The local buffer must be
    2745              :      * adequately aligned.
    2746              :      */
    2747              :     alignas(AsyncQueueEntry) char local_buf[QUEUE_PAGESIZE];
    2748           50 :     char       *local_buf_end = local_buf;
    2749              : 
    2750           50 :     slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, current);
    2751           50 :     page_buffer = NotifyCtl->shared->page_buffer[slotno];
    2752              : 
    2753              :     do
    2754              :     {
    2755         1414 :         QueuePosition thisentry = *current;
    2756              :         AsyncQueueEntry *qe;
    2757              : 
    2758         1414 :         if (QUEUE_POS_EQUAL(thisentry, stop))
    2759           47 :             break;
    2760              : 
    2761         1367 :         qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
    2762              : 
    2763              :         /*
    2764              :          * Advance *current over this message, possibly to the next page. As
    2765              :          * noted in the comments for asyncQueueReadAllNotifications, we must
    2766              :          * do this before possibly failing while processing the message.
    2767              :          */
    2768         1367 :         reachedEndOfPage = asyncQueueAdvance(current, qe->length);
    2769              : 
    2770              :         /* Ignore messages destined for other databases */
    2771         1367 :         if (qe->dboid == MyDatabaseId)
    2772              :         {
    2773         1367 :             if (XidInMVCCSnapshot(qe->xid, snapshot))
    2774              :             {
    2775              :                 /*
    2776              :                  * The source transaction is still in progress, so we can't
    2777              :                  * process this message yet.  Break out of the loop, but first
    2778              :                  * back up *current so we will reprocess the message next
    2779              :                  * time.  (Note: it is unlikely but not impossible for
    2780              :                  * TransactionIdDidCommit to fail, so we can't really avoid
    2781              :                  * this advance-then-back-up behavior when dealing with an
    2782              :                  * uncommitted message.)
    2783              :                  *
    2784              :                  * Note that we must test XidInMVCCSnapshot before we test
    2785              :                  * TransactionIdDidCommit, else we might return a message from
    2786              :                  * a transaction that is not yet visible to snapshots; compare
    2787              :                  * the comments at the head of heapam_visibility.c.
    2788              :                  *
    2789              :                  * Also, while our own xact won't be listed in the snapshot,
    2790              :                  * we need not check for TransactionIdIsCurrentTransactionId
    2791              :                  * because our transaction cannot (yet) have queued any
    2792              :                  * messages.
    2793              :                  */
    2794            0 :                 *current = thisentry;
    2795            0 :                 reachedStop = true;
    2796            0 :                 break;
    2797              :             }
    2798              : 
    2799              :             /*
    2800              :              * Quick check for the case that we're not listening on any
    2801              :              * channels, before calling TransactionIdDidCommit().  This makes
    2802              :              * that case a little faster, but more importantly, it ensures
    2803              :              * that if there's a bad entry in the queue for which
    2804              :              * TransactionIdDidCommit() fails for some reason, we can skip
    2805              :              * over it on the first LISTEN in a session, and not get stuck on
    2806              :              * it indefinitely.  (This is a little trickier than it looks: it
    2807              :              * works because BecomeRegisteredListener runs this code before we
    2808              :              * have made the first entry in localChannelTable.)
    2809              :              */
    2810         1367 :             if (LocalChannelTableIsEmpty())
    2811         1314 :                 continue;
    2812              : 
    2813           53 :             if (TransactionIdDidCommit(qe->xid))
    2814              :             {
    2815           53 :                 memcpy(local_buf_end, qe, qe->length);
    2816           53 :                 local_buf_end += qe->length;
    2817              :             }
    2818              :             else
    2819              :             {
    2820              :                 /*
    2821              :                  * The source transaction aborted or crashed, so we just
    2822              :                  * ignore its notifications.
    2823              :                  */
    2824              :             }
    2825              :         }
    2826              : 
    2827              :         /* Loop back if we're not at end of page */
    2828         1367 :     } while (!reachedEndOfPage);
    2829              : 
    2830              :     /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
    2831           50 :     LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
    2832              : 
    2833              :     /*
    2834              :      * Now that we have let go of the SLRU bank lock, send the notifications
    2835              :      * to our backend
    2836              :      */
    2837              :     Assert(local_buf_end - local_buf <= BLCKSZ);
    2838          103 :     for (char *p = local_buf; p < local_buf_end;)
    2839              :     {
    2840           53 :         AsyncQueueEntry *qe = (AsyncQueueEntry *) p;
    2841              : 
    2842              :         /* qe->data is the null-terminated channel name */
    2843           53 :         char       *channel = qe->data;
    2844              : 
    2845           53 :         if (IsListeningOn(channel))
    2846              :         {
    2847              :             /* payload follows channel name */
    2848           53 :             char       *payload = qe->data + strlen(channel) + 1;
    2849              : 
    2850           53 :             NotifyMyFrontEnd(channel, payload, qe->srcPid);
    2851              :         }
    2852              : 
    2853           53 :         p += qe->length;
    2854              :     }
    2855              : 
    2856           50 :     if (QUEUE_POS_EQUAL(*current, stop))
    2857           47 :         reachedStop = true;
    2858              : 
    2859           50 :     return reachedStop;
    2860              : }
    2861              : 
    2862              : /*
    2863              :  * Advance the shared queue tail variable to the minimum of all the
    2864              :  * per-backend tail pointers.  Truncate pg_notify space if possible.
    2865              :  *
    2866              :  * This is (usually) called during CommitTransaction(), so it's important for
    2867              :  * it to have very low probability of failure.
    2868              :  */
    2869              : static void
    2870           14 : asyncQueueAdvanceTail(void)
    2871              : {
    2872              :     QueuePosition min;
    2873              :     int64       oldtailpage;
    2874              :     int64       newtailpage;
    2875              :     int64       boundary;
    2876              : 
    2877              :     /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
    2878           14 :     LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
    2879              : 
    2880              :     /*
    2881              :      * Compute the new tail.  Pre-v13, it's essential that QUEUE_TAIL be exact
    2882              :      * (ie, exactly match at least one backend's queue position), so it must
    2883              :      * be updated atomically with the actual computation.  Since v13, we could
    2884              :      * get away with not doing it like that, but it seems prudent to keep it
    2885              :      * so.
    2886              :      *
    2887              :      * Also, because incoming backends will scan forward from QUEUE_TAIL, that
    2888              :      * must be advanced before we can truncate any data.  Thus, QUEUE_TAIL is
    2889              :      * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
    2890              :      * un-truncated page.  When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
    2891              :      * there are pages we can truncate but haven't yet finished doing so.
    2892              :      *
    2893              :      * For concurrency's sake, we don't want to hold NotifyQueueLock while
    2894              :      * performing SimpleLruTruncate.  This is OK because no backend will try
    2895              :      * to access the pages we are in the midst of truncating.
    2896              :      */
    2897           14 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    2898           14 :     min = QUEUE_HEAD;
    2899           24 :     for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    2900              :     {
    2901              :         Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
    2902           10 :         min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
    2903              :     }
    2904           14 :     QUEUE_TAIL = min;
    2905           14 :     oldtailpage = QUEUE_STOP_PAGE;
    2906           14 :     LWLockRelease(NotifyQueueLock);
    2907              : 
    2908              :     /*
    2909              :      * We can truncate something if the global tail advanced across an SLRU
    2910              :      * segment boundary.
    2911              :      *
    2912              :      * XXX it might be better to truncate only once every several segments, to
    2913              :      * reduce the number of directory scans.
    2914              :      */
    2915           14 :     newtailpage = QUEUE_POS_PAGE(min);
    2916           14 :     boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
    2917           14 :     if (asyncQueuePagePrecedes(oldtailpage, boundary))
    2918              :     {
    2919              :         /*
    2920              :          * SimpleLruTruncate() will ask for SLRU bank locks but will also
    2921              :          * release the lock again.
    2922              :          */
    2923            1 :         SimpleLruTruncate(NotifyCtl, newtailpage);
    2924              : 
    2925            1 :         LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    2926            1 :         QUEUE_STOP_PAGE = newtailpage;
    2927            1 :         LWLockRelease(NotifyQueueLock);
    2928              :     }
    2929              : 
    2930           14 :     LWLockRelease(NotifyQueueTailLock);
    2931           14 : }
    2932              : 
    2933              : /*
    2934              :  * AsyncNotifyFreezeXids
    2935              :  *
    2936              :  * Prepare the async notification queue for CLOG truncation by freezing
    2937              :  * transaction IDs that are about to become inaccessible.
    2938              :  *
    2939              :  * This function is called by VACUUM before advancing datfrozenxid. It scans
    2940              :  * the notification queue and replaces XIDs that would become inaccessible
    2941              :  * after CLOG truncation with special markers:
    2942              :  * - Committed transactions are set to FrozenTransactionId
    2943              :  * - Aborted/crashed transactions are set to InvalidTransactionId
    2944              :  *
    2945              :  * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
    2946              :  * pages will be truncated. If XID < newFrozenXid, it cannot still be running
    2947              :  * (or it would have held back newFrozenXid through ProcArray).
    2948              :  * Therefore, if TransactionIdDidCommit returns false, we know the transaction
    2949              :  * either aborted explicitly or crashed, and we can safely mark it invalid.
    2950              :  */
    2951              : void
    2952         1128 : AsyncNotifyFreezeXids(TransactionId newFrozenXid)
    2953              : {
    2954              :     QueuePosition pos;
    2955              :     QueuePosition head;
    2956         1128 :     int64       curpage = -1;
    2957         1128 :     int         slotno = -1;
    2958         1128 :     char       *page_buffer = NULL;
    2959         1128 :     bool        page_dirty = false;
    2960              : 
    2961              :     /*
    2962              :      * Acquire locks in the correct order to avoid deadlocks. As per the
    2963              :      * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
    2964              :      * bank locks.
    2965              :      *
    2966              :      * We only need SHARED mode since we're just reading the head/tail
    2967              :      * positions, not modifying them.
    2968              :      */
    2969         1128 :     LWLockAcquire(NotifyQueueTailLock, LW_SHARED);
    2970         1128 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
    2971              : 
    2972         1128 :     pos = QUEUE_TAIL;
    2973         1128 :     head = QUEUE_HEAD;
    2974              : 
    2975              :     /* Release NotifyQueueLock early, we only needed to read the positions */
    2976         1128 :     LWLockRelease(NotifyQueueLock);
    2977              : 
    2978              :     /*
    2979              :      * Scan the queue from tail to head, freezing XIDs as needed. We hold
    2980              :      * NotifyQueueTailLock throughout to ensure the tail doesn't move while
    2981              :      * we're working.
    2982              :      */
    2983         1158 :     while (!QUEUE_POS_EQUAL(pos, head))
    2984              :     {
    2985              :         AsyncQueueEntry *qe;
    2986              :         TransactionId xid;
    2987           30 :         int64       pageno = QUEUE_POS_PAGE(pos);
    2988           30 :         int         offset = QUEUE_POS_OFFSET(pos);
    2989              : 
    2990              :         /* If we need a different page, release old lock and get new one */
    2991           30 :         if (pageno != curpage)
    2992              :         {
    2993              :             LWLock     *lock;
    2994              : 
    2995              :             /* Release previous page if any */
    2996            3 :             if (slotno >= 0)
    2997              :             {
    2998            0 :                 if (page_dirty)
    2999              :                 {
    3000            0 :                     NotifyCtl->shared->page_dirty[slotno] = true;
    3001            0 :                     page_dirty = false;
    3002              :                 }
    3003            0 :                 LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
    3004              :             }
    3005              : 
    3006            3 :             lock = SimpleLruGetBankLock(NotifyCtl, pageno);
    3007            3 :             LWLockAcquire(lock, LW_EXCLUSIVE);
    3008            3 :             slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &pos);
    3009            3 :             page_buffer = NotifyCtl->shared->page_buffer[slotno];
    3010            3 :             curpage = pageno;
    3011              :         }
    3012              : 
    3013           30 :         qe = (AsyncQueueEntry *) (page_buffer + offset);
    3014           30 :         xid = qe->xid;
    3015              : 
    3016           60 :         if (TransactionIdIsNormal(xid) &&
    3017           30 :             TransactionIdPrecedes(xid, newFrozenXid))
    3018              :         {
    3019           10 :             if (TransactionIdDidCommit(xid))
    3020              :             {
    3021           10 :                 qe->xid = FrozenTransactionId;
    3022           10 :                 page_dirty = true;
    3023              :             }
    3024              :             else
    3025              :             {
    3026            0 :                 qe->xid = InvalidTransactionId;
    3027            0 :                 page_dirty = true;
    3028              :             }
    3029              :         }
    3030              : 
    3031              :         /* Advance to next entry */
    3032           30 :         asyncQueueAdvance(&pos, qe->length);
    3033              :     }
    3034              : 
    3035              :     /* Release final page lock if we acquired one */
    3036         1128 :     if (slotno >= 0)
    3037              :     {
    3038            3 :         if (page_dirty)
    3039            1 :             NotifyCtl->shared->page_dirty[slotno] = true;
    3040            3 :         LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
    3041              :     }
    3042              : 
    3043         1128 :     LWLockRelease(NotifyQueueTailLock);
    3044         1128 : }
    3045              : 
    3046              : /*
    3047              :  * ProcessIncomingNotify
    3048              :  *
    3049              :  *      Scan the queue for arriving notifications and report them to the front
    3050              :  *      end.  The notifications might be from other sessions, or our own;
    3051              :  *      there's no need to distinguish here.
    3052              :  *
    3053              :  *      If "flush" is true, force any frontend messages out immediately.
    3054              :  *
    3055              :  *      NOTE: since we are outside any transaction, we must create our own.
    3056              :  */
    3057              : static void
    3058           29 : ProcessIncomingNotify(bool flush)
    3059              : {
    3060              :     /* We *must* reset the flag */
    3061           29 :     notifyInterruptPending = false;
    3062              : 
    3063              :     /* Do nothing else if we aren't actively listening */
    3064           29 :     if (LocalChannelTableIsEmpty())
    3065            0 :         return;
    3066              : 
    3067           29 :     if (Trace_notify)
    3068            0 :         elog(DEBUG1, "ProcessIncomingNotify");
    3069              : 
    3070           29 :     set_ps_display("notify interrupt");
    3071              : 
    3072              :     /*
    3073              :      * We must run asyncQueueReadAllNotifications inside a transaction, else
    3074              :      * bad things happen if it gets an error.
    3075              :      */
    3076           29 :     StartTransactionCommand();
    3077              : 
    3078           29 :     asyncQueueReadAllNotifications();
    3079              : 
    3080           29 :     CommitTransactionCommand();
    3081              : 
    3082              :     /*
    3083              :      * If this isn't an end-of-command case, we must flush the notify messages
    3084              :      * to ensure frontend gets them promptly.
    3085              :      */
    3086           29 :     if (flush)
    3087            7 :         pq_flush();
    3088              : 
    3089           29 :     set_ps_display("idle");
    3090              : 
    3091           29 :     if (Trace_notify)
    3092            0 :         elog(DEBUG1, "ProcessIncomingNotify: done");
    3093              : }
    3094              : 
    3095              : /*
    3096              :  * Send NOTIFY message to my front end.
    3097              :  */
    3098              : void
    3099           53 : NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
    3100              : {
    3101           53 :     if (whereToSendOutput == DestRemote)
    3102              :     {
    3103              :         StringInfoData buf;
    3104              : 
    3105           53 :         pq_beginmessage(&buf, PqMsg_NotificationResponse);
    3106           53 :         pq_sendint32(&buf, srcPid);
    3107           53 :         pq_sendstring(&buf, channel);
    3108           53 :         pq_sendstring(&buf, payload);
    3109           53 :         pq_endmessage(&buf);
    3110              : 
    3111              :         /*
    3112              :          * NOTE: we do not do pq_flush() here.  Some level of caller will
    3113              :          * handle it later, allowing this message to be combined into a packet
    3114              :          * with other ones.
    3115              :          */
    3116              :     }
    3117              :     else
    3118            0 :         elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
    3119           53 : }
    3120              : 
    3121              : /* Does pendingNotifies include a match for the given event? */
    3122              : static bool
    3123         1066 : AsyncExistsPendingNotify(Notification *n)
    3124              : {
    3125         1066 :     if (pendingNotifies == NULL)
    3126            0 :         return false;
    3127              : 
    3128         1066 :     if (pendingNotifies->hashtab != NULL)
    3129              :     {
    3130              :         /* Use the hash table to probe for a match */
    3131          984 :         if (hash_search(pendingNotifies->hashtab,
    3132              :                         &n,
    3133              :                         HASH_FIND,
    3134              :                         NULL))
    3135            1 :             return true;
    3136              :     }
    3137              :     else
    3138              :     {
    3139              :         /* Must scan the event list */
    3140              :         ListCell   *l;
    3141              : 
    3142          425 :         foreach(l, pendingNotifies->events)
    3143              :         {
    3144          357 :             Notification *oldn = (Notification *) lfirst(l);
    3145              : 
    3146          357 :             if (n->channel_len == oldn->channel_len &&
    3147          357 :                 n->payload_len == oldn->payload_len &&
    3148          190 :                 memcmp(n->data, oldn->data,
    3149          190 :                        n->channel_len + n->payload_len + 2) == 0)
    3150           14 :                 return true;
    3151              :         }
    3152              :     }
    3153              : 
    3154         1051 :     return false;
    3155              : }
    3156              : 
    3157              : /*
    3158              :  * Add a notification event to a pre-existing pendingNotifies list.
    3159              :  *
    3160              :  * Because pendingNotifies->events is already nonempty, this works
    3161              :  * correctly no matter what CurrentMemoryContext is.
    3162              :  */
    3163              : static void
    3164         1051 : AddEventToPendingNotifies(Notification *n)
    3165              : {
    3166              :     Assert(pendingNotifies->events != NIL);
    3167              : 
    3168              :     /* Create the hash tables if it's time to */
    3169         1051 :     if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
    3170          985 :         pendingNotifies->hashtab == NULL)
    3171              :     {
    3172              :         HASHCTL     hash_ctl;
    3173              :         ListCell   *l;
    3174              : 
    3175              :         /* Create the hash table */
    3176            2 :         hash_ctl.keysize = sizeof(Notification *);
    3177            2 :         hash_ctl.entrysize = sizeof(struct NotificationHash);
    3178            2 :         hash_ctl.hash = notification_hash;
    3179            2 :         hash_ctl.match = notification_match;
    3180            2 :         hash_ctl.hcxt = CurTransactionContext;
    3181            4 :         pendingNotifies->hashtab =
    3182            2 :             hash_create("Pending Notifies",
    3183              :                         256L,
    3184              :                         &hash_ctl,
    3185              :                         HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
    3186              : 
    3187              :         /* Create the unique channel name table */
    3188              :         Assert(pendingNotifies->uniqueChannelHash == NULL);
    3189            2 :         hash_ctl.keysize = NAMEDATALEN;
    3190            2 :         hash_ctl.entrysize = sizeof(ChannelName);
    3191            2 :         hash_ctl.hcxt = CurTransactionContext;
    3192            4 :         pendingNotifies->uniqueChannelHash =
    3193            2 :             hash_create("Pending Notify Channel Names",
    3194              :                         64L,
    3195              :                         &hash_ctl,
    3196              :                         HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
    3197              : 
    3198              :         /* Insert all the already-existing events */
    3199           34 :         foreach(l, pendingNotifies->events)
    3200              :         {
    3201           32 :             Notification *oldn = (Notification *) lfirst(l);
    3202           32 :             char       *channel = oldn->data;
    3203              :             bool        found;
    3204              : 
    3205           32 :             (void) hash_search(pendingNotifies->hashtab,
    3206              :                                &oldn,
    3207              :                                HASH_ENTER,
    3208              :                                &found);
    3209              :             Assert(!found);
    3210              : 
    3211              :             /* Add channel name to uniqueChannelHash; might be there already */
    3212           32 :             (void) hash_search(pendingNotifies->uniqueChannelHash,
    3213              :                                channel,
    3214              :                                HASH_ENTER,
    3215              :                                NULL);
    3216              :         }
    3217              :     }
    3218              : 
    3219              :     /* Add new event to the list, in order */
    3220         1051 :     pendingNotifies->events = lappend(pendingNotifies->events, n);
    3221              : 
    3222              :     /* Add event to the hash tables if needed */
    3223         1051 :     if (pendingNotifies->hashtab != NULL)
    3224              :     {
    3225          985 :         char       *channel = n->data;
    3226              :         bool        found;
    3227              : 
    3228          985 :         (void) hash_search(pendingNotifies->hashtab,
    3229              :                            &n,
    3230              :                            HASH_ENTER,
    3231              :                            &found);
    3232              :         Assert(!found);
    3233              : 
    3234              :         /* Add channel name to uniqueChannelHash; might be there already */
    3235          985 :         (void) hash_search(pendingNotifies->uniqueChannelHash,
    3236              :                            channel,
    3237              :                            HASH_ENTER,
    3238              :                            NULL);
    3239              :     }
    3240         1051 : }
    3241              : 
    3242              : /*
    3243              :  * notification_hash: hash function for notification hash table
    3244              :  *
    3245              :  * The hash "keys" are pointers to Notification structs.
    3246              :  */
    3247              : static uint32
    3248         2001 : notification_hash(const void *key, Size keysize)
    3249              : {
    3250         2001 :     const Notification *k = *(const Notification *const *) key;
    3251              : 
    3252              :     Assert(keysize == sizeof(Notification *));
    3253              :     /* We don't bother to include the payload's trailing null in the hash */
    3254         2001 :     return DatumGetUInt32(hash_any((const unsigned char *) k->data,
    3255         2001 :                                    k->channel_len + k->payload_len + 1));
    3256              : }
    3257              : 
    3258              : /*
    3259              :  * notification_match: match function to use with notification_hash
    3260              :  */
    3261              : static int
    3262            1 : notification_match(const void *key1, const void *key2, Size keysize)
    3263              : {
    3264            1 :     const Notification *k1 = *(const Notification *const *) key1;
    3265            1 :     const Notification *k2 = *(const Notification *const *) key2;
    3266              : 
    3267              :     Assert(keysize == sizeof(Notification *));
    3268            1 :     if (k1->channel_len == k2->channel_len &&
    3269            1 :         k1->payload_len == k2->payload_len &&
    3270            1 :         memcmp(k1->data, k2->data,
    3271            1 :                k1->channel_len + k1->payload_len + 2) == 0)
    3272            1 :         return 0;               /* equal */
    3273            0 :     return 1;                   /* not equal */
    3274              : }
    3275              : 
    3276              : /* Clear the pendingActions and pendingNotifies lists. */
    3277              : static void
    3278        35680 : ClearPendingActionsAndNotifies(void)
    3279              : {
    3280              :     /*
    3281              :      * Everything's allocated in either TopTransactionContext or the context
    3282              :      * for the subtransaction to which it corresponds.  So, there's nothing to
    3283              :      * do here except reset the pointers; the space will be reclaimed when the
    3284              :      * contexts are deleted.
    3285              :      */
    3286        35680 :     pendingActions = NULL;
    3287        35680 :     pendingNotifies = NULL;
    3288              :     /* Also clear pendingListenActions, which is derived from pendingActions */
    3289        35680 :     pendingListenActions = NULL;
    3290        35680 : }
    3291              : 
    3292              : /*
    3293              :  * GUC check_hook for notify_buffers
    3294              :  */
    3295              : bool
    3296         1296 : check_notify_buffers(int *newval, void **extra, GucSource source)
    3297              : {
    3298         1296 :     return check_slru_buffers("notify_buffers", newval);
    3299              : }
        

Generated by: LCOV version 2.0-1