LCOV - code coverage report
Current view: top level - src/backend/commands - async.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 748 815 91.8 %
Date: 2026-02-07 07:17:36 Functions: 53 54 98.1 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * async.c
       4             :  *    Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
       5             :  *
       6             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/commands/async.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : /*-------------------------------------------------------------------------
      16             :  * Async Notification Model as of v19:
      17             :  *
      18             :  * 1. Multiple backends on same machine.  Multiple backends may be listening
      19             :  *    on each of several channels.
      20             :  *
      21             :  * 2. There is one central queue in disk-based storage (directory pg_notify/),
      22             :  *    with actively-used pages mapped into shared memory by the slru.c module.
      23             :  *    All notification messages are placed in the queue and later read out
      24             :  *    by listening backends.  The single queue allows us to guarantee that
      25             :  *    notifications are received in commit order.
      26             :  *
      27             :  *    Although there is only one queue, notifications are treated as being
      28             :  *    database-local; this is done by including the sender's database OID
      29             :  *    in each notification message.  Listening backends ignore messages
      30             :  *    that don't match their database OID.  This is important because it
      31             :  *    ensures senders and receivers have the same database encoding and won't
      32             :  *    misinterpret non-ASCII text in the channel name or payload string.
      33             :  *
      34             :  *    Since notifications are not expected to survive database crashes,
      35             :  *    we can simply clean out the pg_notify data at any reboot, and there
      36             :  *    is no need for WAL support or fsync'ing.
      37             :  *
      38             :  * 3. Every backend that is listening on at least one channel registers by
      39             :  *    entering its PID into the array in AsyncQueueControl. It then scans all
      40             :  *    incoming notifications in the central queue and first compares the
      41             :  *    database OID of the notification with its own database OID and then
      42             :  *    compares the notified channel with the list of channels that it listens
      43             :  *    to. In case there is a match it delivers the notification event to its
      44             :  *    frontend.  Non-matching events are simply skipped.
      45             :  *
      46             :  * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
      47             :  *    a backend-local list which will not be processed until transaction end.
      48             :  *
      49             :  *    Duplicate notifications from the same transaction are sent out as one
      50             :  *    notification only. This is done to save work when for example a trigger
      51             :  *    on a 2 million row table fires a notification for each row that has been
      52             :  *    changed. If the application needs to receive every single notification
      53             :  *    that has been sent, it can easily add some unique string into the extra
      54             :  *    payload parameter.
      55             :  *
      56             :  *    When the transaction is ready to commit, PreCommit_Notify() adds the
      57             :  *    pending notifications to the head of the queue. The head pointer of the
      58             :  *    queue always points to the next free position and a position is just a
      59             :  *    page number and the offset in that page. This is done before marking the
      60             :  *    transaction as committed in clog. If we run into problems writing the
      61             :  *    notifications, we can still call elog(ERROR, ...) and the transaction
      62             :  *    will roll back safely.
      63             :  *
      64             :  *    Once we have put all of the notifications into the queue, we return to
      65             :  *    CommitTransaction() which will then do the actual transaction commit.
      66             :  *
      67             :  *    After commit we are called another time (AtCommit_Notify()). Here we
      68             :  *    make any required updates to the effective listen state (see below).
      69             :  *    Then we signal any backends that may be interested in our messages
      70             :  *    (including our own backend, if listening).  This is done by
      71             :  *    SignalBackends(), which sends a PROCSIG_NOTIFY_INTERRUPT signal to
      72             :  *    each relevant backend, as described below.
      73             :  *
      74             :  *    Finally, after we are out of the transaction altogether and about to go
      75             :  *    idle, we scan the queue for messages that need to be sent to our
      76             :  *    frontend (which might be notifies from other backends, or self-notifies
      77             :  *    from our own).  This step is not part of the CommitTransaction sequence
      78             :  *    for two important reasons.  First, we could get errors while sending
      79             :  *    data to our frontend, and it's really bad for errors to happen in
      80             :  *    post-commit cleanup.  Second, in cases where a procedure issues commits
      81             :  *    within a single frontend command, we don't want to send notifies to our
      82             :  *    frontend until the command is done; but notifies to other backends
      83             :  *    should go out immediately after each commit.
      84             :  *
      85             :  * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
      86             :  *    sets the process's latch, which triggers the event to be processed
      87             :  *    immediately if this backend is idle (i.e., it is waiting for a frontend
      88             :  *    command and is not within a transaction block. C.f.
      89             :  *    ProcessClientReadInterrupt()).  Otherwise the handler may only set a
      90             :  *    flag, which will cause the processing to occur just before we next go
      91             :  *    idle.
      92             :  *
      93             :  *    Inbound-notify processing consists of reading all of the notifications
      94             :  *    that have arrived since scanning last time. We read every notification
      95             :  *    until we reach either a notification from an uncommitted transaction or
      96             :  *    the head pointer's position.
      97             :  *
      98             :  * 6. To limit disk space consumption, the tail pointer needs to be advanced
      99             :  *    so that old pages can be truncated. This is relatively expensive
     100             :  *    (notably, it requires an exclusive lock), so we don't want to do it
     101             :  *    often. We make sending backends do this work if they advanced the queue
     102             :  *    head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
     103             :  *
     104             :  * 7. So far we have not discussed how backends change their listening state,
     105             :  *    nor how notification senders know which backends to awaken.  To handle
     106             :  *    the latter, we maintain a global channel table (implemented as a dynamic
     107             :  *    shared hash table, or dshash) that maps channel names to the set of
     108             :  *    backends listening on each channel.  This table is created lazily on the
     109             :  *    first LISTEN command and grows dynamically as needed.  There is also a
     110             :  *    local channel table (a plain dynahash table) in each listening backend,
     111             :  *    tracking which channels that backend is listening to.  The local table
     112             :  *    serves to reduce the number of accesses needed to the shared table.
     113             :  *
     114             :  *    If the current transaction has executed any LISTEN/UNLISTEN actions,
     115             :  *    PreCommit_Notify() prepares to commit those.  For LISTEN, it
     116             :  *    pre-allocates entries in both the per-backend localChannelTable and the
     117             :  *    shared globalChannelTable (with listening=false so that these entries
     118             :  *    are no-ops for the moment).  It also records the final per-channel
     119             :  *    intent in pendingListenActions, so post-commit/abort processing can
     120             :  *    apply that in a single step.  Since all these allocations happen before
     121             :  *    committing to clog, we can safely abort the transaction on failure.
     122             :  *
     123             :  *    After commit, AtCommit_Notify() runs through pendingListenActions and
     124             :  *    updates the backend's per-channel listening flags to activate or
     125             :  *    deactivate listening.  This happens before sending signals.
     126             :  *
     127             :  *    SignalBackends() consults the shared global channel table to identify
     128             :  *    listeners for the channels that the current transaction sent
     129             :  *    notification(s) to.  Each selected backend is marked as having a wakeup
     130             :  *    pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
     131             :  *    signal is sent to it.
     132             :  *
     133             :  * 8. While writing notifications, PreCommit_Notify() records the queue head
     134             :  *    position both before and after the write.  Because all writers serialize
     135             :  *    on a cluster-wide heavyweight lock, no other backend can insert entries
     136             :  *    between these two points.  SignalBackends() uses this fact to directly
     137             :  *    advance the queue pointer for any backend that is still positioned at
     138             :  *    the old head, or within the range written, but is not interested in any
     139             :  *    of our notifications.  This avoids unnecessary wakeups for idle
     140             :  *    listeners that have nothing to read.  Backends that are not interested
     141             :  *    in our notifications, but cannot be directly advanced, are signaled only
     142             :  *    if they are far behind the current queue head; that is to ensure that
     143             :  *    we can advance the queue tail without undue delay.
     144             :  *
     145             :  * An application that listens on the same channel it notifies will get
     146             :  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
     147             :  * by comparing be_pid in the NOTIFY message to the application's own backend's
     148             :  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
     149             :  * frontend during startup.)  The above design guarantees that notifies from
     150             :  * other backends will never be missed by ignoring self-notifies.
     151             :  *
     152             :  * The amount of shared memory used for notify management (notify_buffers)
     153             :  * can be varied without affecting anything but performance.  The maximum
     154             :  * amount of notification data that can be queued at one time is determined
     155             :  * by the max_notify_queue_pages GUC.
     156             :  *-------------------------------------------------------------------------
     157             :  */
     158             : 
     159             : #include "postgres.h"
     160             : 
     161             : #include <limits.h>
     162             : #include <unistd.h>
     163             : #include <signal.h>
     164             : 
     165             : #include "access/parallel.h"
     166             : #include "access/slru.h"
     167             : #include "access/transam.h"
     168             : #include "access/xact.h"
     169             : #include "catalog/pg_database.h"
     170             : #include "commands/async.h"
     171             : #include "common/hashfn.h"
     172             : #include "funcapi.h"
     173             : #include "lib/dshash.h"
     174             : #include "libpq/libpq.h"
     175             : #include "libpq/pqformat.h"
     176             : #include "miscadmin.h"
     177             : #include "storage/dsm_registry.h"
     178             : #include "storage/ipc.h"
     179             : #include "storage/lmgr.h"
     180             : #include "storage/procsignal.h"
     181             : #include "tcop/tcopprot.h"
     182             : #include "utils/builtins.h"
     183             : #include "utils/dsa.h"
     184             : #include "utils/guc_hooks.h"
     185             : #include "utils/memutils.h"
     186             : #include "utils/ps_status.h"
     187             : #include "utils/snapmgr.h"
     188             : #include "utils/timestamp.h"
     189             : 
     190             : 
     191             : /*
     192             :  * Maximum size of a NOTIFY payload, including terminating NULL.  This
     193             :  * must be kept small enough so that a notification message fits on one
     194             :  * SLRU page.  The magic fudge factor here is noncritical as long as it's
     195             :  * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
     196             :  * than that, so changes in that data structure won't affect user-visible
     197             :  * restrictions.
     198             :  */
     199             : #define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
     200             : 
     201             : /*
     202             :  * Struct representing an entry in the global notify queue
     203             :  *
     204             :  * This struct declaration has the maximal length, but in a real queue entry
     205             :  * the data area is only big enough for the actual channel and payload strings
     206             :  * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
     207             :  * entry size, if both channel and payload strings are empty (but note it
     208             :  * doesn't include alignment padding).
     209             :  *
     210             :  * The "length" field should always be rounded up to the next QUEUEALIGN
     211             :  * multiple so that all fields are properly aligned.
     212             :  */
     213             : typedef struct AsyncQueueEntry
     214             : {
     215             :     int         length;         /* total allocated length of entry */
     216             :     Oid         dboid;          /* sender's database OID */
     217             :     TransactionId xid;          /* sender's XID */
     218             :     int32       srcPid;         /* sender's PID */
     219             :     char        data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
     220             : } AsyncQueueEntry;
     221             : 
     222             : /* Currently, no field of AsyncQueueEntry requires more than int alignment */
     223             : #define QUEUEALIGN(len)     INTALIGN(len)
     224             : 
     225             : #define AsyncQueueEntryEmptySize    (offsetof(AsyncQueueEntry, data) + 2)
     226             : 
     227             : /*
     228             :  * Struct describing a queue position, and assorted macros for working with it
     229             :  */
     230             : typedef struct QueuePosition
     231             : {
     232             :     int64       page;           /* SLRU page number */
     233             :     int         offset;         /* byte offset within page */
     234             : } QueuePosition;
     235             : 
     236             : #define QUEUE_POS_PAGE(x)       ((x).page)
     237             : #define QUEUE_POS_OFFSET(x)     ((x).offset)
     238             : 
     239             : #define SET_QUEUE_POS(x,y,z) \
     240             :     do { \
     241             :         (x).page = (y); \
     242             :         (x).offset = (z); \
     243             :     } while (0)
     244             : 
     245             : #define QUEUE_POS_EQUAL(x,y) \
     246             :     ((x).page == (y).page && (x).offset == (y).offset)
     247             : 
     248             : #define QUEUE_POS_IS_ZERO(x) \
     249             :     ((x).page == 0 && (x).offset == 0)
     250             : 
     251             : /* choose logically smaller QueuePosition */
     252             : #define QUEUE_POS_MIN(x,y) \
     253             :     (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
     254             :      (x).page != (y).page ? (y) : \
     255             :      (x).offset < (y).offset ? (x) : (y))
     256             : 
     257             : /* choose logically larger QueuePosition */
     258             : #define QUEUE_POS_MAX(x,y) \
     259             :     (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
     260             :      (x).page != (y).page ? (x) : \
     261             :      (x).offset > (y).offset ? (x) : (y))
     262             : 
     263             : /* returns true if x comes before y in queue order */
     264             : #define QUEUE_POS_PRECEDES(x,y) \
     265             :     (asyncQueuePagePrecedes((x).page, (y).page) || \
     266             :      ((x).page == (y).page && (x).offset < (y).offset))
     267             : 
     268             : /*
     269             :  * Parameter determining how often we try to advance the tail pointer:
     270             :  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
     271             :  * also the distance by which a backend that's not interested in our
     272             :  * notifications needs to be behind before we'll decide we need to wake it
     273             :  * up so it can advance its pointer.
     274             :  *
     275             :  * Resist the temptation to make this really large.  While that would save
     276             :  * work in some places, it would add cost in others.  In particular, this
     277             :  * should likely be less than notify_buffers, to ensure that backends
     278             :  * catch up before the pages they'll need to read fall out of SLRU cache.
     279             :  */
     280             : #define QUEUE_CLEANUP_DELAY 4
     281             : 
     282             : /*
     283             :  * Struct describing a listening backend's status
     284             :  */
     285             : typedef struct QueueBackendStatus
     286             : {
     287             :     int32       pid;            /* either a PID or InvalidPid */
     288             :     Oid         dboid;          /* backend's database OID, or InvalidOid */
     289             :     ProcNumber  nextListener;   /* id of next listener, or INVALID_PROC_NUMBER */
     290             :     QueuePosition pos;          /* backend has read queue up to here */
     291             :     bool        wakeupPending;  /* signal sent to backend, not yet processed */
     292             :     bool        isAdvancing;    /* backend is advancing its position */
     293             : } QueueBackendStatus;
     294             : 
     295             : /*
     296             :  * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
     297             :  *
     298             :  * The AsyncQueueControl structure is protected by the NotifyQueueLock and
     299             :  * NotifyQueueTailLock.
     300             :  *
     301             :  * When holding NotifyQueueLock in SHARED mode, backends may only inspect
     302             :  * their own entries as well as the head and tail pointers. Consequently we
     303             :  * can allow a backend to update its own record while holding only SHARED lock
     304             :  * (since no other backend will inspect it).
     305             :  *
     306             :  * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
     307             :  * entries of other backends and also change the head pointer. They can
     308             :  * also advance other backends' queue positions, unless the other backend
     309             :  * has isAdvancing set (i.e., is in process of doing that itself).
     310             :  *
     311             :  * When holding both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE
     312             :  * mode, backends can change the tail pointers.
     313             :  *
     314             :  * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
     315             :  * the control lock for the pg_notify SLRU buffers.
     316             :  * In order to avoid deadlocks, whenever we need multiple locks, we first get
     317             :  * NotifyQueueTailLock, then NotifyQueueLock, then SLRU bank lock, and lastly
     318             :  * globalChannelTable partition locks.
     319             :  *
     320             :  * Each backend uses the backend[] array entry with index equal to its
     321             :  * ProcNumber.  We rely on this to make SendProcSignal fast.
     322             :  *
     323             :  * The backend[] array entries for actively-listening backends are threaded
     324             :  * together using firstListener and the nextListener links, so that we can
     325             :  * scan them without having to iterate over inactive entries.  We keep this
     326             :  * list in order by ProcNumber so that the scan is cache-friendly when there
     327             :  * are many active entries.
     328             :  */
     329             : typedef struct AsyncQueueControl
     330             : {
     331             :     QueuePosition head;         /* head points to the next free location */
     332             :     QueuePosition tail;         /* tail must be <= the queue position of every
     333             :                                  * listening backend */
     334             :     int64       stopPage;       /* oldest unrecycled page; must be <=
     335             :                                  * tail.page */
     336             :     ProcNumber  firstListener;  /* id of first listener, or
     337             :                                  * INVALID_PROC_NUMBER */
     338             :     TimestampTz lastQueueFillWarn;  /* time of last queue-full msg */
     339             :     dsa_handle  globalChannelTableDSA;  /* global channel table's DSA handle */
     340             :     dshash_table_handle globalChannelTableDSH;  /* and its dshash handle */
     341             :     /* Array with room for MaxBackends entries: */
     342             :     QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
     343             : } AsyncQueueControl;
     344             : 
     345             : static AsyncQueueControl *asyncQueueControl;
     346             : 
     347             : #define QUEUE_HEAD                  (asyncQueueControl->head)
     348             : #define QUEUE_TAIL                  (asyncQueueControl->tail)
     349             : #define QUEUE_STOP_PAGE             (asyncQueueControl->stopPage)
     350             : #define QUEUE_FIRST_LISTENER        (asyncQueueControl->firstListener)
     351             : #define QUEUE_BACKEND_PID(i)        (asyncQueueControl->backend[i].pid)
     352             : #define QUEUE_BACKEND_DBOID(i)      (asyncQueueControl->backend[i].dboid)
     353             : #define QUEUE_NEXT_LISTENER(i)      (asyncQueueControl->backend[i].nextListener)
     354             : #define QUEUE_BACKEND_POS(i)        (asyncQueueControl->backend[i].pos)
     355             : #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
     356             : #define QUEUE_BACKEND_IS_ADVANCING(i)   (asyncQueueControl->backend[i].isAdvancing)
     357             : 
     358             : /*
     359             :  * The SLRU buffer area through which we access the notification queue
     360             :  */
     361             : static SlruCtlData NotifyCtlData;
     362             : 
     363             : #define NotifyCtl                   (&NotifyCtlData)
     364             : #define QUEUE_PAGESIZE              BLCKSZ
     365             : 
     366             : #define QUEUE_FULL_WARN_INTERVAL    5000    /* warn at most once every 5s */
     367             : 
     368             : /*
     369             :  * Global channel table definitions
     370             :  *
     371             :  * This hash table maps (database OID, channel name) keys to arrays of
     372             :  * ProcNumbers representing the backends listening or about to listen
     373             :  * on each channel.  The "listening" flags allow us to create hash table
     374             :  * entries pre-commit and not have to assume that creating them post-commit
     375             :  * will succeed.
     376             :  */
     377             : #define INITIAL_LISTENERS_ARRAY_SIZE 4
     378             : 
     379             : typedef struct GlobalChannelKey
     380             : {
     381             :     Oid         dboid;
     382             :     char        channel[NAMEDATALEN];
     383             : } GlobalChannelKey;
     384             : 
     385             : typedef struct ListenerEntry
     386             : {
     387             :     ProcNumber  procNo;         /* listener's ProcNumber */
     388             :     bool        listening;      /* true if committed listener */
     389             : } ListenerEntry;
     390             : 
     391             : typedef struct GlobalChannelEntry
     392             : {
     393             :     GlobalChannelKey key;       /* hash key */
     394             :     dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */
     395             :     int         numListeners;   /* Number of listeners currently stored */
     396             :     int         allocatedListeners; /* Allocated size of array */
     397             : } GlobalChannelEntry;
     398             : 
     399             : static dshash_table *globalChannelTable = NULL;
     400             : static dsa_area *globalChannelDSA = NULL;
     401             : 
     402             : /*
     403             :  * localChannelTable caches the channel names this backend is listening on
     404             :  * (including those we have staged to be listened on, but not yet committed).
     405             :  * Used by IsListeningOn() for fast lookups when reading notifications.
     406             :  */
     407             : static HTAB *localChannelTable = NULL;
     408             : 
     409             : /* We test this condition to detect that we're not listening at all */
     410             : #define LocalChannelTableIsEmpty() \
     411             :     (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
     412             : 
     413             : /*
     414             :  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
     415             :  * all actions requested in the current transaction.  As explained above,
     416             :  * we don't actually change listen state until we reach transaction commit.
     417             :  *
     418             :  * The list is kept in CurTransactionContext.  In subtransactions, each
     419             :  * subtransaction has its own list in its own CurTransactionContext, but
     420             :  * successful subtransactions attach their lists to their parent's list.
     421             :  * Failed subtransactions simply discard their lists.
     422             :  */
     423             : typedef enum
     424             : {
     425             :     LISTEN_LISTEN,
     426             :     LISTEN_UNLISTEN,
     427             :     LISTEN_UNLISTEN_ALL,
     428             : } ListenActionKind;
     429             : 
     430             : typedef struct
     431             : {
     432             :     ListenActionKind action;
     433             :     char        channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
     434             : } ListenAction;
     435             : 
     436             : typedef struct ActionList
     437             : {
     438             :     int         nestingLevel;   /* current transaction nesting depth */
     439             :     List       *actions;        /* list of ListenAction structs */
     440             :     struct ActionList *upper;   /* details for upper transaction levels */
     441             : } ActionList;
     442             : 
     443             : static ActionList *pendingActions = NULL;
     444             : 
     445             : /*
     446             :  * Hash table recording the final listen/unlisten intent per channel for
     447             :  * the current transaction.  Key is channel name, value is PENDING_LISTEN or
     448             :  * PENDING_UNLISTEN.  This keeps critical commit/abort processing to one step
     449             :  * per channel instead of replaying every action.  This is built from the
     450             :  * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
     451             :  * AtAbort_Notify.
     452             :  */
     453             : typedef enum
     454             : {
     455             :     PENDING_LISTEN,
     456             :     PENDING_UNLISTEN,
     457             : } PendingListenAction;
     458             : 
     459             : typedef struct PendingListenEntry
     460             : {
     461             :     char        channel[NAMEDATALEN];   /* hash key */
     462             :     PendingListenAction action; /* which action should we perform? */
     463             : } PendingListenEntry;
     464             : 
     465             : static HTAB *pendingListenActions = NULL;
     466             : 
     467             : /*
     468             :  * State for outbound notifies consists of a list of all channels+payloads
     469             :  * NOTIFYed in the current transaction.  We do not actually perform a NOTIFY
     470             :  * until and unless the transaction commits.  pendingNotifies is NULL if no
     471             :  * NOTIFYs have been done in the current (sub) transaction.
     472             :  *
     473             :  * We discard duplicate notify events issued in the same transaction.
     474             :  * Hence, in addition to the list proper (which we need to track the order
     475             :  * of the events, since we guarantee to deliver them in order), we build a
     476             :  * hash table which we can probe to detect duplicates.  Since building the
     477             :  * hash table is somewhat expensive, we do so only once we have at least
     478             :  * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
     479             :  * before that we just scan the events linearly.
     480             :  *
     481             :  * The list is kept in CurTransactionContext.  In subtransactions, each
     482             :  * subtransaction has its own list in its own CurTransactionContext, but
     483             :  * successful subtransactions add their entries to their parent's list.
     484             :  * Failed subtransactions simply discard their lists.  Since these lists
     485             :  * are independent, there may be notify events in a subtransaction's list
     486             :  * that duplicate events in some ancestor (sub) transaction; we get rid of
     487             :  * the dups when merging the subtransaction's list into its parent's.
     488             :  *
     489             :  * Note: the action and notify lists do not interact within a transaction.
     490             :  * In particular, if a transaction does NOTIFY and then LISTEN on the same
     491             :  * condition name, it will get a self-notify at commit.  This is a bit odd
     492             :  * but is consistent with our historical behavior.
     493             :  */
     494             : typedef struct Notification
     495             : {
     496             :     uint16      channel_len;    /* length of channel-name string */
     497             :     uint16      payload_len;    /* length of payload string */
     498             :     /* null-terminated channel name, then null-terminated payload follow */
     499             :     char        data[FLEXIBLE_ARRAY_MEMBER];
     500             : } Notification;
     501             : 
     502             : typedef struct NotificationList
     503             : {
     504             :     int         nestingLevel;   /* current transaction nesting depth */
     505             :     List       *events;         /* list of Notification structs */
     506             :     HTAB       *hashtab;        /* hash of NotificationHash structs, or NULL */
     507             :     List       *uniqueChannelNames; /* unique channel names being notified */
     508             :     HTAB       *uniqueChannelHash;  /* hash of unique channel names, or NULL */
     509             :     struct NotificationList *upper; /* details for upper transaction levels */
     510             : } NotificationList;
     511             : 
     512             : #define MIN_HASHABLE_NOTIFIES 16    /* threshold to build hashtab */
     513             : 
     514             : struct NotificationHash
     515             : {
     516             :     Notification *event;        /* => the actual Notification struct */
     517             : };
     518             : 
     519             : static NotificationList *pendingNotifies = NULL;
     520             : 
     521             : /*
     522             :  * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
     523             :  * (both just carry the channel name, with no payload).
     524             :  */
     525             : typedef struct ChannelName
     526             : {
     527             :     char        channel[NAMEDATALEN];   /* hash key */
     528             : } ChannelName;
     529             : 
     530             : /*
     531             :  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
     532             :  * called from inside a signal handler. That just sets the
     533             :  * notifyInterruptPending flag and sets the process
     534             :  * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
     535             :  * actually deal with the interrupt.
     536             :  */
     537             : volatile sig_atomic_t notifyInterruptPending = false;
     538             : 
     539             : /* True if we've registered an on_shmem_exit cleanup */
     540             : static bool unlistenExitRegistered = false;
     541             : 
     542             : /* True if we're currently registered as a listener in asyncQueueControl */
     543             : static bool amRegisteredListener = false;
     544             : 
     545             : /*
     546             :  * Queue head positions for direct advancement.
     547             :  * These are captured during PreCommit_Notify while holding the heavyweight
     548             :  * lock on database 0, ensuring no other backend can insert notifications
     549             :  * between them.  SignalBackends uses these to advance idle backends.
     550             :  */
     551             : static QueuePosition queueHeadBeforeWrite;
     552             : static QueuePosition queueHeadAfterWrite;
     553             : 
     554             : /*
     555             :  * Workspace arrays for SignalBackends.  These are preallocated in
     556             :  * PreCommit_Notify to avoid needing memory allocation after committing to
     557             :  * clog.
     558             :  */
     559             : static int32 *signalPids = NULL;
     560             : static ProcNumber *signalProcnos = NULL;
     561             : 
     562             : /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
     563             : static bool tryAdvanceTail = false;
     564             : 
     565             : /* GUC parameters */
     566             : bool        Trace_notify = false;
     567             : 
     568             : /* For 8 KB pages this gives 8 GB of disk space */
     569             : int         max_notify_queue_pages = 1048576;
     570             : 
     571             : /* local function prototypes */
     572             : static inline int64 asyncQueuePageDiff(int64 p, int64 q);
     573             : static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
     574             : static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
     575             :                                         const char *channel);
     576             : static dshash_hash globalChannelTableHash(const void *key, size_t size,
     577             :                                           void *arg);
     578             : static void initGlobalChannelTable(void);
     579             : static void initLocalChannelTable(void);
     580             : static void queue_listen(ListenActionKind action, const char *channel);
     581             : static void Async_UnlistenOnExit(int code, Datum arg);
     582             : static void BecomeRegisteredListener(void);
     583             : static void PrepareTableEntriesForListen(const char *channel);
     584             : static void PrepareTableEntriesForUnlisten(const char *channel);
     585             : static void PrepareTableEntriesForUnlistenAll(void);
     586             : static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
     587             :                                       ListenerEntry *listeners,
     588             :                                       int idx);
     589             : static void ApplyPendingListenActions(bool isCommit);
     590             : static void CleanupListenersOnExit(void);
     591             : static bool IsListeningOn(const char *channel);
     592             : static void asyncQueueUnregister(void);
     593             : static bool asyncQueueIsFull(void);
     594             : static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
     595             : static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
     596             : static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
     597             : static double asyncQueueUsage(void);
     598             : static void asyncQueueFillWarning(void);
     599             : static void SignalBackends(void);
     600             : static void asyncQueueReadAllNotifications(void);
     601             : static bool asyncQueueProcessPageEntries(QueuePosition *current,
     602             :                                          QueuePosition stop,
     603             :                                          Snapshot snapshot);
     604             : static void asyncQueueAdvanceTail(void);
     605             : static void ProcessIncomingNotify(bool flush);
     606             : static bool AsyncExistsPendingNotify(Notification *n);
     607             : static void AddEventToPendingNotifies(Notification *n);
     608             : static uint32 notification_hash(const void *key, Size keysize);
     609             : static int  notification_match(const void *key1, const void *key2, Size keysize);
     610             : static void ClearPendingActionsAndNotifies(void);
     611             : 
     612             : /*
     613             :  * Compute the difference between two queue page numbers.
     614             :  * Previously this function accounted for a wraparound.
     615             :  */
     616             : static inline int64
     617           0 : asyncQueuePageDiff(int64 p, int64 q)
     618             : {
     619           0 :     return p - q;
     620             : }
     621             : 
     622             : /*
     623             :  * Determines whether p precedes q.
     624             :  * Previously this function accounted for a wraparound.
     625             :  */
     626             : static inline bool
     627         288 : asyncQueuePagePrecedes(int64 p, int64 q)
     628             : {
     629         288 :     return p < q;
     630             : }
     631             : 
     632             : /*
     633             :  * GlobalChannelKeyInit
     634             :  *      Prepare a global channel table key for hashing.
     635             :  */
     636             : static inline void
     637         432 : GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
     638             : {
     639         432 :     memset(key, 0, sizeof(GlobalChannelKey));
     640         432 :     key->dboid = dboid;
     641         432 :     strlcpy(key->channel, channel, NAMEDATALEN);
     642         432 : }
     643             : 
     644             : /*
     645             :  * globalChannelTableHash
     646             :  *      Hash function for global channel table keys.
     647             :  */
     648             : static dshash_hash
     649         432 : globalChannelTableHash(const void *key, size_t size, void *arg)
     650             : {
     651         432 :     const GlobalChannelKey *k = (const GlobalChannelKey *) key;
     652             :     dshash_hash h;
     653             : 
     654         432 :     h = DatumGetUInt32(hash_uint32(k->dboid));
     655         432 :     h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
     656         432 :                                  strnlen(k->channel, NAMEDATALEN)));
     657             : 
     658         432 :     return h;
     659             : }
     660             : 
     661             : /* parameters for the global channel table */
     662             : static const dshash_parameters globalChannelTableDSHParams = {
     663             :     sizeof(GlobalChannelKey),
     664             :     sizeof(GlobalChannelEntry),
     665             :     dshash_memcmp,
     666             :     globalChannelTableHash,
     667             :     dshash_memcpy,
     668             :     LWTRANCHE_NOTIFY_CHANNEL_HASH
     669             : };
     670             : 
     671             : /*
     672             :  * initGlobalChannelTable
     673             :  *      Lazy initialization of the global channel table.
     674             :  */
     675             : static void
     676         312 : initGlobalChannelTable(void)
     677             : {
     678             :     MemoryContext oldcontext;
     679             : 
     680             :     /* Quick exit if we already did this */
     681         312 :     if (asyncQueueControl->globalChannelTableDSH != DSHASH_HANDLE_INVALID &&
     682         298 :         globalChannelTable != NULL)
     683         254 :         return;
     684             : 
     685             :     /* Otherwise, use a lock to ensure only one process creates the table */
     686          58 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
     687             : 
     688             :     /* Be sure any local memory allocated by DSA routines is persistent */
     689          58 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
     690             : 
     691          58 :     if (asyncQueueControl->globalChannelTableDSH == DSHASH_HANDLE_INVALID)
     692             :     {
     693             :         /* Initialize dynamic shared hash table for global channels */
     694          14 :         globalChannelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH);
     695          14 :         dsa_pin(globalChannelDSA);
     696          14 :         dsa_pin_mapping(globalChannelDSA);
     697          14 :         globalChannelTable = dshash_create(globalChannelDSA,
     698             :                                            &globalChannelTableDSHParams,
     699             :                                            NULL);
     700             : 
     701             :         /* Store handles in shared memory for other backends to use */
     702          14 :         asyncQueueControl->globalChannelTableDSA = dsa_get_handle(globalChannelDSA);
     703          14 :         asyncQueueControl->globalChannelTableDSH =
     704          14 :             dshash_get_hash_table_handle(globalChannelTable);
     705             :     }
     706          44 :     else if (!globalChannelTable)
     707             :     {
     708             :         /* Attach to existing dynamic shared hash table */
     709          44 :         globalChannelDSA = dsa_attach(asyncQueueControl->globalChannelTableDSA);
     710          44 :         dsa_pin_mapping(globalChannelDSA);
     711          44 :         globalChannelTable = dshash_attach(globalChannelDSA,
     712             :                                            &globalChannelTableDSHParams,
     713          44 :                                            asyncQueueControl->globalChannelTableDSH,
     714             :                                            NULL);
     715             :     }
     716             : 
     717          58 :     MemoryContextSwitchTo(oldcontext);
     718          58 :     LWLockRelease(NotifyQueueLock);
     719             : }
     720             : 
     721             : /*
     722             :  * initLocalChannelTable
     723             :  *      Lazy initialization of the local channel table.
     724             :  *      Once created, this table lasts for the life of the session.
     725             :  */
     726             : static void
     727         182 : initLocalChannelTable(void)
     728             : {
     729             :     HASHCTL     hash_ctl;
     730             : 
     731             :     /* Quick exit if we already did this */
     732         182 :     if (localChannelTable != NULL)
     733         148 :         return;
     734             : 
     735             :     /* Initialize local hash table for this backend's listened channels */
     736          34 :     hash_ctl.keysize = NAMEDATALEN;
     737          34 :     hash_ctl.entrysize = sizeof(ChannelName);
     738             : 
     739          34 :     localChannelTable =
     740          34 :         hash_create("Local Listen Channels",
     741             :                     64,
     742             :                     &hash_ctl,
     743             :                     HASH_ELEM | HASH_STRINGS);
     744             : }
     745             : 
     746             : /*
     747             :  * initPendingListenActions
     748             :  *      Lazy initialization of the pending listen actions hash table.
     749             :  *      This is allocated in CurTransactionContext during PreCommit_Notify,
     750             :  *      and destroyed at transaction end.
     751             :  */
     752             : static void
     753         182 : initPendingListenActions(void)
     754             : {
     755             :     HASHCTL     hash_ctl;
     756             : 
     757         182 :     if (pendingListenActions != NULL)
     758           0 :         return;
     759             : 
     760         182 :     hash_ctl.keysize = NAMEDATALEN;
     761         182 :     hash_ctl.entrysize = sizeof(PendingListenEntry);
     762         182 :     hash_ctl.hcxt = CurTransactionContext;
     763             : 
     764         182 :     pendingListenActions =
     765         182 :         hash_create("Pending Listen Actions",
     766         182 :                     list_length(pendingActions->actions),
     767             :                     &hash_ctl,
     768             :                     HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
     769             : }
     770             : 
     771             : /*
     772             :  * Report space needed for our shared memory area
     773             :  */
     774             : Size
     775        4254 : AsyncShmemSize(void)
     776             : {
     777             :     Size        size;
     778             : 
     779             :     /* This had better match AsyncShmemInit */
     780        4254 :     size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
     781        4254 :     size = add_size(size, offsetof(AsyncQueueControl, backend));
     782             : 
     783        4254 :     size = add_size(size, SimpleLruShmemSize(notify_buffers, 0));
     784             : 
     785        4254 :     return size;
     786             : }
     787             : 
     788             : /*
     789             :  * Initialize our shared memory area
     790             :  */
     791             : void
     792        2280 : AsyncShmemInit(void)
     793             : {
     794             :     bool        found;
     795             :     Size        size;
     796             : 
     797             :     /*
     798             :      * Create or attach to the AsyncQueueControl structure.
     799             :      */
     800        2280 :     size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
     801        2280 :     size = add_size(size, offsetof(AsyncQueueControl, backend));
     802             : 
     803        2280 :     asyncQueueControl = (AsyncQueueControl *)
     804        2280 :         ShmemInitStruct("Async Queue Control", size, &found);
     805             : 
     806        2280 :     if (!found)
     807             :     {
     808             :         /* First time through, so initialize it */
     809        2280 :         SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
     810        2280 :         SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
     811        2280 :         QUEUE_STOP_PAGE = 0;
     812        2280 :         QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
     813        2280 :         asyncQueueControl->lastQueueFillWarn = 0;
     814        2280 :         asyncQueueControl->globalChannelTableDSA = DSA_HANDLE_INVALID;
     815        2280 :         asyncQueueControl->globalChannelTableDSH = DSHASH_HANDLE_INVALID;
     816      211480 :         for (int i = 0; i < MaxBackends; i++)
     817             :         {
     818      209200 :             QUEUE_BACKEND_PID(i) = InvalidPid;
     819      209200 :             QUEUE_BACKEND_DBOID(i) = InvalidOid;
     820      209200 :             QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
     821      209200 :             SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
     822      209200 :             QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
     823      209200 :             QUEUE_BACKEND_IS_ADVANCING(i) = false;
     824             :         }
     825             :     }
     826             : 
     827             :     /*
     828             :      * Set up SLRU management of the pg_notify data. Note that long segment
     829             :      * names are used in order to avoid wraparound.
     830             :      */
     831        2280 :     NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
     832        2280 :     SimpleLruInit(NotifyCtl, "notify", notify_buffers, 0,
     833             :                   "pg_notify", LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU,
     834             :                   SYNC_HANDLER_NONE, true);
     835             : 
     836        2280 :     if (!found)
     837             :     {
     838             :         /*
     839             :          * During start or reboot, clean out the pg_notify directory.
     840             :          */
     841        2280 :         (void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
     842             :     }
     843        2280 : }
     844             : 
     845             : 
     846             : /*
     847             :  * pg_notify -
     848             :  *    SQL function to send a notification event
     849             :  */
     850             : Datum
     851        2144 : pg_notify(PG_FUNCTION_ARGS)
     852             : {
     853             :     const char *channel;
     854             :     const char *payload;
     855             : 
     856        2144 :     if (PG_ARGISNULL(0))
     857           6 :         channel = "";
     858             :     else
     859        2138 :         channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
     860             : 
     861        2144 :     if (PG_ARGISNULL(1))
     862          12 :         payload = "";
     863             :     else
     864        2132 :         payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
     865             : 
     866             :     /* For NOTIFY as a statement, this is checked in ProcessUtility */
     867        2144 :     PreventCommandDuringRecovery("NOTIFY");
     868             : 
     869        2144 :     Async_Notify(channel, payload);
     870             : 
     871        2126 :     PG_RETURN_VOID();
     872             : }
     873             : 
     874             : 
     875             : /*
     876             :  * Async_Notify
     877             :  *
     878             :  *      This is executed by the SQL notify command.
     879             :  *
     880             :  *      Adds the message to the list of pending notifies.
     881             :  *      Actual notification happens during transaction commit.
     882             :  *      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     883             :  */
     884             : void
     885        2276 : Async_Notify(const char *channel, const char *payload)
     886             : {
     887        2276 :     int         my_level = GetCurrentTransactionNestLevel();
     888             :     size_t      channel_len;
     889             :     size_t      payload_len;
     890             :     Notification *n;
     891             :     MemoryContext oldcontext;
     892             : 
     893        2276 :     if (IsParallelWorker())
     894           0 :         elog(ERROR, "cannot send notifications from a parallel worker");
     895             : 
     896        2276 :     if (Trace_notify)
     897           0 :         elog(DEBUG1, "Async_Notify(%s)", channel);
     898             : 
     899        2276 :     channel_len = channel ? strlen(channel) : 0;
     900        2276 :     payload_len = payload ? strlen(payload) : 0;
     901             : 
     902             :     /* a channel name must be specified */
     903        2276 :     if (channel_len == 0)
     904          12 :         ereport(ERROR,
     905             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     906             :                  errmsg("channel name cannot be empty")));
     907             : 
     908             :     /* enforce length limits */
     909        2264 :     if (channel_len >= NAMEDATALEN)
     910           6 :         ereport(ERROR,
     911             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     912             :                  errmsg("channel name too long")));
     913             : 
     914        2258 :     if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
     915           0 :         ereport(ERROR,
     916             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     917             :                  errmsg("payload string too long")));
     918             : 
     919             :     /*
     920             :      * We must construct the Notification entry, even if we end up not using
     921             :      * it, in order to compare it cheaply to existing list entries.
     922             :      *
     923             :      * The notification list needs to live until end of transaction, so store
     924             :      * it in the transaction context.
     925             :      */
     926        2258 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     927             : 
     928        2258 :     n = (Notification *) palloc(offsetof(Notification, data) +
     929        2258 :                                 channel_len + payload_len + 2);
     930        2258 :     n->channel_len = channel_len;
     931        2258 :     n->payload_len = payload_len;
     932        2258 :     strcpy(n->data, channel);
     933        2258 :     if (payload)
     934        2230 :         strcpy(n->data + channel_len + 1, payload);
     935             :     else
     936          28 :         n->data[channel_len + 1] = '\0';
     937             : 
     938        2258 :     if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
     939         134 :     {
     940             :         NotificationList *notifies;
     941             : 
     942             :         /*
     943             :          * First notify event in current (sub)xact. Note that we allocate the
     944             :          * NotificationList in TopTransactionContext; the nestingLevel might
     945             :          * get changed later by AtSubCommit_Notify.
     946             :          */
     947             :         notifies = (NotificationList *)
     948         134 :             MemoryContextAlloc(TopTransactionContext,
     949             :                                sizeof(NotificationList));
     950         134 :         notifies->nestingLevel = my_level;
     951         134 :         notifies->events = list_make1(n);
     952             :         /* We certainly don't need a hashtable yet */
     953         134 :         notifies->hashtab = NULL;
     954             :         /* We won't build uniqueChannelNames/Hash till later, either */
     955         134 :         notifies->uniqueChannelNames = NIL;
     956         134 :         notifies->uniqueChannelHash = NULL;
     957         134 :         notifies->upper = pendingNotifies;
     958         134 :         pendingNotifies = notifies;
     959             :     }
     960             :     else
     961             :     {
     962             :         /* Now check for duplicates */
     963        2124 :         if (AsyncExistsPendingNotify(n))
     964             :         {
     965             :             /* It's a dup, so forget it */
     966          26 :             pfree(n);
     967          26 :             MemoryContextSwitchTo(oldcontext);
     968          26 :             return;
     969             :         }
     970             : 
     971             :         /* Append more events to existing list */
     972        2098 :         AddEventToPendingNotifies(n);
     973             :     }
     974             : 
     975        2232 :     MemoryContextSwitchTo(oldcontext);
     976             : }
     977             : 
     978             : /*
     979             :  * queue_listen
     980             :  *      Common code for listen, unlisten, unlisten all commands.
     981             :  *
     982             :  *      Adds the request to the list of pending actions.
     983             :  *      Actual update of localChannelTable and globalChannelTable happens during
     984             :  *      PreCommit_Notify, with staged changes committed in AtCommit_Notify.
     985             :  */
     986             : static void
     987         220 : queue_listen(ListenActionKind action, const char *channel)
     988             : {
     989             :     MemoryContext oldcontext;
     990             :     ListenAction *actrec;
     991         220 :     int         my_level = GetCurrentTransactionNestLevel();
     992             : 
     993             :     /*
     994             :      * Unlike Async_Notify, we don't try to collapse out duplicates here. We
     995             :      * keep the ordered list to preserve interactions like UNLISTEN ALL; the
     996             :      * final per-channel intent is computed during PreCommit_Notify.
     997             :      */
     998         220 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     999             : 
    1000             :     /* space for terminating null is included in sizeof(ListenAction) */
    1001         220 :     actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
    1002         220 :                                      strlen(channel) + 1);
    1003         220 :     actrec->action = action;
    1004         220 :     strcpy(actrec->channel, channel);
    1005             : 
    1006         220 :     if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
    1007         188 :     {
    1008             :         ActionList *actions;
    1009             : 
    1010             :         /*
    1011             :          * First action in current sub(xact). Note that we allocate the
    1012             :          * ActionList in TopTransactionContext; the nestingLevel might get
    1013             :          * changed later by AtSubCommit_Notify.
    1014             :          */
    1015             :         actions = (ActionList *)
    1016         188 :             MemoryContextAlloc(TopTransactionContext, sizeof(ActionList));
    1017         188 :         actions->nestingLevel = my_level;
    1018         188 :         actions->actions = list_make1(actrec);
    1019         188 :         actions->upper = pendingActions;
    1020         188 :         pendingActions = actions;
    1021             :     }
    1022             :     else
    1023          32 :         pendingActions->actions = lappend(pendingActions->actions, actrec);
    1024             : 
    1025         220 :     MemoryContextSwitchTo(oldcontext);
    1026         220 : }
    1027             : 
    1028             : /*
    1029             :  * Async_Listen
    1030             :  *
    1031             :  *      This is executed by the SQL listen command.
    1032             :  */
    1033             : void
    1034         116 : Async_Listen(const char *channel)
    1035             : {
    1036         116 :     if (Trace_notify)
    1037           0 :         elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
    1038             : 
    1039         116 :     queue_listen(LISTEN_LISTEN, channel);
    1040         116 : }
    1041             : 
    1042             : /*
    1043             :  * Async_Unlisten
    1044             :  *
    1045             :  *      This is executed by the SQL unlisten command.
    1046             :  */
    1047             : void
    1048           6 : Async_Unlisten(const char *channel)
    1049             : {
    1050           6 :     if (Trace_notify)
    1051           0 :         elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
    1052             : 
    1053             :     /* If we couldn't possibly be listening, no need to queue anything */
    1054           6 :     if (pendingActions == NULL && !unlistenExitRegistered)
    1055           0 :         return;
    1056             : 
    1057           6 :     queue_listen(LISTEN_UNLISTEN, channel);
    1058             : }
    1059             : 
    1060             : /*
    1061             :  * Async_UnlistenAll
    1062             :  *
    1063             :  *      This is invoked by UNLISTEN * command, and also at backend exit.
    1064             :  */
    1065             : void
    1066         156 : Async_UnlistenAll(void)
    1067             : {
    1068         156 :     if (Trace_notify)
    1069           0 :         elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
    1070             : 
    1071             :     /* If we couldn't possibly be listening, no need to queue anything */
    1072         156 :     if (pendingActions == NULL && !unlistenExitRegistered)
    1073          58 :         return;
    1074             : 
    1075          98 :     queue_listen(LISTEN_UNLISTEN_ALL, "");
    1076             : }
    1077             : 
    1078             : /*
    1079             :  * SQL function: return a set of the channel names this backend is actively
    1080             :  * listening to.
    1081             :  *
    1082             :  * Note: this coding relies on the fact that the localChannelTable cannot
    1083             :  * change within a transaction.
    1084             :  */
    1085             : Datum
    1086          18 : pg_listening_channels(PG_FUNCTION_ARGS)
    1087             : {
    1088             :     FuncCallContext *funcctx;
    1089             :     HASH_SEQ_STATUS *status;
    1090             : 
    1091             :     /* stuff done only on the first call of the function */
    1092          18 :     if (SRF_IS_FIRSTCALL())
    1093             :     {
    1094             :         /* create a function context for cross-call persistence */
    1095          12 :         funcctx = SRF_FIRSTCALL_INIT();
    1096             : 
    1097             :         /* Initialize hash table iteration if we have any channels */
    1098          12 :         if (localChannelTable != NULL)
    1099             :         {
    1100             :             MemoryContext oldcontext;
    1101             : 
    1102          12 :             oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1103          12 :             status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
    1104          12 :             hash_seq_init(status, localChannelTable);
    1105          12 :             funcctx->user_fctx = status;
    1106          12 :             MemoryContextSwitchTo(oldcontext);
    1107             :         }
    1108             :         else
    1109             :         {
    1110           0 :             funcctx->user_fctx = NULL;
    1111             :         }
    1112             :     }
    1113             : 
    1114             :     /* stuff done on every call of the function */
    1115          18 :     funcctx = SRF_PERCALL_SETUP();
    1116          18 :     status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
    1117             : 
    1118          18 :     if (status != NULL)
    1119             :     {
    1120             :         ChannelName *entry;
    1121             : 
    1122          18 :         entry = (ChannelName *) hash_seq_search(status);
    1123          18 :         if (entry != NULL)
    1124           6 :             SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel));
    1125             :     }
    1126             : 
    1127          12 :     SRF_RETURN_DONE(funcctx);
    1128             : }
    1129             : 
    1130             : /*
    1131             :  * Async_UnlistenOnExit
    1132             :  *
    1133             :  * This is executed at backend exit if we have done any LISTENs in this
    1134             :  * backend.  It might not be necessary anymore, if the user UNLISTENed
    1135             :  * everything, but we don't try to detect that case.
    1136             :  */
    1137             : static void
    1138          34 : Async_UnlistenOnExit(int code, Datum arg)
    1139             : {
    1140          34 :     CleanupListenersOnExit();
    1141          34 :     asyncQueueUnregister();
    1142          34 : }
    1143             : 
    1144             : /*
    1145             :  * AtPrepare_Notify
    1146             :  *
    1147             :  *      This is called at the prepare phase of a two-phase
    1148             :  *      transaction.  Save the state for possible commit later.
    1149             :  */
    1150             : void
    1151         618 : AtPrepare_Notify(void)
    1152             : {
    1153             :     /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
    1154         618 :     if (pendingActions || pendingNotifies)
    1155           0 :         ereport(ERROR,
    1156             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1157             :                  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
    1158         618 : }
    1159             : 
    1160             : /*
    1161             :  * PreCommit_Notify
    1162             :  *
    1163             :  *      This is called at transaction commit, before actually committing to
    1164             :  *      clog.
    1165             :  *
    1166             :  *      If there are pending LISTEN actions, make sure we are listed in the
    1167             :  *      shared-memory listener array.  This must happen before commit to
    1168             :  *      ensure we don't miss any notifies from transactions that commit
    1169             :  *      just after ours.
    1170             :  *
    1171             :  *      If there are outbound notify requests in the pendingNotifies list,
    1172             :  *      add them to the global queue.  We do that before commit so that
    1173             :  *      we can still throw error if we run out of queue space.
    1174             :  */
    1175             : void
    1176      901120 : PreCommit_Notify(void)
    1177             : {
    1178             :     ListCell   *p;
    1179             : 
    1180      901120 :     if (!pendingActions && !pendingNotifies)
    1181      900808 :         return;                 /* no relevant statements in this xact */
    1182             : 
    1183         312 :     if (Trace_notify)
    1184           0 :         elog(DEBUG1, "PreCommit_Notify");
    1185             : 
    1186             :     /* Preflight for any pending listen/unlisten actions */
    1187         312 :     initGlobalChannelTable();
    1188             : 
    1189         312 :     if (pendingActions != NULL)
    1190             :     {
    1191             :         /* Ensure we have a local channel table */
    1192         182 :         initLocalChannelTable();
    1193             :         /* Create pendingListenActions hash table for this transaction */
    1194         182 :         initPendingListenActions();
    1195             : 
    1196             :         /* Stage all the actions this transaction wants to perform */
    1197         396 :         foreach(p, pendingActions->actions)
    1198             :         {
    1199         214 :             ListenAction *actrec = (ListenAction *) lfirst(p);
    1200             : 
    1201         214 :             switch (actrec->action)
    1202             :             {
    1203         112 :                 case LISTEN_LISTEN:
    1204         112 :                     BecomeRegisteredListener();
    1205         112 :                     PrepareTableEntriesForListen(actrec->channel);
    1206         112 :                     break;
    1207           6 :                 case LISTEN_UNLISTEN:
    1208           6 :                     PrepareTableEntriesForUnlisten(actrec->channel);
    1209           6 :                     break;
    1210          96 :                 case LISTEN_UNLISTEN_ALL:
    1211          96 :                     PrepareTableEntriesForUnlistenAll();
    1212          96 :                     break;
    1213             :             }
    1214             :         }
    1215             :     }
    1216             : 
    1217             :     /* Queue any pending notifies (must happen after the above) */
    1218         312 :     if (pendingNotifies)
    1219             :     {
    1220             :         ListCell   *nextNotify;
    1221         130 :         bool        firstIteration = true;
    1222             : 
    1223             :         /*
    1224             :          * Build list of unique channel names being notified for use by
    1225             :          * SignalBackends().
    1226             :          *
    1227             :          * If uniqueChannelHash is available, use it to efficiently get the
    1228             :          * unique channels.  Otherwise, fall back to the O(N^2) approach.
    1229             :          */
    1230         130 :         pendingNotifies->uniqueChannelNames = NIL;
    1231         130 :         if (pendingNotifies->uniqueChannelHash != NULL)
    1232             :         {
    1233             :             HASH_SEQ_STATUS status;
    1234             :             ChannelName *channelEntry;
    1235             : 
    1236           4 :             hash_seq_init(&status, pendingNotifies->uniqueChannelHash);
    1237           8 :             while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
    1238           4 :                 pendingNotifies->uniqueChannelNames =
    1239           4 :                     lappend(pendingNotifies->uniqueChannelNames,
    1240           4 :                             channelEntry->channel);
    1241             :         }
    1242             :         else
    1243             :         {
    1244             :             /* O(N^2) approach is better for small number of notifications */
    1245         438 :             foreach_ptr(Notification, n, pendingNotifies->events)
    1246             :             {
    1247         186 :                 char       *channel = n->data;
    1248         186 :                 bool        found = false;
    1249             : 
    1250             :                 /* Name present in list? */
    1251         380 :                 foreach_ptr(char, oldchan, pendingNotifies->uniqueChannelNames)
    1252             :                 {
    1253          62 :                     if (strcmp(oldchan, channel) == 0)
    1254             :                     {
    1255          54 :                         found = true;
    1256          54 :                         break;
    1257             :                     }
    1258             :                 }
    1259             :                 /* Add if not already in list */
    1260         186 :                 if (!found)
    1261         132 :                     pendingNotifies->uniqueChannelNames =
    1262         132 :                         lappend(pendingNotifies->uniqueChannelNames,
    1263             :                                 channel);
    1264             :             }
    1265             :         }
    1266             : 
    1267             :         /* Preallocate workspace that will be needed by SignalBackends() */
    1268         130 :         if (signalPids == NULL)
    1269          44 :             signalPids = MemoryContextAlloc(TopMemoryContext,
    1270             :                                             MaxBackends * sizeof(int32));
    1271             : 
    1272         130 :         if (signalProcnos == NULL)
    1273          44 :             signalProcnos = MemoryContextAlloc(TopMemoryContext,
    1274             :                                                MaxBackends * sizeof(ProcNumber));
    1275             : 
    1276             :         /*
    1277             :          * Make sure that we have an XID assigned to the current transaction.
    1278             :          * GetCurrentTransactionId is cheap if we already have an XID, but not
    1279             :          * so cheap if we don't, and we'd prefer not to do that work while
    1280             :          * holding NotifyQueueLock.
    1281             :          */
    1282         130 :         (void) GetCurrentTransactionId();
    1283             : 
    1284             :         /*
    1285             :          * Serialize writers by acquiring a special lock that we hold till
    1286             :          * after commit.  This ensures that queue entries appear in commit
    1287             :          * order, and in particular that there are never uncommitted queue
    1288             :          * entries ahead of committed ones, so an uncommitted transaction
    1289             :          * can't block delivery of deliverable notifications.
    1290             :          *
    1291             :          * We use a heavyweight lock so that it'll automatically be released
    1292             :          * after either commit or abort.  This also allows deadlocks to be
    1293             :          * detected, though really a deadlock shouldn't be possible here.
    1294             :          *
    1295             :          * The lock is on "database 0", which is pretty ugly but it doesn't
    1296             :          * seem worth inventing a special locktag category just for this.
    1297             :          * (Historical note: before PG 9.0, a similar lock on "database 0" was
    1298             :          * used by the flatfiles mechanism.)
    1299             :          */
    1300         130 :         LockSharedObject(DatabaseRelationId, InvalidOid, 0,
    1301             :                          AccessExclusiveLock);
    1302             : 
    1303             :         /*
    1304             :          * For the direct advancement optimization in SignalBackends(), we
    1305             :          * need to ensure that no other backend can insert queue entries
    1306             :          * between queueHeadBeforeWrite and queueHeadAfterWrite.  The
    1307             :          * heavyweight lock above provides this guarantee, since it serializes
    1308             :          * all writers.
    1309             :          *
    1310             :          * Note: if the heavyweight lock were ever removed for scalability
    1311             :          * reasons, we could achieve the same guarantee by holding
    1312             :          * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
    1313             :          * than releasing and reacquiring it for each page as we do below.
    1314             :          */
    1315             : 
    1316             :         /* Initialize values to a safe default in case list is empty */
    1317         130 :         SET_QUEUE_POS(queueHeadBeforeWrite, 0, 0);
    1318         130 :         SET_QUEUE_POS(queueHeadAfterWrite, 0, 0);
    1319             : 
    1320             :         /* Now push the notifications into the queue */
    1321         130 :         nextNotify = list_head(pendingNotifies->events);
    1322         330 :         while (nextNotify != NULL)
    1323             :         {
    1324             :             /*
    1325             :              * Add the pending notifications to the queue.  We acquire and
    1326             :              * release NotifyQueueLock once per page, which might be overkill
    1327             :              * but it does allow readers to get in while we're doing this.
    1328             :              *
    1329             :              * A full queue is very uncommon and should really not happen,
    1330             :              * given that we have so much space available in the SLRU pages.
    1331             :              * Nevertheless we need to deal with this possibility. Note that
    1332             :              * when we get here we are in the process of committing our
    1333             :              * transaction, but we have not yet committed to clog, so at this
    1334             :              * point in time we can still roll the transaction back.
    1335             :              */
    1336         200 :             LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1337         200 :             if (firstIteration)
    1338             :             {
    1339         130 :                 queueHeadBeforeWrite = QUEUE_HEAD;
    1340         130 :                 firstIteration = false;
    1341             :             }
    1342         200 :             asyncQueueFillWarning();
    1343         200 :             if (asyncQueueIsFull())
    1344           0 :                 ereport(ERROR,
    1345             :                         (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    1346             :                          errmsg("too many notifications in the NOTIFY queue")));
    1347         200 :             nextNotify = asyncQueueAddEntries(nextNotify);
    1348         200 :             queueHeadAfterWrite = QUEUE_HEAD;
    1349         200 :             LWLockRelease(NotifyQueueLock);
    1350             :         }
    1351             : 
    1352             :         /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
    1353             :     }
    1354             : }
    1355             : 
    1356             : /*
    1357             :  * AtCommit_Notify
    1358             :  *
    1359             :  *      This is called at transaction commit, after committing to clog.
    1360             :  *
    1361             :  *      Apply pending listen/unlisten changes and clear transaction-local state.
    1362             :  *
    1363             :  *      If we issued any notifications in the transaction, send signals to
    1364             :  *      listening backends (possibly including ourselves) to process them.
    1365             :  *      Also, if we filled enough queue pages with new notifies, try to
    1366             :  *      advance the queue tail pointer.
    1367             :  */
    1368             : void
    1369      900810 : AtCommit_Notify(void)
    1370             : {
    1371             :     /*
    1372             :      * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
    1373             :      * return as soon as possible
    1374             :      */
    1375      900810 :     if (!pendingActions && !pendingNotifies)
    1376      900498 :         return;
    1377             : 
    1378         312 :     if (Trace_notify)
    1379           0 :         elog(DEBUG1, "AtCommit_Notify");
    1380             : 
    1381             :     /* Apply staged listen/unlisten changes */
    1382         312 :     ApplyPendingListenActions(true);
    1383             : 
    1384             :     /* If no longer listening to anything, get out of listener array */
    1385         312 :     if (amRegisteredListener && LocalChannelTableIsEmpty())
    1386          46 :         asyncQueueUnregister();
    1387             : 
    1388             :     /*
    1389             :      * Send signals to listening backends.  We need do this only if there are
    1390             :      * pending notifies, which were previously added to the shared queue by
    1391             :      * PreCommit_Notify().
    1392             :      */
    1393         312 :     if (pendingNotifies != NULL)
    1394         130 :         SignalBackends();
    1395             : 
    1396             :     /*
    1397             :      * If it's time to try to advance the global tail pointer, do that.
    1398             :      *
    1399             :      * (It might seem odd to do this in the sender, when more than likely the
    1400             :      * listeners won't yet have read the messages we just sent.  However,
    1401             :      * there's less contention if only the sender does it, and there is little
    1402             :      * need for urgency in advancing the global tail.  So this typically will
    1403             :      * be clearing out messages that were sent some time ago.)
    1404             :      */
    1405         312 :     if (tryAdvanceTail)
    1406             :     {
    1407          16 :         tryAdvanceTail = false;
    1408          16 :         asyncQueueAdvanceTail();
    1409             :     }
    1410             : 
    1411             :     /* And clean up */
    1412         312 :     ClearPendingActionsAndNotifies();
    1413             : }
    1414             : 
    1415             : /*
    1416             :  * BecomeRegisteredListener --- subroutine for PreCommit_Notify
    1417             :  *
    1418             :  * This function must make sure we are ready to catch any incoming messages.
    1419             :  */
    1420             : static void
    1421         112 : BecomeRegisteredListener(void)
    1422             : {
    1423             :     QueuePosition head;
    1424             :     QueuePosition max;
    1425             :     ProcNumber  prevListener;
    1426             : 
    1427             :     /*
    1428             :      * Nothing to do if we are already listening to something, nor if we
    1429             :      * already ran this routine in this transaction.
    1430             :      */
    1431         112 :     if (amRegisteredListener)
    1432          54 :         return;
    1433             : 
    1434          58 :     if (Trace_notify)
    1435           0 :         elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
    1436             : 
    1437             :     /*
    1438             :      * Before registering, make sure we will unlisten before dying. (Note:
    1439             :      * this action does not get undone if we abort later.)
    1440             :      */
    1441          58 :     if (!unlistenExitRegistered)
    1442             :     {
    1443          34 :         before_shmem_exit(Async_UnlistenOnExit, 0);
    1444          34 :         unlistenExitRegistered = true;
    1445             :     }
    1446             : 
    1447             :     /*
    1448             :      * This is our first LISTEN, so establish our pointer.
    1449             :      *
    1450             :      * We set our pointer to the global tail pointer and then move it forward
    1451             :      * over already-committed notifications.  This ensures we cannot miss any
    1452             :      * not-yet-committed notifications.  We might get a few more but that
    1453             :      * doesn't hurt.
    1454             :      *
    1455             :      * In some scenarios there might be a lot of committed notifications that
    1456             :      * have not yet been pruned away (because some backend is being lazy about
    1457             :      * reading them).  To reduce our startup time, we can look at other
    1458             :      * backends and adopt the maximum "pos" pointer of any backend that's in
    1459             :      * our database; any notifications it's already advanced over are surely
    1460             :      * committed and need not be re-examined by us.  (We must consider only
    1461             :      * backends connected to our DB, because others will not have bothered to
    1462             :      * check committed-ness of notifications in our DB.)
    1463             :      *
    1464             :      * We need exclusive lock here so we can look at other backends' entries
    1465             :      * and manipulate the list links.
    1466             :      */
    1467          58 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1468          58 :     head = QUEUE_HEAD;
    1469          58 :     max = QUEUE_TAIL;
    1470          58 :     prevListener = INVALID_PROC_NUMBER;
    1471          88 :     for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    1472             :     {
    1473          30 :         if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
    1474          30 :             max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
    1475             :         /* Also find last listening backend before this one */
    1476          30 :         if (i < MyProcNumber)
    1477          26 :             prevListener = i;
    1478             :     }
    1479          58 :     QUEUE_BACKEND_POS(MyProcNumber) = max;
    1480          58 :     QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
    1481          58 :     QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
    1482          58 :     QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
    1483          58 :     QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
    1484             :     /* Insert backend into list of listeners at correct position */
    1485          58 :     if (prevListener != INVALID_PROC_NUMBER)
    1486             :     {
    1487          14 :         QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
    1488          14 :         QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
    1489             :     }
    1490             :     else
    1491             :     {
    1492          44 :         QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
    1493          44 :         QUEUE_FIRST_LISTENER = MyProcNumber;
    1494             :     }
    1495          58 :     LWLockRelease(NotifyQueueLock);
    1496             : 
    1497             :     /* Now we are listed in the global array, so remember we're listening */
    1498          58 :     amRegisteredListener = true;
    1499             : 
    1500             :     /*
    1501             :      * Try to move our pointer forward as far as possible.  This will skip
    1502             :      * over already-committed notifications, which we want to do because they
    1503             :      * might be quite stale.  Note that we are not yet listening on anything,
    1504             :      * so we won't deliver such notifications to our frontend.  Also, although
    1505             :      * our transaction might have executed NOTIFY, those message(s) aren't
    1506             :      * queued yet so we won't skip them here.
    1507             :      */
    1508          58 :     if (!QUEUE_POS_EQUAL(max, head))
    1509          30 :         asyncQueueReadAllNotifications();
    1510             : }
    1511             : 
    1512             : /*
    1513             :  * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
    1514             :  *
    1515             :  * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
    1516             :  * an entry in localChannelTable, and pre-allocating an entry in the shared
    1517             :  * globalChannelTable with listening=false.  The listening flag will be set
    1518             :  * to true in AtCommit_Notify.  If we abort later, unwanted table entries
    1519             :  * will be removed.
    1520             :  */
    1521             : static void
    1522         112 : PrepareTableEntriesForListen(const char *channel)
    1523             : {
    1524             :     GlobalChannelKey key;
    1525             :     GlobalChannelEntry *entry;
    1526             :     bool        found;
    1527             :     ListenerEntry *listeners;
    1528             :     PendingListenEntry *pending;
    1529             : 
    1530             :     /*
    1531             :      * Record in local pending hash that we want to LISTEN, overwriting any
    1532             :      * earlier attempt to UNLISTEN.
    1533             :      */
    1534             :     pending = (PendingListenEntry *)
    1535         112 :         hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
    1536         112 :     pending->action = PENDING_LISTEN;
    1537             : 
    1538             :     /*
    1539             :      * Ensure that there is an entry for the channel in localChannelTable.
    1540             :      * (Should this fail, we can just roll back.)  If the transaction fails
    1541             :      * after this point, we will remove the entry if appropriate during
    1542             :      * ApplyPendingListenActions.  Note that this entry allows IsListeningOn()
    1543             :      * to return TRUE; we assume nothing is going to consult that before
    1544             :      * AtCommit_Notify/AtAbort_Notify.  However, if later actions attempt to
    1545             :      * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
    1546             :      * present to ensure they do the right things; see
    1547             :      * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
    1548             :      */
    1549         112 :     (void) hash_search(localChannelTable, channel, HASH_ENTER, NULL);
    1550             : 
    1551             :     /* Pre-allocate entry in shared globalChannelTable with listening=false */
    1552         112 :     GlobalChannelKeyInit(&key, MyDatabaseId, channel);
    1553         112 :     entry = dshash_find_or_insert(globalChannelTable, &key, &found);
    1554             : 
    1555         112 :     if (!found)
    1556             :     {
    1557             :         /* New channel entry, so initialize it to a safe state */
    1558          68 :         entry->listenersArray = InvalidDsaPointer;
    1559          68 :         entry->numListeners = 0;
    1560          68 :         entry->allocatedListeners = 0;
    1561             :     }
    1562             : 
    1563             :     /*
    1564             :      * Create listenersArray if entry doesn't have one.  It's tempting to fold
    1565             :      * this into the !found case, but this coding allows us to cope in case
    1566             :      * dsa_allocate() failed in an earlier attempt.
    1567             :      */
    1568         112 :     if (!DsaPointerIsValid(entry->listenersArray))
    1569             :     {
    1570          68 :         entry->listenersArray = dsa_allocate(globalChannelDSA,
    1571             :                                              sizeof(ListenerEntry) * INITIAL_LISTENERS_ARRAY_SIZE);
    1572          68 :         entry->allocatedListeners = INITIAL_LISTENERS_ARRAY_SIZE;
    1573             :     }
    1574             : 
    1575             :     listeners = (ListenerEntry *)
    1576         112 :         dsa_get_address(globalChannelDSA, entry->listenersArray);
    1577             : 
    1578             :     /*
    1579             :      * Check if we already have a ListenerEntry (possibly from earlier in this
    1580             :      * transaction)
    1581             :      */
    1582         150 :     for (int i = 0; i < entry->numListeners; i++)
    1583             :     {
    1584          60 :         if (listeners[i].procNo == MyProcNumber)
    1585             :         {
    1586             :             /* Already have an entry; listening flag stays as-is until commit */
    1587          22 :             dshash_release_lock(globalChannelTable, entry);
    1588          22 :             return;
    1589             :         }
    1590             :     }
    1591             : 
    1592             :     /* Need to add a new entry; grow array if necessary */
    1593          90 :     if (entry->numListeners >= entry->allocatedListeners)
    1594             :     {
    1595           2 :         int         new_size = entry->allocatedListeners * 2;
    1596           2 :         dsa_pointer old_array = entry->listenersArray;
    1597           2 :         dsa_pointer new_array = dsa_allocate(globalChannelDSA,
    1598             :                                              sizeof(ListenerEntry) * new_size);
    1599           2 :         ListenerEntry *new_listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA, new_array);
    1600             : 
    1601           2 :         memcpy(new_listeners, listeners, sizeof(ListenerEntry) * entry->numListeners);
    1602           2 :         entry->listenersArray = new_array;
    1603           2 :         entry->allocatedListeners = new_size;
    1604           2 :         dsa_free(globalChannelDSA, old_array);
    1605           2 :         listeners = new_listeners;
    1606             :     }
    1607             : 
    1608          90 :     listeners[entry->numListeners].procNo = MyProcNumber;
    1609          90 :     listeners[entry->numListeners].listening = false;    /* staged, not yet
    1610             :                                                          * committed */
    1611          90 :     entry->numListeners++;
    1612             : 
    1613          90 :     dshash_release_lock(globalChannelTable, entry);
    1614             : }
    1615             : 
    1616             : /*
    1617             :  * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
    1618             :  *
    1619             :  * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
    1620             :  * we're currently listening (committed or staged).  We don't touch
    1621             :  * globalChannelTable yet - the listener keeps receiving signals until
    1622             :  * commit, when the entry is removed.
    1623             :  */
    1624             : static void
    1625           6 : PrepareTableEntriesForUnlisten(const char *channel)
    1626             : {
    1627             :     PendingListenEntry *pending;
    1628             : 
    1629             :     /*
    1630             :      * If the channel name is not in localChannelTable, then we are neither
    1631             :      * listening on it nor preparing to listen on it, so we don't need to
    1632             :      * record an UNLISTEN action.
    1633             :      */
    1634             :     Assert(localChannelTable != NULL);
    1635           6 :     if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
    1636           0 :         return;
    1637             : 
    1638             :     /*
    1639             :      * Record in local pending hash that we want to UNLISTEN, overwriting any
    1640             :      * earlier attempt to LISTEN.  Don't touch localChannelTable or
    1641             :      * globalChannelTable yet - we keep receiving signals until commit.
    1642             :      */
    1643             :     pending = (PendingListenEntry *)
    1644           6 :         hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
    1645           6 :     pending->action = PENDING_UNLISTEN;
    1646             : }
    1647             : 
    1648             : /*
    1649             :  * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
    1650             :  *
    1651             :  * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
    1652             :  * about-to-be-listened channels in pendingListenActions.
    1653             :  */
    1654             : static void
    1655          96 : PrepareTableEntriesForUnlistenAll(void)
    1656             : {
    1657             :     HASH_SEQ_STATUS seq;
    1658             :     ChannelName *channelEntry;
    1659             :     PendingListenEntry *pending;
    1660             : 
    1661             :     /*
    1662             :      * Scan localChannelTable, which will have the names of all channels that
    1663             :      * we are listening on or have prepared to listen on.  Record an UNLISTEN
    1664             :      * action for each one, overwriting any earlier attempt to LISTEN.
    1665             :      */
    1666          96 :     hash_seq_init(&seq, localChannelTable);
    1667         164 :     while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
    1668             :     {
    1669             :         pending = (PendingListenEntry *)
    1670          68 :             hash_search(pendingListenActions, channelEntry->channel, HASH_ENTER, NULL);
    1671          68 :         pending->action = PENDING_UNLISTEN;
    1672             :     }
    1673          96 : }
    1674             : 
    1675             : /*
    1676             :  * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
    1677             :  *
    1678             :  * Decrements numListeners, compacts the array, and frees the entry if empty.
    1679             :  * Sets *entry_ptr to NULL if the entry was deleted.
    1680             :  *
    1681             :  * We could get the listeners pointer from the entry, but all callers
    1682             :  * already have it at hand.
    1683             :  */
    1684             : static void
    1685          74 : RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
    1686             :                           ListenerEntry *listeners,
    1687             :                           int idx)
    1688             : {
    1689          74 :     GlobalChannelEntry *entry = *entry_ptr;
    1690             : 
    1691          74 :     entry->numListeners--;
    1692          74 :     if (idx < entry->numListeners)
    1693          16 :         memmove(&listeners[idx], &listeners[idx + 1],
    1694          16 :                 sizeof(ListenerEntry) * (entry->numListeners - idx));
    1695             : 
    1696          74 :     if (entry->numListeners == 0)
    1697             :     {
    1698          52 :         dsa_free(globalChannelDSA, entry->listenersArray);
    1699          52 :         dshash_delete_entry(globalChannelTable, entry);
    1700             :         /* tells caller not to release the entry's lock: */
    1701          52 :         *entry_ptr = NULL;
    1702             :     }
    1703          74 : }
    1704             : 
    1705             : /*
    1706             :  * ApplyPendingListenActions
    1707             :  *
    1708             :  * Apply, or revert, staged listen/unlisten changes to the local and global
    1709             :  * hash tables.
    1710             :  */
    1711             : static void
    1712       52498 : ApplyPendingListenActions(bool isCommit)
    1713             : {
    1714             :     HASH_SEQ_STATUS seq;
    1715             :     PendingListenEntry *pending;
    1716             : 
    1717             :     /* Quick exit if nothing to do */
    1718       52498 :     if (pendingListenActions == NULL)
    1719       52316 :         return;
    1720             : 
    1721             :     /* We made a globalChannelTable before building pendingListenActions */
    1722         182 :     if (globalChannelTable == NULL)
    1723           0 :         elog(PANIC, "global channel table missing post-commit/abort");
    1724             : 
    1725             :     /* For each staged action ... */
    1726         182 :     hash_seq_init(&seq, pendingListenActions);
    1727         366 :     while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
    1728             :     {
    1729             :         GlobalChannelKey key;
    1730             :         GlobalChannelEntry *entry;
    1731         184 :         bool        removeLocal = true;
    1732         184 :         bool        foundListener = false;
    1733             : 
    1734             :         /*
    1735             :          * Find the global entry for this channel.  If isCommit, it had better
    1736             :          * exist (it was created in PreCommit).  In an abort, it might not
    1737             :          * exist, in which case we are not listening and should discard any
    1738             :          * local entry that PreCommit may have managed to create.
    1739             :          */
    1740         184 :         GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
    1741         184 :         entry = dshash_find(globalChannelTable, &key, true);
    1742         184 :         if (entry != NULL)
    1743             :         {
    1744             :             /* Scan entry to find the ListenerEntry for this backend */
    1745             :             ListenerEntry *listeners;
    1746             : 
    1747             :             listeners = (ListenerEntry *)
    1748         184 :                 dsa_get_address(globalChannelDSA, entry->listenersArray);
    1749             : 
    1750         232 :             for (int i = 0; i < entry->numListeners; i++)
    1751             :             {
    1752         232 :                 if (listeners[i].procNo != MyProcNumber)
    1753          48 :                     continue;
    1754         184 :                 foundListener = true;
    1755         184 :                 if (isCommit)
    1756             :                 {
    1757         184 :                     if (pending->action == PENDING_LISTEN)
    1758             :                     {
    1759             :                         /*
    1760             :                          * LISTEN being committed: set listening=true.
    1761             :                          * localChannelTable entry was created during
    1762             :                          * PreCommit and should be kept.
    1763             :                          */
    1764         110 :                         listeners[i].listening = true;
    1765         110 :                         removeLocal = false;
    1766             :                     }
    1767             :                     else
    1768             :                     {
    1769             :                         /*
    1770             :                          * UNLISTEN being committed: remove pre-allocated
    1771             :                          * entries from both tables.
    1772             :                          */
    1773          74 :                         RemoveListenerFromChannel(&entry, listeners, i);
    1774             :                     }
    1775             :                 }
    1776             :                 else
    1777             :                 {
    1778             :                     /*
    1779             :                      * Note: this part is reachable only if the transaction
    1780             :                      * aborts after PreCommit_Notify() has made some
    1781             :                      * pendingListenActions entries, so it's pretty hard to
    1782             :                      * test.
    1783             :                      */
    1784           0 :                     if (!listeners[i].listening)
    1785             :                     {
    1786             :                         /*
    1787             :                          * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
    1788             :                          * and we weren't listening before, so remove
    1789             :                          * pre-allocated entries from both tables.
    1790             :                          */
    1791           0 :                         RemoveListenerFromChannel(&entry, listeners, i);
    1792             :                     }
    1793             :                     else
    1794             :                     {
    1795             :                         /*
    1796             :                          * We're aborting, but the previous state was that
    1797             :                          * we're listening, so keep localChannelTable entry.
    1798             :                          */
    1799           0 :                         removeLocal = false;
    1800             :                     }
    1801             :                 }
    1802         184 :                 break;          /* there shouldn't be another match */
    1803             :             }
    1804             : 
    1805             :             /* We might have already released the entry by removing it */
    1806         184 :             if (entry != NULL)
    1807         132 :                 dshash_release_lock(globalChannelTable, entry);
    1808             :         }
    1809             : 
    1810             :         /*
    1811             :          * If we're committing a LISTEN action, we should have found a
    1812             :          * matching ListenerEntry, but otherwise it's okay if we didn't.
    1813             :          */
    1814         184 :         if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
    1815           0 :             elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
    1816             :                  pending->channel, MyProcNumber);
    1817             : 
    1818             :         /*
    1819             :          * If we did not find a globalChannelTable entry for our backend, or
    1820             :          * if we are unlistening, remove any localChannelTable entry that may
    1821             :          * exist.  (Note in particular that this cleans up if we created a
    1822             :          * localChannelTable entry and then failed while trying to create a
    1823             :          * globalChannelTable entry.)
    1824             :          */
    1825         184 :         if (removeLocal && localChannelTable != NULL)
    1826          74 :             (void) hash_search(localChannelTable, pending->channel,
    1827             :                                HASH_REMOVE, NULL);
    1828             :     }
    1829             : }
    1830             : 
    1831             : /*
    1832             :  * CleanupListenersOnExit --- called from Async_UnlistenOnExit
    1833             :  *
    1834             :  *      Remove this backend from all channels in the shared global table.
    1835             :  */
    1836             : static void
    1837          34 : CleanupListenersOnExit(void)
    1838             : {
    1839             :     dshash_seq_status status;
    1840             :     GlobalChannelEntry *entry;
    1841             : 
    1842          34 :     if (Trace_notify)
    1843           0 :         elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
    1844             : 
    1845             :     /* Clear our local cache (not really necessary, but be consistent) */
    1846          34 :     if (localChannelTable != NULL)
    1847             :     {
    1848          34 :         hash_destroy(localChannelTable);
    1849          34 :         localChannelTable = NULL;
    1850             :     }
    1851             : 
    1852             :     /* Now remove our entries from the shared globalChannelTable */
    1853          34 :     if (globalChannelTable == NULL)
    1854           0 :         return;
    1855             : 
    1856          34 :     dshash_seq_init(&status, globalChannelTable, true);
    1857          66 :     while ((entry = dshash_seq_next(&status)) != NULL)
    1858             :     {
    1859             :         ListenerEntry *listeners;
    1860             : 
    1861          32 :         if (entry->key.dboid != MyDatabaseId)
    1862           0 :             continue;           /* not relevant */
    1863             : 
    1864             :         listeners = (ListenerEntry *)
    1865          32 :             dsa_get_address(globalChannelDSA, entry->listenersArray);
    1866             : 
    1867          48 :         for (int i = 0; i < entry->numListeners; i++)
    1868             :         {
    1869          32 :             if (listeners[i].procNo == MyProcNumber)
    1870             :             {
    1871          16 :                 entry->numListeners--;
    1872          16 :                 if (i < entry->numListeners)
    1873           0 :                     memmove(&listeners[i], &listeners[i + 1],
    1874           0 :                             sizeof(ListenerEntry) * (entry->numListeners - i));
    1875             : 
    1876          16 :                 if (entry->numListeners == 0)
    1877             :                 {
    1878          16 :                     dsa_free(globalChannelDSA, entry->listenersArray);
    1879          16 :                     dshash_delete_current(&status);
    1880             :                 }
    1881          16 :                 break;
    1882             :             }
    1883             :         }
    1884             :     }
    1885          34 :     dshash_seq_term(&status);
    1886             : }
    1887             : 
    1888             : /*
    1889             :  * Test whether we are actively listening on the given channel name.
    1890             :  *
    1891             :  * Note: this function is executed for every notification found in the queue.
    1892             :  */
    1893             : static bool
    1894         110 : IsListeningOn(const char *channel)
    1895             : {
    1896         110 :     if (localChannelTable == NULL)
    1897           0 :         return false;
    1898             : 
    1899         110 :     return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
    1900             : }
    1901             : 
    1902             : /*
    1903             :  * Remove our entry from the listeners array when we are no longer listening
    1904             :  * on any channel.  NB: must not fail if we're already not listening.
    1905             :  */
    1906             : static void
    1907          80 : asyncQueueUnregister(void)
    1908             : {
    1909             :     Assert(LocalChannelTableIsEmpty()); /* else caller error */
    1910             : 
    1911          80 :     if (!amRegisteredListener)  /* nothing to do */
    1912          22 :         return;
    1913             : 
    1914             :     /*
    1915             :      * Need exclusive lock here to manipulate list links.
    1916             :      */
    1917          58 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1918             :     /* Mark our entry as invalid */
    1919          58 :     QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
    1920          58 :     QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
    1921          58 :     QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
    1922          58 :     QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
    1923             :     /* and remove it from the list */
    1924          58 :     if (QUEUE_FIRST_LISTENER == MyProcNumber)
    1925          44 :         QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
    1926             :     else
    1927             :     {
    1928          14 :         for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    1929             :         {
    1930          14 :             if (QUEUE_NEXT_LISTENER(i) == MyProcNumber)
    1931             :             {
    1932          14 :                 QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyProcNumber);
    1933          14 :                 break;
    1934             :             }
    1935             :         }
    1936             :     }
    1937          58 :     QUEUE_NEXT_LISTENER(MyProcNumber) = INVALID_PROC_NUMBER;
    1938          58 :     LWLockRelease(NotifyQueueLock);
    1939             : 
    1940             :     /* mark ourselves as no longer listed in the global array */
    1941          58 :     amRegisteredListener = false;
    1942             : }
    1943             : 
    1944             : /*
    1945             :  * Test whether there is room to insert more notification messages.
    1946             :  *
    1947             :  * Caller must hold at least shared NotifyQueueLock.
    1948             :  */
    1949             : static bool
    1950         200 : asyncQueueIsFull(void)
    1951             : {
    1952         200 :     int64       headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
    1953         200 :     int64       tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
    1954         200 :     int64       occupied = headPage - tailPage;
    1955             : 
    1956         200 :     return occupied >= max_notify_queue_pages;
    1957             : }
    1958             : 
    1959             : /*
    1960             :  * Advance the QueuePosition to the next entry, assuming that the current
    1961             :  * entry is of length entryLength.  If we jump to a new page the function
    1962             :  * returns true, else false.
    1963             :  */
    1964             : static bool
    1965        4906 : asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
    1966             : {
    1967        4906 :     int64       pageno = QUEUE_POS_PAGE(*position);
    1968        4906 :     int         offset = QUEUE_POS_OFFSET(*position);
    1969        4906 :     bool        pageJump = false;
    1970             : 
    1971             :     /*
    1972             :      * Move to the next writing position: First jump over what we have just
    1973             :      * written or read.
    1974             :      */
    1975        4906 :     offset += entryLength;
    1976             :     Assert(offset <= QUEUE_PAGESIZE);
    1977             : 
    1978             :     /*
    1979             :      * In a second step check if another entry can possibly be written to the
    1980             :      * page. If so, stay here, we have reached the next position. If not, then
    1981             :      * we need to move on to the next page.
    1982             :      */
    1983        4906 :     if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE)
    1984             :     {
    1985          76 :         pageno++;
    1986          76 :         offset = 0;
    1987          76 :         pageJump = true;
    1988             :     }
    1989             : 
    1990        4906 :     SET_QUEUE_POS(*position, pageno, offset);
    1991        4906 :     return pageJump;
    1992             : }
    1993             : 
    1994             : /*
    1995             :  * Fill the AsyncQueueEntry at *qe with an outbound notification message.
    1996             :  */
    1997             : static void
    1998        2284 : asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
    1999             : {
    2000        2284 :     size_t      channellen = n->channel_len;
    2001        2284 :     size_t      payloadlen = n->payload_len;
    2002             :     int         entryLength;
    2003             : 
    2004             :     Assert(channellen < NAMEDATALEN);
    2005             :     Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
    2006             : 
    2007             :     /* The terminators are already included in AsyncQueueEntryEmptySize */
    2008        2284 :     entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
    2009        2284 :     entryLength = QUEUEALIGN(entryLength);
    2010        2284 :     qe->length = entryLength;
    2011        2284 :     qe->dboid = MyDatabaseId;
    2012        2284 :     qe->xid = GetCurrentTransactionId();
    2013        2284 :     qe->srcPid = MyProcPid;
    2014        2284 :     memcpy(qe->data, n->data, channellen + payloadlen + 2);
    2015        2284 : }
    2016             : 
    2017             : /*
    2018             :  * Add pending notifications to the queue.
    2019             :  *
    2020             :  * We go page by page here, i.e. we stop once we have to go to a new page but
    2021             :  * we will be called again and then fill that next page. If an entry does not
    2022             :  * fit into the current page, we write a dummy entry with an InvalidOid as the
    2023             :  * database OID in order to fill the page. So every page is always used up to
    2024             :  * the last byte which simplifies reading the page later.
    2025             :  *
    2026             :  * We are passed the list cell (in pendingNotifies->events) containing the next
    2027             :  * notification to write and return the first still-unwritten cell back.
    2028             :  * Eventually we will return NULL indicating all is done.
    2029             :  *
    2030             :  * We are holding NotifyQueueLock already from the caller and grab
    2031             :  * page specific SLRU bank lock locally in this function.
    2032             :  */
    2033             : static ListCell *
    2034         200 : asyncQueueAddEntries(ListCell *nextNotify)
    2035             : {
    2036             :     AsyncQueueEntry qe;
    2037             :     QueuePosition queue_head;
    2038             :     int64       pageno;
    2039             :     int         offset;
    2040             :     int         slotno;
    2041             :     LWLock     *prevlock;
    2042             : 
    2043             :     /*
    2044             :      * We work with a local copy of QUEUE_HEAD, which we write back to shared
    2045             :      * memory upon exiting.  The reason for this is that if we have to advance
    2046             :      * to a new page, SimpleLruZeroPage might fail (out of disk space, for
    2047             :      * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
    2048             :      * subsequent insertions would try to put entries into a page that slru.c
    2049             :      * thinks doesn't exist yet.)  So, use a local position variable.  Note
    2050             :      * that if we do fail, any already-inserted queue entries are forgotten;
    2051             :      * this is okay, since they'd be useless anyway after our transaction
    2052             :      * rolls back.
    2053             :      */
    2054         200 :     queue_head = QUEUE_HEAD;
    2055             : 
    2056             :     /*
    2057             :      * If this is the first write since the postmaster started, we need to
    2058             :      * initialize the first page of the async SLRU.  Otherwise, the current
    2059             :      * page should be initialized already, so just fetch it.
    2060             :      */
    2061         200 :     pageno = QUEUE_POS_PAGE(queue_head);
    2062         200 :     prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
    2063             : 
    2064             :     /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
    2065         200 :     LWLockAcquire(prevlock, LW_EXCLUSIVE);
    2066             : 
    2067         200 :     if (QUEUE_POS_IS_ZERO(queue_head))
    2068          16 :         slotno = SimpleLruZeroPage(NotifyCtl, pageno);
    2069             :     else
    2070         184 :         slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
    2071             :                                    InvalidTransactionId);
    2072             : 
    2073             :     /* Note we mark the page dirty before writing in it */
    2074         200 :     NotifyCtl->shared->page_dirty[slotno] = true;
    2075             : 
    2076        2414 :     while (nextNotify != NULL)
    2077             :     {
    2078        2284 :         Notification *n = (Notification *) lfirst(nextNotify);
    2079             : 
    2080             :         /* Construct a valid queue entry in local variable qe */
    2081        2284 :         asyncQueueNotificationToEntry(n, &qe);
    2082             : 
    2083        2284 :         offset = QUEUE_POS_OFFSET(queue_head);
    2084             : 
    2085             :         /* Check whether the entry really fits on the current page */
    2086        2284 :         if (offset + qe.length <= QUEUE_PAGESIZE)
    2087             :         {
    2088             :             /* OK, so advance nextNotify past this item */
    2089        2220 :             nextNotify = lnext(pendingNotifies->events, nextNotify);
    2090             :         }
    2091             :         else
    2092             :         {
    2093             :             /*
    2094             :              * Write a dummy entry to fill up the page. Actually readers will
    2095             :              * only check dboid and since it won't match any reader's database
    2096             :              * OID, they will ignore this entry and move on.
    2097             :              */
    2098          64 :             qe.length = QUEUE_PAGESIZE - offset;
    2099          64 :             qe.dboid = InvalidOid;
    2100          64 :             qe.xid = InvalidTransactionId;
    2101          64 :             qe.data[0] = '\0';  /* empty channel */
    2102          64 :             qe.data[1] = '\0';  /* empty payload */
    2103             :         }
    2104             : 
    2105             :         /* Now copy qe into the shared buffer page */
    2106        2284 :         memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
    2107             :                &qe,
    2108        2284 :                qe.length);
    2109             : 
    2110             :         /* Advance queue_head appropriately, and detect if page is full */
    2111        2284 :         if (asyncQueueAdvance(&(queue_head), qe.length))
    2112             :         {
    2113             :             LWLock     *lock;
    2114             : 
    2115          70 :             pageno = QUEUE_POS_PAGE(queue_head);
    2116          70 :             lock = SimpleLruGetBankLock(NotifyCtl, pageno);
    2117          70 :             if (lock != prevlock)
    2118             :             {
    2119           0 :                 LWLockRelease(prevlock);
    2120           0 :                 LWLockAcquire(lock, LW_EXCLUSIVE);
    2121           0 :                 prevlock = lock;
    2122             :             }
    2123             : 
    2124             :             /*
    2125             :              * Page is full, so we're done here, but first fill the next page
    2126             :              * with zeroes.  The reason to do this is to ensure that slru.c's
    2127             :              * idea of the head page is always the same as ours, which avoids
    2128             :              * boundary problems in SimpleLruTruncate.  The test in
    2129             :              * asyncQueueIsFull() ensured that there is room to create this
    2130             :              * page without overrunning the queue.
    2131             :              */
    2132          70 :             slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
    2133             : 
    2134             :             /*
    2135             :              * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
    2136             :              * set flag to remember that we should try to advance the tail
    2137             :              * pointer (we don't want to actually do that right here).
    2138             :              */
    2139          70 :             if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
    2140          16 :                 tryAdvanceTail = true;
    2141             : 
    2142             :             /* And exit the loop */
    2143          70 :             break;
    2144             :         }
    2145             :     }
    2146             : 
    2147             :     /* Success, so update the global QUEUE_HEAD */
    2148         200 :     QUEUE_HEAD = queue_head;
    2149             : 
    2150         200 :     LWLockRelease(prevlock);
    2151             : 
    2152         200 :     return nextNotify;
    2153             : }
    2154             : 
    2155             : /*
    2156             :  * SQL function to return the fraction of the notification queue currently
    2157             :  * occupied.
    2158             :  */
    2159             : Datum
    2160          10 : pg_notification_queue_usage(PG_FUNCTION_ARGS)
    2161             : {
    2162             :     double      usage;
    2163             : 
    2164             :     /* Advance the queue tail so we don't report a too-large result */
    2165          10 :     asyncQueueAdvanceTail();
    2166             : 
    2167          10 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
    2168          10 :     usage = asyncQueueUsage();
    2169          10 :     LWLockRelease(NotifyQueueLock);
    2170             : 
    2171          10 :     PG_RETURN_FLOAT8(usage);
    2172             : }
    2173             : 
    2174             : /*
    2175             :  * Return the fraction of the queue that is currently occupied.
    2176             :  *
    2177             :  * The caller must hold NotifyQueueLock in (at least) shared mode.
    2178             :  *
    2179             :  * Note: we measure the distance to the logical tail page, not the physical
    2180             :  * tail page.  In some sense that's wrong, but the relative position of the
    2181             :  * physical tail is affected by details such as SLRU segment boundaries,
    2182             :  * so that a result based on that is unpleasantly unstable.
    2183             :  */
    2184             : static double
    2185         210 : asyncQueueUsage(void)
    2186             : {
    2187         210 :     int64       headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
    2188         210 :     int64       tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
    2189         210 :     int64       occupied = headPage - tailPage;
    2190             : 
    2191         210 :     if (occupied == 0)
    2192         132 :         return (double) 0;      /* fast exit for common case */
    2193             : 
    2194          78 :     return (double) occupied / (double) max_notify_queue_pages;
    2195             : }
    2196             : 
    2197             : /*
    2198             :  * Check whether the queue is at least half full, and emit a warning if so.
    2199             :  *
    2200             :  * This is unlikely given the size of the queue, but possible.
    2201             :  * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
    2202             :  *
    2203             :  * Caller must hold exclusive NotifyQueueLock.
    2204             :  */
    2205             : static void
    2206         200 : asyncQueueFillWarning(void)
    2207             : {
    2208             :     double      fillDegree;
    2209             :     TimestampTz t;
    2210             : 
    2211         200 :     fillDegree = asyncQueueUsage();
    2212         200 :     if (fillDegree < 0.5)
    2213         200 :         return;
    2214             : 
    2215           0 :     t = GetCurrentTimestamp();
    2216             : 
    2217           0 :     if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
    2218             :                                    t, QUEUE_FULL_WARN_INTERVAL))
    2219             :     {
    2220           0 :         QueuePosition min = QUEUE_HEAD;
    2221           0 :         int32       minPid = InvalidPid;
    2222             : 
    2223           0 :         for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    2224             :         {
    2225             :             Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
    2226           0 :             min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
    2227           0 :             if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
    2228           0 :                 minPid = QUEUE_BACKEND_PID(i);
    2229             :         }
    2230             : 
    2231           0 :         ereport(WARNING,
    2232             :                 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
    2233             :                  (minPid != InvalidPid ?
    2234             :                   errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
    2235             :                   : 0),
    2236             :                  (minPid != InvalidPid ?
    2237             :                   errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
    2238             :                   : 0)));
    2239             : 
    2240           0 :         asyncQueueControl->lastQueueFillWarn = t;
    2241             :     }
    2242             : }
    2243             : 
    2244             : /*
    2245             :  * Send signals to listening backends.
    2246             :  *
    2247             :  * Normally we signal only backends that are interested in the notifies that
    2248             :  * we just sent.  However, that will leave idle listeners falling further and
    2249             :  * further behind.  Waken them anyway if they're far enough behind, so they'll
    2250             :  * advance their queue position pointers, allowing the global tail to advance.
    2251             :  *
    2252             :  * Since we know the ProcNumber and the Pid the signaling is quite cheap.
    2253             :  *
    2254             :  * This is called during CommitTransaction(), so it's important for it
    2255             :  * to have very low probability of failure.
    2256             :  */
    2257             : static void
    2258         130 : SignalBackends(void)
    2259             : {
    2260             :     int         count;
    2261             : 
    2262             :     /* Can't get here without PreCommit_Notify having made the global table */
    2263             :     Assert(globalChannelTable != NULL);
    2264             : 
    2265             :     /* It should have set up these arrays, too */
    2266             :     Assert(signalPids != NULL && signalProcnos != NULL);
    2267             : 
    2268             :     /*
    2269             :      * Identify backends that we need to signal.  We don't want to send
    2270             :      * signals while holding the NotifyQueueLock, so this part just builds a
    2271             :      * list of target PIDs in signalPids[] and signalProcnos[].
    2272             :      */
    2273         130 :     count = 0;
    2274             : 
    2275         130 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    2276             : 
    2277             :     /* Scan each channel name that we notified in this transaction */
    2278         396 :     foreach_ptr(char, channel, pendingNotifies->uniqueChannelNames)
    2279             :     {
    2280             :         GlobalChannelKey key;
    2281             :         GlobalChannelEntry *entry;
    2282             :         ListenerEntry *listeners;
    2283             : 
    2284         136 :         GlobalChannelKeyInit(&key, MyDatabaseId, channel);
    2285         136 :         entry = dshash_find(globalChannelTable, &key, false);
    2286         136 :         if (entry == NULL)
    2287          56 :             continue;           /* nobody is listening */
    2288             : 
    2289          80 :         listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA,
    2290             :                                                       entry->listenersArray);
    2291             : 
    2292             :         /* Identify listeners that now need waking, add them to arrays */
    2293         170 :         for (int j = 0; j < entry->numListeners; j++)
    2294             :         {
    2295             :             ProcNumber  i;
    2296             :             int32       pid;
    2297             :             QueuePosition pos;
    2298             : 
    2299          90 :             if (!listeners[j].listening)
    2300          24 :                 continue;       /* ignore not-yet-committed listeners */
    2301             : 
    2302          90 :             i = listeners[j].procNo;
    2303             : 
    2304          90 :             if (QUEUE_BACKEND_WAKEUP_PENDING(i))
    2305          24 :                 continue;       /* already signaled, no need to repeat */
    2306             : 
    2307          66 :             pid = QUEUE_BACKEND_PID(i);
    2308          66 :             pos = QUEUE_BACKEND_POS(i);
    2309             : 
    2310          66 :             if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
    2311           0 :                 continue;       /* it's fully caught up already */
    2312             : 
    2313             :             Assert(pid != InvalidPid);
    2314             : 
    2315          66 :             QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
    2316          66 :             signalPids[count] = pid;
    2317          66 :             signalProcnos[count] = i;
    2318          66 :             count++;
    2319             :         }
    2320             : 
    2321          80 :         dshash_release_lock(globalChannelTable, entry);
    2322             :     }
    2323             : 
    2324             :     /*
    2325             :      * Scan all listeners.  Any that are not already pending wakeup must not
    2326             :      * be interested in our notifications (else we'd have set their wakeup
    2327             :      * flags above).  Check to see if we can directly advance their queue
    2328             :      * pointers to save a wakeup.  Otherwise, if they are far behind, wake
    2329             :      * them anyway so they will catch up.
    2330             :      */
    2331         256 :     for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    2332             :     {
    2333             :         int32       pid;
    2334             :         QueuePosition pos;
    2335             : 
    2336         126 :         if (QUEUE_BACKEND_WAKEUP_PENDING(i))
    2337          84 :             continue;
    2338             : 
    2339             :         /* If it's currently advancing, we should not touch it */
    2340          42 :         if (QUEUE_BACKEND_IS_ADVANCING(i))
    2341           0 :             continue;
    2342             : 
    2343          42 :         pid = QUEUE_BACKEND_PID(i);
    2344          42 :         pos = QUEUE_BACKEND_POS(i);
    2345             : 
    2346             :         /*
    2347             :          * We can directly advance the other backend's queue pointer if it's
    2348             :          * not currently advancing (else there are race conditions), and its
    2349             :          * current pointer is not behind queueHeadBeforeWrite (else we'd make
    2350             :          * it miss some older messages), and we'd not be moving the pointer
    2351             :          * backward.
    2352             :          */
    2353          84 :         if (!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite) &&
    2354          62 :             QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))
    2355             :         {
    2356             :             /* We can directly advance its pointer past what we wrote */
    2357          42 :             QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
    2358             :         }
    2359           0 :         else if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
    2360             :                                     QUEUE_POS_PAGE(pos)) >= QUEUE_CLEANUP_DELAY)
    2361             :         {
    2362             :             /* It's idle and far behind, so wake it up */
    2363             :             Assert(pid != InvalidPid);
    2364             : 
    2365           0 :             QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
    2366           0 :             signalPids[count] = pid;
    2367           0 :             signalProcnos[count] = i;
    2368           0 :             count++;
    2369             :         }
    2370             :     }
    2371             : 
    2372         130 :     LWLockRelease(NotifyQueueLock);
    2373             : 
    2374             :     /* Now send signals */
    2375         196 :     for (int i = 0; i < count; i++)
    2376             :     {
    2377          66 :         int32       pid = signalPids[i];
    2378             : 
    2379             :         /*
    2380             :          * If we are signaling our own process, no need to involve the kernel;
    2381             :          * just set the flag directly.
    2382             :          */
    2383          66 :         if (pid == MyProcPid)
    2384             :         {
    2385          42 :             notifyInterruptPending = true;
    2386          42 :             continue;
    2387             :         }
    2388             : 
    2389             :         /*
    2390             :          * Note: assuming things aren't broken, a signal failure here could
    2391             :          * only occur if the target backend exited since we released
    2392             :          * NotifyQueueLock; which is unlikely but certainly possible. So we
    2393             :          * just log a low-level debug message if it happens.
    2394             :          */
    2395          24 :         if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, signalProcnos[i]) < 0)
    2396           0 :             elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
    2397             :     }
    2398         130 : }
    2399             : 
    2400             : /*
    2401             :  * AtAbort_Notify
    2402             :  *
    2403             :  *  This is called at transaction abort.
    2404             :  *
    2405             :  *  Revert any staged listen/unlisten changes and clean up transaction state.
    2406             :  *  This only does anything if we abort after PreCommit_Notify has staged
    2407             :  *  some entries.
    2408             :  */
    2409             : void
    2410       52186 : AtAbort_Notify(void)
    2411             : {
    2412             :     /* Revert staged listen/unlisten changes */
    2413       52186 :     ApplyPendingListenActions(false);
    2414             : 
    2415             :     /* If we're no longer listening on anything, unregister */
    2416       52186 :     if (amRegisteredListener && LocalChannelTableIsEmpty())
    2417           0 :         asyncQueueUnregister();
    2418             : 
    2419             :     /* And clean up */
    2420       52186 :     ClearPendingActionsAndNotifies();
    2421       52186 : }
    2422             : 
    2423             : /*
    2424             :  * AtSubCommit_Notify() --- Take care of subtransaction commit.
    2425             :  *
    2426             :  * Reassign all items in the pending lists to the parent transaction.
    2427             :  */
    2428             : void
    2429       10760 : AtSubCommit_Notify(void)
    2430             : {
    2431       10760 :     int         my_level = GetCurrentTransactionNestLevel();
    2432             : 
    2433             :     /* If there are actions at our nesting level, we must reparent them. */
    2434       10760 :     if (pendingActions != NULL &&
    2435           4 :         pendingActions->nestingLevel >= my_level)
    2436             :     {
    2437           4 :         if (pendingActions->upper == NULL ||
    2438           2 :             pendingActions->upper->nestingLevel < my_level - 1)
    2439             :         {
    2440             :             /* nothing to merge; give the whole thing to the parent */
    2441           2 :             --pendingActions->nestingLevel;
    2442             :         }
    2443             :         else
    2444             :         {
    2445           2 :             ActionList *childPendingActions = pendingActions;
    2446             : 
    2447           2 :             pendingActions = pendingActions->upper;
    2448             : 
    2449             :             /*
    2450             :              * Mustn't try to eliminate duplicates here --- see queue_listen()
    2451             :              */
    2452           4 :             pendingActions->actions =
    2453           2 :                 list_concat(pendingActions->actions,
    2454           2 :                             childPendingActions->actions);
    2455           2 :             pfree(childPendingActions);
    2456             :         }
    2457             :     }
    2458             : 
    2459             :     /* If there are notifies at our nesting level, we must reparent them. */
    2460       10760 :     if (pendingNotifies != NULL &&
    2461           6 :         pendingNotifies->nestingLevel >= my_level)
    2462             :     {
    2463             :         Assert(pendingNotifies->nestingLevel == my_level);
    2464             : 
    2465           4 :         if (pendingNotifies->upper == NULL ||
    2466           2 :             pendingNotifies->upper->nestingLevel < my_level - 1)
    2467             :         {
    2468             :             /* nothing to merge; give the whole thing to the parent */
    2469           2 :             --pendingNotifies->nestingLevel;
    2470             :         }
    2471             :         else
    2472             :         {
    2473             :             /*
    2474             :              * Formerly, we didn't bother to eliminate duplicates here, but
    2475             :              * now we must, else we fall foul of "Assert(!found)", either here
    2476             :              * or during a later attempt to build the parent-level hashtable.
    2477             :              */
    2478           2 :             NotificationList *childPendingNotifies = pendingNotifies;
    2479             :             ListCell   *l;
    2480             : 
    2481           2 :             pendingNotifies = pendingNotifies->upper;
    2482             :             /* Insert all the subxact's events into parent, except for dups */
    2483          10 :             foreach(l, childPendingNotifies->events)
    2484             :             {
    2485           8 :                 Notification *childn = (Notification *) lfirst(l);
    2486             : 
    2487           8 :                 if (!AsyncExistsPendingNotify(childn))
    2488           4 :                     AddEventToPendingNotifies(childn);
    2489             :             }
    2490           2 :             pfree(childPendingNotifies);
    2491             :         }
    2492             :     }
    2493       10760 : }
    2494             : 
    2495             : /*
    2496             :  * AtSubAbort_Notify() --- Take care of subtransaction abort.
    2497             :  */
    2498             : void
    2499        9406 : AtSubAbort_Notify(void)
    2500             : {
    2501        9406 :     int         my_level = GetCurrentTransactionNestLevel();
    2502             : 
    2503             :     /*
    2504             :      * All we have to do is pop the stack --- the actions/notifies made in
    2505             :      * this subxact are no longer interesting, and the space will be freed
    2506             :      * when CurTransactionContext is recycled. We still have to free the
    2507             :      * ActionList and NotificationList objects themselves, though, because
    2508             :      * those are allocated in TopTransactionContext.
    2509             :      *
    2510             :      * Note that there might be no entries at all, or no entries for the
    2511             :      * current subtransaction level, either because none were ever created, or
    2512             :      * because we reentered this routine due to trouble during subxact abort.
    2513             :      */
    2514        9408 :     while (pendingActions != NULL &&
    2515           2 :            pendingActions->nestingLevel >= my_level)
    2516             :     {
    2517           2 :         ActionList *childPendingActions = pendingActions;
    2518             : 
    2519           2 :         pendingActions = pendingActions->upper;
    2520           2 :         pfree(childPendingActions);
    2521             :     }
    2522             : 
    2523        9408 :     while (pendingNotifies != NULL &&
    2524           4 :            pendingNotifies->nestingLevel >= my_level)
    2525             :     {
    2526           2 :         NotificationList *childPendingNotifies = pendingNotifies;
    2527             : 
    2528           2 :         pendingNotifies = pendingNotifies->upper;
    2529           2 :         pfree(childPendingNotifies);
    2530             :     }
    2531        9406 : }
    2532             : 
    2533             : /*
    2534             :  * HandleNotifyInterrupt
    2535             :  *
    2536             :  *      Signal handler portion of interrupt handling. Let the backend know
    2537             :  *      that there's a pending notify interrupt. If we're currently reading
    2538             :  *      from the client, this will interrupt the read and
    2539             :  *      ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
    2540             :  */
    2541             : void
    2542          22 : HandleNotifyInterrupt(void)
    2543             : {
    2544             :     /*
    2545             :      * Note: this is called by a SIGNAL HANDLER. You must be very wary what
    2546             :      * you do here.
    2547             :      */
    2548             : 
    2549             :     /* signal that work needs to be done */
    2550          22 :     notifyInterruptPending = true;
    2551             : 
    2552             :     /* make sure the event is processed in due course */
    2553          22 :     SetLatch(MyLatch);
    2554          22 : }
    2555             : 
    2556             : /*
    2557             :  * ProcessNotifyInterrupt
    2558             :  *
    2559             :  *      This is called if we see notifyInterruptPending set, just before
    2560             :  *      transmitting ReadyForQuery at the end of a frontend command, and
    2561             :  *      also if a notify signal occurs while reading from the frontend.
    2562             :  *      HandleNotifyInterrupt() will cause the read to be interrupted
    2563             :  *      via the process's latch, and this routine will get called.
    2564             :  *      If we are truly idle (ie, *not* inside a transaction block),
    2565             :  *      process the incoming notifies.
    2566             :  *
    2567             :  *      If "flush" is true, force any frontend messages out immediately.
    2568             :  *      This can be false when being called at the end of a frontend command,
    2569             :  *      since we'll flush after sending ReadyForQuery.
    2570             :  */
    2571             : void
    2572          74 : ProcessNotifyInterrupt(bool flush)
    2573             : {
    2574          74 :     if (IsTransactionOrTransactionBlock())
    2575          12 :         return;                 /* not really idle */
    2576             : 
    2577             :     /* Loop in case another signal arrives while sending messages */
    2578         124 :     while (notifyInterruptPending)
    2579          62 :         ProcessIncomingNotify(flush);
    2580             : }
    2581             : 
    2582             : 
    2583             : /*
    2584             :  * Read all pending notifications from the queue, and deliver appropriate
    2585             :  * ones to my frontend.  Stop when we reach queue head or an uncommitted
    2586             :  * notification.
    2587             :  */
    2588             : static void
    2589          92 : asyncQueueReadAllNotifications(void)
    2590             : {
    2591             :     QueuePosition pos;
    2592             :     QueuePosition head;
    2593             :     Snapshot    snapshot;
    2594             : 
    2595             :     /*
    2596             :      * Fetch current state, indicate to others that we have woken up, and that
    2597             :      * we are in process of advancing our position.
    2598             :      */
    2599          92 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
    2600             :     /* Assert checks that we have a valid state entry */
    2601             :     Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
    2602          92 :     QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
    2603          92 :     pos = QUEUE_BACKEND_POS(MyProcNumber);
    2604          92 :     head = QUEUE_HEAD;
    2605             : 
    2606          92 :     if (QUEUE_POS_EQUAL(pos, head))
    2607             :     {
    2608             :         /* Nothing to do, we have read all notifications already. */
    2609           0 :         LWLockRelease(NotifyQueueLock);
    2610           0 :         return;
    2611             :     }
    2612             : 
    2613          92 :     QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = true;
    2614          92 :     LWLockRelease(NotifyQueueLock);
    2615             : 
    2616             :     /*----------
    2617             :      * Get snapshot we'll use to decide which xacts are still in progress.
    2618             :      * This is trickier than it might seem, because of race conditions.
    2619             :      * Consider the following example:
    2620             :      *
    2621             :      * Backend 1:                    Backend 2:
    2622             :      *
    2623             :      * transaction starts
    2624             :      * UPDATE foo SET ...;
    2625             :      * NOTIFY foo;
    2626             :      * commit starts
    2627             :      * queue the notify message
    2628             :      *                               transaction starts
    2629             :      *                               LISTEN foo;  -- first LISTEN in session
    2630             :      *                               SELECT * FROM foo WHERE ...;
    2631             :      * commit to clog
    2632             :      *                               commit starts
    2633             :      *                               add backend 2 to array of listeners
    2634             :      *                               advance to queue head (this code)
    2635             :      *                               commit to clog
    2636             :      *
    2637             :      * Transaction 2's SELECT has not seen the UPDATE's effects, since that
    2638             :      * wasn't committed yet.  Ideally we'd ensure that client 2 would
    2639             :      * eventually get transaction 1's notify message, but there's no way
    2640             :      * to do that; until we're in the listener array, there's no guarantee
    2641             :      * that the notify message doesn't get removed from the queue.
    2642             :      *
    2643             :      * Therefore the coding technique transaction 2 is using is unsafe:
    2644             :      * applications must commit a LISTEN before inspecting database state,
    2645             :      * if they want to ensure they will see notifications about subsequent
    2646             :      * changes to that state.
    2647             :      *
    2648             :      * What we do guarantee is that we'll see all notifications from
    2649             :      * transactions committing after the snapshot we take here.
    2650             :      * BecomeRegisteredListener has already added us to the listener array,
    2651             :      * so no not-yet-committed messages can be removed from the queue
    2652             :      * before we see them.
    2653             :      *----------
    2654             :      */
    2655          92 :     snapshot = RegisterSnapshot(GetLatestSnapshot());
    2656             : 
    2657             :     /*
    2658             :      * It is possible that we fail while trying to send a message to our
    2659             :      * frontend (for example, because of encoding conversion failure).  If
    2660             :      * that happens it is critical that we not try to send the same message
    2661             :      * over and over again.  Therefore, we set ExitOnAnyError to upgrade any
    2662             :      * ERRORs to FATAL, causing the client connection to be closed on error.
    2663             :      *
    2664             :      * We used to only skip over the offending message and try to soldier on,
    2665             :      * but it was somewhat questionable to lose a notification and give the
    2666             :      * client an ERROR instead.  A client application is not be prepared for
    2667             :      * that and can't tell that a notification was missed.  It was also not
    2668             :      * very useful in practice because notifications are often processed while
    2669             :      * a connection is idle and reading a message from the client, and in that
    2670             :      * state, any error is upgraded to FATAL anyway.  Closing the connection
    2671             :      * is a clear signal to the application that it might have missed
    2672             :      * notifications.
    2673             :      */
    2674             :     {
    2675          92 :         bool        save_ExitOnAnyError = ExitOnAnyError;
    2676             :         bool        reachedStop;
    2677             : 
    2678          92 :         ExitOnAnyError = true;
    2679             : 
    2680             :         do
    2681             :         {
    2682             :             /*
    2683             :              * Process messages up to the stop position, end of page, or an
    2684             :              * uncommitted message.
    2685             :              *
    2686             :              * Our stop position is what we found to be the head's position
    2687             :              * when we entered this function. It might have changed already.
    2688             :              * But if it has, we will receive (or have already received and
    2689             :              * queued) another signal and come here again.
    2690             :              *
    2691             :              * We are not holding NotifyQueueLock here! The queue can only
    2692             :              * extend beyond the head pointer (see above) and we leave our
    2693             :              * backend's pointer where it is so nobody will truncate or
    2694             :              * rewrite pages under us. Especially we don't want to hold a lock
    2695             :              * while sending the notifications to the frontend.
    2696             :              */
    2697          98 :             reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
    2698          98 :         } while (!reachedStop);
    2699             : 
    2700             :         /* Update shared state */
    2701          92 :         LWLockAcquire(NotifyQueueLock, LW_SHARED);
    2702          92 :         QUEUE_BACKEND_POS(MyProcNumber) = pos;
    2703          92 :         QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
    2704          92 :         LWLockRelease(NotifyQueueLock);
    2705             : 
    2706          92 :         ExitOnAnyError = save_ExitOnAnyError;
    2707             :     }
    2708             : 
    2709             :     /* Done with snapshot */
    2710          92 :     UnregisterSnapshot(snapshot);
    2711             : }
    2712             : 
    2713             : /*
    2714             :  * Fetch notifications from the shared queue, beginning at position current,
    2715             :  * and deliver relevant ones to my frontend.
    2716             :  *
    2717             :  * The function returns true once we have reached the stop position or an
    2718             :  * uncommitted notification, and false if we have finished with the page.
    2719             :  * In other words: once it returns true there is no need to look further.
    2720             :  * The QueuePosition *current is advanced past all processed messages.
    2721             :  */
    2722             : static bool
    2723          98 : asyncQueueProcessPageEntries(QueuePosition *current,
    2724             :                              QueuePosition stop,
    2725             :                              Snapshot snapshot)
    2726             : {
    2727          98 :     int64       curpage = QUEUE_POS_PAGE(*current);
    2728             :     int         slotno;
    2729             :     char       *page_buffer;
    2730          98 :     bool        reachedStop = false;
    2731             :     bool        reachedEndOfPage;
    2732             : 
    2733             :     /*
    2734             :      * We copy the entries into a local buffer to avoid holding the SLRU lock
    2735             :      * while we transmit them to our frontend.  The local buffer must be
    2736             :      * adequately aligned.
    2737             :      */
    2738             :     alignas(AsyncQueueEntry) char local_buf[QUEUE_PAGESIZE];
    2739          98 :     char       *local_buf_end = local_buf;
    2740             : 
    2741          98 :     slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
    2742             :                                         InvalidTransactionId);
    2743          98 :     page_buffer = NotifyCtl->shared->page_buffer[slotno];
    2744             : 
    2745             :     do
    2746             :     {
    2747        2654 :         QueuePosition thisentry = *current;
    2748             :         AsyncQueueEntry *qe;
    2749             : 
    2750        2654 :         if (QUEUE_POS_EQUAL(thisentry, stop))
    2751          92 :             break;
    2752             : 
    2753        2562 :         qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
    2754             : 
    2755             :         /*
    2756             :          * Advance *current over this message, possibly to the next page. As
    2757             :          * noted in the comments for asyncQueueReadAllNotifications, we must
    2758             :          * do this before possibly failing while processing the message.
    2759             :          */
    2760        2562 :         reachedEndOfPage = asyncQueueAdvance(current, qe->length);
    2761             : 
    2762             :         /* Ignore messages destined for other databases */
    2763        2562 :         if (qe->dboid == MyDatabaseId)
    2764             :         {
    2765        2562 :             if (XidInMVCCSnapshot(qe->xid, snapshot))
    2766             :             {
    2767             :                 /*
    2768             :                  * The source transaction is still in progress, so we can't
    2769             :                  * process this message yet.  Break out of the loop, but first
    2770             :                  * back up *current so we will reprocess the message next
    2771             :                  * time.  (Note: it is unlikely but not impossible for
    2772             :                  * TransactionIdDidCommit to fail, so we can't really avoid
    2773             :                  * this advance-then-back-up behavior when dealing with an
    2774             :                  * uncommitted message.)
    2775             :                  *
    2776             :                  * Note that we must test XidInMVCCSnapshot before we test
    2777             :                  * TransactionIdDidCommit, else we might return a message from
    2778             :                  * a transaction that is not yet visible to snapshots; compare
    2779             :                  * the comments at the head of heapam_visibility.c.
    2780             :                  *
    2781             :                  * Also, while our own xact won't be listed in the snapshot,
    2782             :                  * we need not check for TransactionIdIsCurrentTransactionId
    2783             :                  * because our transaction cannot (yet) have queued any
    2784             :                  * messages.
    2785             :                  */
    2786           0 :                 *current = thisentry;
    2787           0 :                 reachedStop = true;
    2788           0 :                 break;
    2789             :             }
    2790             : 
    2791             :             /*
    2792             :              * Quick check for the case that we're not listening on any
    2793             :              * channels, before calling TransactionIdDidCommit().  This makes
    2794             :              * that case a little faster, but more importantly, it ensures
    2795             :              * that if there's a bad entry in the queue for which
    2796             :              * TransactionIdDidCommit() fails for some reason, we can skip
    2797             :              * over it on the first LISTEN in a session, and not get stuck on
    2798             :              * it indefinitely.  (This is a little trickier than it looks: it
    2799             :              * works because BecomeRegisteredListener runs this code before we
    2800             :              * have made the first entry in localChannelTable.)
    2801             :              */
    2802        2562 :             if (LocalChannelTableIsEmpty())
    2803        2452 :                 continue;
    2804             : 
    2805         110 :             if (TransactionIdDidCommit(qe->xid))
    2806             :             {
    2807         110 :                 memcpy(local_buf_end, qe, qe->length);
    2808         110 :                 local_buf_end += qe->length;
    2809             :             }
    2810             :             else
    2811             :             {
    2812             :                 /*
    2813             :                  * The source transaction aborted or crashed, so we just
    2814             :                  * ignore its notifications.
    2815             :                  */
    2816             :             }
    2817             :         }
    2818             : 
    2819             :         /* Loop back if we're not at end of page */
    2820        2562 :     } while (!reachedEndOfPage);
    2821             : 
    2822             :     /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
    2823          98 :     LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
    2824             : 
    2825             :     /*
    2826             :      * Now that we have let go of the SLRU bank lock, send the notifications
    2827             :      * to our backend
    2828             :      */
    2829             :     Assert(local_buf_end - local_buf <= BLCKSZ);
    2830         208 :     for (char *p = local_buf; p < local_buf_end;)
    2831             :     {
    2832         110 :         AsyncQueueEntry *qe = (AsyncQueueEntry *) p;
    2833             : 
    2834             :         /* qe->data is the null-terminated channel name */
    2835         110 :         char       *channel = qe->data;
    2836             : 
    2837         110 :         if (IsListeningOn(channel))
    2838             :         {
    2839             :             /* payload follows channel name */
    2840         110 :             char       *payload = qe->data + strlen(channel) + 1;
    2841             : 
    2842         110 :             NotifyMyFrontEnd(channel, payload, qe->srcPid);
    2843             :         }
    2844             : 
    2845         110 :         p += qe->length;
    2846             :     }
    2847             : 
    2848          98 :     if (QUEUE_POS_EQUAL(*current, stop))
    2849          92 :         reachedStop = true;
    2850             : 
    2851          98 :     return reachedStop;
    2852             : }
    2853             : 
    2854             : /*
    2855             :  * Advance the shared queue tail variable to the minimum of all the
    2856             :  * per-backend tail pointers.  Truncate pg_notify space if possible.
    2857             :  *
    2858             :  * This is (usually) called during CommitTransaction(), so it's important for
    2859             :  * it to have very low probability of failure.
    2860             :  */
    2861             : static void
    2862          26 : asyncQueueAdvanceTail(void)
    2863             : {
    2864             :     QueuePosition min;
    2865             :     int64       oldtailpage;
    2866             :     int64       newtailpage;
    2867             :     int64       boundary;
    2868             : 
    2869             :     /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
    2870          26 :     LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
    2871             : 
    2872             :     /*
    2873             :      * Compute the new tail.  Pre-v13, it's essential that QUEUE_TAIL be exact
    2874             :      * (ie, exactly match at least one backend's queue position), so it must
    2875             :      * be updated atomically with the actual computation.  Since v13, we could
    2876             :      * get away with not doing it like that, but it seems prudent to keep it
    2877             :      * so.
    2878             :      *
    2879             :      * Also, because incoming backends will scan forward from QUEUE_TAIL, that
    2880             :      * must be advanced before we can truncate any data.  Thus, QUEUE_TAIL is
    2881             :      * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
    2882             :      * un-truncated page.  When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
    2883             :      * there are pages we can truncate but haven't yet finished doing so.
    2884             :      *
    2885             :      * For concurrency's sake, we don't want to hold NotifyQueueLock while
    2886             :      * performing SimpleLruTruncate.  This is OK because no backend will try
    2887             :      * to access the pages we are in the midst of truncating.
    2888             :      */
    2889          26 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    2890          26 :     min = QUEUE_HEAD;
    2891          46 :     for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
    2892             :     {
    2893             :         Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
    2894          20 :         min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
    2895             :     }
    2896          26 :     QUEUE_TAIL = min;
    2897          26 :     oldtailpage = QUEUE_STOP_PAGE;
    2898          26 :     LWLockRelease(NotifyQueueLock);
    2899             : 
    2900             :     /*
    2901             :      * We can truncate something if the global tail advanced across an SLRU
    2902             :      * segment boundary.
    2903             :      *
    2904             :      * XXX it might be better to truncate only once every several segments, to
    2905             :      * reduce the number of directory scans.
    2906             :      */
    2907          26 :     newtailpage = QUEUE_POS_PAGE(min);
    2908          26 :     boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
    2909          26 :     if (asyncQueuePagePrecedes(oldtailpage, boundary))
    2910             :     {
    2911             :         /*
    2912             :          * SimpleLruTruncate() will ask for SLRU bank locks but will also
    2913             :          * release the lock again.
    2914             :          */
    2915           2 :         SimpleLruTruncate(NotifyCtl, newtailpage);
    2916             : 
    2917           2 :         LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    2918           2 :         QUEUE_STOP_PAGE = newtailpage;
    2919           2 :         LWLockRelease(NotifyQueueLock);
    2920             :     }
    2921             : 
    2922          26 :     LWLockRelease(NotifyQueueTailLock);
    2923          26 : }
    2924             : 
    2925             : /*
    2926             :  * AsyncNotifyFreezeXids
    2927             :  *
    2928             :  * Prepare the async notification queue for CLOG truncation by freezing
    2929             :  * transaction IDs that are about to become inaccessible.
    2930             :  *
    2931             :  * This function is called by VACUUM before advancing datfrozenxid. It scans
    2932             :  * the notification queue and replaces XIDs that would become inaccessible
    2933             :  * after CLOG truncation with special markers:
    2934             :  * - Committed transactions are set to FrozenTransactionId
    2935             :  * - Aborted/crashed transactions are set to InvalidTransactionId
    2936             :  *
    2937             :  * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
    2938             :  * pages will be truncated. If XID < newFrozenXid, it cannot still be running
    2939             :  * (or it would have held back newFrozenXid through ProcArray).
    2940             :  * Therefore, if TransactionIdDidCommit returns false, we know the transaction
    2941             :  * either aborted explicitly or crashed, and we can safely mark it invalid.
    2942             :  */
    2943             : void
    2944        1610 : AsyncNotifyFreezeXids(TransactionId newFrozenXid)
    2945             : {
    2946             :     QueuePosition pos;
    2947             :     QueuePosition head;
    2948        1610 :     int64       curpage = -1;
    2949        1610 :     int         slotno = -1;
    2950        1610 :     char       *page_buffer = NULL;
    2951        1610 :     bool        page_dirty = false;
    2952             : 
    2953             :     /*
    2954             :      * Acquire locks in the correct order to avoid deadlocks. As per the
    2955             :      * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
    2956             :      * bank locks.
    2957             :      *
    2958             :      * We only need SHARED mode since we're just reading the head/tail
    2959             :      * positions, not modifying them.
    2960             :      */
    2961        1610 :     LWLockAcquire(NotifyQueueTailLock, LW_SHARED);
    2962        1610 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
    2963             : 
    2964        1610 :     pos = QUEUE_TAIL;
    2965        1610 :     head = QUEUE_HEAD;
    2966             : 
    2967             :     /* Release NotifyQueueLock early, we only needed to read the positions */
    2968        1610 :     LWLockRelease(NotifyQueueLock);
    2969             : 
    2970             :     /*
    2971             :      * Scan the queue from tail to head, freezing XIDs as needed. We hold
    2972             :      * NotifyQueueTailLock throughout to ensure the tail doesn't move while
    2973             :      * we're working.
    2974             :      */
    2975        1670 :     while (!QUEUE_POS_EQUAL(pos, head))
    2976             :     {
    2977             :         AsyncQueueEntry *qe;
    2978             :         TransactionId xid;
    2979          60 :         int64       pageno = QUEUE_POS_PAGE(pos);
    2980          60 :         int         offset = QUEUE_POS_OFFSET(pos);
    2981             : 
    2982             :         /* If we need a different page, release old lock and get new one */
    2983          60 :         if (pageno != curpage)
    2984             :         {
    2985             :             LWLock     *lock;
    2986             : 
    2987             :             /* Release previous page if any */
    2988           6 :             if (slotno >= 0)
    2989             :             {
    2990           0 :                 if (page_dirty)
    2991             :                 {
    2992           0 :                     NotifyCtl->shared->page_dirty[slotno] = true;
    2993           0 :                     page_dirty = false;
    2994             :                 }
    2995           0 :                 LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
    2996             :             }
    2997             : 
    2998           6 :             lock = SimpleLruGetBankLock(NotifyCtl, pageno);
    2999           6 :             LWLockAcquire(lock, LW_EXCLUSIVE);
    3000           6 :             slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
    3001             :                                        InvalidTransactionId);
    3002           6 :             page_buffer = NotifyCtl->shared->page_buffer[slotno];
    3003           6 :             curpage = pageno;
    3004             :         }
    3005             : 
    3006          60 :         qe = (AsyncQueueEntry *) (page_buffer + offset);
    3007          60 :         xid = qe->xid;
    3008             : 
    3009         120 :         if (TransactionIdIsNormal(xid) &&
    3010          60 :             TransactionIdPrecedes(xid, newFrozenXid))
    3011             :         {
    3012          20 :             if (TransactionIdDidCommit(xid))
    3013             :             {
    3014          20 :                 qe->xid = FrozenTransactionId;
    3015          20 :                 page_dirty = true;
    3016             :             }
    3017             :             else
    3018             :             {
    3019           0 :                 qe->xid = InvalidTransactionId;
    3020           0 :                 page_dirty = true;
    3021             :             }
    3022             :         }
    3023             : 
    3024             :         /* Advance to next entry */
    3025          60 :         asyncQueueAdvance(&pos, qe->length);
    3026             :     }
    3027             : 
    3028             :     /* Release final page lock if we acquired one */
    3029        1610 :     if (slotno >= 0)
    3030             :     {
    3031           6 :         if (page_dirty)
    3032           2 :             NotifyCtl->shared->page_dirty[slotno] = true;
    3033           6 :         LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
    3034             :     }
    3035             : 
    3036        1610 :     LWLockRelease(NotifyQueueTailLock);
    3037        1610 : }
    3038             : 
    3039             : /*
    3040             :  * ProcessIncomingNotify
    3041             :  *
    3042             :  *      Scan the queue for arriving notifications and report them to the front
    3043             :  *      end.  The notifications might be from other sessions, or our own;
    3044             :  *      there's no need to distinguish here.
    3045             :  *
    3046             :  *      If "flush" is true, force any frontend messages out immediately.
    3047             :  *
    3048             :  *      NOTE: since we are outside any transaction, we must create our own.
    3049             :  */
    3050             : static void
    3051          62 : ProcessIncomingNotify(bool flush)
    3052             : {
    3053             :     /* We *must* reset the flag */
    3054          62 :     notifyInterruptPending = false;
    3055             : 
    3056             :     /* Do nothing else if we aren't actively listening */
    3057          62 :     if (LocalChannelTableIsEmpty())
    3058           0 :         return;
    3059             : 
    3060          62 :     if (Trace_notify)
    3061           0 :         elog(DEBUG1, "ProcessIncomingNotify");
    3062             : 
    3063          62 :     set_ps_display("notify interrupt");
    3064             : 
    3065             :     /*
    3066             :      * We must run asyncQueueReadAllNotifications inside a transaction, else
    3067             :      * bad things happen if it gets an error.
    3068             :      */
    3069          62 :     StartTransactionCommand();
    3070             : 
    3071          62 :     asyncQueueReadAllNotifications();
    3072             : 
    3073          62 :     CommitTransactionCommand();
    3074             : 
    3075             :     /*
    3076             :      * If this isn't an end-of-command case, we must flush the notify messages
    3077             :      * to ensure frontend gets them promptly.
    3078             :      */
    3079          62 :     if (flush)
    3080          18 :         pq_flush();
    3081             : 
    3082          62 :     set_ps_display("idle");
    3083             : 
    3084          62 :     if (Trace_notify)
    3085           0 :         elog(DEBUG1, "ProcessIncomingNotify: done");
    3086             : }
    3087             : 
    3088             : /*
    3089             :  * Send NOTIFY message to my front end.
    3090             :  */
    3091             : void
    3092         110 : NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
    3093             : {
    3094         110 :     if (whereToSendOutput == DestRemote)
    3095             :     {
    3096             :         StringInfoData buf;
    3097             : 
    3098         110 :         pq_beginmessage(&buf, PqMsg_NotificationResponse);
    3099         110 :         pq_sendint32(&buf, srcPid);
    3100         110 :         pq_sendstring(&buf, channel);
    3101         110 :         pq_sendstring(&buf, payload);
    3102         110 :         pq_endmessage(&buf);
    3103             : 
    3104             :         /*
    3105             :          * NOTE: we do not do pq_flush() here.  Some level of caller will
    3106             :          * handle it later, allowing this message to be combined into a packet
    3107             :          * with other ones.
    3108             :          */
    3109             :     }
    3110             :     else
    3111           0 :         elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
    3112         110 : }
    3113             : 
    3114             : /* Does pendingNotifies include a match for the given event? */
    3115             : static bool
    3116        2132 : AsyncExistsPendingNotify(Notification *n)
    3117             : {
    3118        2132 :     if (pendingNotifies == NULL)
    3119           0 :         return false;
    3120             : 
    3121        2132 :     if (pendingNotifies->hashtab != NULL)
    3122             :     {
    3123             :         /* Use the hash table to probe for a match */
    3124        1968 :         if (hash_search(pendingNotifies->hashtab,
    3125             :                         &n,
    3126             :                         HASH_FIND,
    3127             :                         NULL))
    3128           2 :             return true;
    3129             :     }
    3130             :     else
    3131             :     {
    3132             :         /* Must scan the event list */
    3133             :         ListCell   *l;
    3134             : 
    3135         850 :         foreach(l, pendingNotifies->events)
    3136             :         {
    3137         714 :             Notification *oldn = (Notification *) lfirst(l);
    3138             : 
    3139         714 :             if (n->channel_len == oldn->channel_len &&
    3140         714 :                 n->payload_len == oldn->payload_len &&
    3141         380 :                 memcmp(n->data, oldn->data,
    3142         380 :                        n->channel_len + n->payload_len + 2) == 0)
    3143          28 :                 return true;
    3144             :         }
    3145             :     }
    3146             : 
    3147        2102 :     return false;
    3148             : }
    3149             : 
    3150             : /*
    3151             :  * Add a notification event to a pre-existing pendingNotifies list.
    3152             :  *
    3153             :  * Because pendingNotifies->events is already nonempty, this works
    3154             :  * correctly no matter what CurrentMemoryContext is.
    3155             :  */
    3156             : static void
    3157        2102 : AddEventToPendingNotifies(Notification *n)
    3158             : {
    3159             :     Assert(pendingNotifies->events != NIL);
    3160             : 
    3161             :     /* Create the hash tables if it's time to */
    3162        2102 :     if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
    3163        1970 :         pendingNotifies->hashtab == NULL)
    3164             :     {
    3165             :         HASHCTL     hash_ctl;
    3166             :         ListCell   *l;
    3167             : 
    3168             :         /* Create the hash table */
    3169           4 :         hash_ctl.keysize = sizeof(Notification *);
    3170           4 :         hash_ctl.entrysize = sizeof(struct NotificationHash);
    3171           4 :         hash_ctl.hash = notification_hash;
    3172           4 :         hash_ctl.match = notification_match;
    3173           4 :         hash_ctl.hcxt = CurTransactionContext;
    3174           8 :         pendingNotifies->hashtab =
    3175           4 :             hash_create("Pending Notifies",
    3176             :                         256L,
    3177             :                         &hash_ctl,
    3178             :                         HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
    3179             : 
    3180             :         /* Create the unique channel name table */
    3181             :         Assert(pendingNotifies->uniqueChannelHash == NULL);
    3182           4 :         hash_ctl.keysize = NAMEDATALEN;
    3183           4 :         hash_ctl.entrysize = sizeof(ChannelName);
    3184           4 :         hash_ctl.hcxt = CurTransactionContext;
    3185           8 :         pendingNotifies->uniqueChannelHash =
    3186           4 :             hash_create("Pending Notify Channel Names",
    3187             :                         64L,
    3188             :                         &hash_ctl,
    3189             :                         HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
    3190             : 
    3191             :         /* Insert all the already-existing events */
    3192          68 :         foreach(l, pendingNotifies->events)
    3193             :         {
    3194          64 :             Notification *oldn = (Notification *) lfirst(l);
    3195          64 :             char       *channel = oldn->data;
    3196             :             bool        found;
    3197             : 
    3198          64 :             (void) hash_search(pendingNotifies->hashtab,
    3199             :                                &oldn,
    3200             :                                HASH_ENTER,
    3201             :                                &found);
    3202             :             Assert(!found);
    3203             : 
    3204             :             /* Add channel name to uniqueChannelHash; might be there already */
    3205          64 :             (void) hash_search(pendingNotifies->uniqueChannelHash,
    3206             :                                channel,
    3207             :                                HASH_ENTER,
    3208             :                                NULL);
    3209             :         }
    3210             :     }
    3211             : 
    3212             :     /* Add new event to the list, in order */
    3213        2102 :     pendingNotifies->events = lappend(pendingNotifies->events, n);
    3214             : 
    3215             :     /* Add event to the hash tables if needed */
    3216        2102 :     if (pendingNotifies->hashtab != NULL)
    3217             :     {
    3218        1970 :         char       *channel = n->data;
    3219             :         bool        found;
    3220             : 
    3221        1970 :         (void) hash_search(pendingNotifies->hashtab,
    3222             :                            &n,
    3223             :                            HASH_ENTER,
    3224             :                            &found);
    3225             :         Assert(!found);
    3226             : 
    3227             :         /* Add channel name to uniqueChannelHash; might be there already */
    3228        1970 :         (void) hash_search(pendingNotifies->uniqueChannelHash,
    3229             :                            channel,
    3230             :                            HASH_ENTER,
    3231             :                            NULL);
    3232             :     }
    3233        2102 : }
    3234             : 
    3235             : /*
    3236             :  * notification_hash: hash function for notification hash table
    3237             :  *
    3238             :  * The hash "keys" are pointers to Notification structs.
    3239             :  */
    3240             : static uint32
    3241        4002 : notification_hash(const void *key, Size keysize)
    3242             : {
    3243        4002 :     const Notification *k = *(const Notification *const *) key;
    3244             : 
    3245             :     Assert(keysize == sizeof(Notification *));
    3246             :     /* We don't bother to include the payload's trailing null in the hash */
    3247        4002 :     return DatumGetUInt32(hash_any((const unsigned char *) k->data,
    3248        4002 :                                    k->channel_len + k->payload_len + 1));
    3249             : }
    3250             : 
    3251             : /*
    3252             :  * notification_match: match function to use with notification_hash
    3253             :  */
    3254             : static int
    3255           2 : notification_match(const void *key1, const void *key2, Size keysize)
    3256             : {
    3257           2 :     const Notification *k1 = *(const Notification *const *) key1;
    3258           2 :     const Notification *k2 = *(const Notification *const *) key2;
    3259             : 
    3260             :     Assert(keysize == sizeof(Notification *));
    3261           2 :     if (k1->channel_len == k2->channel_len &&
    3262           2 :         k1->payload_len == k2->payload_len &&
    3263           2 :         memcmp(k1->data, k2->data,
    3264           2 :                k1->channel_len + k1->payload_len + 2) == 0)
    3265           2 :         return 0;               /* equal */
    3266           0 :     return 1;                   /* not equal */
    3267             : }
    3268             : 
    3269             : /* Clear the pendingActions and pendingNotifies lists. */
    3270             : static void
    3271       52498 : ClearPendingActionsAndNotifies(void)
    3272             : {
    3273             :     /*
    3274             :      * Everything's allocated in either TopTransactionContext or the context
    3275             :      * for the subtransaction to which it corresponds.  So, there's nothing to
    3276             :      * do here except reset the pointers; the space will be reclaimed when the
    3277             :      * contexts are deleted.
    3278             :      */
    3279       52498 :     pendingActions = NULL;
    3280       52498 :     pendingNotifies = NULL;
    3281             :     /* Also clear pendingListenActions, which is derived from pendingActions */
    3282       52498 :     pendingListenActions = NULL;
    3283       52498 : }
    3284             : 
    3285             : /*
    3286             :  * GUC check_hook for notify_buffers
    3287             :  */
    3288             : bool
    3289        2354 : check_notify_buffers(int *newval, void **extra, GucSource source)
    3290             : {
    3291        2354 :     return check_slru_buffers("notify_buffers", newval);
    3292             : }

Generated by: LCOV version 1.16