LCOV - code coverage report
Current view: top level - src/backend/commands - async.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 495 576 85.9 %
Date: 2020-06-01 09:07:10 Functions: 44 45 97.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * async.c
       4             :  *    Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
       5             :  *
       6             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/commands/async.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : /*-------------------------------------------------------------------------
      16             :  * Async Notification Model as of 9.0:
      17             :  *
      18             :  * 1. Multiple backends on same machine. Multiple backends listening on
      19             :  *    several channels. (Channels are also called "conditions" in other
      20             :  *    parts of the code.)
      21             :  *
      22             :  * 2. There is one central queue in disk-based storage (directory pg_notify/),
      23             :  *    with actively-used pages mapped into shared memory by the slru.c module.
      24             :  *    All notification messages are placed in the queue and later read out
      25             :  *    by listening backends.
      26             :  *
      27             :  *    There is no central knowledge of which backend listens on which channel;
      28             :  *    every backend has its own list of interesting channels.
      29             :  *
      30             :  *    Although there is only one queue, notifications are treated as being
      31             :  *    database-local; this is done by including the sender's database OID
      32             :  *    in each notification message.  Listening backends ignore messages
      33             :  *    that don't match their database OID.  This is important because it
      34             :  *    ensures senders and receivers have the same database encoding and won't
      35             :  *    misinterpret non-ASCII text in the channel name or payload string.
      36             :  *
      37             :  *    Since notifications are not expected to survive database crashes,
      38             :  *    we can simply clean out the pg_notify data at any reboot, and there
      39             :  *    is no need for WAL support or fsync'ing.
      40             :  *
      41             :  * 3. Every backend that is listening on at least one channel registers by
      42             :  *    entering its PID into the array in AsyncQueueControl. It then scans all
      43             :  *    incoming notifications in the central queue and first compares the
      44             :  *    database OID of the notification with its own database OID and then
      45             :  *    compares the notified channel with the list of channels that it listens
      46             :  *    to. In case there is a match it delivers the notification event to its
      47             :  *    frontend.  Non-matching events are simply skipped.
      48             :  *
      49             :  * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
      50             :  *    a backend-local list which will not be processed until transaction end.
      51             :  *
      52             :  *    Duplicate notifications from the same transaction are sent out as one
      53             :  *    notification only. This is done to save work when for example a trigger
      54             :  *    on a 2 million row table fires a notification for each row that has been
      55             :  *    changed. If the application needs to receive every single notification
      56             :  *    that has been sent, it can easily add some unique string into the extra
      57             :  *    payload parameter.
      58             :  *
      59             :  *    When the transaction is ready to commit, PreCommit_Notify() adds the
      60             :  *    pending notifications to the head of the queue. The head pointer of the
      61             :  *    queue always points to the next free position and a position is just a
      62             :  *    page number and the offset in that page. This is done before marking the
      63             :  *    transaction as committed in clog. If we run into problems writing the
      64             :  *    notifications, we can still call elog(ERROR, ...) and the transaction
      65             :  *    will roll back.
      66             :  *
      67             :  *    Once we have put all of the notifications into the queue, we return to
      68             :  *    CommitTransaction() which will then do the actual transaction commit.
      69             :  *
      70             :  *    After commit we are called another time (AtCommit_Notify()). Here we
      71             :  *    make the actual updates to the effective listen state (listenChannels).
      72             :  *
      73             :  *    Finally, after we are out of the transaction altogether, we check if
      74             :  *    we need to signal listening backends.  In SignalBackends() we scan the
      75             :  *    list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
      76             :  *    to every listening backend (we don't know which backend is listening on
      77             :  *    which channel so we must signal them all). We can exclude backends that
      78             :  *    are already up to date, though, and we can also exclude backends that
      79             :  *    are in other databases (unless they are way behind and should be kicked
      80             :  *    to make them advance their pointers).  We don't bother with a
      81             :  *    self-signal either, but just process the queue directly.
      82             :  *
      83             :  * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
      84             :  *    sets the process's latch, which triggers the event to be processed
      85             :  *    immediately if this backend is idle (i.e., it is waiting for a frontend
      86             :  *    command and is not within a transaction block. C.f.
      87             :  *    ProcessClientReadInterrupt()).  Otherwise the handler may only set a
      88             :  *    flag, which will cause the processing to occur just before we next go
      89             :  *    idle.
      90             :  *
      91             :  *    Inbound-notify processing consists of reading all of the notifications
      92             :  *    that have arrived since scanning last time. We read every notification
      93             :  *    until we reach either a notification from an uncommitted transaction or
      94             :  *    the head pointer's position.
      95             :  *
      96             :  * 6. To avoid SLRU wraparound and limit disk space consumption, the tail
      97             :  *    pointer needs to be advanced so that old pages can be truncated.
      98             :  *    This is relatively expensive (notably, it requires an exclusive lock),
      99             :  *    so we don't want to do it often.  We make sending backends do this work
     100             :  *    if they advanced the queue head into a new page, but only once every
     101             :  *    QUEUE_CLEANUP_DELAY pages.
     102             :  *
     103             :  * An application that listens on the same channel it notifies will get
     104             :  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
     105             :  * by comparing be_pid in the NOTIFY message to the application's own backend's
     106             :  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
     107             :  * frontend during startup.)  The above design guarantees that notifies from
     108             :  * other backends will never be missed by ignoring self-notifies.
     109             :  *
     110             :  * The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS)
     111             :  * can be varied without affecting anything but performance.  The maximum
     112             :  * amount of notification data that can be queued at one time is determined
     113             :  * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below.
     114             :  *-------------------------------------------------------------------------
     115             :  */
     116             : 
     117             : #include "postgres.h"
     118             : 
     119             : #include <limits.h>
     120             : #include <unistd.h>
     121             : #include <signal.h>
     122             : 
     123             : #include "access/parallel.h"
     124             : #include "access/slru.h"
     125             : #include "access/transam.h"
     126             : #include "access/xact.h"
     127             : #include "catalog/pg_database.h"
     128             : #include "commands/async.h"
     129             : #include "common/hashfn.h"
     130             : #include "funcapi.h"
     131             : #include "libpq/libpq.h"
     132             : #include "libpq/pqformat.h"
     133             : #include "miscadmin.h"
     134             : #include "storage/ipc.h"
     135             : #include "storage/lmgr.h"
     136             : #include "storage/proc.h"
     137             : #include "storage/procarray.h"
     138             : #include "storage/procsignal.h"
     139             : #include "storage/sinval.h"
     140             : #include "tcop/tcopprot.h"
     141             : #include "utils/builtins.h"
     142             : #include "utils/memutils.h"
     143             : #include "utils/ps_status.h"
     144             : #include "utils/snapmgr.h"
     145             : #include "utils/timestamp.h"
     146             : 
     147             : 
     148             : /*
     149             :  * Maximum size of a NOTIFY payload, including terminating NULL.  This
     150             :  * must be kept small enough so that a notification message fits on one
     151             :  * SLRU page.  The magic fudge factor here is noncritical as long as it's
     152             :  * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
     153             :  * than that, so changes in that data structure won't affect user-visible
     154             :  * restrictions.
     155             :  */
     156             : #define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
     157             : 
     158             : /*
     159             :  * Struct representing an entry in the global notify queue
     160             :  *
     161             :  * This struct declaration has the maximal length, but in a real queue entry
     162             :  * the data area is only big enough for the actual channel and payload strings
     163             :  * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
     164             :  * entry size, if both channel and payload strings are empty (but note it
     165             :  * doesn't include alignment padding).
     166             :  *
     167             :  * The "length" field should always be rounded up to the next QUEUEALIGN
     168             :  * multiple so that all fields are properly aligned.
     169             :  */
     170             : typedef struct AsyncQueueEntry
     171             : {
     172             :     int         length;         /* total allocated length of entry */
     173             :     Oid         dboid;          /* sender's database OID */
     174             :     TransactionId xid;          /* sender's XID */
     175             :     int32       srcPid;         /* sender's PID */
     176             :     char        data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
     177             : } AsyncQueueEntry;
     178             : 
     179             : /* Currently, no field of AsyncQueueEntry requires more than int alignment */
     180             : #define QUEUEALIGN(len)     INTALIGN(len)
     181             : 
     182             : #define AsyncQueueEntryEmptySize    (offsetof(AsyncQueueEntry, data) + 2)
     183             : 
     184             : /*
     185             :  * Struct describing a queue position, and assorted macros for working with it
     186             :  */
     187             : typedef struct QueuePosition
     188             : {
     189             :     int         page;           /* SLRU page number */
     190             :     int         offset;         /* byte offset within page */
     191             : } QueuePosition;
     192             : 
     193             : #define QUEUE_POS_PAGE(x)       ((x).page)
     194             : #define QUEUE_POS_OFFSET(x)     ((x).offset)
     195             : 
     196             : #define SET_QUEUE_POS(x,y,z) \
     197             :     do { \
     198             :         (x).page = (y); \
     199             :         (x).offset = (z); \
     200             :     } while (0)
     201             : 
     202             : #define QUEUE_POS_EQUAL(x,y) \
     203             :     ((x).page == (y).page && (x).offset == (y).offset)
     204             : 
     205             : #define QUEUE_POS_IS_ZERO(x) \
     206             :     ((x).page == 0 && (x).offset == 0)
     207             : 
     208             : /* choose logically smaller QueuePosition */
     209             : #define QUEUE_POS_MIN(x,y) \
     210             :     (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
     211             :      (x).page != (y).page ? (y) : \
     212             :      (x).offset < (y).offset ? (x) : (y))
     213             : 
     214             : /* choose logically larger QueuePosition */
     215             : #define QUEUE_POS_MAX(x,y) \
     216             :     (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
     217             :      (x).page != (y).page ? (x) : \
     218             :      (x).offset > (y).offset ? (x) : (y))
     219             : 
     220             : /*
     221             :  * Parameter determining how often we try to advance the tail pointer:
     222             :  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
     223             :  * also the distance by which a backend in another database needs to be
     224             :  * behind before we'll decide we need to wake it up to advance its pointer.
     225             :  *
     226             :  * Resist the temptation to make this really large.  While that would save
     227             :  * work in some places, it would add cost in others.  In particular, this
     228             :  * should likely be less than NUM_NOTIFY_BUFFERS, to ensure that backends
     229             :  * catch up before the pages they'll need to read fall out of SLRU cache.
     230             :  */
     231             : #define QUEUE_CLEANUP_DELAY 4
     232             : 
     233             : /*
     234             :  * Struct describing a listening backend's status
     235             :  */
     236             : typedef struct QueueBackendStatus
     237             : {
     238             :     int32       pid;            /* either a PID or InvalidPid */
     239             :     Oid         dboid;          /* backend's database OID, or InvalidOid */
     240             :     BackendId   nextListener;   /* id of next listener, or InvalidBackendId */
     241             :     QueuePosition pos;          /* backend has read queue up to here */
     242             : } QueueBackendStatus;
     243             : 
     244             : /*
     245             :  * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
     246             :  *
     247             :  * The AsyncQueueControl structure is protected by the NotifyQueueLock.
     248             :  *
     249             :  * When holding the lock in SHARED mode, backends may only inspect their own
     250             :  * entries as well as the head and tail pointers. Consequently we can allow a
     251             :  * backend to update its own record while holding only SHARED lock (since no
     252             :  * other backend will inspect it).
     253             :  *
     254             :  * When holding the lock in EXCLUSIVE mode, backends can inspect the entries
     255             :  * of other backends and also change the head and tail pointers.
     256             :  *
     257             :  * NotifySLRULock is used as the control lock for the pg_notify SLRU buffers.
     258             :  * In order to avoid deadlocks, whenever we need both locks, we always first
     259             :  * get NotifyQueueLock and then NotifySLRULock.
     260             :  *
     261             :  * Each backend uses the backend[] array entry with index equal to its
     262             :  * BackendId (which can range from 1 to MaxBackends).  We rely on this to make
     263             :  * SendProcSignal fast.
     264             :  *
     265             :  * The backend[] array entries for actively-listening backends are threaded
     266             :  * together using firstListener and the nextListener links, so that we can
     267             :  * scan them without having to iterate over inactive entries.  We keep this
     268             :  * list in order by BackendId so that the scan is cache-friendly when there
     269             :  * are many active entries.
     270             :  */
     271             : typedef struct AsyncQueueControl
     272             : {
     273             :     QueuePosition head;         /* head points to the next free location */
     274             :     QueuePosition tail;         /* tail must be <= the queue position of every
     275             :                                  * listening backend */
     276             :     BackendId   firstListener;  /* id of first listener, or InvalidBackendId */
     277             :     TimestampTz lastQueueFillWarn;  /* time of last queue-full msg */
     278             :     QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
     279             :     /* backend[0] is not used; used entries are from [1] to [MaxBackends] */
     280             : } AsyncQueueControl;
     281             : 
     282             : static AsyncQueueControl *asyncQueueControl;
     283             : 
     284             : #define QUEUE_HEAD                  (asyncQueueControl->head)
     285             : #define QUEUE_TAIL                  (asyncQueueControl->tail)
     286             : #define QUEUE_FIRST_LISTENER        (asyncQueueControl->firstListener)
     287             : #define QUEUE_BACKEND_PID(i)        (asyncQueueControl->backend[i].pid)
     288             : #define QUEUE_BACKEND_DBOID(i)      (asyncQueueControl->backend[i].dboid)
     289             : #define QUEUE_NEXT_LISTENER(i)      (asyncQueueControl->backend[i].nextListener)
     290             : #define QUEUE_BACKEND_POS(i)        (asyncQueueControl->backend[i].pos)
     291             : 
     292             : /*
     293             :  * The SLRU buffer area through which we access the notification queue
     294             :  */
     295             : static SlruCtlData NotifyCtlData;
     296             : 
     297             : #define NotifyCtl                   (&NotifyCtlData)
     298             : #define QUEUE_PAGESIZE              BLCKSZ
     299             : #define QUEUE_FULL_WARN_INTERVAL    5000    /* warn at most once every 5s */
     300             : 
     301             : /*
     302             :  * slru.c currently assumes that all filenames are four characters of hex
     303             :  * digits. That means that we can use segments 0000 through FFFF.
     304             :  * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us
     305             :  * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.
     306             :  *
     307             :  * It's of course possible to enhance slru.c, but this gives us so much
     308             :  * space already that it doesn't seem worth the trouble.
     309             :  *
     310             :  * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
     311             :  * pages, because more than that would confuse slru.c into thinking there
     312             :  * was a wraparound condition.  With the default BLCKSZ this means there
     313             :  * can be up to 8GB of queued-and-not-read data.
     314             :  *
     315             :  * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
     316             :  * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.
     317             :  */
     318             : #define QUEUE_MAX_PAGE          (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
     319             : 
     320             : /*
     321             :  * listenChannels identifies the channels we are actually listening to
     322             :  * (ie, have committed a LISTEN on).  It is a simple list of channel names,
     323             :  * allocated in TopMemoryContext.
     324             :  */
     325             : static List *listenChannels = NIL;  /* list of C strings */
     326             : 
     327             : /*
     328             :  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
     329             :  * all actions requested in the current transaction.  As explained above,
     330             :  * we don't actually change listenChannels until we reach transaction commit.
     331             :  *
     332             :  * The list is kept in CurTransactionContext.  In subtransactions, each
     333             :  * subtransaction has its own list in its own CurTransactionContext, but
     334             :  * successful subtransactions attach their lists to their parent's list.
     335             :  * Failed subtransactions simply discard their lists.
     336             :  */
     337             : typedef enum
     338             : {
     339             :     LISTEN_LISTEN,
     340             :     LISTEN_UNLISTEN,
     341             :     LISTEN_UNLISTEN_ALL
     342             : } ListenActionKind;
     343             : 
     344             : typedef struct
     345             : {
     346             :     ListenActionKind action;
     347             :     char        channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
     348             : } ListenAction;
     349             : 
     350             : typedef struct ActionList
     351             : {
     352             :     int         nestingLevel;   /* current transaction nesting depth */
     353             :     List       *actions;        /* list of ListenAction structs */
     354             :     struct ActionList *upper;   /* details for upper transaction levels */
     355             : } ActionList;
     356             : 
     357             : static ActionList *pendingActions = NULL;
     358             : 
     359             : /*
     360             :  * State for outbound notifies consists of a list of all channels+payloads
     361             :  * NOTIFYed in the current transaction.  We do not actually perform a NOTIFY
     362             :  * until and unless the transaction commits.  pendingNotifies is NULL if no
     363             :  * NOTIFYs have been done in the current (sub) transaction.
     364             :  *
     365             :  * We discard duplicate notify events issued in the same transaction.
     366             :  * Hence, in addition to the list proper (which we need to track the order
     367             :  * of the events, since we guarantee to deliver them in order), we build a
     368             :  * hash table which we can probe to detect duplicates.  Since building the
     369             :  * hash table is somewhat expensive, we do so only once we have at least
     370             :  * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
     371             :  * before that we just scan the events linearly.
     372             :  *
     373             :  * The list is kept in CurTransactionContext.  In subtransactions, each
     374             :  * subtransaction has its own list in its own CurTransactionContext, but
     375             :  * successful subtransactions add their entries to their parent's list.
     376             :  * Failed subtransactions simply discard their lists.  Since these lists
     377             :  * are independent, there may be notify events in a subtransaction's list
     378             :  * that duplicate events in some ancestor (sub) transaction; we get rid of
     379             :  * the dups when merging the subtransaction's list into its parent's.
     380             :  *
     381             :  * Note: the action and notify lists do not interact within a transaction.
     382             :  * In particular, if a transaction does NOTIFY and then LISTEN on the same
     383             :  * condition name, it will get a self-notify at commit.  This is a bit odd
     384             :  * but is consistent with our historical behavior.
     385             :  */
     386             : typedef struct Notification
     387             : {
     388             :     uint16      channel_len;    /* length of channel-name string */
     389             :     uint16      payload_len;    /* length of payload string */
     390             :     /* null-terminated channel name, then null-terminated payload follow */
     391             :     char        data[FLEXIBLE_ARRAY_MEMBER];
     392             : } Notification;
     393             : 
     394             : typedef struct NotificationList
     395             : {
     396             :     int         nestingLevel;   /* current transaction nesting depth */
     397             :     List       *events;         /* list of Notification structs */
     398             :     HTAB       *hashtab;        /* hash of NotificationHash structs, or NULL */
     399             :     struct NotificationList *upper; /* details for upper transaction levels */
     400             : } NotificationList;
     401             : 
     402             : #define MIN_HASHABLE_NOTIFIES 16    /* threshold to build hashtab */
     403             : 
     404             : typedef struct NotificationHash
     405             : {
     406             :     Notification *event;        /* => the actual Notification struct */
     407             : } NotificationHash;
     408             : 
     409             : static NotificationList *pendingNotifies = NULL;
     410             : 
     411             : /*
     412             :  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
     413             :  * called from inside a signal handler. That just sets the
     414             :  * notifyInterruptPending flag and sets the process
     415             :  * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
     416             :  * actually deal with the interrupt.
     417             :  */
     418             : volatile sig_atomic_t notifyInterruptPending = false;
     419             : 
     420             : /* True if we've registered an on_shmem_exit cleanup */
     421             : static bool unlistenExitRegistered = false;
     422             : 
     423             : /* True if we're currently registered as a listener in asyncQueueControl */
     424             : static bool amRegisteredListener = false;
     425             : 
     426             : /* has this backend sent notifications in the current transaction? */
     427             : static bool backendHasSentNotifications = false;
     428             : 
     429             : /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
     430             : static bool backendTryAdvanceTail = false;
     431             : 
     432             : /* GUC parameter */
     433             : bool        Trace_notify = false;
     434             : 
     435             : /* local function prototypes */
     436             : static int  asyncQueuePageDiff(int p, int q);
     437             : static bool asyncQueuePagePrecedes(int p, int q);
     438             : static void queue_listen(ListenActionKind action, const char *channel);
     439             : static void Async_UnlistenOnExit(int code, Datum arg);
     440             : static void Exec_ListenPreCommit(void);
     441             : static void Exec_ListenCommit(const char *channel);
     442             : static void Exec_UnlistenCommit(const char *channel);
     443             : static void Exec_UnlistenAllCommit(void);
     444             : static bool IsListeningOn(const char *channel);
     445             : static void asyncQueueUnregister(void);
     446             : static bool asyncQueueIsFull(void);
     447             : static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
     448             : static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
     449             : static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
     450             : static double asyncQueueUsage(void);
     451             : static void asyncQueueFillWarning(void);
     452             : static void SignalBackends(void);
     453             : static void asyncQueueReadAllNotifications(void);
     454             : static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
     455             :                                          QueuePosition stop,
     456             :                                          char *page_buffer,
     457             :                                          Snapshot snapshot);
     458             : static void asyncQueueAdvanceTail(void);
     459             : static void ProcessIncomingNotify(void);
     460             : static bool AsyncExistsPendingNotify(Notification *n);
     461             : static void AddEventToPendingNotifies(Notification *n);
     462             : static uint32 notification_hash(const void *key, Size keysize);
     463             : static int  notification_match(const void *key1, const void *key2, Size keysize);
     464             : static void ClearPendingActionsAndNotifies(void);
     465             : 
     466             : /*
     467             :  * Compute the difference between two queue page numbers (i.e., p - q),
     468             :  * accounting for wraparound.
     469             :  */
     470             : static int
     471          74 : asyncQueuePageDiff(int p, int q)
     472             : {
     473             :     int         diff;
     474             : 
     475             :     /*
     476             :      * We have to compare modulo (QUEUE_MAX_PAGE+1)/2.  Both inputs should be
     477             :      * in the range 0..QUEUE_MAX_PAGE.
     478             :      */
     479             :     Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
     480             :     Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
     481             : 
     482          74 :     diff = p - q;
     483          74 :     if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
     484           0 :         diff -= QUEUE_MAX_PAGE + 1;
     485          74 :     else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
     486           0 :         diff += QUEUE_MAX_PAGE + 1;
     487          74 :     return diff;
     488             : }
     489             : 
     490             : /* Is p < q, accounting for wraparound? */
     491             : static bool
     492          74 : asyncQueuePagePrecedes(int p, int q)
     493             : {
     494          74 :     return asyncQueuePageDiff(p, q) < 0;
     495             : }
     496             : 
     497             : /*
     498             :  * Report space needed for our shared memory area
     499             :  */
     500             : Size
     501        2174 : AsyncShmemSize(void)
     502             : {
     503             :     Size        size;
     504             : 
     505             :     /* This had better match AsyncShmemInit */
     506        2174 :     size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
     507        2174 :     size = add_size(size, offsetof(AsyncQueueControl, backend));
     508             : 
     509        2174 :     size = add_size(size, SimpleLruShmemSize(NUM_NOTIFY_BUFFERS, 0));
     510             : 
     511        2174 :     return size;
     512             : }
     513             : 
     514             : /*
     515             :  * Initialize our shared memory area
     516             :  */
     517             : void
     518        2170 : AsyncShmemInit(void)
     519             : {
     520             :     bool        found;
     521             :     Size        size;
     522             : 
     523             :     /*
     524             :      * Create or attach to the AsyncQueueControl structure.
     525             :      *
     526             :      * The used entries in the backend[] array run from 1 to MaxBackends; the
     527             :      * zero'th entry is unused but must be allocated.
     528             :      */
     529        2170 :     size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
     530        2170 :     size = add_size(size, offsetof(AsyncQueueControl, backend));
     531             : 
     532        2170 :     asyncQueueControl = (AsyncQueueControl *)
     533        2170 :         ShmemInitStruct("Async Queue Control", size, &found);
     534             : 
     535        2170 :     if (!found)
     536             :     {
     537             :         /* First time through, so initialize it */
     538        2170 :         SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
     539        2170 :         SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
     540        2170 :         QUEUE_FIRST_LISTENER = InvalidBackendId;
     541        2170 :         asyncQueueControl->lastQueueFillWarn = 0;
     542             :         /* zero'th entry won't be used, but let's initialize it anyway */
     543      232536 :         for (int i = 0; i <= MaxBackends; i++)
     544             :         {
     545      230366 :             QUEUE_BACKEND_PID(i) = InvalidPid;
     546      230366 :             QUEUE_BACKEND_DBOID(i) = InvalidOid;
     547      230366 :             QUEUE_NEXT_LISTENER(i) = InvalidBackendId;
     548      230366 :             SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
     549             :         }
     550             :     }
     551             : 
     552             :     /*
     553             :      * Set up SLRU management of the pg_notify data.
     554             :      */
     555        2170 :     NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
     556        2170 :     SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, 0,
     557        2170 :                   NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER);
     558             :     /* Override default assumption that writes should be fsync'd */
     559        2170 :     NotifyCtl->do_fsync = false;
     560             : 
     561        2170 :     if (!found)
     562             :     {
     563             :         /*
     564             :          * During start or reboot, clean out the pg_notify directory.
     565             :          */
     566        2170 :         (void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
     567             :     }
     568        2170 : }
     569             : 
     570             : 
     571             : /*
     572             :  * pg_notify -
     573             :  *    SQL function to send a notification event
     574             :  */
     575             : Datum
     576        2030 : pg_notify(PG_FUNCTION_ARGS)
     577             : {
     578             :     const char *channel;
     579             :     const char *payload;
     580             : 
     581        2030 :     if (PG_ARGISNULL(0))
     582           4 :         channel = "";
     583             :     else
     584        2026 :         channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
     585             : 
     586        2030 :     if (PG_ARGISNULL(1))
     587          10 :         payload = "";
     588             :     else
     589        2020 :         payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
     590             : 
     591             :     /* For NOTIFY as a statement, this is checked in ProcessUtility */
     592        2030 :     PreventCommandDuringRecovery("NOTIFY");
     593             : 
     594        2030 :     Async_Notify(channel, payload);
     595             : 
     596        2018 :     PG_RETURN_VOID();
     597             : }
     598             : 
     599             : 
     600             : /*
     601             :  * Async_Notify
     602             :  *
     603             :  *      This is executed by the SQL notify command.
     604             :  *
     605             :  *      Adds the message to the list of pending notifies.
     606             :  *      Actual notification happens during transaction commit.
     607             :  *      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     608             :  */
     609             : void
     610        2112 : Async_Notify(const char *channel, const char *payload)
     611             : {
     612        2112 :     int         my_level = GetCurrentTransactionNestLevel();
     613             :     size_t      channel_len;
     614             :     size_t      payload_len;
     615             :     Notification *n;
     616             :     MemoryContext oldcontext;
     617             : 
     618        2112 :     if (IsParallelWorker())
     619           0 :         elog(ERROR, "cannot send notifications from a parallel worker");
     620             : 
     621        2112 :     if (Trace_notify)
     622           0 :         elog(DEBUG1, "Async_Notify(%s)", channel);
     623             : 
     624        2112 :     channel_len = channel ? strlen(channel) : 0;
     625        2112 :     payload_len = payload ? strlen(payload) : 0;
     626             : 
     627             :     /* a channel name must be specified */
     628        2112 :     if (channel_len == 0)
     629           8 :         ereport(ERROR,
     630             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     631             :                  errmsg("channel name cannot be empty")));
     632             : 
     633             :     /* enforce length limits */
     634        2104 :     if (channel_len >= NAMEDATALEN)
     635           4 :         ereport(ERROR,
     636             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     637             :                  errmsg("channel name too long")));
     638             : 
     639        2100 :     if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
     640           0 :         ereport(ERROR,
     641             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     642             :                  errmsg("payload string too long")));
     643             : 
     644             :     /*
     645             :      * We must construct the Notification entry, even if we end up not using
     646             :      * it, in order to compare it cheaply to existing list entries.
     647             :      *
     648             :      * The notification list needs to live until end of transaction, so store
     649             :      * it in the transaction context.
     650             :      */
     651        2100 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     652             : 
     653        2100 :     n = (Notification *) palloc(offsetof(Notification, data) +
     654        2100 :                                 channel_len + payload_len + 2);
     655        2100 :     n->channel_len = channel_len;
     656        2100 :     n->payload_len = payload_len;
     657        2100 :     strcpy(n->data, channel);
     658        2100 :     if (payload)
     659        2078 :         strcpy(n->data + channel_len + 1, payload);
     660             :     else
     661          22 :         n->data[channel_len + 1] = '\0';
     662             : 
     663        2100 :     if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
     664          58 :     {
     665             :         NotificationList *notifies;
     666             : 
     667             :         /*
     668             :          * First notify event in current (sub)xact. Note that we allocate the
     669             :          * NotificationList in TopTransactionContext; the nestingLevel might
     670             :          * get changed later by AtSubCommit_Notify.
     671             :          */
     672             :         notifies = (NotificationList *)
     673          58 :             MemoryContextAlloc(TopTransactionContext,
     674             :                                sizeof(NotificationList));
     675          58 :         notifies->nestingLevel = my_level;
     676          58 :         notifies->events = list_make1(n);
     677             :         /* We certainly don't need a hashtable yet */
     678          58 :         notifies->hashtab = NULL;
     679          58 :         notifies->upper = pendingNotifies;
     680          58 :         pendingNotifies = notifies;
     681             :     }
     682             :     else
     683             :     {
     684             :         /* Now check for duplicates */
     685        2042 :         if (AsyncExistsPendingNotify(n))
     686             :         {
     687             :             /* It's a dup, so forget it */
     688          24 :             pfree(n);
     689          24 :             MemoryContextSwitchTo(oldcontext);
     690          24 :             return;
     691             :         }
     692             : 
     693             :         /* Append more events to existing list */
     694        2018 :         AddEventToPendingNotifies(n);
     695             :     }
     696             : 
     697        2076 :     MemoryContextSwitchTo(oldcontext);
     698             : }
     699             : 
     700             : /*
     701             :  * queue_listen
     702             :  *      Common code for listen, unlisten, unlisten all commands.
     703             :  *
     704             :  *      Adds the request to the list of pending actions.
     705             :  *      Actual update of the listenChannels list happens during transaction
     706             :  *      commit.
     707             :  */
     708             : static void
     709          76 : queue_listen(ListenActionKind action, const char *channel)
     710             : {
     711             :     MemoryContext oldcontext;
     712             :     ListenAction *actrec;
     713          76 :     int         my_level = GetCurrentTransactionNestLevel();
     714             : 
     715             :     /*
     716             :      * Unlike Async_Notify, we don't try to collapse out duplicates. It would
     717             :      * be too complicated to ensure we get the right interactions of
     718             :      * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
     719             :      * would be any performance benefit anyway in sane applications.
     720             :      */
     721          76 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     722             : 
     723             :     /* space for terminating null is included in sizeof(ListenAction) */
     724          76 :     actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
     725          76 :                                      strlen(channel) + 1);
     726          76 :     actrec->action = action;
     727          76 :     strcpy(actrec->channel, channel);
     728             : 
     729          76 :     if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
     730          62 :     {
     731             :         ActionList *actions;
     732             : 
     733             :         /*
     734             :          * First action in current sub(xact). Note that we allocate the
     735             :          * ActionList in TopTransactionContext; the nestingLevel might get
     736             :          * changed later by AtSubCommit_Notify.
     737             :          */
     738             :         actions = (ActionList *)
     739          62 :             MemoryContextAlloc(TopTransactionContext, sizeof(ActionList));
     740          62 :         actions->nestingLevel = my_level;
     741          62 :         actions->actions = list_make1(actrec);
     742          62 :         actions->upper = pendingActions;
     743          62 :         pendingActions = actions;
     744             :     }
     745             :     else
     746          14 :         pendingActions->actions = lappend(pendingActions->actions, actrec);
     747             : 
     748          76 :     MemoryContextSwitchTo(oldcontext);
     749          76 : }
     750             : 
     751             : /*
     752             :  * Async_Listen
     753             :  *
     754             :  *      This is executed by the SQL listen command.
     755             :  */
     756             : void
     757          42 : Async_Listen(const char *channel)
     758             : {
     759          42 :     if (Trace_notify)
     760           0 :         elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
     761             : 
     762          42 :     queue_listen(LISTEN_LISTEN, channel);
     763          42 : }
     764             : 
     765             : /*
     766             :  * Async_Unlisten
     767             :  *
     768             :  *      This is executed by the SQL unlisten command.
     769             :  */
     770             : void
     771           4 : Async_Unlisten(const char *channel)
     772             : {
     773           4 :     if (Trace_notify)
     774           0 :         elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
     775             : 
     776             :     /* If we couldn't possibly be listening, no need to queue anything */
     777           4 :     if (pendingActions == NULL && !unlistenExitRegistered)
     778           0 :         return;
     779             : 
     780           4 :     queue_listen(LISTEN_UNLISTEN, channel);
     781             : }
     782             : 
     783             : /*
     784             :  * Async_UnlistenAll
     785             :  *
     786             :  *      This is invoked by UNLISTEN * command, and also at backend exit.
     787             :  */
     788             : void
     789          34 : Async_UnlistenAll(void)
     790             : {
     791          34 :     if (Trace_notify)
     792           0 :         elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
     793             : 
     794             :     /* If we couldn't possibly be listening, no need to queue anything */
     795          34 :     if (pendingActions == NULL && !unlistenExitRegistered)
     796           4 :         return;
     797             : 
     798          30 :     queue_listen(LISTEN_UNLISTEN_ALL, "");
     799             : }
     800             : 
     801             : /*
     802             :  * SQL function: return a set of the channel names this backend is actively
     803             :  * listening to.
     804             :  *
     805             :  * Note: this coding relies on the fact that the listenChannels list cannot
     806             :  * change within a transaction.
     807             :  */
     808             : Datum
     809          12 : pg_listening_channels(PG_FUNCTION_ARGS)
     810             : {
     811             :     FuncCallContext *funcctx;
     812             : 
     813             :     /* stuff done only on the first call of the function */
     814          12 :     if (SRF_IS_FIRSTCALL())
     815             :     {
     816             :         /* create a function context for cross-call persistence */
     817           8 :         funcctx = SRF_FIRSTCALL_INIT();
     818             :     }
     819             : 
     820             :     /* stuff done on every call of the function */
     821          12 :     funcctx = SRF_PERCALL_SETUP();
     822             : 
     823          12 :     if (funcctx->call_cntr < list_length(listenChannels))
     824             :     {
     825           4 :         char       *channel = (char *) list_nth(listenChannels,
     826           4 :                                                 funcctx->call_cntr);
     827             : 
     828           4 :         SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
     829             :     }
     830             : 
     831           8 :     SRF_RETURN_DONE(funcctx);
     832             : }
     833             : 
     834             : /*
     835             :  * Async_UnlistenOnExit
     836             :  *
     837             :  * This is executed at backend exit if we have done any LISTENs in this
     838             :  * backend.  It might not be necessary anymore, if the user UNLISTENed
     839             :  * everything, but we don't try to detect that case.
     840             :  */
     841             : static void
     842          16 : Async_UnlistenOnExit(int code, Datum arg)
     843             : {
     844          16 :     Exec_UnlistenAllCommit();
     845          16 :     asyncQueueUnregister();
     846          16 : }
     847             : 
     848             : /*
     849             :  * AtPrepare_Notify
     850             :  *
     851             :  *      This is called at the prepare phase of a two-phase
     852             :  *      transaction.  Save the state for possible commit later.
     853             :  */
     854             : void
     855          60 : AtPrepare_Notify(void)
     856             : {
     857             :     /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
     858          60 :     if (pendingActions || pendingNotifies)
     859           0 :         ereport(ERROR,
     860             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     861             :                  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
     862          60 : }
     863             : 
     864             : /*
     865             :  * PreCommit_Notify
     866             :  *
     867             :  *      This is called at transaction commit, before actually committing to
     868             :  *      clog.
     869             :  *
     870             :  *      If there are pending LISTEN actions, make sure we are listed in the
     871             :  *      shared-memory listener array.  This must happen before commit to
     872             :  *      ensure we don't miss any notifies from transactions that commit
     873             :  *      just after ours.
     874             :  *
     875             :  *      If there are outbound notify requests in the pendingNotifies list,
     876             :  *      add them to the global queue.  We do that before commit so that
     877             :  *      we can still throw error if we run out of queue space.
     878             :  */
     879             : void
     880      474904 : PreCommit_Notify(void)
     881             : {
     882             :     ListCell   *p;
     883             : 
     884      474904 :     if (!pendingActions && !pendingNotifies)
     885      474790 :         return;                 /* no relevant statements in this xact */
     886             : 
     887         114 :     if (Trace_notify)
     888           0 :         elog(DEBUG1, "PreCommit_Notify");
     889             : 
     890             :     /* Preflight for any pending listen/unlisten actions */
     891         114 :     if (pendingActions != NULL)
     892             :     {
     893         134 :         foreach(p, pendingActions->actions)
     894             :         {
     895          74 :             ListenAction *actrec = (ListenAction *) lfirst(p);
     896             : 
     897          74 :             switch (actrec->action)
     898             :             {
     899          42 :                 case LISTEN_LISTEN:
     900          42 :                     Exec_ListenPreCommit();
     901          42 :                     break;
     902           4 :                 case LISTEN_UNLISTEN:
     903             :                     /* there is no Exec_UnlistenPreCommit() */
     904           4 :                     break;
     905          28 :                 case LISTEN_UNLISTEN_ALL:
     906             :                     /* there is no Exec_UnlistenAllPreCommit() */
     907          28 :                     break;
     908             :             }
     909          74 :         }
     910             :     }
     911             : 
     912             :     /* Queue any pending notifies (must happen after the above) */
     913         114 :     if (pendingNotifies)
     914             :     {
     915             :         ListCell   *nextNotify;
     916             : 
     917             :         /*
     918             :          * Make sure that we have an XID assigned to the current transaction.
     919             :          * GetCurrentTransactionId is cheap if we already have an XID, but not
     920             :          * so cheap if we don't, and we'd prefer not to do that work while
     921             :          * holding NotifyQueueLock.
     922             :          */
     923          54 :         (void) GetCurrentTransactionId();
     924             : 
     925             :         /*
     926             :          * Serialize writers by acquiring a special lock that we hold till
     927             :          * after commit.  This ensures that queue entries appear in commit
     928             :          * order, and in particular that there are never uncommitted queue
     929             :          * entries ahead of committed ones, so an uncommitted transaction
     930             :          * can't block delivery of deliverable notifications.
     931             :          *
     932             :          * We use a heavyweight lock so that it'll automatically be released
     933             :          * after either commit or abort.  This also allows deadlocks to be
     934             :          * detected, though really a deadlock shouldn't be possible here.
     935             :          *
     936             :          * The lock is on "database 0", which is pretty ugly but it doesn't
     937             :          * seem worth inventing a special locktag category just for this.
     938             :          * (Historical note: before PG 9.0, a similar lock on "database 0" was
     939             :          * used by the flatfiles mechanism.)
     940             :          */
     941          54 :         LockSharedObject(DatabaseRelationId, InvalidOid, 0,
     942             :                          AccessExclusiveLock);
     943             : 
     944             :         /* Now push the notifications into the queue */
     945          54 :         backendHasSentNotifications = true;
     946             : 
     947          54 :         nextNotify = list_head(pendingNotifies->events);
     948         112 :         while (nextNotify != NULL)
     949             :         {
     950             :             /*
     951             :              * Add the pending notifications to the queue.  We acquire and
     952             :              * release NotifyQueueLock once per page, which might be overkill
     953             :              * but it does allow readers to get in while we're doing this.
     954             :              *
     955             :              * A full queue is very uncommon and should really not happen,
     956             :              * given that we have so much space available in the SLRU pages.
     957             :              * Nevertheless we need to deal with this possibility. Note that
     958             :              * when we get here we are in the process of committing our
     959             :              * transaction, but we have not yet committed to clog, so at this
     960             :              * point in time we can still roll the transaction back.
     961             :              */
     962          58 :             LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
     963          58 :             asyncQueueFillWarning();
     964          58 :             if (asyncQueueIsFull())
     965           0 :                 ereport(ERROR,
     966             :                         (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     967             :                          errmsg("too many notifications in the NOTIFY queue")));
     968          58 :             nextNotify = asyncQueueAddEntries(nextNotify);
     969          58 :             LWLockRelease(NotifyQueueLock);
     970             :         }
     971             :     }
     972             : }
     973             : 
     974             : /*
     975             :  * AtCommit_Notify
     976             :  *
     977             :  *      This is called at transaction commit, after committing to clog.
     978             :  *
     979             :  *      Update listenChannels and clear transaction-local state.
     980             :  */
     981             : void
     982      474610 : AtCommit_Notify(void)
     983             : {
     984             :     ListCell   *p;
     985             : 
     986             :     /*
     987             :      * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
     988             :      * return as soon as possible
     989             :      */
     990      474610 :     if (!pendingActions && !pendingNotifies)
     991      474496 :         return;
     992             : 
     993         114 :     if (Trace_notify)
     994           0 :         elog(DEBUG1, "AtCommit_Notify");
     995             : 
     996             :     /* Perform any pending listen/unlisten actions */
     997         114 :     if (pendingActions != NULL)
     998             :     {
     999         134 :         foreach(p, pendingActions->actions)
    1000             :         {
    1001          74 :             ListenAction *actrec = (ListenAction *) lfirst(p);
    1002             : 
    1003          74 :             switch (actrec->action)
    1004             :             {
    1005          42 :                 case LISTEN_LISTEN:
    1006          42 :                     Exec_ListenCommit(actrec->channel);
    1007          42 :                     break;
    1008           4 :                 case LISTEN_UNLISTEN:
    1009           4 :                     Exec_UnlistenCommit(actrec->channel);
    1010           4 :                     break;
    1011          28 :                 case LISTEN_UNLISTEN_ALL:
    1012          28 :                     Exec_UnlistenAllCommit();
    1013          28 :                     break;
    1014             :             }
    1015          74 :         }
    1016             :     }
    1017             : 
    1018             :     /* If no longer listening to anything, get out of listener array */
    1019         114 :     if (amRegisteredListener && listenChannels == NIL)
    1020          22 :         asyncQueueUnregister();
    1021             : 
    1022             :     /* And clean up */
    1023         114 :     ClearPendingActionsAndNotifies();
    1024             : }
    1025             : 
    1026             : /*
    1027             :  * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
    1028             :  *
    1029             :  * This function must make sure we are ready to catch any incoming messages.
    1030             :  */
    1031             : static void
    1032          42 : Exec_ListenPreCommit(void)
    1033             : {
    1034             :     QueuePosition head;
    1035             :     QueuePosition max;
    1036             :     BackendId   prevListener;
    1037             : 
    1038             :     /*
    1039             :      * Nothing to do if we are already listening to something, nor if we
    1040             :      * already ran this routine in this transaction.
    1041             :      */
    1042          42 :     if (amRegisteredListener)
    1043          16 :         return;
    1044             : 
    1045          26 :     if (Trace_notify)
    1046           0 :         elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
    1047             : 
    1048             :     /*
    1049             :      * Before registering, make sure we will unlisten before dying. (Note:
    1050             :      * this action does not get undone if we abort later.)
    1051             :      */
    1052          26 :     if (!unlistenExitRegistered)
    1053             :     {
    1054          16 :         before_shmem_exit(Async_UnlistenOnExit, 0);
    1055          16 :         unlistenExitRegistered = true;
    1056             :     }
    1057             : 
    1058             :     /*
    1059             :      * This is our first LISTEN, so establish our pointer.
    1060             :      *
    1061             :      * We set our pointer to the global tail pointer and then move it forward
    1062             :      * over already-committed notifications.  This ensures we cannot miss any
    1063             :      * not-yet-committed notifications.  We might get a few more but that
    1064             :      * doesn't hurt.
    1065             :      *
    1066             :      * In some scenarios there might be a lot of committed notifications that
    1067             :      * have not yet been pruned away (because some backend is being lazy about
    1068             :      * reading them).  To reduce our startup time, we can look at other
    1069             :      * backends and adopt the maximum "pos" pointer of any backend that's in
    1070             :      * our database; any notifications it's already advanced over are surely
    1071             :      * committed and need not be re-examined by us.  (We must consider only
    1072             :      * backends connected to our DB, because others will not have bothered to
    1073             :      * check committed-ness of notifications in our DB.)
    1074             :      *
    1075             :      * We need exclusive lock here so we can look at other backends' entries
    1076             :      * and manipulate the list links.
    1077             :      */
    1078          26 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1079          26 :     head = QUEUE_HEAD;
    1080          26 :     max = QUEUE_TAIL;
    1081          26 :     prevListener = InvalidBackendId;
    1082          30 :     for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
    1083             :     {
    1084           4 :         if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
    1085           4 :             max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
    1086             :         /* Also find last listening backend before this one */
    1087           4 :         if (i < MyBackendId)
    1088           2 :             prevListener = i;
    1089             :     }
    1090          26 :     QUEUE_BACKEND_POS(MyBackendId) = max;
    1091          26 :     QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
    1092          26 :     QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
    1093             :     /* Insert backend into list of listeners at correct position */
    1094          26 :     if (prevListener > 0)
    1095             :     {
    1096           2 :         QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_NEXT_LISTENER(prevListener);
    1097           2 :         QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
    1098             :     }
    1099             :     else
    1100             :     {
    1101          24 :         QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_FIRST_LISTENER;
    1102          24 :         QUEUE_FIRST_LISTENER = MyBackendId;
    1103             :     }
    1104          26 :     LWLockRelease(NotifyQueueLock);
    1105             : 
    1106             :     /* Now we are listed in the global array, so remember we're listening */
    1107          26 :     amRegisteredListener = true;
    1108             : 
    1109             :     /*
    1110             :      * Try to move our pointer forward as far as possible.  This will skip
    1111             :      * over already-committed notifications, which we want to do because they
    1112             :      * might be quite stale.  Note that we are not yet listening on anything,
    1113             :      * so we won't deliver such notifications to our frontend.  Also, although
    1114             :      * our transaction might have executed NOTIFY, those message(s) aren't
    1115             :      * queued yet so we won't skip them here.
    1116             :      */
    1117          26 :     if (!QUEUE_POS_EQUAL(max, head))
    1118          16 :         asyncQueueReadAllNotifications();
    1119             : }
    1120             : 
    1121             : /*
    1122             :  * Exec_ListenCommit --- subroutine for AtCommit_Notify
    1123             :  *
    1124             :  * Add the channel to the list of channels we are listening on.
    1125             :  */
    1126             : static void
    1127          42 : Exec_ListenCommit(const char *channel)
    1128             : {
    1129             :     MemoryContext oldcontext;
    1130             : 
    1131             :     /* Do nothing if we are already listening on this channel */
    1132          42 :     if (IsListeningOn(channel))
    1133           0 :         return;
    1134             : 
    1135             :     /*
    1136             :      * Add the new channel name to listenChannels.
    1137             :      *
    1138             :      * XXX It is theoretically possible to get an out-of-memory failure here,
    1139             :      * which would be bad because we already committed.  For the moment it
    1140             :      * doesn't seem worth trying to guard against that, but maybe improve this
    1141             :      * later.
    1142             :      */
    1143          42 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1144          42 :     listenChannels = lappend(listenChannels, pstrdup(channel));
    1145          42 :     MemoryContextSwitchTo(oldcontext);
    1146             : }
    1147             : 
    1148             : /*
    1149             :  * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
    1150             :  *
    1151             :  * Remove the specified channel name from listenChannels.
    1152             :  */
    1153             : static void
    1154           4 : Exec_UnlistenCommit(const char *channel)
    1155             : {
    1156             :     ListCell   *q;
    1157             : 
    1158           4 :     if (Trace_notify)
    1159           0 :         elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
    1160             : 
    1161           4 :     foreach(q, listenChannels)
    1162             :     {
    1163           4 :         char       *lchan = (char *) lfirst(q);
    1164             : 
    1165           4 :         if (strcmp(lchan, channel) == 0)
    1166             :         {
    1167           4 :             listenChannels = foreach_delete_current(listenChannels, q);
    1168           4 :             pfree(lchan);
    1169           4 :             break;
    1170             :         }
    1171             :     }
    1172             : 
    1173             :     /*
    1174             :      * We do not complain about unlistening something not being listened;
    1175             :      * should we?
    1176             :      */
    1177           4 : }
    1178             : 
    1179             : /*
    1180             :  * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
    1181             :  *
    1182             :  *      Unlisten on all channels for this backend.
    1183             :  */
    1184             : static void
    1185          44 : Exec_UnlistenAllCommit(void)
    1186             : {
    1187          44 :     if (Trace_notify)
    1188           0 :         elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
    1189             : 
    1190          44 :     list_free_deep(listenChannels);
    1191          44 :     listenChannels = NIL;
    1192          44 : }
    1193             : 
    1194             : /*
    1195             :  * ProcessCompletedNotifies --- send out signals and self-notifies
    1196             :  *
    1197             :  * This is called from postgres.c just before going idle at the completion
    1198             :  * of a transaction.  If we issued any notifications in the just-completed
    1199             :  * transaction, send signals to other backends to process them, and also
    1200             :  * process the queue ourselves to send messages to our own frontend.
    1201             :  * Also, if we filled enough queue pages with new notifies, try to advance
    1202             :  * the queue tail pointer.
    1203             :  *
    1204             :  * The reason that this is not done in AtCommit_Notify is that there is
    1205             :  * a nonzero chance of errors here (for example, encoding conversion errors
    1206             :  * while trying to format messages to our frontend).  An error during
    1207             :  * AtCommit_Notify would be a PANIC condition.  The timing is also arranged
    1208             :  * to ensure that a transaction's self-notifies are delivered to the frontend
    1209             :  * before it gets the terminating ReadyForQuery message.
    1210             :  *
    1211             :  * Note that we send signals and process the queue even if the transaction
    1212             :  * eventually aborted.  This is because we need to clean out whatever got
    1213             :  * added to the queue.
    1214             :  *
    1215             :  * NOTE: we are outside of any transaction here.
    1216             :  */
    1217             : void
    1218      401090 : ProcessCompletedNotifies(void)
    1219             : {
    1220             :     MemoryContext caller_context;
    1221             : 
    1222             :     /* Nothing to do if we didn't send any notifications */
    1223      401090 :     if (!backendHasSentNotifications)
    1224      401036 :         return;
    1225             : 
    1226             :     /*
    1227             :      * We reset the flag immediately; otherwise, if any sort of error occurs
    1228             :      * below, we'd be locked up in an infinite loop, because control will come
    1229             :      * right back here after error cleanup.
    1230             :      */
    1231          54 :     backendHasSentNotifications = false;
    1232             : 
    1233             :     /*
    1234             :      * We must preserve the caller's memory context (probably MessageContext)
    1235             :      * across the transaction we do here.
    1236             :      */
    1237          54 :     caller_context = CurrentMemoryContext;
    1238             : 
    1239          54 :     if (Trace_notify)
    1240           0 :         elog(DEBUG1, "ProcessCompletedNotifies");
    1241             : 
    1242             :     /*
    1243             :      * We must run asyncQueueReadAllNotifications inside a transaction, else
    1244             :      * bad things happen if it gets an error.
    1245             :      */
    1246          54 :     StartTransactionCommand();
    1247             : 
    1248             :     /* Send signals to other backends */
    1249          54 :     SignalBackends();
    1250             : 
    1251          54 :     if (listenChannels != NIL)
    1252             :     {
    1253             :         /* Read the queue ourselves, and send relevant stuff to the frontend */
    1254          26 :         asyncQueueReadAllNotifications();
    1255             :     }
    1256             : 
    1257             :     /*
    1258             :      * If it's time to try to advance the global tail pointer, do that.
    1259             :      */
    1260          54 :     if (backendTryAdvanceTail)
    1261             :     {
    1262           0 :         backendTryAdvanceTail = false;
    1263           0 :         asyncQueueAdvanceTail();
    1264             :     }
    1265             : 
    1266          54 :     CommitTransactionCommand();
    1267             : 
    1268          54 :     MemoryContextSwitchTo(caller_context);
    1269             : 
    1270             :     /* We don't need pq_flush() here since postgres.c will do one shortly */
    1271             : }
    1272             : 
    1273             : /*
    1274             :  * Test whether we are actively listening on the given channel name.
    1275             :  *
    1276             :  * Note: this function is executed for every notification found in the queue.
    1277             :  * Perhaps it is worth further optimization, eg convert the list to a sorted
    1278             :  * array so we can binary-search it.  In practice the list is likely to be
    1279             :  * fairly short, though.
    1280             :  */
    1281             : static bool
    1282         270 : IsListeningOn(const char *channel)
    1283             : {
    1284             :     ListCell   *p;
    1285             : 
    1286         326 :     foreach(p, listenChannels)
    1287             :     {
    1288         104 :         char       *lchan = (char *) lfirst(p);
    1289             : 
    1290         104 :         if (strcmp(lchan, channel) == 0)
    1291          48 :             return true;
    1292             :     }
    1293         222 :     return false;
    1294             : }
    1295             : 
    1296             : /*
    1297             :  * Remove our entry from the listeners array when we are no longer listening
    1298             :  * on any channel.  NB: must not fail if we're already not listening.
    1299             :  */
    1300             : static void
    1301          38 : asyncQueueUnregister(void)
    1302             : {
    1303             :     Assert(listenChannels == NIL);  /* else caller error */
    1304             : 
    1305          38 :     if (!amRegisteredListener)  /* nothing to do */
    1306          12 :         return;
    1307             : 
    1308             :     /*
    1309             :      * Need exclusive lock here to manipulate list links.
    1310             :      */
    1311          26 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1312             :     /* Mark our entry as invalid */
    1313          26 :     QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
    1314          26 :     QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
    1315             :     /* and remove it from the list */
    1316          26 :     if (QUEUE_FIRST_LISTENER == MyBackendId)
    1317          24 :         QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyBackendId);
    1318             :     else
    1319             :     {
    1320           2 :         for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
    1321             :         {
    1322           2 :             if (QUEUE_NEXT_LISTENER(i) == MyBackendId)
    1323             :             {
    1324           2 :                 QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyBackendId);
    1325           2 :                 break;
    1326             :             }
    1327             :         }
    1328             :     }
    1329          26 :     QUEUE_NEXT_LISTENER(MyBackendId) = InvalidBackendId;
    1330          26 :     LWLockRelease(NotifyQueueLock);
    1331             : 
    1332             :     /* mark ourselves as no longer listed in the global array */
    1333          26 :     amRegisteredListener = false;
    1334             : }
    1335             : 
    1336             : /*
    1337             :  * Test whether there is room to insert more notification messages.
    1338             :  *
    1339             :  * Caller must hold at least shared NotifyQueueLock.
    1340             :  */
    1341             : static bool
    1342          58 : asyncQueueIsFull(void)
    1343             : {
    1344             :     int         nexthead;
    1345             :     int         boundary;
    1346             : 
    1347             :     /*
    1348             :      * The queue is full if creating a new head page would create a page that
    1349             :      * logically precedes the current global tail pointer, ie, the head
    1350             :      * pointer would wrap around compared to the tail.  We cannot create such
    1351             :      * a head page for fear of confusing slru.c.  For safety we round the tail
    1352             :      * pointer back to a segment boundary (compare the truncation logic in
    1353             :      * asyncQueueAdvanceTail).
    1354             :      *
    1355             :      * Note that this test is *not* dependent on how much space there is on
    1356             :      * the current head page.  This is necessary because asyncQueueAddEntries
    1357             :      * might try to create the next head page in any case.
    1358             :      */
    1359          58 :     nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
    1360          58 :     if (nexthead > QUEUE_MAX_PAGE)
    1361           0 :         nexthead = 0;           /* wrap around */
    1362          58 :     boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
    1363          58 :     boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
    1364          58 :     return asyncQueuePagePrecedes(nexthead, boundary);
    1365             : }
    1366             : 
    1367             : /*
    1368             :  * Advance the QueuePosition to the next entry, assuming that the current
    1369             :  * entry is of length entryLength.  If we jump to a new page the function
    1370             :  * returns true, else false.
    1371             :  */
    1372             : static bool
    1373        2292 : asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
    1374             : {
    1375        2292 :     int         pageno = QUEUE_POS_PAGE(*position);
    1376        2292 :     int         offset = QUEUE_POS_OFFSET(*position);
    1377        2292 :     bool        pageJump = false;
    1378             : 
    1379             :     /*
    1380             :      * Move to the next writing position: First jump over what we have just
    1381             :      * written or read.
    1382             :      */
    1383        2292 :     offset += entryLength;
    1384             :     Assert(offset <= QUEUE_PAGESIZE);
    1385             : 
    1386             :     /*
    1387             :      * In a second step check if another entry can possibly be written to the
    1388             :      * page. If so, stay here, we have reached the next position. If not, then
    1389             :      * we need to move on to the next page.
    1390             :      */
    1391        2292 :     if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE)
    1392             :     {
    1393           4 :         pageno++;
    1394           4 :         if (pageno > QUEUE_MAX_PAGE)
    1395           0 :             pageno = 0;         /* wrap around */
    1396           4 :         offset = 0;
    1397           4 :         pageJump = true;
    1398             :     }
    1399             : 
    1400        2292 :     SET_QUEUE_POS(*position, pageno, offset);
    1401        2292 :     return pageJump;
    1402             : }
    1403             : 
    1404             : /*
    1405             :  * Fill the AsyncQueueEntry at *qe with an outbound notification message.
    1406             :  */
    1407             : static void
    1408        2064 : asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
    1409             : {
    1410        2064 :     size_t      channellen = n->channel_len;
    1411        2064 :     size_t      payloadlen = n->payload_len;
    1412             :     int         entryLength;
    1413             : 
    1414             :     Assert(channellen < NAMEDATALEN);
    1415             :     Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
    1416             : 
    1417             :     /* The terminators are already included in AsyncQueueEntryEmptySize */
    1418        2064 :     entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
    1419        2064 :     entryLength = QUEUEALIGN(entryLength);
    1420        2064 :     qe->length = entryLength;
    1421        2064 :     qe->dboid = MyDatabaseId;
    1422        2064 :     qe->xid = GetCurrentTransactionId();
    1423        2064 :     qe->srcPid = MyProcPid;
    1424        2064 :     memcpy(qe->data, n->data, channellen + payloadlen + 2);
    1425        2064 : }
    1426             : 
    1427             : /*
    1428             :  * Add pending notifications to the queue.
    1429             :  *
    1430             :  * We go page by page here, i.e. we stop once we have to go to a new page but
    1431             :  * we will be called again and then fill that next page. If an entry does not
    1432             :  * fit into the current page, we write a dummy entry with an InvalidOid as the
    1433             :  * database OID in order to fill the page. So every page is always used up to
    1434             :  * the last byte which simplifies reading the page later.
    1435             :  *
    1436             :  * We are passed the list cell (in pendingNotifies->events) containing the next
    1437             :  * notification to write and return the first still-unwritten cell back.
    1438             :  * Eventually we will return NULL indicating all is done.
    1439             :  *
    1440             :  * We are holding NotifyQueueLock already from the caller and grab
    1441             :  * NotifySLRULock locally in this function.
    1442             :  */
    1443             : static ListCell *
    1444          58 : asyncQueueAddEntries(ListCell *nextNotify)
    1445             : {
    1446             :     AsyncQueueEntry qe;
    1447             :     QueuePosition queue_head;
    1448             :     int         pageno;
    1449             :     int         offset;
    1450             :     int         slotno;
    1451             : 
    1452             :     /* We hold both NotifyQueueLock and NotifySLRULock during this operation */
    1453          58 :     LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
    1454             : 
    1455             :     /*
    1456             :      * We work with a local copy of QUEUE_HEAD, which we write back to shared
    1457             :      * memory upon exiting.  The reason for this is that if we have to advance
    1458             :      * to a new page, SimpleLruZeroPage might fail (out of disk space, for
    1459             :      * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
    1460             :      * subsequent insertions would try to put entries into a page that slru.c
    1461             :      * thinks doesn't exist yet.)  So, use a local position variable.  Note
    1462             :      * that if we do fail, any already-inserted queue entries are forgotten;
    1463             :      * this is okay, since they'd be useless anyway after our transaction
    1464             :      * rolls back.
    1465             :      */
    1466          58 :     queue_head = QUEUE_HEAD;
    1467             : 
    1468             :     /*
    1469             :      * If this is the first write since the postmaster started, we need to
    1470             :      * initialize the first page of the async SLRU.  Otherwise, the current
    1471             :      * page should be initialized already, so just fetch it.
    1472             :      *
    1473             :      * (We could also take the first path when the SLRU position has just
    1474             :      * wrapped around, but re-zeroing the page is harmless in that case.)
    1475             :      */
    1476          58 :     pageno = QUEUE_POS_PAGE(queue_head);
    1477          58 :     if (QUEUE_POS_IS_ZERO(queue_head))
    1478           8 :         slotno = SimpleLruZeroPage(NotifyCtl, pageno);
    1479             :     else
    1480          50 :         slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
    1481             :                                    InvalidTransactionId);
    1482             : 
    1483             :     /* Note we mark the page dirty before writing in it */
    1484          58 :     NotifyCtl->shared->page_dirty[slotno] = true;
    1485             : 
    1486        2118 :     while (nextNotify != NULL)
    1487             :     {
    1488        2064 :         Notification *n = (Notification *) lfirst(nextNotify);
    1489             : 
    1490             :         /* Construct a valid queue entry in local variable qe */
    1491        2064 :         asyncQueueNotificationToEntry(n, &qe);
    1492             : 
    1493        2064 :         offset = QUEUE_POS_OFFSET(queue_head);
    1494             : 
    1495             :         /* Check whether the entry really fits on the current page */
    1496        2064 :         if (offset + qe.length <= QUEUE_PAGESIZE)
    1497             :         {
    1498             :             /* OK, so advance nextNotify past this item */
    1499        2064 :             nextNotify = lnext(pendingNotifies->events, nextNotify);
    1500             :         }
    1501             :         else
    1502             :         {
    1503             :             /*
    1504             :              * Write a dummy entry to fill up the page. Actually readers will
    1505             :              * only check dboid and since it won't match any reader's database
    1506             :              * OID, they will ignore this entry and move on.
    1507             :              */
    1508           0 :             qe.length = QUEUE_PAGESIZE - offset;
    1509           0 :             qe.dboid = InvalidOid;
    1510           0 :             qe.data[0] = '\0';  /* empty channel */
    1511           0 :             qe.data[1] = '\0';  /* empty payload */
    1512             :         }
    1513             : 
    1514             :         /* Now copy qe into the shared buffer page */
    1515        2064 :         memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
    1516             :                &qe,
    1517        2064 :                qe.length);
    1518             : 
    1519             :         /* Advance queue_head appropriately, and detect if page is full */
    1520        2064 :         if (asyncQueueAdvance(&(queue_head), qe.length))
    1521             :         {
    1522             :             /*
    1523             :              * Page is full, so we're done here, but first fill the next page
    1524             :              * with zeroes.  The reason to do this is to ensure that slru.c's
    1525             :              * idea of the head page is always the same as ours, which avoids
    1526             :              * boundary problems in SimpleLruTruncate.  The test in
    1527             :              * asyncQueueIsFull() ensured that there is room to create this
    1528             :              * page without overrunning the queue.
    1529             :              */
    1530           4 :             slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
    1531             : 
    1532             :             /*
    1533             :              * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
    1534             :              * set flag to remember that we should try to advance the tail
    1535             :              * pointer (we don't want to actually do that right here).
    1536             :              */
    1537           4 :             if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
    1538           0 :                 backendTryAdvanceTail = true;
    1539             : 
    1540             :             /* And exit the loop */
    1541           4 :             break;
    1542             :         }
    1543             :     }
    1544             : 
    1545             :     /* Success, so update the global QUEUE_HEAD */
    1546          58 :     QUEUE_HEAD = queue_head;
    1547             : 
    1548          58 :     LWLockRelease(NotifySLRULock);
    1549             : 
    1550          58 :     return nextNotify;
    1551             : }
    1552             : 
    1553             : /*
    1554             :  * SQL function to return the fraction of the notification queue currently
    1555             :  * occupied.
    1556             :  */
    1557             : Datum
    1558           8 : pg_notification_queue_usage(PG_FUNCTION_ARGS)
    1559             : {
    1560             :     double      usage;
    1561             : 
    1562             :     /* Advance the queue tail so we don't report a too-large result */
    1563           8 :     asyncQueueAdvanceTail();
    1564             : 
    1565           8 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
    1566           8 :     usage = asyncQueueUsage();
    1567           8 :     LWLockRelease(NotifyQueueLock);
    1568             : 
    1569           8 :     PG_RETURN_FLOAT8(usage);
    1570             : }
    1571             : 
    1572             : /*
    1573             :  * Return the fraction of the queue that is currently occupied.
    1574             :  *
    1575             :  * The caller must hold NotifyQueueLock in (at least) shared mode.
    1576             :  */
    1577             : static double
    1578          66 : asyncQueueUsage(void)
    1579             : {
    1580          66 :     int         headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
    1581          66 :     int         tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
    1582             :     int         occupied;
    1583             : 
    1584          66 :     occupied = headPage - tailPage;
    1585             : 
    1586          66 :     if (occupied == 0)
    1587          60 :         return (double) 0;      /* fast exit for common case */
    1588             : 
    1589           6 :     if (occupied < 0)
    1590             :     {
    1591             :         /* head has wrapped around, tail not yet */
    1592           0 :         occupied += QUEUE_MAX_PAGE + 1;
    1593             :     }
    1594             : 
    1595           6 :     return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
    1596             : }
    1597             : 
    1598             : /*
    1599             :  * Check whether the queue is at least half full, and emit a warning if so.
    1600             :  *
    1601             :  * This is unlikely given the size of the queue, but possible.
    1602             :  * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
    1603             :  *
    1604             :  * Caller must hold exclusive NotifyQueueLock.
    1605             :  */
    1606             : static void
    1607          58 : asyncQueueFillWarning(void)
    1608             : {
    1609             :     double      fillDegree;
    1610             :     TimestampTz t;
    1611             : 
    1612          58 :     fillDegree = asyncQueueUsage();
    1613          58 :     if (fillDegree < 0.5)
    1614          58 :         return;
    1615             : 
    1616           0 :     t = GetCurrentTimestamp();
    1617             : 
    1618           0 :     if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
    1619             :                                    t, QUEUE_FULL_WARN_INTERVAL))
    1620             :     {
    1621           0 :         QueuePosition min = QUEUE_HEAD;
    1622           0 :         int32       minPid = InvalidPid;
    1623             : 
    1624           0 :         for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
    1625             :         {
    1626             :             Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
    1627           0 :             min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
    1628           0 :             if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
    1629           0 :                 minPid = QUEUE_BACKEND_PID(i);
    1630             :         }
    1631             : 
    1632           0 :         ereport(WARNING,
    1633             :                 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
    1634             :                  (minPid != InvalidPid ?
    1635             :                   errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
    1636             :                   : 0),
    1637             :                  (minPid != InvalidPid ?
    1638             :                   errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
    1639             :                   : 0)));
    1640             : 
    1641           0 :         asyncQueueControl->lastQueueFillWarn = t;
    1642             :     }
    1643             : }
    1644             : 
    1645             : /*
    1646             :  * Send signals to listening backends.
    1647             :  *
    1648             :  * We never signal our own process; that should be handled by our caller.
    1649             :  *
    1650             :  * Normally we signal only backends in our own database, since only those
    1651             :  * backends could be interested in notifies we send.  However, if there's
    1652             :  * notify traffic in our database but no traffic in another database that
    1653             :  * does have listener(s), those listeners will fall further and further
    1654             :  * behind.  Waken them anyway if they're far enough behind, so that they'll
    1655             :  * advance their queue position pointers, allowing the global tail to advance.
    1656             :  *
    1657             :  * Since we know the BackendId and the Pid the signalling is quite cheap.
    1658             :  */
    1659             : static void
    1660          54 : SignalBackends(void)
    1661             : {
    1662             :     int32      *pids;
    1663             :     BackendId  *ids;
    1664             :     int         count;
    1665             : 
    1666             :     /*
    1667             :      * Identify backends that we need to signal.  We don't want to send
    1668             :      * signals while holding the NotifyQueueLock, so this loop just builds a
    1669             :      * list of target PIDs.
    1670             :      *
    1671             :      * XXX in principle these pallocs could fail, which would be bad. Maybe
    1672             :      * preallocate the arrays?  But in practice this is only run in trivial
    1673             :      * transactions, so there should surely be space available.
    1674             :      */
    1675          54 :     pids = (int32 *) palloc(MaxBackends * sizeof(int32));
    1676          54 :     ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
    1677          54 :     count = 0;
    1678             : 
    1679          54 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1680         100 :     for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
    1681             :     {
    1682          46 :         int32       pid = QUEUE_BACKEND_PID(i);
    1683             :         QueuePosition pos;
    1684             : 
    1685             :         Assert(pid != InvalidPid);
    1686          46 :         if (pid == MyProcPid)
    1687          26 :             continue;           /* never signal self */
    1688          20 :         pos = QUEUE_BACKEND_POS(i);
    1689          20 :         if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
    1690             :         {
    1691             :             /*
    1692             :              * Always signal listeners in our own database, unless they're
    1693             :              * already caught up (unlikely, but possible).
    1694             :              */
    1695          20 :             if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
    1696           0 :                 continue;
    1697             :         }
    1698             :         else
    1699             :         {
    1700             :             /*
    1701             :              * Listeners in other databases should be signaled only if they
    1702             :              * are far behind.
    1703             :              */
    1704           0 :             if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
    1705             :                                    QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
    1706           0 :                 continue;
    1707             :         }
    1708             :         /* OK, need to signal this one */
    1709          20 :         pids[count] = pid;
    1710          20 :         ids[count] = i;
    1711          20 :         count++;
    1712             :     }
    1713          54 :     LWLockRelease(NotifyQueueLock);
    1714             : 
    1715             :     /* Now send signals */
    1716          74 :     for (int i = 0; i < count; i++)
    1717             :     {
    1718          20 :         int32       pid = pids[i];
    1719             : 
    1720             :         /*
    1721             :          * Note: assuming things aren't broken, a signal failure here could
    1722             :          * only occur if the target backend exited since we released
    1723             :          * NotifyQueueLock; which is unlikely but certainly possible. So we
    1724             :          * just log a low-level debug message if it happens.
    1725             :          */
    1726          20 :         if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
    1727           0 :             elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
    1728             :     }
    1729             : 
    1730          54 :     pfree(pids);
    1731          54 :     pfree(ids);
    1732          54 : }
    1733             : 
    1734             : /*
    1735             :  * AtAbort_Notify
    1736             :  *
    1737             :  *  This is called at transaction abort.
    1738             :  *
    1739             :  *  Gets rid of pending actions and outbound notifies that we would have
    1740             :  *  executed if the transaction got committed.
    1741             :  */
    1742             : void
    1743       20388 : AtAbort_Notify(void)
    1744             : {
    1745             :     /*
    1746             :      * If we LISTEN but then roll back the transaction after PreCommit_Notify,
    1747             :      * we have registered as a listener but have not made any entry in
    1748             :      * listenChannels.  In that case, deregister again.
    1749             :      */
    1750       20388 :     if (amRegisteredListener && listenChannels == NIL)
    1751           0 :         asyncQueueUnregister();
    1752             : 
    1753             :     /* And clean up */
    1754       20388 :     ClearPendingActionsAndNotifies();
    1755       20388 : }
    1756             : 
    1757             : /*
    1758             :  * AtSubCommit_Notify() --- Take care of subtransaction commit.
    1759             :  *
    1760             :  * Reassign all items in the pending lists to the parent transaction.
    1761             :  */
    1762             : void
    1763        4690 : AtSubCommit_Notify(void)
    1764             : {
    1765        4690 :     int         my_level = GetCurrentTransactionNestLevel();
    1766             : 
    1767             :     /* If there are actions at our nesting level, we must reparent them. */
    1768        4690 :     if (pendingActions != NULL &&
    1769           0 :         pendingActions->nestingLevel >= my_level)
    1770             :     {
    1771           0 :         if (pendingActions->upper == NULL ||
    1772           0 :             pendingActions->upper->nestingLevel < my_level - 1)
    1773             :         {
    1774             :             /* nothing to merge; give the whole thing to the parent */
    1775           0 :             --pendingActions->nestingLevel;
    1776             :         }
    1777             :         else
    1778             :         {
    1779           0 :             ActionList *childPendingActions = pendingActions;
    1780             : 
    1781           0 :             pendingActions = pendingActions->upper;
    1782             : 
    1783             :             /*
    1784             :              * Mustn't try to eliminate duplicates here --- see queue_listen()
    1785             :              */
    1786           0 :             pendingActions->actions =
    1787           0 :                 list_concat(pendingActions->actions,
    1788           0 :                             childPendingActions->actions);
    1789           0 :             pfree(childPendingActions);
    1790             :         }
    1791             :     }
    1792             : 
    1793             :     /* If there are notifies at our nesting level, we must reparent them. */
    1794        4690 :     if (pendingNotifies != NULL &&
    1795           4 :         pendingNotifies->nestingLevel >= my_level)
    1796             :     {
    1797             :         Assert(pendingNotifies->nestingLevel == my_level);
    1798             : 
    1799           2 :         if (pendingNotifies->upper == NULL ||
    1800           2 :             pendingNotifies->upper->nestingLevel < my_level - 1)
    1801             :         {
    1802             :             /* nothing to merge; give the whole thing to the parent */
    1803           0 :             --pendingNotifies->nestingLevel;
    1804             :         }
    1805             :         else
    1806             :         {
    1807             :             /*
    1808             :              * Formerly, we didn't bother to eliminate duplicates here, but
    1809             :              * now we must, else we fall foul of "Assert(!found)", either here
    1810             :              * or during a later attempt to build the parent-level hashtable.
    1811             :              */
    1812           2 :             NotificationList *childPendingNotifies = pendingNotifies;
    1813             :             ListCell   *l;
    1814             : 
    1815           2 :             pendingNotifies = pendingNotifies->upper;
    1816             :             /* Insert all the subxact's events into parent, except for dups */
    1817          10 :             foreach(l, childPendingNotifies->events)
    1818             :             {
    1819           8 :                 Notification *childn = (Notification *) lfirst(l);
    1820             : 
    1821           8 :                 if (!AsyncExistsPendingNotify(childn))
    1822           4 :                     AddEventToPendingNotifies(childn);
    1823             :             }
    1824           2 :             pfree(childPendingNotifies);
    1825             :         }
    1826             :     }
    1827        4690 : }
    1828             : 
    1829             : /*
    1830             :  * AtSubAbort_Notify() --- Take care of subtransaction abort.
    1831             :  */
    1832             : void
    1833        2950 : AtSubAbort_Notify(void)
    1834             : {
    1835        2950 :     int         my_level = GetCurrentTransactionNestLevel();
    1836             : 
    1837             :     /*
    1838             :      * All we have to do is pop the stack --- the actions/notifies made in
    1839             :      * this subxact are no longer interesting, and the space will be freed
    1840             :      * when CurTransactionContext is recycled. We still have to free the
    1841             :      * ActionList and NotificationList objects themselves, though, because
    1842             :      * those are allocated in TopTransactionContext.
    1843             :      *
    1844             :      * Note that there might be no entries at all, or no entries for the
    1845             :      * current subtransaction level, either because none were ever created, or
    1846             :      * because we reentered this routine due to trouble during subxact abort.
    1847             :      */
    1848        2950 :     while (pendingActions != NULL &&
    1849           0 :            pendingActions->nestingLevel >= my_level)
    1850             :     {
    1851           0 :         ActionList *childPendingActions = pendingActions;
    1852             : 
    1853           0 :         pendingActions = pendingActions->upper;
    1854           0 :         pfree(childPendingActions);
    1855             :     }
    1856             : 
    1857        2952 :     while (pendingNotifies != NULL &&
    1858           4 :            pendingNotifies->nestingLevel >= my_level)
    1859             :     {
    1860           2 :         NotificationList *childPendingNotifies = pendingNotifies;
    1861             : 
    1862           2 :         pendingNotifies = pendingNotifies->upper;
    1863           2 :         pfree(childPendingNotifies);
    1864             :     }
    1865        2950 : }
    1866             : 
    1867             : /*
    1868             :  * HandleNotifyInterrupt
    1869             :  *
    1870             :  *      Signal handler portion of interrupt handling. Let the backend know
    1871             :  *      that there's a pending notify interrupt. If we're currently reading
    1872             :  *      from the client, this will interrupt the read and
    1873             :  *      ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
    1874             :  */
    1875             : void
    1876          20 : HandleNotifyInterrupt(void)
    1877             : {
    1878             :     /*
    1879             :      * Note: this is called by a SIGNAL HANDLER. You must be very wary what
    1880             :      * you do here.
    1881             :      */
    1882             : 
    1883             :     /* signal that work needs to be done */
    1884          20 :     notifyInterruptPending = true;
    1885             : 
    1886             :     /* make sure the event is processed in due course */
    1887          20 :     SetLatch(MyLatch);
    1888          20 : }
    1889             : 
    1890             : /*
    1891             :  * ProcessNotifyInterrupt
    1892             :  *
    1893             :  *      This is called if we see notifyInterruptPending set, just before
    1894             :  *      transmitting ReadyForQuery at the end of a frontend command, and
    1895             :  *      also if a notify signal occurs while reading from the frontend.
    1896             :  *      HandleNotifyInterrupt() will cause the read to be interrupted
    1897             :  *      via the process's latch, and this routine will get called.
    1898             :  *      If we are truly idle (ie, *not* inside a transaction block),
    1899             :  *      process the incoming notifies.
    1900             :  */
    1901             : void
    1902          30 : ProcessNotifyInterrupt(void)
    1903             : {
    1904          30 :     if (IsTransactionOrTransactionBlock())
    1905          12 :         return;                 /* not really idle */
    1906             : 
    1907          36 :     while (notifyInterruptPending)
    1908          18 :         ProcessIncomingNotify();
    1909             : }
    1910             : 
    1911             : 
    1912             : /*
    1913             :  * Read all pending notifications from the queue, and deliver appropriate
    1914             :  * ones to my frontend.  Stop when we reach queue head or an uncommitted
    1915             :  * notification.
    1916             :  */
    1917             : static void
    1918          60 : asyncQueueReadAllNotifications(void)
    1919             : {
    1920             :     volatile QueuePosition pos;
    1921             :     QueuePosition oldpos;
    1922             :     QueuePosition head;
    1923             :     Snapshot    snapshot;
    1924             : 
    1925             :     /* page_buffer must be adequately aligned, so use a union */
    1926             :     union
    1927             :     {
    1928             :         char        buf[QUEUE_PAGESIZE];
    1929             :         AsyncQueueEntry align;
    1930             :     }           page_buffer;
    1931             : 
    1932             :     /* Fetch current state */
    1933          60 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
    1934             :     /* Assert checks that we have a valid state entry */
    1935             :     Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
    1936          60 :     pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
    1937          60 :     head = QUEUE_HEAD;
    1938          60 :     LWLockRelease(NotifyQueueLock);
    1939             : 
    1940          60 :     if (QUEUE_POS_EQUAL(pos, head))
    1941             :     {
    1942             :         /* Nothing to do, we have read all notifications already. */
    1943           0 :         return;
    1944             :     }
    1945             : 
    1946             :     /*----------
    1947             :      * Get snapshot we'll use to decide which xacts are still in progress.
    1948             :      * This is trickier than it might seem, because of race conditions.
    1949             :      * Consider the following example:
    1950             :      *
    1951             :      * Backend 1:                    Backend 2:
    1952             :      *
    1953             :      * transaction starts
    1954             :      * UPDATE foo SET ...;
    1955             :      * NOTIFY foo;
    1956             :      * commit starts
    1957             :      * queue the notify message
    1958             :      *                               transaction starts
    1959             :      *                               LISTEN foo;  -- first LISTEN in session
    1960             :      *                               SELECT * FROM foo WHERE ...;
    1961             :      * commit to clog
    1962             :      *                               commit starts
    1963             :      *                               add backend 2 to array of listeners
    1964             :      *                               advance to queue head (this code)
    1965             :      *                               commit to clog
    1966             :      *
    1967             :      * Transaction 2's SELECT has not seen the UPDATE's effects, since that
    1968             :      * wasn't committed yet.  Ideally we'd ensure that client 2 would
    1969             :      * eventually get transaction 1's notify message, but there's no way
    1970             :      * to do that; until we're in the listener array, there's no guarantee
    1971             :      * that the notify message doesn't get removed from the queue.
    1972             :      *
    1973             :      * Therefore the coding technique transaction 2 is using is unsafe:
    1974             :      * applications must commit a LISTEN before inspecting database state,
    1975             :      * if they want to ensure they will see notifications about subsequent
    1976             :      * changes to that state.
    1977             :      *
    1978             :      * What we do guarantee is that we'll see all notifications from
    1979             :      * transactions committing after the snapshot we take here.
    1980             :      * Exec_ListenPreCommit has already added us to the listener array,
    1981             :      * so no not-yet-committed messages can be removed from the queue
    1982             :      * before we see them.
    1983             :      *----------
    1984             :      */
    1985          60 :     snapshot = RegisterSnapshot(GetLatestSnapshot());
    1986             : 
    1987             :     /*
    1988             :      * It is possible that we fail while trying to send a message to our
    1989             :      * frontend (for example, because of encoding conversion failure).  If
    1990             :      * that happens it is critical that we not try to send the same message
    1991             :      * over and over again.  Therefore, we place a PG_TRY block here that will
    1992             :      * forcibly advance our queue position before we lose control to an error.
    1993             :      * (We could alternatively retake NotifyQueueLock and move the position
    1994             :      * before handling each individual message, but that seems like too much
    1995             :      * lock traffic.)
    1996             :      */
    1997          60 :     PG_TRY();
    1998             :     {
    1999             :         bool        reachedStop;
    2000             : 
    2001             :         do
    2002             :         {
    2003          60 :             int         curpage = QUEUE_POS_PAGE(pos);
    2004          60 :             int         curoffset = QUEUE_POS_OFFSET(pos);
    2005             :             int         slotno;
    2006             :             int         copysize;
    2007             : 
    2008             :             /*
    2009             :              * We copy the data from SLRU into a local buffer, so as to avoid
    2010             :              * holding the NotifySLRULock while we are examining the entries
    2011             :              * and possibly transmitting them to our frontend.  Copy only the
    2012             :              * part of the page we will actually inspect.
    2013             :              */
    2014          60 :             slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
    2015             :                                                 InvalidTransactionId);
    2016          60 :             if (curpage == QUEUE_POS_PAGE(head))
    2017             :             {
    2018             :                 /* we only want to read as far as head */
    2019          60 :                 copysize = QUEUE_POS_OFFSET(head) - curoffset;
    2020          60 :                 if (copysize < 0)
    2021           0 :                     copysize = 0;   /* just for safety */
    2022             :             }
    2023             :             else
    2024             :             {
    2025             :                 /* fetch all the rest of the page */
    2026           0 :                 copysize = QUEUE_PAGESIZE - curoffset;
    2027             :             }
    2028         180 :             memcpy(page_buffer.buf + curoffset,
    2029         120 :                    NotifyCtl->shared->page_buffer[slotno] + curoffset,
    2030             :                    copysize);
    2031             :             /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
    2032          60 :             LWLockRelease(NotifySLRULock);
    2033             : 
    2034             :             /*
    2035             :              * Process messages up to the stop position, end of page, or an
    2036             :              * uncommitted message.
    2037             :              *
    2038             :              * Our stop position is what we found to be the head's position
    2039             :              * when we entered this function. It might have changed already.
    2040             :              * But if it has, we will receive (or have already received and
    2041             :              * queued) another signal and come here again.
    2042             :              *
    2043             :              * We are not holding NotifyQueueLock here! The queue can only
    2044             :              * extend beyond the head pointer (see above) and we leave our
    2045             :              * backend's pointer where it is so nobody will truncate or
    2046             :              * rewrite pages under us. Especially we don't want to hold a lock
    2047             :              * while sending the notifications to the frontend.
    2048             :              */
    2049          60 :             reachedStop = asyncQueueProcessPageEntries(&pos, head,
    2050             :                                                        page_buffer.buf,
    2051             :                                                        snapshot);
    2052          60 :         } while (!reachedStop);
    2053             :     }
    2054           0 :     PG_FINALLY();
    2055             :     {
    2056             :         /* Update shared state */
    2057          60 :         LWLockAcquire(NotifyQueueLock, LW_SHARED);
    2058          60 :         QUEUE_BACKEND_POS(MyBackendId) = pos;
    2059          60 :         LWLockRelease(NotifyQueueLock);
    2060             :     }
    2061          60 :     PG_END_TRY();
    2062             : 
    2063             :     /* Done with snapshot */
    2064          60 :     UnregisterSnapshot(snapshot);
    2065             : }
    2066             : 
    2067             : /*
    2068             :  * Fetch notifications from the shared queue, beginning at position current,
    2069             :  * and deliver relevant ones to my frontend.
    2070             :  *
    2071             :  * The current page must have been fetched into page_buffer from shared
    2072             :  * memory.  (We could access the page right in shared memory, but that
    2073             :  * would imply holding the NotifySLRULock throughout this routine.)
    2074             :  *
    2075             :  * We stop if we reach the "stop" position, or reach a notification from an
    2076             :  * uncommitted transaction, or reach the end of the page.
    2077             :  *
    2078             :  * The function returns true once we have reached the stop position or an
    2079             :  * uncommitted notification, and false if we have finished with the page.
    2080             :  * In other words: once it returns true there is no need to look further.
    2081             :  * The QueuePosition *current is advanced past all processed messages.
    2082             :  */
    2083             : static bool
    2084          60 : asyncQueueProcessPageEntries(volatile QueuePosition *current,
    2085             :                              QueuePosition stop,
    2086             :                              char *page_buffer,
    2087             :                              Snapshot snapshot)
    2088             : {
    2089          60 :     bool        reachedStop = false;
    2090             :     bool        reachedEndOfPage;
    2091             :     AsyncQueueEntry *qe;
    2092             : 
    2093             :     do
    2094             :     {
    2095         288 :         QueuePosition thisentry = *current;
    2096             : 
    2097         288 :         if (QUEUE_POS_EQUAL(thisentry, stop))
    2098          60 :             break;
    2099             : 
    2100         228 :         qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
    2101             : 
    2102             :         /*
    2103             :          * Advance *current over this message, possibly to the next page. As
    2104             :          * noted in the comments for asyncQueueReadAllNotifications, we must
    2105             :          * do this before possibly failing while processing the message.
    2106             :          */
    2107         228 :         reachedEndOfPage = asyncQueueAdvance(current, qe->length);
    2108             : 
    2109             :         /* Ignore messages destined for other databases */
    2110         228 :         if (qe->dboid == MyDatabaseId)
    2111             :         {
    2112         228 :             if (XidInMVCCSnapshot(qe->xid, snapshot))
    2113             :             {
    2114             :                 /*
    2115             :                  * The source transaction is still in progress, so we can't
    2116             :                  * process this message yet.  Break out of the loop, but first
    2117             :                  * back up *current so we will reprocess the message next
    2118             :                  * time.  (Note: it is unlikely but not impossible for
    2119             :                  * TransactionIdDidCommit to fail, so we can't really avoid
    2120             :                  * this advance-then-back-up behavior when dealing with an
    2121             :                  * uncommitted message.)
    2122             :                  *
    2123             :                  * Note that we must test XidInMVCCSnapshot before we test
    2124             :                  * TransactionIdDidCommit, else we might return a message from
    2125             :                  * a transaction that is not yet visible to snapshots; compare
    2126             :                  * the comments at the head of heapam_visibility.c.
    2127             :                  *
    2128             :                  * Also, while our own xact won't be listed in the snapshot,
    2129             :                  * we need not check for TransactionIdIsCurrentTransactionId
    2130             :                  * because our transaction cannot (yet) have queued any
    2131             :                  * messages.
    2132             :                  */
    2133           0 :                 *current = thisentry;
    2134           0 :                 reachedStop = true;
    2135           0 :                 break;
    2136             :             }
    2137         228 :             else if (TransactionIdDidCommit(qe->xid))
    2138             :             {
    2139             :                 /* qe->data is the null-terminated channel name */
    2140         228 :                 char       *channel = qe->data;
    2141             : 
    2142         228 :                 if (IsListeningOn(channel))
    2143             :                 {
    2144             :                     /* payload follows channel name */
    2145          48 :                     char       *payload = qe->data + strlen(channel) + 1;
    2146             : 
    2147          48 :                     NotifyMyFrontEnd(channel, payload, qe->srcPid);
    2148             :                 }
    2149             :             }
    2150             :             else
    2151             :             {
    2152             :                 /*
    2153             :                  * The source transaction aborted or crashed, so we just
    2154             :                  * ignore its notifications.
    2155             :                  */
    2156             :             }
    2157             :         }
    2158             : 
    2159             :         /* Loop back if we're not at end of page */
    2160         228 :     } while (!reachedEndOfPage);
    2161             : 
    2162          60 :     if (QUEUE_POS_EQUAL(*current, stop))
    2163          60 :         reachedStop = true;
    2164             : 
    2165          60 :     return reachedStop;
    2166             : }
    2167             : 
    2168             : /*
    2169             :  * Advance the shared queue tail variable to the minimum of all the
    2170             :  * per-backend tail pointers.  Truncate pg_notify space if possible.
    2171             :  */
    2172             : static void
    2173           8 : asyncQueueAdvanceTail(void)
    2174             : {
    2175             :     QueuePosition min;
    2176             :     int         oldtailpage;
    2177             :     int         newtailpage;
    2178             :     int         boundary;
    2179             : 
    2180           8 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    2181           8 :     min = QUEUE_HEAD;
    2182          12 :     for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
    2183             :     {
    2184             :         Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
    2185           4 :         min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
    2186             :     }
    2187           8 :     oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
    2188           8 :     QUEUE_TAIL = min;
    2189           8 :     LWLockRelease(NotifyQueueLock);
    2190             : 
    2191             :     /*
    2192             :      * We can truncate something if the global tail advanced across an SLRU
    2193             :      * segment boundary.
    2194             :      *
    2195             :      * XXX it might be better to truncate only once every several segments, to
    2196             :      * reduce the number of directory scans.
    2197             :      */
    2198           8 :     newtailpage = QUEUE_POS_PAGE(min);
    2199           8 :     boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
    2200           8 :     if (asyncQueuePagePrecedes(oldtailpage, boundary))
    2201             :     {
    2202             :         /*
    2203             :          * SimpleLruTruncate() will ask for NotifySLRULock but will also
    2204             :          * release the lock again.
    2205             :          */
    2206           0 :         SimpleLruTruncate(NotifyCtl, newtailpage);
    2207             :     }
    2208           8 : }
    2209             : 
    2210             : /*
    2211             :  * ProcessIncomingNotify
    2212             :  *
    2213             :  *      Deal with arriving NOTIFYs from other backends as soon as it's safe to
    2214             :  *      do so. This used to be called from the PROCSIG_NOTIFY_INTERRUPT
    2215             :  *      signal handler, but isn't anymore.
    2216             :  *
    2217             :  *      Scan the queue for arriving notifications and report them to my front
    2218             :  *      end.
    2219             :  *
    2220             :  *      NOTE: since we are outside any transaction, we must create our own.
    2221             :  */
    2222             : static void
    2223          18 : ProcessIncomingNotify(void)
    2224             : {
    2225             :     /* We *must* reset the flag */
    2226          18 :     notifyInterruptPending = false;
    2227             : 
    2228             :     /* Do nothing else if we aren't actively listening */
    2229          18 :     if (listenChannels == NIL)
    2230           0 :         return;
    2231             : 
    2232          18 :     if (Trace_notify)
    2233           0 :         elog(DEBUG1, "ProcessIncomingNotify");
    2234             : 
    2235          18 :     set_ps_display("notify interrupt");
    2236             : 
    2237             :     /*
    2238             :      * We must run asyncQueueReadAllNotifications inside a transaction, else
    2239             :      * bad things happen if it gets an error.
    2240             :      */
    2241          18 :     StartTransactionCommand();
    2242             : 
    2243          18 :     asyncQueueReadAllNotifications();
    2244             : 
    2245          18 :     CommitTransactionCommand();
    2246             : 
    2247             :     /*
    2248             :      * Must flush the notify messages to ensure frontend gets them promptly.
    2249             :      */
    2250          18 :     pq_flush();
    2251             : 
    2252          18 :     set_ps_display("idle");
    2253             : 
    2254          18 :     if (Trace_notify)
    2255           0 :         elog(DEBUG1, "ProcessIncomingNotify: done");
    2256             : }
    2257             : 
    2258             : /*
    2259             :  * Send NOTIFY message to my front end.
    2260             :  */
    2261             : void
    2262          48 : NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
    2263             : {
    2264          48 :     if (whereToSendOutput == DestRemote)
    2265             :     {
    2266             :         StringInfoData buf;
    2267             : 
    2268          48 :         pq_beginmessage(&buf, 'A');
    2269          48 :         pq_sendint32(&buf, srcPid);
    2270          48 :         pq_sendstring(&buf, channel);
    2271          48 :         if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
    2272          48 :             pq_sendstring(&buf, payload);
    2273          48 :         pq_endmessage(&buf);
    2274             : 
    2275             :         /*
    2276             :          * NOTE: we do not do pq_flush() here.  For a self-notify, it will
    2277             :          * happen at the end of the transaction, and for incoming notifies
    2278             :          * ProcessIncomingNotify will do it after finding all the notifies.
    2279             :          */
    2280             :     }
    2281             :     else
    2282           0 :         elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
    2283          48 : }
    2284             : 
    2285             : /* Does pendingNotifies include a match for the given event? */
    2286             : static bool
    2287        2050 : AsyncExistsPendingNotify(Notification *n)
    2288             : {
    2289        2050 :     if (pendingNotifies == NULL)
    2290           0 :         return false;
    2291             : 
    2292        2050 :     if (pendingNotifies->hashtab != NULL)
    2293             :     {
    2294             :         /* Use the hash table to probe for a match */
    2295        1966 :         if (hash_search(pendingNotifies->hashtab,
    2296             :                         &n,
    2297             :                         HASH_FIND,
    2298             :                         NULL))
    2299           0 :             return true;
    2300             :     }
    2301             :     else
    2302             :     {
    2303             :         /* Must scan the event list */
    2304             :         ListCell   *l;
    2305             : 
    2306         428 :         foreach(l, pendingNotifies->events)
    2307             :         {
    2308         372 :             Notification *oldn = (Notification *) lfirst(l);
    2309             : 
    2310         372 :             if (n->channel_len == oldn->channel_len &&
    2311         372 :                 n->payload_len == oldn->payload_len &&
    2312         182 :                 memcmp(n->data, oldn->data,
    2313         182 :                        n->channel_len + n->payload_len + 2) == 0)
    2314          28 :                 return true;
    2315             :         }
    2316             :     }
    2317             : 
    2318        2022 :     return false;
    2319             : }
    2320             : 
    2321             : /*
    2322             :  * Add a notification event to a pre-existing pendingNotifies list.
    2323             :  *
    2324             :  * Because pendingNotifies->events is already nonempty, this works
    2325             :  * correctly no matter what CurrentMemoryContext is.
    2326             :  */
    2327             : static void
    2328        2022 : AddEventToPendingNotifies(Notification *n)
    2329             : {
    2330             :     Assert(pendingNotifies->events != NIL);
    2331             : 
    2332             :     /* Create the hash table if it's time to */
    2333        2022 :     if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
    2334        1968 :         pendingNotifies->hashtab == NULL)
    2335             :     {
    2336             :         HASHCTL     hash_ctl;
    2337             :         ListCell   *l;
    2338             : 
    2339             :         /* Create the hash table */
    2340          28 :         MemSet(&hash_ctl, 0, sizeof(hash_ctl));
    2341           2 :         hash_ctl.keysize = sizeof(Notification *);
    2342           2 :         hash_ctl.entrysize = sizeof(NotificationHash);
    2343           2 :         hash_ctl.hash = notification_hash;
    2344           2 :         hash_ctl.match = notification_match;
    2345           2 :         hash_ctl.hcxt = CurTransactionContext;
    2346           4 :         pendingNotifies->hashtab =
    2347           2 :             hash_create("Pending Notifies",
    2348             :                         256L,
    2349             :                         &hash_ctl,
    2350             :                         HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
    2351             : 
    2352             :         /* Insert all the already-existing events */
    2353          34 :         foreach(l, pendingNotifies->events)
    2354             :         {
    2355          32 :             Notification *oldn = (Notification *) lfirst(l);
    2356             :             NotificationHash *hentry;
    2357             :             bool        found;
    2358             : 
    2359          32 :             hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
    2360             :                                                       &oldn,
    2361             :                                                       HASH_ENTER,
    2362             :                                                       &found);
    2363             :             Assert(!found);
    2364          32 :             hentry->event = oldn;
    2365             :         }
    2366             :     }
    2367             : 
    2368             :     /* Add new event to the list, in order */
    2369        2022 :     pendingNotifies->events = lappend(pendingNotifies->events, n);
    2370             : 
    2371             :     /* Add event to the hash table if needed */
    2372        2022 :     if (pendingNotifies->hashtab != NULL)
    2373             :     {
    2374             :         NotificationHash *hentry;
    2375             :         bool        found;
    2376             : 
    2377        1968 :         hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
    2378             :                                                   &n,
    2379             :                                                   HASH_ENTER,
    2380             :                                                   &found);
    2381             :         Assert(!found);
    2382        1968 :         hentry->event = n;
    2383             :     }
    2384        2022 : }
    2385             : 
    2386             : /*
    2387             :  * notification_hash: hash function for notification hash table
    2388             :  *
    2389             :  * The hash "keys" are pointers to Notification structs.
    2390             :  */
    2391             : static uint32
    2392        3966 : notification_hash(const void *key, Size keysize)
    2393             : {
    2394        3966 :     const Notification *k = *(const Notification *const *) key;
    2395             : 
    2396             :     Assert(keysize == sizeof(Notification *));
    2397             :     /* We don't bother to include the payload's trailing null in the hash */
    2398        3966 :     return DatumGetUInt32(hash_any((const unsigned char *) k->data,
    2399             :                                    k->channel_len + k->payload_len + 1));
    2400             : }
    2401             : 
    2402             : /*
    2403             :  * notification_match: match function to use with notification_hash
    2404             :  */
    2405             : static int
    2406           0 : notification_match(const void *key1, const void *key2, Size keysize)
    2407             : {
    2408           0 :     const Notification *k1 = *(const Notification *const *) key1;
    2409           0 :     const Notification *k2 = *(const Notification *const *) key2;
    2410             : 
    2411             :     Assert(keysize == sizeof(Notification *));
    2412           0 :     if (k1->channel_len == k2->channel_len &&
    2413           0 :         k1->payload_len == k2->payload_len &&
    2414           0 :         memcmp(k1->data, k2->data,
    2415           0 :                k1->channel_len + k1->payload_len + 2) == 0)
    2416           0 :         return 0;               /* equal */
    2417           0 :     return 1;                   /* not equal */
    2418             : }
    2419             : 
    2420             : /* Clear the pendingActions and pendingNotifies lists. */
    2421             : static void
    2422       20502 : ClearPendingActionsAndNotifies(void)
    2423             : {
    2424             :     /*
    2425             :      * Everything's allocated in either TopTransactionContext or the context
    2426             :      * for the subtransaction to which it corresponds.  So, there's nothing to
    2427             :      * do here except reset the pointers; the space will be reclaimed when the
    2428             :      * contexts are deleted.
    2429             :      */
    2430       20502 :     pendingActions = NULL;
    2431       20502 :     pendingNotifies = NULL;
    2432       20502 : }

Generated by: LCOV version 1.13