LCOV - code coverage report
Current view: top level - src/backend/storage/ipc - shm_mq.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 86.4 % 381 329
Test Date: 2026-03-24 01:16:09 Functions: 95.2 % 21 20
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * shm_mq.c
       4              :  *    single-reader, single-writer shared memory message queue
       5              :  *
       6              :  * Both the sender and the receiver must have a PGPROC; their respective
       7              :  * process latches are used for synchronization.  Only the sender may send,
       8              :  * and only the receiver may receive.  This is intended to allow a user
       9              :  * backend to communicate with worker backends that it has registered.
      10              :  *
      11              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      12              :  * Portions Copyright (c) 1994, Regents of the University of California
      13              :  *
      14              :  * src/backend/storage/ipc/shm_mq.c
      15              :  *
      16              :  *-------------------------------------------------------------------------
      17              :  */
      18              : 
      19              : #include "postgres.h"
      20              : 
      21              : #include "miscadmin.h"
      22              : #include "pgstat.h"
      23              : #include "port/pg_bitutils.h"
      24              : #include "postmaster/bgworker.h"
      25              : #include "storage/proc.h"
      26              : #include "storage/shm_mq.h"
      27              : #include "storage/spin.h"
      28              : #include "utils/memutils.h"
      29              : #include "utils/wait_event.h"
      30              : 
      31              : /*
      32              :  * This structure represents the actual queue, stored in shared memory.
      33              :  *
      34              :  * Some notes on synchronization:
      35              :  *
      36              :  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
      37              :  * mq_sender and mq_bytes_written can only be changed by the sender.
      38              :  * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
      39              :  * they cannot change once set, and thus may be read without a lock once this
      40              :  * is known to be the case.
      41              :  *
      42              :  * mq_bytes_read and mq_bytes_written are not protected by the mutex.  Instead,
      43              :  * they are written atomically using 8 byte loads and stores.  Memory barriers
      44              :  * must be carefully used to synchronize reads and writes of these values with
      45              :  * reads and writes of the actual data in mq_ring.
      46              :  *
      47              :  * mq_detached needs no locking.  It can be set by either the sender or the
      48              :  * receiver, but only ever from false to true, so redundant writes don't
      49              :  * matter.  It is important that if we set mq_detached and then set the
      50              :  * counterparty's latch, the counterparty must be certain to see the change
      51              :  * after waking up.  Since SetLatch begins with a memory barrier and ResetLatch
      52              :  * ends with one, this should be OK.
      53              :  *
      54              :  * mq_ring_size and mq_ring_offset never change after initialization, and
      55              :  * can therefore be read without the lock.
      56              :  *
      57              :  * Importantly, mq_ring can be safely read and written without a lock.
      58              :  * At any given time, the difference between mq_bytes_read and
      59              :  * mq_bytes_written defines the number of bytes within mq_ring that contain
      60              :  * unread data, and mq_bytes_read defines the position where those bytes
      61              :  * begin.  The sender can increase the number of unread bytes at any time,
      62              :  * but only the receiver can give license to overwrite those bytes, by
      63              :  * incrementing mq_bytes_read.  Therefore, it's safe for the receiver to read
      64              :  * the unread bytes it knows to be present without the lock.  Conversely,
      65              :  * the sender can write to the unused portion of the ring buffer without
      66              :  * the lock, because nobody else can be reading or writing those bytes.  The
      67              :  * receiver could be making more bytes unused by incrementing mq_bytes_read,
      68              :  * but that's OK.  Note that it would be unsafe for the receiver to read any
      69              :  * data it's already marked as read, or to write any data; and it would be
      70              :  * unsafe for the sender to reread any data after incrementing
      71              :  * mq_bytes_written, but fortunately there's no need for any of that.
      72              :  */
      73              : struct shm_mq
      74              : {
      75              :     slock_t     mq_mutex;
      76              :     PGPROC     *mq_receiver;
      77              :     PGPROC     *mq_sender;
      78              :     pg_atomic_uint64 mq_bytes_read;
      79              :     pg_atomic_uint64 mq_bytes_written;
      80              :     Size        mq_ring_size;
      81              :     bool        mq_detached;
      82              :     uint8       mq_ring_offset;
      83              :     char        mq_ring[FLEXIBLE_ARRAY_MEMBER];
      84              : };
      85              : 
      86              : /*
      87              :  * This structure is a backend-private handle for access to a queue.
      88              :  *
      89              :  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
      90              :  * an optional pointer to the dynamic shared memory segment that contains it.
      91              :  * (If mqh_segment is provided, we register an on_dsm_detach callback to
      92              :  * make sure we detach from the queue before detaching from DSM.)
      93              :  *
      94              :  * If this queue is intended to connect the current process with a background
      95              :  * worker that started it, the user can pass a pointer to the worker handle
      96              :  * to shm_mq_attach(), and we'll store it in mqh_handle.  The point of this
      97              :  * is to allow us to begin sending to or receiving from that queue before the
      98              :  * process we'll be communicating with has even been started.  If it fails
      99              :  * to start, the handle will allow us to notice that and fail cleanly, rather
     100              :  * than waiting forever; see shm_mq_wait_internal.  This is mostly useful in
     101              :  * simple cases - e.g. where there are just 2 processes communicating; in
     102              :  * more complex scenarios, every process may not have a BackgroundWorkerHandle
     103              :  * available, or may need to watch for the failure of more than one other
     104              :  * process at a time.
     105              :  *
     106              :  * When a message exists as a contiguous chunk of bytes in the queue - that is,
     107              :  * it is smaller than the size of the ring buffer and does not wrap around
     108              :  * the end - we return the message to the caller as a pointer into the buffer.
     109              :  * For messages that are larger or happen to wrap, we reassemble the message
     110              :  * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
     111              :  * the buffer, and mqh_buflen is the number of bytes allocated for it.
     112              :  *
     113              :  * mqh_send_pending, is number of bytes that is written to the queue but not
     114              :  * yet updated in the shared memory.  We will not update it until the written
     115              :  * data is 1/4th of the ring size or the tuple queue is full.  This will
     116              :  * prevent frequent CPU cache misses, and it will also avoid frequent
     117              :  * SetLatch() calls, which are quite expensive.
     118              :  *
     119              :  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
     120              :  * are used to track the state of non-blocking operations.  When the caller
     121              :  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
     122              :  * are expected to retry the call at a later time with the same argument;
     123              :  * we need to retain enough state to pick up where we left off.
     124              :  * mqh_length_word_complete tracks whether we are done sending or receiving
     125              :  * (whichever we're doing) the entire length word.  mqh_partial_bytes tracks
     126              :  * the number of bytes read or written for either the length word or the
     127              :  * message itself, and mqh_expected_bytes - which is used only for reads -
     128              :  * tracks the expected total size of the payload.
     129              :  *
     130              :  * mqh_counterparty_attached tracks whether we know the counterparty to have
     131              :  * attached to the queue at some previous point.  This lets us avoid some
     132              :  * mutex acquisitions.
     133              :  *
     134              :  * mqh_context is the memory context in effect at the time we attached to
     135              :  * the shm_mq.  The shm_mq_handle itself is allocated in this context, and
     136              :  * we make sure any other allocations we do happen in this context as well,
     137              :  * to avoid nasty surprises.
     138              :  */
     139              : struct shm_mq_handle
     140              : {
     141              :     shm_mq     *mqh_queue;
     142              :     dsm_segment *mqh_segment;
     143              :     BackgroundWorkerHandle *mqh_handle;
     144              :     char       *mqh_buffer;
     145              :     Size        mqh_buflen;
     146              :     Size        mqh_consume_pending;
     147              :     Size        mqh_send_pending;
     148              :     Size        mqh_partial_bytes;
     149              :     Size        mqh_expected_bytes;
     150              :     bool        mqh_length_word_complete;
     151              :     bool        mqh_counterparty_attached;
     152              :     MemoryContext mqh_context;
     153              : };
     154              : 
     155              : static void shm_mq_detach_internal(shm_mq *mq);
     156              : static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
     157              :                                        const void *data, bool nowait, Size *bytes_written);
     158              : static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
     159              :                                           Size bytes_needed, bool nowait, Size *nbytesp,
     160              :                                           void **datap);
     161              : static bool shm_mq_counterparty_gone(shm_mq *mq,
     162              :                                      BackgroundWorkerHandle *handle);
     163              : static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
     164              :                                  BackgroundWorkerHandle *handle);
     165              : static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
     166              : static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
     167              : static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
     168              : 
     169              : /* Minimum queue size is enough for header and at least one chunk of data. */
     170              : const Size  shm_mq_minimum_size =
     171              : MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
     172              : 
     173              : #define MQH_INITIAL_BUFSIZE             8192
     174              : 
     175              : /*
     176              :  * Initialize a new shared message queue.
     177              :  */
     178              : shm_mq *
     179         3961 : shm_mq_create(void *address, Size size)
     180              : {
     181         3961 :     shm_mq     *mq = address;
     182         3961 :     Size        data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
     183              : 
     184              :     /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
     185         3961 :     size = MAXALIGN_DOWN(size);
     186              : 
     187              :     /* Queue size must be large enough to hold some data. */
     188              :     Assert(size > data_offset);
     189              : 
     190              :     /* Initialize queue header. */
     191         3961 :     SpinLockInit(&mq->mq_mutex);
     192         3961 :     mq->mq_receiver = NULL;
     193         3961 :     mq->mq_sender = NULL;
     194         3961 :     pg_atomic_init_u64(&mq->mq_bytes_read, 0);
     195         3961 :     pg_atomic_init_u64(&mq->mq_bytes_written, 0);
     196         3961 :     mq->mq_ring_size = size - data_offset;
     197         3961 :     mq->mq_detached = false;
     198         3961 :     mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
     199              : 
     200         3961 :     return mq;
     201              : }
     202              : 
     203              : /*
     204              :  * Set the identity of the process that will receive from a shared message
     205              :  * queue.
     206              :  */
     207              : void
     208         3961 : shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
     209              : {
     210              :     PGPROC     *sender;
     211              : 
     212         3961 :     SpinLockAcquire(&mq->mq_mutex);
     213              :     Assert(mq->mq_receiver == NULL);
     214         3961 :     mq->mq_receiver = proc;
     215         3961 :     sender = mq->mq_sender;
     216         3961 :     SpinLockRelease(&mq->mq_mutex);
     217              : 
     218         3961 :     if (sender != NULL)
     219           20 :         SetLatch(&sender->procLatch);
     220         3961 : }
     221              : 
     222              : /*
     223              :  * Set the identity of the process that will send to a shared message queue.
     224              :  */
     225              : void
     226         3844 : shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
     227              : {
     228              :     PGPROC     *receiver;
     229              : 
     230         3844 :     SpinLockAcquire(&mq->mq_mutex);
     231              :     Assert(mq->mq_sender == NULL);
     232         3844 :     mq->mq_sender = proc;
     233         3844 :     receiver = mq->mq_receiver;
     234         3844 :     SpinLockRelease(&mq->mq_mutex);
     235              : 
     236         3844 :     if (receiver != NULL)
     237         3824 :         SetLatch(&receiver->procLatch);
     238         3844 : }
     239              : 
     240              : /*
     241              :  * Get the configured receiver.
     242              :  */
     243              : PGPROC *
     244            5 : shm_mq_get_receiver(shm_mq *mq)
     245              : {
     246              :     PGPROC     *receiver;
     247              : 
     248            5 :     SpinLockAcquire(&mq->mq_mutex);
     249            5 :     receiver = mq->mq_receiver;
     250            5 :     SpinLockRelease(&mq->mq_mutex);
     251              : 
     252            5 :     return receiver;
     253              : }
     254              : 
     255              : /*
     256              :  * Get the configured sender.
     257              :  */
     258              : PGPROC *
     259     11358181 : shm_mq_get_sender(shm_mq *mq)
     260              : {
     261              :     PGPROC     *sender;
     262              : 
     263     11358181 :     SpinLockAcquire(&mq->mq_mutex);
     264     11358181 :     sender = mq->mq_sender;
     265     11358181 :     SpinLockRelease(&mq->mq_mutex);
     266              : 
     267     11358181 :     return sender;
     268              : }
     269              : 
     270              : /*
     271              :  * Attach to a shared message queue so we can send or receive messages.
     272              :  *
     273              :  * The memory context in effect at the time this function is called should
     274              :  * be one which will last for at least as long as the message queue itself.
     275              :  * We'll allocate the handle in that context, and future allocations that
     276              :  * are needed to buffer incoming data will happen in that context as well.
     277              :  *
     278              :  * If seg != NULL, the queue will be automatically detached when that dynamic
     279              :  * shared memory segment is detached.
     280              :  *
     281              :  * If handle != NULL, the queue can be read or written even before the
     282              :  * other process has attached.  We'll wait for it to do so if needed.  The
     283              :  * handle must be for a background worker initialized with bgw_notify_pid
     284              :  * equal to our PID.
     285              :  *
     286              :  * shm_mq_detach() should be called when done.  This will free the
     287              :  * shm_mq_handle and mark the queue itself as detached, so that our
     288              :  * counterpart won't get stuck waiting for us to fill or drain the queue
     289              :  * after we've already lost interest.
     290              :  */
     291              : shm_mq_handle *
     292         7805 : shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
     293              : {
     294         7805 :     shm_mq_handle *mqh = palloc_object(shm_mq_handle);
     295              : 
     296              :     Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
     297         7805 :     mqh->mqh_queue = mq;
     298         7805 :     mqh->mqh_segment = seg;
     299         7805 :     mqh->mqh_handle = handle;
     300         7805 :     mqh->mqh_buffer = NULL;
     301         7805 :     mqh->mqh_buflen = 0;
     302         7805 :     mqh->mqh_consume_pending = 0;
     303         7805 :     mqh->mqh_send_pending = 0;
     304         7805 :     mqh->mqh_partial_bytes = 0;
     305         7805 :     mqh->mqh_expected_bytes = 0;
     306         7805 :     mqh->mqh_length_word_complete = false;
     307         7805 :     mqh->mqh_counterparty_attached = false;
     308         7805 :     mqh->mqh_context = CurrentMemoryContext;
     309              : 
     310         7805 :     if (seg != NULL)
     311         7805 :         on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
     312              : 
     313         7805 :     return mqh;
     314              : }
     315              : 
     316              : /*
     317              :  * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
     318              :  * been passed to shm_mq_attach.
     319              :  */
     320              : void
     321         3804 : shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
     322              : {
     323              :     Assert(mqh->mqh_handle == NULL);
     324         3804 :     mqh->mqh_handle = handle;
     325         3804 : }
     326              : 
     327              : /*
     328              :  * Write a message into a shared message queue.
     329              :  */
     330              : shm_mq_result
     331      1156921 : shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
     332              :             bool force_flush)
     333              : {
     334              :     shm_mq_iovec iov;
     335              : 
     336      1156921 :     iov.data = data;
     337      1156921 :     iov.len = nbytes;
     338              : 
     339      1156921 :     return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
     340              : }
     341              : 
     342              : /*
     343              :  * Write a message into a shared message queue, gathered from multiple
     344              :  * addresses.
     345              :  *
     346              :  * When nowait = false, we'll wait on our process latch when the ring buffer
     347              :  * fills up, and then continue writing once the receiver has drained some data.
     348              :  * The process latch is reset after each wait.
     349              :  *
     350              :  * When nowait = true, we do not manipulate the state of the process latch;
     351              :  * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK.  In
     352              :  * this case, the caller should call this function again, with the same
     353              :  * arguments, each time the process latch is set.  (Once begun, the sending
     354              :  * of a message cannot be aborted except by detaching from the queue; changing
     355              :  * the length or payload will corrupt the queue.)
     356              :  *
     357              :  * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
     358              :  * and notify the receiver (if it is already attached).  Otherwise, we don't
     359              :  * update it until we have written an amount of data greater than 1/4th of the
     360              :  * ring size.
     361              :  */
     362              : shm_mq_result
     363      1158937 : shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
     364              :              bool force_flush)
     365              : {
     366              :     shm_mq_result res;
     367      1158937 :     shm_mq     *mq = mqh->mqh_queue;
     368              :     PGPROC     *receiver;
     369      1158937 :     Size        nbytes = 0;
     370              :     Size        bytes_written;
     371              :     int         i;
     372      1158937 :     int         which_iov = 0;
     373              :     Size        offset;
     374              : 
     375              :     Assert(mq->mq_sender == MyProc);
     376              : 
     377              :     /* Compute total size of write. */
     378      2319890 :     for (i = 0; i < iovcnt; ++i)
     379      1160953 :         nbytes += iov[i].len;
     380              : 
     381              :     /* Prevent writing messages overwhelming the receiver. */
     382      1158937 :     if (nbytes > MaxAllocSize)
     383            0 :         ereport(ERROR,
     384              :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     385              :                  errmsg("cannot send a message of size %zu via shared memory queue",
     386              :                         nbytes)));
     387              : 
     388              :     /* Try to write, or finish writing, the length word into the buffer. */
     389      2313768 :     while (!mqh->mqh_length_word_complete)
     390              :     {
     391              :         Assert(mqh->mqh_partial_bytes < sizeof(Size));
     392      1154835 :         res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
     393      1154835 :                                 ((char *) &nbytes) + mqh->mqh_partial_bytes,
     394              :                                 nowait, &bytes_written);
     395              : 
     396      1154835 :         if (res == SHM_MQ_DETACHED)
     397              :         {
     398              :             /* Reset state in case caller tries to send another message. */
     399            4 :             mqh->mqh_partial_bytes = 0;
     400            4 :             mqh->mqh_length_word_complete = false;
     401            4 :             return res;
     402              :         }
     403      1154831 :         mqh->mqh_partial_bytes += bytes_written;
     404              : 
     405      1154831 :         if (mqh->mqh_partial_bytes >= sizeof(Size))
     406              :         {
     407              :             Assert(mqh->mqh_partial_bytes == sizeof(Size));
     408              : 
     409      1154831 :             mqh->mqh_partial_bytes = 0;
     410      1154831 :             mqh->mqh_length_word_complete = true;
     411              :         }
     412              : 
     413      1154831 :         if (res != SHM_MQ_SUCCESS)
     414            0 :             return res;
     415              : 
     416              :         /* Length word can't be split unless bigger than required alignment. */
     417              :         Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
     418              :     }
     419              : 
     420              :     /* Write the actual data bytes into the buffer. */
     421              :     Assert(mqh->mqh_partial_bytes <= nbytes);
     422      1158933 :     offset = mqh->mqh_partial_bytes;
     423              :     do
     424              :     {
     425              :         Size        chunksize;
     426              : 
     427              :         /* Figure out which bytes need to be sent next. */
     428      1158954 :         if (offset >= iov[which_iov].len)
     429              :         {
     430         4008 :             offset -= iov[which_iov].len;
     431         4008 :             ++which_iov;
     432         4008 :             if (which_iov >= iovcnt)
     433         4000 :                 break;
     434            8 :             continue;
     435              :         }
     436              : 
     437              :         /*
     438              :          * We want to avoid copying the data if at all possible, but every
     439              :          * chunk of bytes we write into the queue has to be MAXALIGN'd, except
     440              :          * the last.  Thus, if a chunk other than the last one ends on a
     441              :          * non-MAXALIGN'd boundary, we have to combine the tail end of its
     442              :          * data with data from one or more following chunks until we either
     443              :          * reach the last chunk or accumulate a number of bytes which is
     444              :          * MAXALIGN'd.
     445              :          */
     446      1154946 :         if (which_iov + 1 < iovcnt &&
     447         2004 :             offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
     448         2004 :         {
     449              :             char        tmpbuf[MAXIMUM_ALIGNOF];
     450         2004 :             int         j = 0;
     451              : 
     452              :             for (;;)
     453              :             {
     454         6090 :                 if (offset < iov[which_iov].len)
     455              :                 {
     456         2095 :                     tmpbuf[j] = iov[which_iov].data[offset];
     457         2095 :                     j++;
     458         2095 :                     offset++;
     459         2095 :                     if (j == MAXIMUM_ALIGNOF)
     460           13 :                         break;
     461              :                 }
     462              :                 else
     463              :                 {
     464         3995 :                     offset -= iov[which_iov].len;
     465         3995 :                     which_iov++;
     466         3995 :                     if (which_iov >= iovcnt)
     467         1991 :                         break;
     468              :                 }
     469              :             }
     470              : 
     471         2004 :             res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
     472              : 
     473         2004 :             if (res == SHM_MQ_DETACHED)
     474              :             {
     475              :                 /* Reset state in case caller tries to send another message. */
     476            0 :                 mqh->mqh_partial_bytes = 0;
     477            0 :                 mqh->mqh_length_word_complete = false;
     478            0 :                 return res;
     479              :             }
     480              : 
     481         2004 :             mqh->mqh_partial_bytes += bytes_written;
     482         2004 :             if (res != SHM_MQ_SUCCESS)
     483            0 :                 return res;
     484         2004 :             continue;
     485              :         }
     486              : 
     487              :         /*
     488              :          * If this is the last chunk, we can write all the data, even if it
     489              :          * isn't a multiple of MAXIMUM_ALIGNOF.  Otherwise, we need to
     490              :          * MAXALIGN_DOWN the write size.
     491              :          */
     492      1152942 :         chunksize = iov[which_iov].len - offset;
     493      1152942 :         if (which_iov + 1 < iovcnt)
     494            0 :             chunksize = MAXALIGN_DOWN(chunksize);
     495      1152942 :         res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
     496              :                                 nowait, &bytes_written);
     497              : 
     498      1152942 :         if (res == SHM_MQ_DETACHED)
     499              :         {
     500              :             /* Reset state in case caller tries to send another message. */
     501            0 :             mqh->mqh_length_word_complete = false;
     502            0 :             mqh->mqh_partial_bytes = 0;
     503            0 :             return res;
     504              :         }
     505              : 
     506      1152942 :         mqh->mqh_partial_bytes += bytes_written;
     507      1152942 :         offset += bytes_written;
     508      1152942 :         if (res != SHM_MQ_SUCCESS)
     509         4102 :             return res;
     510      1150852 :     } while (mqh->mqh_partial_bytes < nbytes);
     511              : 
     512              :     /* Reset for next message. */
     513      1154831 :     mqh->mqh_partial_bytes = 0;
     514      1154831 :     mqh->mqh_length_word_complete = false;
     515              : 
     516              :     /* If queue has been detached, let caller know. */
     517      1154831 :     if (mq->mq_detached)
     518            0 :         return SHM_MQ_DETACHED;
     519              : 
     520              :     /*
     521              :      * If the counterparty is known to have attached, we can read mq_receiver
     522              :      * without acquiring the spinlock.  Otherwise, more caution is needed.
     523              :      */
     524      1154831 :     if (mqh->mqh_counterparty_attached)
     525      1151798 :         receiver = mq->mq_receiver;
     526              :     else
     527              :     {
     528         3033 :         SpinLockAcquire(&mq->mq_mutex);
     529         3033 :         receiver = mq->mq_receiver;
     530         3033 :         SpinLockRelease(&mq->mq_mutex);
     531         3033 :         if (receiver != NULL)
     532         3033 :             mqh->mqh_counterparty_attached = true;
     533              :     }
     534              : 
     535              :     /*
     536              :      * If the caller has requested force flush or we have written more than
     537              :      * 1/4 of the ring size, mark it as written in shared memory and notify
     538              :      * the receiver.
     539              :      */
     540      1154831 :     if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
     541              :     {
     542       121536 :         shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
     543       121536 :         if (receiver != NULL)
     544       121536 :             SetLatch(&receiver->procLatch);
     545       121536 :         mqh->mqh_send_pending = 0;
     546              :     }
     547              : 
     548      1154831 :     return SHM_MQ_SUCCESS;
     549              : }
     550              : 
     551              : /*
     552              :  * Receive a message from a shared message queue.
     553              :  *
     554              :  * We set *nbytes to the message length and *data to point to the message
     555              :  * payload.  If the entire message exists in the queue as a single,
     556              :  * contiguous chunk, *data will point directly into shared memory; otherwise,
     557              :  * it will point to a temporary buffer.  This mostly avoids data copying in
     558              :  * the hoped-for case where messages are short compared to the buffer size,
     559              :  * while still allowing longer messages.  In either case, the return value
     560              :  * remains valid until the next receive operation is performed on the queue.
     561              :  *
     562              :  * When nowait = false, we'll wait on our process latch when the ring buffer
     563              :  * is empty and we have not yet received a full message.  The sender will
     564              :  * set our process latch after more data has been written, and we'll resume
     565              :  * processing.  Each call will therefore return a complete message
     566              :  * (unless the sender detaches the queue).
     567              :  *
     568              :  * When nowait = true, we do not manipulate the state of the process latch;
     569              :  * instead, whenever the buffer is empty and we need to read from it, we
     570              :  * return SHM_MQ_WOULD_BLOCK.  In this case, the caller should call this
     571              :  * function again after the process latch has been set.
     572              :  */
     573              : shm_mq_result
     574      4540687 : shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
     575              : {
     576      4540687 :     shm_mq     *mq = mqh->mqh_queue;
     577              :     shm_mq_result res;
     578      4540687 :     Size        rb = 0;
     579              :     Size        nbytes;
     580              :     void       *rawdata;
     581              : 
     582              :     Assert(mq->mq_receiver == MyProc);
     583              : 
     584              :     /* We can't receive data until the sender has attached. */
     585      4540687 :     if (!mqh->mqh_counterparty_attached)
     586              :     {
     587      3247861 :         if (nowait)
     588              :         {
     589              :             int         counterparty_gone;
     590              : 
     591              :             /*
     592              :              * We shouldn't return at this point at all unless the sender
     593              :              * hasn't attached yet.  However, the correct return value depends
     594              :              * on whether the sender is still attached.  If we first test
     595              :              * whether the sender has ever attached and then test whether the
     596              :              * sender has detached, there's a race condition: a sender that
     597              :              * attaches and detaches very quickly might fool us into thinking
     598              :              * the sender never attached at all.  So, test whether our
     599              :              * counterparty is definitively gone first, and only afterwards
     600              :              * check whether the sender ever attached in the first place.
     601              :              */
     602      3247597 :             counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
     603      3247597 :             if (shm_mq_get_sender(mq) == NULL)
     604              :             {
     605      3244026 :                 if (counterparty_gone)
     606            0 :                     return SHM_MQ_DETACHED;
     607              :                 else
     608      3244026 :                     return SHM_MQ_WOULD_BLOCK;
     609              :             }
     610              :         }
     611          264 :         else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
     612          114 :                  && shm_mq_get_sender(mq) == NULL)
     613              :         {
     614            0 :             mq->mq_detached = true;
     615            0 :             return SHM_MQ_DETACHED;
     616              :         }
     617         3835 :         mqh->mqh_counterparty_attached = true;
     618              :     }
     619              : 
     620              :     /*
     621              :      * If we've consumed an amount of data greater than 1/4th of the ring
     622              :      * size, mark it consumed in shared memory.  We try to avoid doing this
     623              :      * unnecessarily when only a small amount of data has been consumed,
     624              :      * because SetLatch() is fairly expensive and we don't want to do it too
     625              :      * often.
     626              :      */
     627      1296661 :     if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
     628              :     {
     629        22721 :         shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
     630        22721 :         mqh->mqh_consume_pending = 0;
     631              :     }
     632              : 
     633              :     /* Try to read, or finish reading, the length word from the buffer. */
     634      1318443 :     while (!mqh->mqh_length_word_complete)
     635              :     {
     636              :         /* Try to receive the message length word. */
     637              :         Assert(mqh->mqh_partial_bytes < sizeof(Size));
     638      1292736 :         res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
     639              :                                    nowait, &rb, &rawdata);
     640      1292736 :         if (res != SHM_MQ_SUCCESS)
     641       142927 :             return res;
     642              : 
     643              :         /*
     644              :          * Hopefully, we'll receive the entire message length word at once.
     645              :          * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
     646              :          * multiple reads.
     647              :          */
     648      1149809 :         if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
     649        21782 :         {
     650              :             Size        needed;
     651              : 
     652      1149809 :             nbytes = *(Size *) rawdata;
     653              : 
     654              :             /* If we've already got the whole message, we're done. */
     655      1149809 :             needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
     656      1149809 :             if (rb >= needed)
     657              :             {
     658      1128027 :                 mqh->mqh_consume_pending += needed;
     659      1128027 :                 *nbytesp = nbytes;
     660      1128027 :                 *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
     661      1128027 :                 return SHM_MQ_SUCCESS;
     662              :             }
     663              : 
     664              :             /*
     665              :              * We don't have the whole message, but we at least have the whole
     666              :              * length word.
     667              :              */
     668        21782 :             mqh->mqh_expected_bytes = nbytes;
     669        21782 :             mqh->mqh_length_word_complete = true;
     670        21782 :             mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
     671        21782 :             rb -= MAXALIGN(sizeof(Size));
     672              :         }
     673              :         else
     674              :         {
     675              :             Size        lengthbytes;
     676              : 
     677              :             /* Can't be split unless bigger than required alignment. */
     678              :             Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
     679              : 
     680              :             /* Message word is split; need buffer to reassemble. */
     681            0 :             if (mqh->mqh_buffer == NULL)
     682              :             {
     683            0 :                 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
     684              :                                                      MQH_INITIAL_BUFSIZE);
     685            0 :                 mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
     686              :             }
     687              :             Assert(mqh->mqh_buflen >= sizeof(Size));
     688              : 
     689              :             /* Copy partial length word; remember to consume it. */
     690            0 :             if (mqh->mqh_partial_bytes + rb > sizeof(Size))
     691            0 :                 lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
     692              :             else
     693            0 :                 lengthbytes = rb;
     694            0 :             memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
     695              :                    lengthbytes);
     696            0 :             mqh->mqh_partial_bytes += lengthbytes;
     697            0 :             mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
     698            0 :             rb -= lengthbytes;
     699              : 
     700              :             /* If we now have the whole word, we're ready to read payload. */
     701            0 :             if (mqh->mqh_partial_bytes >= sizeof(Size))
     702              :             {
     703              :                 Assert(mqh->mqh_partial_bytes == sizeof(Size));
     704            0 :                 mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
     705            0 :                 mqh->mqh_length_word_complete = true;
     706            0 :                 mqh->mqh_partial_bytes = 0;
     707              :             }
     708              :         }
     709              :     }
     710        25707 :     nbytes = mqh->mqh_expected_bytes;
     711              : 
     712              :     /*
     713              :      * Should be disallowed on the sending side already, but better check and
     714              :      * error out on the receiver side as well rather than trying to read a
     715              :      * prohibitively large message.
     716              :      */
     717        25707 :     if (nbytes > MaxAllocSize)
     718            0 :         ereport(ERROR,
     719              :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     720              :                  errmsg("invalid message size %zu in shared memory queue",
     721              :                         nbytes)));
     722              : 
     723        25707 :     if (mqh->mqh_partial_bytes == 0)
     724              :     {
     725              :         /*
     726              :          * Try to obtain the whole message in a single chunk.  If this works,
     727              :          * we need not copy the data and can return a pointer directly into
     728              :          * shared memory.
     729              :          */
     730        21980 :         res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
     731        21980 :         if (res != SHM_MQ_SUCCESS)
     732          198 :             return res;
     733        21782 :         if (rb >= nbytes)
     734              :         {
     735          300 :             mqh->mqh_length_word_complete = false;
     736          300 :             mqh->mqh_consume_pending += MAXALIGN(nbytes);
     737          300 :             *nbytesp = nbytes;
     738          300 :             *datap = rawdata;
     739          300 :             return SHM_MQ_SUCCESS;
     740              :         }
     741              : 
     742              :         /*
     743              :          * The message has wrapped the buffer.  We'll need to copy it in order
     744              :          * to return it to the client in one chunk.  First, make sure we have
     745              :          * a large enough buffer available.
     746              :          */
     747        21482 :         if (mqh->mqh_buflen < nbytes)
     748              :         {
     749              :             Size        newbuflen;
     750              : 
     751              :             /*
     752              :              * Increase size to the next power of 2 that's >= nbytes, but
     753              :              * limit to MaxAllocSize.
     754              :              */
     755          150 :             newbuflen = pg_nextpower2_size_t(nbytes);
     756          150 :             newbuflen = Min(newbuflen, MaxAllocSize);
     757              : 
     758          150 :             if (mqh->mqh_buffer != NULL)
     759              :             {
     760            0 :                 pfree(mqh->mqh_buffer);
     761            0 :                 mqh->mqh_buffer = NULL;
     762            0 :                 mqh->mqh_buflen = 0;
     763              :             }
     764          150 :             mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
     765          150 :             mqh->mqh_buflen = newbuflen;
     766              :         }
     767              :     }
     768              : 
     769              :     /* Loop until we've copied the entire message. */
     770              :     for (;;)
     771        93910 :     {
     772              :         Size        still_needed;
     773              : 
     774              :         /* Copy as much as we can. */
     775              :         Assert(mqh->mqh_partial_bytes + rb <= nbytes);
     776       119119 :         if (rb > 0)
     777              :         {
     778       115392 :             memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
     779       115392 :             mqh->mqh_partial_bytes += rb;
     780              :         }
     781              : 
     782              :         /*
     783              :          * Update count of bytes that can be consumed, accounting for
     784              :          * alignment padding.  Note that this will never actually insert any
     785              :          * padding except at the end of a message, because the buffer size is
     786              :          * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
     787              :          */
     788              :         Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
     789       119119 :         mqh->mqh_consume_pending += MAXALIGN(rb);
     790              : 
     791              :         /* If we got all the data, exit the loop. */
     792       119119 :         if (mqh->mqh_partial_bytes >= nbytes)
     793        21482 :             break;
     794              : 
     795              :         /* Wait for some more data. */
     796        97637 :         still_needed = nbytes - mqh->mqh_partial_bytes;
     797        97637 :         res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
     798        97637 :         if (res != SHM_MQ_SUCCESS)
     799         3727 :             return res;
     800        93910 :         if (rb > still_needed)
     801        20797 :             rb = still_needed;
     802              :     }
     803              : 
     804              :     /* Return the complete message, and reset for next message. */
     805        21482 :     *nbytesp = nbytes;
     806        21482 :     *datap = mqh->mqh_buffer;
     807        21482 :     mqh->mqh_length_word_complete = false;
     808        21482 :     mqh->mqh_partial_bytes = 0;
     809        21482 :     return SHM_MQ_SUCCESS;
     810              : }
     811              : 
     812              : /*
     813              :  * Wait for the other process that's supposed to use this queue to attach
     814              :  * to it.
     815              :  *
     816              :  * The return value is SHM_MQ_DETACHED if the worker has already detached or
     817              :  * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
     818              :  * Note that we will only be able to detect that the worker has died before
     819              :  * attaching if a background worker handle was passed to shm_mq_attach().
     820              :  */
     821              : shm_mq_result
     822            0 : shm_mq_wait_for_attach(shm_mq_handle *mqh)
     823              : {
     824            0 :     shm_mq     *mq = mqh->mqh_queue;
     825              :     PGPROC    **victim;
     826              : 
     827            0 :     if (shm_mq_get_receiver(mq) == MyProc)
     828            0 :         victim = &mq->mq_sender;
     829              :     else
     830              :     {
     831              :         Assert(shm_mq_get_sender(mq) == MyProc);
     832            0 :         victim = &mq->mq_receiver;
     833              :     }
     834              : 
     835            0 :     if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
     836            0 :         return SHM_MQ_SUCCESS;
     837              :     else
     838            0 :         return SHM_MQ_DETACHED;
     839              : }
     840              : 
     841              : /*
     842              :  * Detach from a shared message queue, and destroy the shm_mq_handle.
     843              :  */
     844              : void
     845         5671 : shm_mq_detach(shm_mq_handle *mqh)
     846              : {
     847              :     /* Before detaching, notify the receiver about any already-written data. */
     848         5671 :     if (mqh->mqh_send_pending > 0)
     849              :     {
     850         1014 :         shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
     851         1014 :         mqh->mqh_send_pending = 0;
     852              :     }
     853              : 
     854              :     /* Notify counterparty that we're outta here. */
     855         5671 :     shm_mq_detach_internal(mqh->mqh_queue);
     856              : 
     857              :     /* Cancel on_dsm_detach callback, if any. */
     858         5671 :     if (mqh->mqh_segment)
     859         5671 :         cancel_on_dsm_detach(mqh->mqh_segment,
     860              :                              shm_mq_detach_callback,
     861         5671 :                              PointerGetDatum(mqh->mqh_queue));
     862              : 
     863              :     /* Release local memory associated with handle. */
     864         5671 :     if (mqh->mqh_buffer != NULL)
     865          142 :         pfree(mqh->mqh_buffer);
     866         5671 :     pfree(mqh);
     867         5671 : }
     868              : 
     869              : /*
     870              :  * Notify counterparty that we're detaching from shared message queue.
     871              :  *
     872              :  * The purpose of this function is to make sure that the process
     873              :  * with which we're communicating doesn't block forever waiting for us to
     874              :  * fill or drain the queue once we've lost interest.  When the sender
     875              :  * detaches, the receiver can read any messages remaining in the queue;
     876              :  * further reads will return SHM_MQ_DETACHED.  If the receiver detaches,
     877              :  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
     878              :  *
     879              :  * This is separated out from shm_mq_detach() because if the on_dsm_detach
     880              :  * callback fires, we only want to do this much.  We do not try to touch
     881              :  * the local shm_mq_handle, as it may have been pfree'd already.
     882              :  */
     883              : static void
     884         7805 : shm_mq_detach_internal(shm_mq *mq)
     885              : {
     886              :     PGPROC     *victim;
     887              : 
     888         7805 :     SpinLockAcquire(&mq->mq_mutex);
     889         7805 :     if (mq->mq_sender == MyProc)
     890         3844 :         victim = mq->mq_receiver;
     891              :     else
     892              :     {
     893              :         Assert(mq->mq_receiver == MyProc);
     894         3961 :         victim = mq->mq_sender;
     895              :     }
     896         7805 :     mq->mq_detached = true;
     897         7805 :     SpinLockRelease(&mq->mq_mutex);
     898              : 
     899         7805 :     if (victim != NULL)
     900         7692 :         SetLatch(&victim->procLatch);
     901         7805 : }
     902              : 
     903              : /*
     904              :  * Get the shm_mq from handle.
     905              :  */
     906              : shm_mq *
     907      8110470 : shm_mq_get_queue(shm_mq_handle *mqh)
     908              : {
     909      8110470 :     return mqh->mqh_queue;
     910              : }
     911              : 
     912              : /*
     913              :  * Write bytes into a shared message queue.
     914              :  */
     915              : static shm_mq_result
     916      2309781 : shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
     917              :                   bool nowait, Size *bytes_written)
     918              : {
     919      2309781 :     shm_mq     *mq = mqh->mqh_queue;
     920      2309781 :     Size        sent = 0;
     921              :     uint64      used;
     922      2309781 :     Size        ringsize = mq->mq_ring_size;
     923              :     Size        available;
     924              : 
     925      4803184 :     while (sent < nbytes)
     926              :     {
     927              :         uint64      rb;
     928              :         uint64      wb;
     929              : 
     930              :         /* Compute number of ring buffer bytes used and available. */
     931      2497509 :         rb = pg_atomic_read_u64(&mq->mq_bytes_read);
     932      2497509 :         wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
     933              :         Assert(wb >= rb);
     934      2497509 :         used = wb - rb;
     935              :         Assert(used <= ringsize);
     936      2497509 :         available = Min(ringsize - used, nbytes - sent);
     937              : 
     938              :         /*
     939              :          * Bail out if the queue has been detached.  Note that we would be in
     940              :          * trouble if the compiler decided to cache the value of
     941              :          * mq->mq_detached in a register or on the stack across loop
     942              :          * iterations.  It probably shouldn't do that anyway since we'll
     943              :          * always return, call an external function that performs a system
     944              :          * call, or reach a memory barrier at some point later in the loop,
     945              :          * but just to be sure, insert a compiler barrier here.
     946              :          */
     947      2497509 :         pg_compiler_barrier();
     948      2497509 :         if (mq->mq_detached)
     949              :         {
     950            4 :             *bytes_written = sent;
     951            4 :             return SHM_MQ_DETACHED;
     952              :         }
     953              : 
     954      2497505 :         if (available == 0 && !mqh->mqh_counterparty_attached)
     955              :         {
     956              :             /*
     957              :              * The queue is full, so if the receiver isn't yet known to be
     958              :              * attached, we must wait for that to happen.
     959              :              */
     960           10 :             if (nowait)
     961              :             {
     962            5 :                 if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
     963              :                 {
     964            0 :                     *bytes_written = sent;
     965            0 :                     return SHM_MQ_DETACHED;
     966              :                 }
     967            5 :                 if (shm_mq_get_receiver(mq) == NULL)
     968              :                 {
     969            0 :                     *bytes_written = sent;
     970            0 :                     return SHM_MQ_WOULD_BLOCK;
     971              :                 }
     972              :             }
     973            5 :             else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
     974              :                                            mqh->mqh_handle))
     975              :             {
     976            0 :                 mq->mq_detached = true;
     977            0 :                 *bytes_written = sent;
     978            0 :                 return SHM_MQ_DETACHED;
     979              :             }
     980           10 :             mqh->mqh_counterparty_attached = true;
     981              : 
     982              :             /*
     983              :              * The receiver may have read some data after attaching, so we
     984              :              * must not wait without rechecking the queue state.
     985              :              */
     986              :         }
     987      2497495 :         else if (available == 0)
     988              :         {
     989              :             /* Update the pending send bytes in the shared memory. */
     990        97352 :             shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
     991              : 
     992              :             /*
     993              :              * Since mq->mqh_counterparty_attached is known to be true at this
     994              :              * point, mq_receiver has been set, and it can't change once set.
     995              :              * Therefore, we can read it without acquiring the spinlock.
     996              :              */
     997              :             Assert(mqh->mqh_counterparty_attached);
     998        97352 :             SetLatch(&mq->mq_receiver->procLatch);
     999              : 
    1000              :             /*
    1001              :              * We have just updated the mqh_send_pending bytes in the shared
    1002              :              * memory so reset it.
    1003              :              */
    1004        97352 :             mqh->mqh_send_pending = 0;
    1005              : 
    1006              :             /* Skip manipulation of our latch if nowait = true. */
    1007        97352 :             if (nowait)
    1008              :             {
    1009         4102 :                 *bytes_written = sent;
    1010         4102 :                 return SHM_MQ_WOULD_BLOCK;
    1011              :             }
    1012              : 
    1013              :             /*
    1014              :              * Wait for our latch to be set.  It might already be set for some
    1015              :              * unrelated reason, but that'll just result in one extra trip
    1016              :              * through the loop.  It's worth it to avoid resetting the latch
    1017              :              * at top of loop, because setting an already-set latch is much
    1018              :              * cheaper than setting one that has been reset.
    1019              :              */
    1020        93250 :             (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
    1021              :                              WAIT_EVENT_MESSAGE_QUEUE_SEND);
    1022              : 
    1023              :             /* Reset the latch so we don't spin. */
    1024        93250 :             ResetLatch(MyLatch);
    1025              : 
    1026              :             /* An interrupt may have occurred while we were waiting. */
    1027        93250 :             CHECK_FOR_INTERRUPTS();
    1028              :         }
    1029              :         else
    1030              :         {
    1031              :             Size        offset;
    1032              :             Size        sendnow;
    1033              : 
    1034      2400143 :             offset = wb % (uint64) ringsize;
    1035      2400143 :             sendnow = Min(available, ringsize - offset);
    1036              : 
    1037              :             /*
    1038              :              * Write as much data as we can via a single memcpy(). Make sure
    1039              :              * these writes happen after the read of mq_bytes_read, above.
    1040              :              * This barrier pairs with the one in shm_mq_inc_bytes_read.
    1041              :              * (Since we're separating the read of mq_bytes_read from a
    1042              :              * subsequent write to mq_ring, we need a full barrier here.)
    1043              :              */
    1044      2400143 :             pg_memory_barrier();
    1045      2400143 :             memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
    1046              :                    (const char *) data + sent, sendnow);
    1047      2400143 :             sent += sendnow;
    1048              : 
    1049              :             /*
    1050              :              * Update count of bytes written, with alignment padding.  Note
    1051              :              * that this will never actually insert any padding except at the
    1052              :              * end of a run of bytes, because the buffer size is a multiple of
    1053              :              * MAXIMUM_ALIGNOF, and each read is as well.
    1054              :              */
    1055              :             Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
    1056              : 
    1057              :             /*
    1058              :              * For efficiency, we don't update the bytes written in the shared
    1059              :              * memory and also don't set the reader's latch here.  Refer to
    1060              :              * the comments atop the shm_mq_handle structure for more
    1061              :              * information.
    1062              :              */
    1063      2400143 :             mqh->mqh_send_pending += MAXALIGN(sendnow);
    1064              :         }
    1065              :     }
    1066              : 
    1067      2305675 :     *bytes_written = sent;
    1068      2305675 :     return SHM_MQ_SUCCESS;
    1069              : }
    1070              : 
    1071              : /*
    1072              :  * Wait until at least *nbytesp bytes are available to be read from the
    1073              :  * shared message queue, or until the buffer wraps around.  If the queue is
    1074              :  * detached, returns SHM_MQ_DETACHED.  If nowait is specified and a wait
    1075              :  * would be required, returns SHM_MQ_WOULD_BLOCK.  Otherwise, *datap is set
    1076              :  * to the location at which data bytes can be read, *nbytesp is set to the
    1077              :  * number of bytes which can be read at that address, and the return value
    1078              :  * is SHM_MQ_SUCCESS.
    1079              :  */
    1080              : static shm_mq_result
    1081      1412353 : shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
    1082              :                      Size *nbytesp, void **datap)
    1083              : {
    1084      1412353 :     shm_mq     *mq = mqh->mqh_queue;
    1085      1412353 :     Size        ringsize = mq->mq_ring_size;
    1086              :     uint64      used;
    1087              :     uint64      written;
    1088              : 
    1089              :     for (;;)
    1090       128920 :     {
    1091              :         Size        offset;
    1092              :         uint64      read;
    1093              : 
    1094              :         /* Get bytes written, so we can compute what's available to read. */
    1095      1541273 :         written = pg_atomic_read_u64(&mq->mq_bytes_written);
    1096              : 
    1097              :         /*
    1098              :          * Get bytes read.  Include bytes we could consume but have not yet
    1099              :          * consumed.
    1100              :          */
    1101      1541273 :         read = pg_atomic_read_u64(&mq->mq_bytes_read) +
    1102      1541273 :             mqh->mqh_consume_pending;
    1103      1541273 :         used = written - read;
    1104              :         Assert(used <= ringsize);
    1105      1541273 :         offset = read % (uint64) ringsize;
    1106              : 
    1107              :         /* If we have enough data or buffer has wrapped, we're done. */
    1108      1541273 :         if (used >= bytes_needed || offset + used >= ringsize)
    1109              :         {
    1110      1265501 :             *nbytesp = Min(used, ringsize - offset);
    1111      1265501 :             *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
    1112              : 
    1113              :             /*
    1114              :              * Separate the read of mq_bytes_written, above, from caller's
    1115              :              * attempt to read the data itself.  Pairs with the barrier in
    1116              :              * shm_mq_inc_bytes_written.
    1117              :              */
    1118      1265501 :             pg_read_barrier();
    1119      1265501 :             return SHM_MQ_SUCCESS;
    1120              :         }
    1121              : 
    1122              :         /*
    1123              :          * Fall out before waiting if the queue has been detached.
    1124              :          *
    1125              :          * Note that we don't check for this until *after* considering whether
    1126              :          * the data already available is enough, since the receiver can finish
    1127              :          * receiving a message stored in the buffer even after the sender has
    1128              :          * detached.
    1129              :          */
    1130       275772 :         if (mq->mq_detached)
    1131              :         {
    1132              :             /*
    1133              :              * If the writer advanced mq_bytes_written and then set
    1134              :              * mq_detached, we might not have read the final value of
    1135              :              * mq_bytes_written above.  Insert a read barrier and then check
    1136              :              * again if mq_bytes_written has advanced.
    1137              :              */
    1138         1804 :             pg_read_barrier();
    1139         1804 :             if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
    1140            0 :                 continue;
    1141              : 
    1142         1804 :             return SHM_MQ_DETACHED;
    1143              :         }
    1144              : 
    1145              :         /*
    1146              :          * We didn't get enough data to satisfy the request, so mark any data
    1147              :          * previously-consumed as read to make more buffer space.
    1148              :          */
    1149       273968 :         if (mqh->mqh_consume_pending > 0)
    1150              :         {
    1151       109908 :             shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
    1152       109908 :             mqh->mqh_consume_pending = 0;
    1153              :         }
    1154              : 
    1155              :         /* Skip manipulation of our latch if nowait = true. */
    1156       273968 :         if (nowait)
    1157       145048 :             return SHM_MQ_WOULD_BLOCK;
    1158              : 
    1159              :         /*
    1160              :          * Wait for our latch to be set.  It might already be set for some
    1161              :          * unrelated reason, but that'll just result in one extra trip through
    1162              :          * the loop.  It's worth it to avoid resetting the latch at top of
    1163              :          * loop, because setting an already-set latch is much cheaper than
    1164              :          * setting one that has been reset.
    1165              :          */
    1166       128920 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
    1167              :                          WAIT_EVENT_MESSAGE_QUEUE_RECEIVE);
    1168              : 
    1169              :         /* Reset the latch so we don't spin. */
    1170       128920 :         ResetLatch(MyLatch);
    1171              : 
    1172              :         /* An interrupt may have occurred while we were waiting. */
    1173       128920 :         CHECK_FOR_INTERRUPTS();
    1174              :     }
    1175              : }
    1176              : 
    1177              : /*
    1178              :  * Test whether a counterparty who may not even be alive yet is definitely gone.
    1179              :  */
    1180              : static bool
    1181      3247602 : shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
    1182              : {
    1183              :     pid_t       pid;
    1184              : 
    1185              :     /* If the queue has been detached, counterparty is definitely gone. */
    1186      3247602 :     if (mq->mq_detached)
    1187         1232 :         return true;
    1188              : 
    1189              :     /* If there's a handle, check worker status. */
    1190      3246370 :     if (handle != NULL)
    1191              :     {
    1192              :         BgwHandleStatus status;
    1193              : 
    1194              :         /* Check for unexpected worker death. */
    1195      3246347 :         status = GetBackgroundWorkerPid(handle, &pid);
    1196      3246347 :         if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
    1197              :         {
    1198              :             /* Mark it detached, just to make it official. */
    1199            0 :             mq->mq_detached = true;
    1200            0 :             return true;
    1201              :         }
    1202              :     }
    1203              : 
    1204              :     /* Counterparty is not definitively gone. */
    1205      3246370 :     return false;
    1206              : }
    1207              : 
    1208              : /*
    1209              :  * This is used when a process is waiting for its counterpart to attach to the
    1210              :  * queue.  We exit when the other process attaches as expected, or, if
    1211              :  * handle != NULL, when the referenced background process or the postmaster
    1212              :  * dies.  Note that if handle == NULL, and the process fails to attach, we'll
    1213              :  * potentially get stuck here forever waiting for a process that may never
    1214              :  * start.  We do check for interrupts, though.
    1215              :  *
    1216              :  * ptr is a pointer to the memory address that we're expecting to become
    1217              :  * non-NULL when our counterpart attaches to the queue.
    1218              :  */
    1219              : static bool
    1220          269 : shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
    1221              : {
    1222          269 :     bool        result = false;
    1223              : 
    1224              :     for (;;)
    1225          797 :     {
    1226              :         BgwHandleStatus status;
    1227              :         pid_t       pid;
    1228              : 
    1229              :         /* Acquire the lock just long enough to check the pointer. */
    1230         1066 :         SpinLockAcquire(&mq->mq_mutex);
    1231         1066 :         result = (*ptr != NULL);
    1232         1066 :         SpinLockRelease(&mq->mq_mutex);
    1233              : 
    1234              :         /* Fail if detached; else succeed if initialized. */
    1235         1066 :         if (mq->mq_detached)
    1236              :         {
    1237          114 :             result = false;
    1238          114 :             break;
    1239              :         }
    1240          952 :         if (result)
    1241          155 :             break;
    1242              : 
    1243          797 :         if (handle != NULL)
    1244              :         {
    1245              :             /* Check for unexpected worker death. */
    1246          795 :             status = GetBackgroundWorkerPid(handle, &pid);
    1247          795 :             if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
    1248              :             {
    1249            0 :                 result = false;
    1250            0 :                 break;
    1251              :             }
    1252              :         }
    1253              : 
    1254              :         /* Wait to be signaled. */
    1255          797 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
    1256              :                          WAIT_EVENT_MESSAGE_QUEUE_INTERNAL);
    1257              : 
    1258              :         /* Reset the latch so we don't spin. */
    1259          797 :         ResetLatch(MyLatch);
    1260              : 
    1261              :         /* An interrupt may have occurred while we were waiting. */
    1262          797 :         CHECK_FOR_INTERRUPTS();
    1263              :     }
    1264              : 
    1265          269 :     return result;
    1266              : }
    1267              : 
    1268              : /*
    1269              :  * Increment the number of bytes read.
    1270              :  */
    1271              : static void
    1272       132629 : shm_mq_inc_bytes_read(shm_mq *mq, Size n)
    1273              : {
    1274              :     PGPROC     *sender;
    1275              : 
    1276              :     /*
    1277              :      * Separate prior reads of mq_ring from the increment of mq_bytes_read
    1278              :      * which follows.  This pairs with the full barrier in
    1279              :      * shm_mq_send_bytes(). We only need a read barrier here because the
    1280              :      * increment of mq_bytes_read is actually a read followed by a dependent
    1281              :      * write.
    1282              :      */
    1283       132629 :     pg_read_barrier();
    1284              : 
    1285              :     /*
    1286              :      * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
    1287              :      * else can be changing this value.  This method should be cheaper.
    1288              :      */
    1289       132629 :     pg_atomic_write_u64(&mq->mq_bytes_read,
    1290       132629 :                         pg_atomic_read_u64(&mq->mq_bytes_read) + n);
    1291              : 
    1292              :     /*
    1293              :      * We shouldn't have any bytes to read without a sender, so we can read
    1294              :      * mq_sender here without a lock.  Once it's initialized, it can't change.
    1295              :      */
    1296       132629 :     sender = mq->mq_sender;
    1297              :     Assert(sender != NULL);
    1298       132629 :     SetLatch(&sender->procLatch);
    1299       132629 : }
    1300              : 
    1301              : /*
    1302              :  * Increment the number of bytes written.
    1303              :  */
    1304              : static void
    1305       219902 : shm_mq_inc_bytes_written(shm_mq *mq, Size n)
    1306              : {
    1307              :     /*
    1308              :      * Separate prior reads of mq_ring from the write of mq_bytes_written
    1309              :      * which we're about to do.  Pairs with the read barrier found in
    1310              :      * shm_mq_receive_bytes.
    1311              :      */
    1312       219902 :     pg_write_barrier();
    1313              : 
    1314              :     /*
    1315              :      * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
    1316              :      * else can be changing this value.  This method avoids taking the bus
    1317              :      * lock unnecessarily.
    1318              :      */
    1319       219902 :     pg_atomic_write_u64(&mq->mq_bytes_written,
    1320       219902 :                         pg_atomic_read_u64(&mq->mq_bytes_written) + n);
    1321       219902 : }
    1322              : 
    1323              : /* Shim for on_dsm_detach callback. */
    1324              : static void
    1325         2134 : shm_mq_detach_callback(dsm_segment *seg, Datum arg)
    1326              : {
    1327         2134 :     shm_mq     *mq = (shm_mq *) DatumGetPointer(arg);
    1328              : 
    1329         2134 :     shm_mq_detach_internal(mq);
    1330         2134 : }
        

Generated by: LCOV version 2.0-1