LCOV - code coverage report
Current view: top level - src/backend/storage/ipc - shm_mq.c (source / functions) Hit Total Coverage
Test: PostgreSQL 12beta2 Lines: 309 367 84.2 %
Date: 2019-06-18 07:06:57 Functions: 20 21 95.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * shm_mq.c
       4             :  *    single-reader, single-writer shared memory message queue
       5             :  *
       6             :  * Both the sender and the receiver must have a PGPROC; their respective
       7             :  * process latches are used for synchronization.  Only the sender may send,
       8             :  * and only the receiver may receive.  This is intended to allow a user
       9             :  * backend to communicate with worker backends that it has registered.
      10             :  *
      11             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
      12             :  * Portions Copyright (c) 1994, Regents of the University of California
      13             :  *
      14             :  * src/backend/storage/ipc/shm_mq.c
      15             :  *
      16             :  *-------------------------------------------------------------------------
      17             :  */
      18             : 
      19             : #include "postgres.h"
      20             : 
      21             : #include "miscadmin.h"
      22             : #include "pgstat.h"
      23             : #include "postmaster/bgworker.h"
      24             : #include "storage/procsignal.h"
      25             : #include "storage/shm_mq.h"
      26             : #include "storage/spin.h"
      27             : 
      28             : /*
      29             :  * This structure represents the actual queue, stored in shared memory.
      30             :  *
      31             :  * Some notes on synchronization:
      32             :  *
      33             :  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
      34             :  * mq_sender and mq_bytes_written can only be changed by the sender.
      35             :  * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
      36             :  * they cannot change once set, and thus may be read without a lock once this
      37             :  * is known to be the case.
      38             :  *
      39             :  * mq_bytes_read and mq_bytes_written are not protected by the mutex.  Instead,
      40             :  * they are written atomically using 8 byte loads and stores.  Memory barriers
      41             :  * must be carefully used to synchronize reads and writes of these values with
      42             :  * reads and writes of the actual data in mq_ring.
      43             :  *
      44             :  * mq_detached needs no locking.  It can be set by either the sender or the
      45             :  * receiver, but only ever from false to true, so redundant writes don't
      46             :  * matter.  It is important that if we set mq_detached and then set the
      47             :  * counterparty's latch, the counterparty must be certain to see the change
      48             :  * after waking up.  Since SetLatch begins with a memory barrier and ResetLatch
      49             :  * ends with one, this should be OK.
      50             :  *
      51             :  * mq_ring_size and mq_ring_offset never change after initialization, and
      52             :  * can therefore be read without the lock.
      53             :  *
      54             :  * Importantly, mq_ring can be safely read and written without a lock.
      55             :  * At any given time, the difference between mq_bytes_read and
      56             :  * mq_bytes_written defines the number of bytes within mq_ring that contain
      57             :  * unread data, and mq_bytes_read defines the position where those bytes
      58             :  * begin.  The sender can increase the number of unread bytes at any time,
      59             :  * but only the receiver can give license to overwrite those bytes, by
      60             :  * incrementing mq_bytes_read.  Therefore, it's safe for the receiver to read
      61             :  * the unread bytes it knows to be present without the lock.  Conversely,
      62             :  * the sender can write to the unused portion of the ring buffer without
      63             :  * the lock, because nobody else can be reading or writing those bytes.  The
      64             :  * receiver could be making more bytes unused by incrementing mq_bytes_read,
      65             :  * but that's OK.  Note that it would be unsafe for the receiver to read any
      66             :  * data it's already marked as read, or to write any data; and it would be
      67             :  * unsafe for the sender to reread any data after incrementing
      68             :  * mq_bytes_written, but fortunately there's no need for any of that.
      69             :  */
      70             : struct shm_mq
      71             : {
      72             :     slock_t     mq_mutex;
      73             :     PGPROC     *mq_receiver;
      74             :     PGPROC     *mq_sender;
      75             :     pg_atomic_uint64 mq_bytes_read;
      76             :     pg_atomic_uint64 mq_bytes_written;
      77             :     Size        mq_ring_size;
      78             :     bool        mq_detached;
      79             :     uint8       mq_ring_offset;
      80             :     char        mq_ring[FLEXIBLE_ARRAY_MEMBER];
      81             : };
      82             : 
      83             : /*
      84             :  * This structure is a backend-private handle for access to a queue.
      85             :  *
      86             :  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
      87             :  * an optional pointer to the dynamic shared memory segment that contains it.
      88             :  * (If mqh_segment is provided, we register an on_dsm_detach callback to
      89             :  * make sure we detach from the queue before detaching from DSM.)
      90             :  *
      91             :  * If this queue is intended to connect the current process with a background
      92             :  * worker that started it, the user can pass a pointer to the worker handle
      93             :  * to shm_mq_attach(), and we'll store it in mqh_handle.  The point of this
      94             :  * is to allow us to begin sending to or receiving from that queue before the
      95             :  * process we'll be communicating with has even been started.  If it fails
      96             :  * to start, the handle will allow us to notice that and fail cleanly, rather
      97             :  * than waiting forever; see shm_mq_wait_internal.  This is mostly useful in
      98             :  * simple cases - e.g. where there are just 2 processes communicating; in
      99             :  * more complex scenarios, every process may not have a BackgroundWorkerHandle
     100             :  * available, or may need to watch for the failure of more than one other
     101             :  * process at a time.
     102             :  *
     103             :  * When a message exists as a contiguous chunk of bytes in the queue - that is,
     104             :  * it is smaller than the size of the ring buffer and does not wrap around
     105             :  * the end - we return the message to the caller as a pointer into the buffer.
     106             :  * For messages that are larger or happen to wrap, we reassemble the message
     107             :  * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
     108             :  * the buffer, and mqh_buflen is the number of bytes allocated for it.
     109             :  *
     110             :  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
     111             :  * are used to track the state of non-blocking operations.  When the caller
     112             :  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
     113             :  * are expected to retry the call at a later time with the same argument;
     114             :  * we need to retain enough state to pick up where we left off.
     115             :  * mqh_length_word_complete tracks whether we are done sending or receiving
     116             :  * (whichever we're doing) the entire length word.  mqh_partial_bytes tracks
     117             :  * the number of bytes read or written for either the length word or the
     118             :  * message itself, and mqh_expected_bytes - which is used only for reads -
     119             :  * tracks the expected total size of the payload.
     120             :  *
     121             :  * mqh_counterparty_attached tracks whether we know the counterparty to have
     122             :  * attached to the queue at some previous point.  This lets us avoid some
     123             :  * mutex acquisitions.
     124             :  *
     125             :  * mqh_context is the memory context in effect at the time we attached to
     126             :  * the shm_mq.  The shm_mq_handle itself is allocated in this context, and
     127             :  * we make sure any other allocations we do happen in this context as well,
     128             :  * to avoid nasty surprises.
     129             :  */
     130             : struct shm_mq_handle
     131             : {
     132             :     shm_mq     *mqh_queue;
     133             :     dsm_segment *mqh_segment;
     134             :     BackgroundWorkerHandle *mqh_handle;
     135             :     char       *mqh_buffer;
     136             :     Size        mqh_buflen;
     137             :     Size        mqh_consume_pending;
     138             :     Size        mqh_partial_bytes;
     139             :     Size        mqh_expected_bytes;
     140             :     bool        mqh_length_word_complete;
     141             :     bool        mqh_counterparty_attached;
     142             :     MemoryContext mqh_context;
     143             : };
     144             : 
     145             : static void shm_mq_detach_internal(shm_mq *mq);
     146             : static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
     147             :                                        const void *data, bool nowait, Size *bytes_written);
     148             : static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
     149             :                                           Size bytes_needed, bool nowait, Size *nbytesp,
     150             :                                           void **datap);
     151             : static bool shm_mq_counterparty_gone(shm_mq *mq,
     152             :                                      BackgroundWorkerHandle *handle);
     153             : static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
     154             :                                  BackgroundWorkerHandle *handle);
     155             : static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
     156             : static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
     157             : static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
     158             : 
     159             : /* Minimum queue size is enough for header and at least one chunk of data. */
     160             : const Size  shm_mq_minimum_size =
     161             : MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
     162             : 
     163             : #define MQH_INITIAL_BUFSIZE             8192
     164             : 
     165             : /*
     166             :  * Initialize a new shared message queue.
     167             :  */
     168             : shm_mq *
     169        3320 : shm_mq_create(void *address, Size size)
     170             : {
     171        3320 :     shm_mq     *mq = address;
     172        3320 :     Size        data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
     173             : 
     174             :     /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
     175        3320 :     size = MAXALIGN_DOWN(size);
     176             : 
     177             :     /* Queue size must be large enough to hold some data. */
     178             :     Assert(size > data_offset);
     179             : 
     180             :     /* Initialize queue header. */
     181        3320 :     SpinLockInit(&mq->mq_mutex);
     182        3320 :     mq->mq_receiver = NULL;
     183        3320 :     mq->mq_sender = NULL;
     184        3320 :     pg_atomic_init_u64(&mq->mq_bytes_read, 0);
     185        3320 :     pg_atomic_init_u64(&mq->mq_bytes_written, 0);
     186        3320 :     mq->mq_ring_size = size - data_offset;
     187        3320 :     mq->mq_detached = false;
     188        3320 :     mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
     189             : 
     190        3320 :     return mq;
     191             : }
     192             : 
     193             : /*
     194             :  * Set the identity of the process that will receive from a shared message
     195             :  * queue.
     196             :  */
     197             : void
     198        3320 : shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
     199             : {
     200             :     PGPROC     *sender;
     201             : 
     202        3320 :     SpinLockAcquire(&mq->mq_mutex);
     203             :     Assert(mq->mq_receiver == NULL);
     204        3320 :     mq->mq_receiver = proc;
     205        3320 :     sender = mq->mq_sender;
     206        3320 :     SpinLockRelease(&mq->mq_mutex);
     207             : 
     208        3320 :     if (sender != NULL)
     209          14 :         SetLatch(&sender->procLatch);
     210        3320 : }
     211             : 
     212             : /*
     213             :  * Set the identity of the process that will send to a shared message queue.
     214             :  */
     215             : void
     216        3224 : shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
     217             : {
     218             :     PGPROC     *receiver;
     219             : 
     220        3224 :     SpinLockAcquire(&mq->mq_mutex);
     221             :     Assert(mq->mq_sender == NULL);
     222        3224 :     mq->mq_sender = proc;
     223        3224 :     receiver = mq->mq_receiver;
     224        3224 :     SpinLockRelease(&mq->mq_mutex);
     225             : 
     226        3224 :     if (receiver != NULL)
     227        3210 :         SetLatch(&receiver->procLatch);
     228        3224 : }
     229             : 
     230             : /*
     231             :  * Get the configured receiver.
     232             :  */
     233             : PGPROC *
     234           2 : shm_mq_get_receiver(shm_mq *mq)
     235             : {
     236             :     PGPROC     *receiver;
     237             : 
     238           2 :     SpinLockAcquire(&mq->mq_mutex);
     239           2 :     receiver = mq->mq_receiver;
     240           2 :     SpinLockRelease(&mq->mq_mutex);
     241             : 
     242           2 :     return receiver;
     243             : }
     244             : 
     245             : /*
     246             :  * Get the configured sender.
     247             :  */
     248             : PGPROC *
     249     9312790 : shm_mq_get_sender(shm_mq *mq)
     250             : {
     251             :     PGPROC     *sender;
     252             : 
     253     9312790 :     SpinLockAcquire(&mq->mq_mutex);
     254     9312790 :     sender = mq->mq_sender;
     255     9312790 :     SpinLockRelease(&mq->mq_mutex);
     256             : 
     257     9312790 :     return sender;
     258             : }
     259             : 
     260             : /*
     261             :  * Attach to a shared message queue so we can send or receive messages.
     262             :  *
     263             :  * The memory context in effect at the time this function is called should
     264             :  * be one which will last for at least as long as the message queue itself.
     265             :  * We'll allocate the handle in that context, and future allocations that
     266             :  * are needed to buffer incoming data will happen in that context as well.
     267             :  *
     268             :  * If seg != NULL, the queue will be automatically detached when that dynamic
     269             :  * shared memory segment is detached.
     270             :  *
     271             :  * If handle != NULL, the queue can be read or written even before the
     272             :  * other process has attached.  We'll wait for it to do so if needed.  The
     273             :  * handle must be for a background worker initialized with bgw_notify_pid
     274             :  * equal to our PID.
     275             :  *
     276             :  * shm_mq_detach() should be called when done.  This will free the
     277             :  * shm_mq_handle and mark the queue itself as detached, so that our
     278             :  * counterpart won't get stuck waiting for us to fill or drain the queue
     279             :  * after we've already lost interest.
     280             :  */
     281             : shm_mq_handle *
     282        6544 : shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
     283             : {
     284        6544 :     shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
     285             : 
     286             :     Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
     287        6544 :     mqh->mqh_queue = mq;
     288        6544 :     mqh->mqh_segment = seg;
     289        6544 :     mqh->mqh_handle = handle;
     290        6544 :     mqh->mqh_buffer = NULL;
     291        6544 :     mqh->mqh_buflen = 0;
     292        6544 :     mqh->mqh_consume_pending = 0;
     293        6544 :     mqh->mqh_partial_bytes = 0;
     294        6544 :     mqh->mqh_expected_bytes = 0;
     295        6544 :     mqh->mqh_length_word_complete = false;
     296        6544 :     mqh->mqh_counterparty_attached = false;
     297        6544 :     mqh->mqh_context = CurrentMemoryContext;
     298             : 
     299        6544 :     if (seg != NULL)
     300        6544 :         on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
     301             : 
     302        6544 :     return mqh;
     303             : }
     304             : 
     305             : /*
     306             :  * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
     307             :  * been passed to shm_mq_attach.
     308             :  */
     309             : void
     310        3200 : shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
     311             : {
     312             :     Assert(mqh->mqh_handle == NULL);
     313        3200 :     mqh->mqh_handle = handle;
     314        3200 : }
     315             : 
     316             : /*
     317             :  * Write a message into a shared message queue.
     318             :  */
     319             : shm_mq_result
     320     1068308 : shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
     321             : {
     322             :     shm_mq_iovec iov;
     323             : 
     324     1068308 :     iov.data = data;
     325     1068308 :     iov.len = nbytes;
     326             : 
     327     1068308 :     return shm_mq_sendv(mqh, &iov, 1, nowait);
     328             : }
     329             : 
     330             : /*
     331             :  * Write a message into a shared message queue, gathered from multiple
     332             :  * addresses.
     333             :  *
     334             :  * When nowait = false, we'll wait on our process latch when the ring buffer
     335             :  * fills up, and then continue writing once the receiver has drained some data.
     336             :  * The process latch is reset after each wait.
     337             :  *
     338             :  * When nowait = true, we do not manipulate the state of the process latch;
     339             :  * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK.  In
     340             :  * this case, the caller should call this function again, with the same
     341             :  * arguments, each time the process latch is set.  (Once begun, the sending
     342             :  * of a message cannot be aborted except by detaching from the queue; changing
     343             :  * the length or payload will corrupt the queue.)
     344             :  */
     345             : shm_mq_result
     346     1071592 : shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
     347             : {
     348             :     shm_mq_result res;
     349     1071592 :     shm_mq     *mq = mqh->mqh_queue;
     350             :     PGPROC     *receiver;
     351     1071592 :     Size        nbytes = 0;
     352             :     Size        bytes_written;
     353             :     int         i;
     354     1071592 :     int         which_iov = 0;
     355             :     Size        offset;
     356             : 
     357             :     Assert(mq->mq_sender == MyProc);
     358             : 
     359             :     /* Compute total size of write. */
     360     2146468 :     for (i = 0; i < iovcnt; ++i)
     361     1074876 :         nbytes += iov[i].len;
     362             : 
     363             :     /* Try to write, or finish writing, the length word into the buffer. */
     364     3204768 :     while (!mqh->mqh_length_word_complete)
     365             :     {
     366             :         Assert(mqh->mqh_partial_bytes < sizeof(Size));
     367     2123168 :         res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
     368     1061584 :                                 ((char *) &nbytes) + mqh->mqh_partial_bytes,
     369             :                                 nowait, &bytes_written);
     370             : 
     371     1061584 :         if (res == SHM_MQ_DETACHED)
     372             :         {
     373             :             /* Reset state in case caller tries to send another message. */
     374           0 :             mqh->mqh_partial_bytes = 0;
     375           0 :             mqh->mqh_length_word_complete = false;
     376           0 :             return res;
     377             :         }
     378     1061584 :         mqh->mqh_partial_bytes += bytes_written;
     379             : 
     380     1061584 :         if (mqh->mqh_partial_bytes >= sizeof(Size))
     381             :         {
     382             :             Assert(mqh->mqh_partial_bytes == sizeof(Size));
     383             : 
     384     1061584 :             mqh->mqh_partial_bytes = 0;
     385     1061584 :             mqh->mqh_length_word_complete = true;
     386             :         }
     387             : 
     388     1061584 :         if (res != SHM_MQ_SUCCESS)
     389           0 :             return res;
     390             : 
     391             :         /* Length word can't be split unless bigger than required alignment. */
     392             :         Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
     393             :     }
     394             : 
     395             :     /* Write the actual data bytes into the buffer. */
     396             :     Assert(mqh->mqh_partial_bytes <= nbytes);
     397     1071592 :     offset = mqh->mqh_partial_bytes;
     398             :     do
     399             :     {
     400             :         Size        chunksize;
     401             : 
     402             :         /* Figure out which bytes need to be sent next. */
     403     1073238 :         if (offset >= iov[which_iov].len)
     404             :         {
     405        8000 :             offset -= iov[which_iov].len;
     406        8000 :             ++which_iov;
     407        8000 :             if (which_iov >= iovcnt)
     408        8000 :                 break;
     409           0 :             continue;
     410             :         }
     411             : 
     412             :         /*
     413             :          * We want to avoid copying the data if at all possible, but every
     414             :          * chunk of bytes we write into the queue has to be MAXALIGN'd, except
     415             :          * the last.  Thus, if a chunk other than the last one ends on a
     416             :          * non-MAXALIGN'd boundary, we have to combine the tail end of its
     417             :          * data with data from one or more following chunks until we either
     418             :          * reach the last chunk or accumulate a number of bytes which is
     419             :          * MAXALIGN'd.
     420             :          */
     421     1068522 :         if (which_iov + 1 < iovcnt &&
     422        3284 :             offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
     423             :         {
     424             :             char        tmpbuf[MAXIMUM_ALIGNOF];
     425        3284 :             int         j = 0;
     426             : 
     427             :             for (;;)
     428             :             {
     429       36172 :                 if (offset < iov[which_iov].len)
     430             :                 {
     431       14806 :                     tmpbuf[j] = iov[which_iov].data[offset];
     432       14806 :                     j++;
     433       14806 :                     offset++;
     434       14806 :                     if (j == MAXIMUM_ALIGNOF)
     435        1646 :                         break;
     436             :                 }
     437             :                 else
     438             :                 {
     439        4922 :                     offset -= iov[which_iov].len;
     440        4922 :                     which_iov++;
     441        4922 :                     if (which_iov >= iovcnt)
     442        1638 :                         break;
     443             :                 }
     444             :             }
     445             : 
     446        3284 :             res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
     447             : 
     448        3284 :             if (res == SHM_MQ_DETACHED)
     449             :             {
     450             :                 /* Reset state in case caller tries to send another message. */
     451           0 :                 mqh->mqh_partial_bytes = 0;
     452           0 :                 mqh->mqh_length_word_complete = false;
     453           0 :                 return res;
     454             :             }
     455             : 
     456        3284 :             mqh->mqh_partial_bytes += bytes_written;
     457        3284 :             if (res != SHM_MQ_SUCCESS)
     458           0 :                 return res;
     459        3284 :             continue;
     460             :         }
     461             : 
     462             :         /*
     463             :          * If this is the last chunk, we can write all the data, even if it
     464             :          * isn't a multiple of MAXIMUM_ALIGNOF.  Otherwise, we need to
     465             :          * MAXALIGN_DOWN the write size.
     466             :          */
     467     1061954 :         chunksize = iov[which_iov].len - offset;
     468     1061954 :         if (which_iov + 1 < iovcnt)
     469           0 :             chunksize = MAXALIGN_DOWN(chunksize);
     470     1061954 :         res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
     471             :                                 nowait, &bytes_written);
     472             : 
     473     1061954 :         if (res == SHM_MQ_DETACHED)
     474             :         {
     475             :             /* Reset state in case caller tries to send another message. */
     476           0 :             mqh->mqh_length_word_complete = false;
     477           0 :             mqh->mqh_partial_bytes = 0;
     478           0 :             return res;
     479             :         }
     480             : 
     481     1061954 :         mqh->mqh_partial_bytes += bytes_written;
     482     1061954 :         offset += bytes_written;
     483     1061954 :         if (res != SHM_MQ_SUCCESS)
     484       10008 :             return res;
     485     1055230 :     } while (mqh->mqh_partial_bytes < nbytes);
     486             : 
     487             :     /* Reset for next message. */
     488     1061584 :     mqh->mqh_partial_bytes = 0;
     489     1061584 :     mqh->mqh_length_word_complete = false;
     490             : 
     491             :     /* If queue has been detached, let caller know. */
     492     1061584 :     if (mq->mq_detached)
     493           0 :         return SHM_MQ_DETACHED;
     494             : 
     495             :     /*
     496             :      * If the counterparty is known to have attached, we can read mq_receiver
     497             :      * without acquiring the spinlock and assume it isn't NULL.  Otherwise,
     498             :      * more caution is needed.
     499             :      */
     500     1061584 :     if (mqh->mqh_counterparty_attached)
     501     1059154 :         receiver = mq->mq_receiver;
     502             :     else
     503             :     {
     504        2430 :         SpinLockAcquire(&mq->mq_mutex);
     505        2430 :         receiver = mq->mq_receiver;
     506        2430 :         SpinLockRelease(&mq->mq_mutex);
     507        2430 :         if (receiver == NULL)
     508           0 :             return SHM_MQ_SUCCESS;
     509        2430 :         mqh->mqh_counterparty_attached = true;
     510             :     }
     511             : 
     512             :     /* Notify receiver of the newly-written data, and return. */
     513     1061584 :     SetLatch(&receiver->procLatch);
     514     1061584 :     return SHM_MQ_SUCCESS;
     515             : }
     516             : 
     517             : /*
     518             :  * Receive a message from a shared message queue.
     519             :  *
     520             :  * We set *nbytes to the message length and *data to point to the message
     521             :  * payload.  If the entire message exists in the queue as a single,
     522             :  * contiguous chunk, *data will point directly into shared memory; otherwise,
     523             :  * it will point to a temporary buffer.  This mostly avoids data copying in
     524             :  * the hoped-for case where messages are short compared to the buffer size,
     525             :  * while still allowing longer messages.  In either case, the return value
     526             :  * remains valid until the next receive operation is performed on the queue.
     527             :  *
     528             :  * When nowait = false, we'll wait on our process latch when the ring buffer
     529             :  * is empty and we have not yet received a full message.  The sender will
     530             :  * set our process latch after more data has been written, and we'll resume
     531             :  * processing.  Each call will therefore return a complete message
     532             :  * (unless the sender detaches the queue).
     533             :  *
     534             :  * When nowait = true, we do not manipulate the state of the process latch;
     535             :  * instead, whenever the buffer is empty and we need to read from it, we
     536             :  * return SHM_MQ_WOULD_BLOCK.  In this case, the caller should call this
     537             :  * function again after the process latch has been set.
     538             :  */
     539             : shm_mq_result
     540     4746884 : shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
     541             : {
     542     4746884 :     shm_mq     *mq = mqh->mqh_queue;
     543             :     shm_mq_result res;
     544     4746884 :     Size        rb = 0;
     545             :     Size        nbytes;
     546             :     void       *rawdata;
     547             : 
     548             :     Assert(mq->mq_receiver == MyProc);
     549             : 
     550             :     /* We can't receive data until the sender has attached. */
     551     4746884 :     if (!mqh->mqh_counterparty_attached)
     552             :     {
     553     3444764 :         if (nowait)
     554             :         {
     555             :             int         counterparty_gone;
     556             : 
     557             :             /*
     558             :              * We shouldn't return at this point at all unless the sender
     559             :              * hasn't attached yet.  However, the correct return value depends
     560             :              * on whether the sender is still attached.  If we first test
     561             :              * whether the sender has ever attached and then test whether the
     562             :              * sender has detached, there's a race condition: a sender that
     563             :              * attaches and detaches very quickly might fool us into thinking
     564             :              * the sender never attached at all.  So, test whether our
     565             :              * counterparty is definitively gone first, and only afterwards
     566             :              * check whether the sender ever attached in the first place.
     567             :              */
     568     3444514 :             counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
     569     3444514 :             if (shm_mq_get_sender(mq) == NULL)
     570             :             {
     571     3441564 :                 if (counterparty_gone)
     572           0 :                     return SHM_MQ_DETACHED;
     573             :                 else
     574     3441564 :                     return SHM_MQ_WOULD_BLOCK;
     575             :             }
     576             :         }
     577         250 :         else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
     578         152 :                  && shm_mq_get_sender(mq) == NULL)
     579             :         {
     580           0 :             mq->mq_detached = true;
     581           0 :             return SHM_MQ_DETACHED;
     582             :         }
     583        3200 :         mqh->mqh_counterparty_attached = true;
     584             :     }
     585             : 
     586             :     /*
     587             :      * If we've consumed an amount of data greater than 1/4th of the ring
     588             :      * size, mark it consumed in shared memory.  We try to avoid doing this
     589             :      * unnecessarily when only a small amount of data has been consumed,
     590             :      * because SetLatch() is fairly expensive and we don't want to do it too
     591             :      * often.
     592             :      */
     593     1305320 :     if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
     594             :     {
     595       34810 :         shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
     596       34810 :         mqh->mqh_consume_pending = 0;
     597             :     }
     598             : 
     599             :     /* Try to read, or finish reading, the length word from the buffer. */
     600     2655188 :     while (!mqh->mqh_length_word_complete)
     601             :     {
     602             :         /* Try to receive the message length word. */
     603             :         Assert(mqh->mqh_partial_bytes < sizeof(Size));
     604     1296886 :         res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
     605             :                                    nowait, &rb, &rawdata);
     606     1296886 :         if (res != SHM_MQ_SUCCESS)
     607      235302 :             return res;
     608             : 
     609             :         /*
     610             :          * Hopefully, we'll receive the entire message length word at once.
     611             :          * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
     612             :          * multiple reads.
     613             :          */
     614     1061584 :         if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
     615       44548 :         {
     616             :             Size        needed;
     617             : 
     618     1061584 :             nbytes = *(Size *) rawdata;
     619             : 
     620             :             /* If we've already got the whole message, we're done. */
     621     1061584 :             needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
     622     1061584 :             if (rb >= needed)
     623             :             {
     624     1017036 :                 mqh->mqh_consume_pending += needed;
     625     1017036 :                 *nbytesp = nbytes;
     626     1017036 :                 *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
     627     1017036 :                 return SHM_MQ_SUCCESS;
     628             :             }
     629             : 
     630             :             /*
     631             :              * We don't have the whole message, but we at least have the whole
     632             :              * length word.
     633             :              */
     634       44548 :             mqh->mqh_expected_bytes = nbytes;
     635       44548 :             mqh->mqh_length_word_complete = true;
     636       44548 :             mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
     637       44548 :             rb -= MAXALIGN(sizeof(Size));
     638             :         }
     639             :         else
     640             :         {
     641             :             Size        lengthbytes;
     642             : 
     643             :             /* Can't be split unless bigger than required alignment. */
     644             :             Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
     645             : 
     646             :             /* Message word is split; need buffer to reassemble. */
     647           0 :             if (mqh->mqh_buffer == NULL)
     648             :             {
     649           0 :                 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
     650             :                                                      MQH_INITIAL_BUFSIZE);
     651           0 :                 mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
     652             :             }
     653             :             Assert(mqh->mqh_buflen >= sizeof(Size));
     654             : 
     655             :             /* Copy partial length word; remember to consume it. */
     656           0 :             if (mqh->mqh_partial_bytes + rb > sizeof(Size))
     657           0 :                 lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
     658             :             else
     659           0 :                 lengthbytes = rb;
     660           0 :             memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
     661             :                    lengthbytes);
     662           0 :             mqh->mqh_partial_bytes += lengthbytes;
     663           0 :             mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
     664           0 :             rb -= lengthbytes;
     665             : 
     666             :             /* If we now have the whole word, we're ready to read payload. */
     667           0 :             if (mqh->mqh_partial_bytes >= sizeof(Size))
     668             :             {
     669             :                 Assert(mqh->mqh_partial_bytes == sizeof(Size));
     670           0 :                 mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
     671           0 :                 mqh->mqh_length_word_complete = true;
     672           0 :                 mqh->mqh_partial_bytes = 0;
     673             :             }
     674             :         }
     675             :     }
     676       52982 :     nbytes = mqh->mqh_expected_bytes;
     677             : 
     678       52982 :     if (mqh->mqh_partial_bytes == 0)
     679             :     {
     680             :         /*
     681             :          * Try to obtain the whole message in a single chunk.  If this works,
     682             :          * we need not copy the data and can return a pointer directly into
     683             :          * shared memory.
     684             :          */
     685       45688 :         res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
     686       45688 :         if (res != SHM_MQ_SUCCESS)
     687        1140 :             return res;
     688       44548 :         if (rb >= nbytes)
     689             :         {
     690        2140 :             mqh->mqh_length_word_complete = false;
     691        2140 :             mqh->mqh_consume_pending += MAXALIGN(nbytes);
     692        2140 :             *nbytesp = nbytes;
     693        2140 :             *datap = rawdata;
     694        2140 :             return SHM_MQ_SUCCESS;
     695             :         }
     696             : 
     697             :         /*
     698             :          * The message has wrapped the buffer.  We'll need to copy it in order
     699             :          * to return it to the client in one chunk.  First, make sure we have
     700             :          * a large enough buffer available.
     701             :          */
     702       42408 :         if (mqh->mqh_buflen < nbytes)
     703             :         {
     704          16 :             Size        newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
     705             : 
     706          80 :             while (newbuflen < nbytes)
     707          48 :                 newbuflen *= 2;
     708             : 
     709          16 :             if (mqh->mqh_buffer != NULL)
     710             :             {
     711           0 :                 pfree(mqh->mqh_buffer);
     712           0 :                 mqh->mqh_buffer = NULL;
     713           0 :                 mqh->mqh_buflen = 0;
     714             :             }
     715          16 :             mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
     716          16 :             mqh->mqh_buflen = newbuflen;
     717             :         }
     718             :     }
     719             : 
     720             :     /* Loop until we've copied the entire message. */
     721             :     for (;;)
     722      243264 :     {
     723             :         Size        still_needed;
     724             : 
     725             :         /* Copy as much as we can. */
     726             :         Assert(mqh->mqh_partial_bytes + rb <= nbytes);
     727      292966 :         memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
     728      292966 :         mqh->mqh_partial_bytes += rb;
     729             : 
     730             :         /*
     731             :          * Update count of bytes that can be consumed, accounting for
     732             :          * alignment padding.  Note that this will never actually insert any
     733             :          * padding except at the end of a message, because the buffer size is
     734             :          * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
     735             :          */
     736             :         Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
     737      292966 :         mqh->mqh_consume_pending += MAXALIGN(rb);
     738             : 
     739             :         /* If we got all the data, exit the loop. */
     740      292966 :         if (mqh->mqh_partial_bytes >= nbytes)
     741       42408 :             break;
     742             : 
     743             :         /* Wait for some more data. */
     744      250558 :         still_needed = nbytes - mqh->mqh_partial_bytes;
     745      250558 :         res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
     746      250558 :         if (res != SHM_MQ_SUCCESS)
     747        7294 :             return res;
     748      243264 :         if (rb > still_needed)
     749       41136 :             rb = still_needed;
     750             :     }
     751             : 
     752             :     /* Return the complete message, and reset for next message. */
     753       42408 :     *nbytesp = nbytes;
     754       42408 :     *datap = mqh->mqh_buffer;
     755       42408 :     mqh->mqh_length_word_complete = false;
     756       42408 :     mqh->mqh_partial_bytes = 0;
     757       42408 :     return SHM_MQ_SUCCESS;
     758             : }
     759             : 
     760             : /*
     761             :  * Wait for the other process that's supposed to use this queue to attach
     762             :  * to it.
     763             :  *
     764             :  * The return value is SHM_MQ_DETACHED if the worker has already detached or
     765             :  * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
     766             :  * Note that we will only be able to detect that the worker has died before
     767             :  * attaching if a background worker handle was passed to shm_mq_attach().
     768             :  */
     769             : shm_mq_result
     770           0 : shm_mq_wait_for_attach(shm_mq_handle *mqh)
     771             : {
     772           0 :     shm_mq     *mq = mqh->mqh_queue;
     773             :     PGPROC    **victim;
     774             : 
     775           0 :     if (shm_mq_get_receiver(mq) == MyProc)
     776           0 :         victim = &mq->mq_sender;
     777             :     else
     778             :     {
     779             :         Assert(shm_mq_get_sender(mq) == MyProc);
     780           0 :         victim = &mq->mq_receiver;
     781             :     }
     782             : 
     783           0 :     if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
     784           0 :         return SHM_MQ_SUCCESS;
     785             :     else
     786           0 :         return SHM_MQ_DETACHED;
     787             : }
     788             : 
     789             : /*
     790             :  * Detach from a shared message queue, and destroy the shm_mq_handle.
     791             :  */
     792             : void
     793        4798 : shm_mq_detach(shm_mq_handle *mqh)
     794             : {
     795             :     /* Notify counterparty that we're outta here. */
     796        4798 :     shm_mq_detach_internal(mqh->mqh_queue);
     797             : 
     798             :     /* Cancel on_dsm_detach callback, if any. */
     799        4798 :     if (mqh->mqh_segment)
     800        4798 :         cancel_on_dsm_detach(mqh->mqh_segment,
     801             :                              shm_mq_detach_callback,
     802        4798 :                              PointerGetDatum(mqh->mqh_queue));
     803             : 
     804             :     /* Release local memory associated with handle. */
     805        4798 :     if (mqh->mqh_buffer != NULL)
     806           0 :         pfree(mqh->mqh_buffer);
     807        4798 :     pfree(mqh);
     808        4798 : }
     809             : 
     810             : /*
     811             :  * Notify counterparty that we're detaching from shared message queue.
     812             :  *
     813             :  * The purpose of this function is to make sure that the process
     814             :  * with which we're communicating doesn't block forever waiting for us to
     815             :  * fill or drain the queue once we've lost interest.  When the sender
     816             :  * detaches, the receiver can read any messages remaining in the queue;
     817             :  * further reads will return SHM_MQ_DETACHED.  If the receiver detaches,
     818             :  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
     819             :  *
     820             :  * This is separated out from shm_mq_detach() because if the on_dsm_detach
     821             :  * callback fires, we only want to do this much.  We do not try to touch
     822             :  * the local shm_mq_handle, as it may have been pfree'd already.
     823             :  */
     824             : static void
     825        6544 : shm_mq_detach_internal(shm_mq *mq)
     826             : {
     827             :     PGPROC     *victim;
     828             : 
     829        6544 :     SpinLockAcquire(&mq->mq_mutex);
     830        6544 :     if (mq->mq_sender == MyProc)
     831        3224 :         victim = mq->mq_receiver;
     832             :     else
     833             :     {
     834             :         Assert(mq->mq_receiver == MyProc);
     835        3320 :         victim = mq->mq_sender;
     836             :     }
     837        6544 :     mq->mq_detached = true;
     838        6544 :     SpinLockRelease(&mq->mq_mutex);
     839             : 
     840        6544 :     if (victim != NULL)
     841        6424 :         SetLatch(&victim->procLatch);
     842        6544 : }
     843             : 
     844             : /*
     845             :  * Get the shm_mq from handle.
     846             :  */
     847             : shm_mq *
     848     5868124 : shm_mq_get_queue(shm_mq_handle *mqh)
     849             : {
     850     5868124 :     return mqh->mqh_queue;
     851             : }
     852             : 
     853             : /*
     854             :  * Write bytes into a shared message queue.
     855             :  */
     856             : static shm_mq_result
     857     2126822 : shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
     858             :                   bool nowait, Size *bytes_written)
     859             : {
     860     2126822 :     shm_mq     *mq = mqh->mqh_queue;
     861     2126822 :     Size        sent = 0;
     862             :     uint64      used;
     863     2126822 :     Size        ringsize = mq->mq_ring_size;
     864             :     Size        available;
     865             : 
     866     6892242 :     while (sent < nbytes)
     867             :     {
     868             :         uint64      rb;
     869             :         uint64      wb;
     870             : 
     871             :         /* Compute number of ring buffer bytes used and available. */
     872     2648606 :         rb = pg_atomic_read_u64(&mq->mq_bytes_read);
     873     2648606 :         wb = pg_atomic_read_u64(&mq->mq_bytes_written);
     874             :         Assert(wb >= rb);
     875     2648606 :         used = wb - rb;
     876             :         Assert(used <= ringsize);
     877     2648606 :         available = Min(ringsize - used, nbytes - sent);
     878             : 
     879             :         /*
     880             :          * Bail out if the queue has been detached.  Note that we would be in
     881             :          * trouble if the compiler decided to cache the value of
     882             :          * mq->mq_detached in a register or on the stack across loop
     883             :          * iterations.  It probably shouldn't do that anyway since we'll
     884             :          * always return, call an external function that performs a system
     885             :          * call, or reach a memory barrier at some point later in the loop,
     886             :          * but just to be sure, insert a compiler barrier here.
     887             :          */
     888     2648606 :         pg_compiler_barrier();
     889     2648606 :         if (mq->mq_detached)
     890             :         {
     891           0 :             *bytes_written = sent;
     892           0 :             return SHM_MQ_DETACHED;
     893             :         }
     894             : 
     895     2648606 :         if (available == 0 && !mqh->mqh_counterparty_attached)
     896             :         {
     897             :             /*
     898             :              * The queue is full, so if the receiver isn't yet known to be
     899             :              * attached, we must wait for that to happen.
     900             :              */
     901          12 :             if (nowait)
     902             :             {
     903           2 :                 if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
     904             :                 {
     905           0 :                     *bytes_written = sent;
     906           0 :                     return SHM_MQ_DETACHED;
     907             :                 }
     908           2 :                 if (shm_mq_get_receiver(mq) == NULL)
     909             :                 {
     910           0 :                     *bytes_written = sent;
     911           0 :                     return SHM_MQ_WOULD_BLOCK;
     912             :                 }
     913             :             }
     914          10 :             else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
     915             :                                            mqh->mqh_handle))
     916             :             {
     917           0 :                 mq->mq_detached = true;
     918           0 :                 *bytes_written = sent;
     919           0 :                 return SHM_MQ_DETACHED;
     920             :             }
     921          12 :             mqh->mqh_counterparty_attached = true;
     922             : 
     923             :             /*
     924             :              * The receiver may have read some data after attaching, so we
     925             :              * must not wait without rechecking the queue state.
     926             :              */
     927             :         }
     928     2648594 :         else if (available == 0)
     929             :         {
     930             :             /*
     931             :              * Since mq->mqh_counterparty_attached is known to be true at this
     932             :              * point, mq_receiver has been set, and it can't change once set.
     933             :              * Therefore, we can read it without acquiring the spinlock.
     934             :              */
     935             :             Assert(mqh->mqh_counterparty_attached);
     936      255846 :             SetLatch(&mq->mq_receiver->procLatch);
     937             : 
     938             :             /* Skip manipulation of our latch if nowait = true. */
     939      255846 :             if (nowait)
     940             :             {
     941       10008 :                 *bytes_written = sent;
     942       10008 :                 return SHM_MQ_WOULD_BLOCK;
     943             :             }
     944             : 
     945             :             /*
     946             :              * Wait for our latch to be set.  It might already be set for some
     947             :              * unrelated reason, but that'll just result in one extra trip
     948             :              * through the loop.  It's worth it to avoid resetting the latch
     949             :              * at top of loop, because setting an already-set latch is much
     950             :              * cheaper than setting one that has been reset.
     951             :              */
     952      245838 :             (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
     953             :                              WAIT_EVENT_MQ_SEND);
     954             : 
     955             :             /* Reset the latch so we don't spin. */
     956      245838 :             ResetLatch(MyLatch);
     957             : 
     958             :             /* An interrupt may have occurred while we were waiting. */
     959      245838 :             CHECK_FOR_INTERRUPTS();
     960             :         }
     961             :         else
     962             :         {
     963             :             Size        offset;
     964             :             Size        sendnow;
     965             : 
     966     2392748 :             offset = wb % (uint64) ringsize;
     967     2392748 :             sendnow = Min(available, ringsize - offset);
     968             : 
     969             :             /*
     970             :              * Write as much data as we can via a single memcpy(). Make sure
     971             :              * these writes happen after the read of mq_bytes_read, above.
     972             :              * This barrier pairs with the one in shm_mq_inc_bytes_read.
     973             :              * (Since we're separating the read of mq_bytes_read from a
     974             :              * subsequent write to mq_ring, we need a full barrier here.)
     975             :              */
     976     2392748 :             pg_memory_barrier();
     977     2392748 :             memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
     978             :                    (char *) data + sent, sendnow);
     979     2392748 :             sent += sendnow;
     980             : 
     981             :             /*
     982             :              * Update count of bytes written, with alignment padding.  Note
     983             :              * that this will never actually insert any padding except at the
     984             :              * end of a run of bytes, because the buffer size is a multiple of
     985             :              * MAXIMUM_ALIGNOF, and each read is as well.
     986             :              */
     987             :             Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
     988     2392748 :             shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
     989             : 
     990             :             /*
     991             :              * For efficiency, we don't set the reader's latch here.  We'll do
     992             :              * that only when the buffer fills up or after writing an entire
     993             :              * message.
     994             :              */
     995             :         }
     996             :     }
     997             : 
     998     2116814 :     *bytes_written = sent;
     999     2116814 :     return SHM_MQ_SUCCESS;
    1000             : }
    1001             : 
    1002             : /*
    1003             :  * Wait until at least *nbytesp bytes are available to be read from the
    1004             :  * shared message queue, or until the buffer wraps around.  If the queue is
    1005             :  * detached, returns SHM_MQ_DETACHED.  If nowait is specified and a wait
    1006             :  * would be required, returns SHM_MQ_WOULD_BLOCK.  Otherwise, *datap is set
    1007             :  * to the location at which data bytes can be read, *nbytesp is set to the
    1008             :  * number of bytes which can be read at that address, and the return value
    1009             :  * is SHM_MQ_SUCCESS.
    1010             :  */
    1011             : static shm_mq_result
    1012     1593132 : shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
    1013             :                      Size *nbytesp, void **datap)
    1014             : {
    1015     1593132 :     shm_mq     *mq = mqh->mqh_queue;
    1016     1593132 :     Size        ringsize = mq->mq_ring_size;
    1017             :     uint64      used;
    1018             :     uint64      written;
    1019             : 
    1020             :     for (;;)
    1021      337352 :     {
    1022             :         Size        offset;
    1023             :         uint64      read;
    1024             : 
    1025             :         /* Get bytes written, so we can compute what's available to read. */
    1026     1930484 :         written = pg_atomic_read_u64(&mq->mq_bytes_written);
    1027             : 
    1028             :         /*
    1029             :          * Get bytes read.  Include bytes we could consume but have not yet
    1030             :          * consumed.
    1031             :          */
    1032     3860968 :         read = pg_atomic_read_u64(&mq->mq_bytes_read) +
    1033     1930484 :             mqh->mqh_consume_pending;
    1034     1930484 :         used = written - read;
    1035             :         Assert(used <= ringsize);
    1036     1930484 :         offset = read % (uint64) ringsize;
    1037             : 
    1038             :         /* If we have enough data or buffer has wrapped, we're done. */
    1039     1930484 :         if (used >= bytes_needed || offset + used >= ringsize)
    1040             :         {
    1041     1349396 :             *nbytesp = Min(used, ringsize - offset);
    1042     1349396 :             *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
    1043             : 
    1044             :             /*
    1045             :              * Separate the read of mq_bytes_written, above, from caller's
    1046             :              * attempt to read the data itself.  Pairs with the barrier in
    1047             :              * shm_mq_inc_bytes_written.
    1048             :              */
    1049     1349396 :             pg_read_barrier();
    1050     1349396 :             return SHM_MQ_SUCCESS;
    1051             :         }
    1052             : 
    1053             :         /*
    1054             :          * Fall out before waiting if the queue has been detached.
    1055             :          *
    1056             :          * Note that we don't check for this until *after* considering whether
    1057             :          * the data already available is enough, since the receiver can finish
    1058             :          * receiving a message stored in the buffer even after the sender has
    1059             :          * detached.
    1060             :          */
    1061      581088 :         if (mq->mq_detached)
    1062             :         {
    1063             :             /*
    1064             :              * If the writer advanced mq_bytes_written and then set
    1065             :              * mq_detached, we might not have read the final value of
    1066             :              * mq_bytes_written above.  Insert a read barrier and then check
    1067             :              * again if mq_bytes_written has advanced.
    1068             :              */
    1069        1544 :             pg_read_barrier();
    1070        1544 :             if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
    1071           0 :                 continue;
    1072             : 
    1073        1544 :             return SHM_MQ_DETACHED;
    1074             :         }
    1075             : 
    1076             :         /*
    1077             :          * We didn't get enough data to satisfy the request, so mark any data
    1078             :          * previously-consumed as read to make more buffer space.
    1079             :          */
    1080      579544 :         if (mqh->mqh_consume_pending > 0)
    1081             :         {
    1082      383516 :             shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
    1083      383516 :             mqh->mqh_consume_pending = 0;
    1084             :         }
    1085             : 
    1086             :         /* Skip manipulation of our latch if nowait = true. */
    1087      579544 :         if (nowait)
    1088      242192 :             return SHM_MQ_WOULD_BLOCK;
    1089             : 
    1090             :         /*
    1091             :          * Wait for our latch to be set.  It might already be set for some
    1092             :          * unrelated reason, but that'll just result in one extra trip through
    1093             :          * the loop.  It's worth it to avoid resetting the latch at top of
    1094             :          * loop, because setting an already-set latch is much cheaper than
    1095             :          * setting one that has been reset.
    1096             :          */
    1097      337352 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
    1098             :                          WAIT_EVENT_MQ_RECEIVE);
    1099             : 
    1100             :         /* Reset the latch so we don't spin. */
    1101      337352 :         ResetLatch(MyLatch);
    1102             : 
    1103             :         /* An interrupt may have occurred while we were waiting. */
    1104      337352 :         CHECK_FOR_INTERRUPTS();
    1105             :     }
    1106             : }
    1107             : 
    1108             : /*
    1109             :  * Test whether a counterparty who may not even be alive yet is definitely gone.
    1110             :  */
    1111             : static bool
    1112     3444516 : shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
    1113             : {
    1114             :     pid_t       pid;
    1115             : 
    1116             :     /* If the queue has been detached, counterparty is definitely gone. */
    1117     3444516 :     if (mq->mq_detached)
    1118          78 :         return true;
    1119             : 
    1120             :     /* If there's a handle, check worker status. */
    1121     3444438 :     if (handle != NULL)
    1122             :     {
    1123             :         BgwHandleStatus status;
    1124             : 
    1125             :         /* Check for unexpected worker death. */
    1126     3444438 :         status = GetBackgroundWorkerPid(handle, &pid);
    1127     3444438 :         if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
    1128             :         {
    1129             :             /* Mark it detached, just to make it official. */
    1130           0 :             mq->mq_detached = true;
    1131           0 :             return true;
    1132             :         }
    1133             :     }
    1134             : 
    1135             :     /* Counterparty is not definitively gone. */
    1136     3444438 :     return false;
    1137             : }
    1138             : 
    1139             : /*
    1140             :  * This is used when a process is waiting for its counterpart to attach to the
    1141             :  * queue.  We exit when the other process attaches as expected, or, if
    1142             :  * handle != NULL, when the referenced background process or the postmaster
    1143             :  * dies.  Note that if handle == NULL, and the process fails to attach, we'll
    1144             :  * potentially get stuck here forever waiting for a process that may never
    1145             :  * start.  We do check for interrupts, though.
    1146             :  *
    1147             :  * ptr is a pointer to the memory address that we're expecting to become
    1148             :  * non-NULL when our counterpart attaches to the queue.
    1149             :  */
    1150             : static bool
    1151         260 : shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
    1152             : {
    1153         260 :     bool        result = false;
    1154             : 
    1155             :     for (;;)
    1156        1774 :     {
    1157             :         BgwHandleStatus status;
    1158             :         pid_t       pid;
    1159             : 
    1160             :         /* Acquire the lock just long enough to check the pointer. */
    1161        2034 :         SpinLockAcquire(&mq->mq_mutex);
    1162        2034 :         result = (*ptr != NULL);
    1163        2034 :         SpinLockRelease(&mq->mq_mutex);
    1164             : 
    1165             :         /* Fail if detached; else succeed if initialized. */
    1166        2034 :         if (mq->mq_detached)
    1167             :         {
    1168         152 :             result = false;
    1169         412 :             break;
    1170             :         }
    1171        1882 :         if (result)
    1172         108 :             break;
    1173             : 
    1174        1774 :         if (handle != NULL)
    1175             :         {
    1176             :             /* Check for unexpected worker death. */
    1177        1774 :             status = GetBackgroundWorkerPid(handle, &pid);
    1178        1774 :             if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
    1179             :             {
    1180           0 :                 result = false;
    1181           0 :                 break;
    1182             :             }
    1183             :         }
    1184             : 
    1185             :         /* Wait to be signalled. */
    1186        1774 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
    1187             :                          WAIT_EVENT_MQ_INTERNAL);
    1188             : 
    1189             :         /* Reset the latch so we don't spin. */
    1190        1774 :         ResetLatch(MyLatch);
    1191             : 
    1192             :         /* An interrupt may have occurred while we were waiting. */
    1193        1774 :         CHECK_FOR_INTERRUPTS();
    1194             :     }
    1195             : 
    1196         260 :     return result;
    1197             : }
    1198             : 
    1199             : /*
    1200             :  * Increment the number of bytes read.
    1201             :  */
    1202             : static void
    1203      418326 : shm_mq_inc_bytes_read(shm_mq *mq, Size n)
    1204             : {
    1205             :     PGPROC     *sender;
    1206             : 
    1207             :     /*
    1208             :      * Separate prior reads of mq_ring from the increment of mq_bytes_read
    1209             :      * which follows.  This pairs with the full barrier in
    1210             :      * shm_mq_send_bytes(). We only need a read barrier here because the
    1211             :      * increment of mq_bytes_read is actually a read followed by a dependent
    1212             :      * write.
    1213             :      */
    1214      418326 :     pg_read_barrier();
    1215             : 
    1216             :     /*
    1217             :      * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
    1218             :      * else can be changing this value.  This method should be cheaper.
    1219             :      */
    1220      418326 :     pg_atomic_write_u64(&mq->mq_bytes_read,
    1221      418326 :                         pg_atomic_read_u64(&mq->mq_bytes_read) + n);
    1222             : 
    1223             :     /*
    1224             :      * We shouldn't have any bytes to read without a sender, so we can read
    1225             :      * mq_sender here without a lock.  Once it's initialized, it can't change.
    1226             :      */
    1227      418326 :     sender = mq->mq_sender;
    1228             :     Assert(sender != NULL);
    1229      418326 :     SetLatch(&sender->procLatch);
    1230      418326 : }
    1231             : 
    1232             : /*
    1233             :  * Increment the number of bytes written.
    1234             :  */
    1235             : static void
    1236     2392748 : shm_mq_inc_bytes_written(shm_mq *mq, Size n)
    1237             : {
    1238             :     /*
    1239             :      * Separate prior reads of mq_ring from the write of mq_bytes_written
    1240             :      * which we're about to do.  Pairs with the read barrier found in
    1241             :      * shm_mq_receive_bytes.
    1242             :      */
    1243     2392748 :     pg_write_barrier();
    1244             : 
    1245             :     /*
    1246             :      * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
    1247             :      * else can be changing this value.  This method avoids taking the bus
    1248             :      * lock unnecessarily.
    1249             :      */
    1250     2392748 :     pg_atomic_write_u64(&mq->mq_bytes_written,
    1251     2392748 :                         pg_atomic_read_u64(&mq->mq_bytes_written) + n);
    1252     2392748 : }
    1253             : 
    1254             : /* Shim for on_dsm_callback. */
    1255             : static void
    1256        1746 : shm_mq_detach_callback(dsm_segment *seg, Datum arg)
    1257             : {
    1258        1746 :     shm_mq     *mq = (shm_mq *) DatumGetPointer(arg);
    1259             : 
    1260        1746 :     shm_mq_detach_internal(mq);
    1261        1746 : }

Generated by: LCOV version 1.13