Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * shm_mq.c
4 : * single-reader, single-writer shared memory message queue
5 : *
6 : * Both the sender and the receiver must have a PGPROC; their respective
7 : * process latches are used for synchronization. Only the sender may send,
8 : * and only the receiver may receive. This is intended to allow a user
9 : * backend to communicate with worker backends that it has registered.
10 : *
11 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
12 : * Portions Copyright (c) 1994, Regents of the University of California
13 : *
14 : * src/backend/storage/ipc/shm_mq.c
15 : *
16 : *-------------------------------------------------------------------------
17 : */
18 :
19 : #include "postgres.h"
20 :
21 : #include "miscadmin.h"
22 : #include "pgstat.h"
23 : #include "port/pg_bitutils.h"
24 : #include "postmaster/bgworker.h"
25 : #include "storage/proc.h"
26 : #include "storage/shm_mq.h"
27 : #include "storage/spin.h"
28 : #include "utils/memutils.h"
29 : #include "utils/wait_event.h"
30 :
31 : /*
32 : * This structure represents the actual queue, stored in shared memory.
33 : *
34 : * Some notes on synchronization:
35 : *
36 : * mq_receiver and mq_bytes_read can only be changed by the receiver; and
37 : * mq_sender and mq_bytes_written can only be changed by the sender.
38 : * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
39 : * they cannot change once set, and thus may be read without a lock once this
40 : * is known to be the case.
41 : *
42 : * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
43 : * they are written atomically using 8 byte loads and stores. Memory barriers
44 : * must be carefully used to synchronize reads and writes of these values with
45 : * reads and writes of the actual data in mq_ring.
46 : *
47 : * mq_detached needs no locking. It can be set by either the sender or the
48 : * receiver, but only ever from false to true, so redundant writes don't
49 : * matter. It is important that if we set mq_detached and then set the
50 : * counterparty's latch, the counterparty must be certain to see the change
51 : * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
52 : * ends with one, this should be OK.
53 : *
54 : * mq_ring_size and mq_ring_offset never change after initialization, and
55 : * can therefore be read without the lock.
56 : *
57 : * Importantly, mq_ring can be safely read and written without a lock.
58 : * At any given time, the difference between mq_bytes_read and
59 : * mq_bytes_written defines the number of bytes within mq_ring that contain
60 : * unread data, and mq_bytes_read defines the position where those bytes
61 : * begin. The sender can increase the number of unread bytes at any time,
62 : * but only the receiver can give license to overwrite those bytes, by
63 : * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
64 : * the unread bytes it knows to be present without the lock. Conversely,
65 : * the sender can write to the unused portion of the ring buffer without
66 : * the lock, because nobody else can be reading or writing those bytes. The
67 : * receiver could be making more bytes unused by incrementing mq_bytes_read,
68 : * but that's OK. Note that it would be unsafe for the receiver to read any
69 : * data it's already marked as read, or to write any data; and it would be
70 : * unsafe for the sender to reread any data after incrementing
71 : * mq_bytes_written, but fortunately there's no need for any of that.
72 : */
73 : struct shm_mq
74 : {
75 : slock_t mq_mutex;
76 : PGPROC *mq_receiver;
77 : PGPROC *mq_sender;
78 : pg_atomic_uint64 mq_bytes_read;
79 : pg_atomic_uint64 mq_bytes_written;
80 : Size mq_ring_size;
81 : bool mq_detached;
82 : uint8 mq_ring_offset;
83 : char mq_ring[FLEXIBLE_ARRAY_MEMBER];
84 : };
85 :
86 : /*
87 : * This structure is a backend-private handle for access to a queue.
88 : *
89 : * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
90 : * an optional pointer to the dynamic shared memory segment that contains it.
91 : * (If mqh_segment is provided, we register an on_dsm_detach callback to
92 : * make sure we detach from the queue before detaching from DSM.)
93 : *
94 : * If this queue is intended to connect the current process with a background
95 : * worker that started it, the user can pass a pointer to the worker handle
96 : * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
97 : * is to allow us to begin sending to or receiving from that queue before the
98 : * process we'll be communicating with has even been started. If it fails
99 : * to start, the handle will allow us to notice that and fail cleanly, rather
100 : * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
101 : * simple cases - e.g. where there are just 2 processes communicating; in
102 : * more complex scenarios, every process may not have a BackgroundWorkerHandle
103 : * available, or may need to watch for the failure of more than one other
104 : * process at a time.
105 : *
106 : * When a message exists as a contiguous chunk of bytes in the queue - that is,
107 : * it is smaller than the size of the ring buffer and does not wrap around
108 : * the end - we return the message to the caller as a pointer into the buffer.
109 : * For messages that are larger or happen to wrap, we reassemble the message
110 : * locally by copying the chunks into a backend-local buffer. mqh_buffer is
111 : * the buffer, and mqh_buflen is the number of bytes allocated for it.
112 : *
113 : * mqh_send_pending, is number of bytes that is written to the queue but not
114 : * yet updated in the shared memory. We will not update it until the written
115 : * data is 1/4th of the ring size or the tuple queue is full. This will
116 : * prevent frequent CPU cache misses, and it will also avoid frequent
117 : * SetLatch() calls, which are quite expensive.
118 : *
119 : * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
120 : * are used to track the state of non-blocking operations. When the caller
121 : * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
122 : * are expected to retry the call at a later time with the same argument;
123 : * we need to retain enough state to pick up where we left off.
124 : * mqh_length_word_complete tracks whether we are done sending or receiving
125 : * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
126 : * the number of bytes read or written for either the length word or the
127 : * message itself, and mqh_expected_bytes - which is used only for reads -
128 : * tracks the expected total size of the payload.
129 : *
130 : * mqh_counterparty_attached tracks whether we know the counterparty to have
131 : * attached to the queue at some previous point. This lets us avoid some
132 : * mutex acquisitions.
133 : *
134 : * mqh_context is the memory context in effect at the time we attached to
135 : * the shm_mq. The shm_mq_handle itself is allocated in this context, and
136 : * we make sure any other allocations we do happen in this context as well,
137 : * to avoid nasty surprises.
138 : */
139 : struct shm_mq_handle
140 : {
141 : shm_mq *mqh_queue;
142 : dsm_segment *mqh_segment;
143 : BackgroundWorkerHandle *mqh_handle;
144 : char *mqh_buffer;
145 : Size mqh_buflen;
146 : Size mqh_consume_pending;
147 : Size mqh_send_pending;
148 : Size mqh_partial_bytes;
149 : Size mqh_expected_bytes;
150 : bool mqh_length_word_complete;
151 : bool mqh_counterparty_attached;
152 : MemoryContext mqh_context;
153 : };
154 :
155 : static void shm_mq_detach_internal(shm_mq *mq);
156 : static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
157 : const void *data, bool nowait, Size *bytes_written);
158 : static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
159 : Size bytes_needed, bool nowait, Size *nbytesp,
160 : void **datap);
161 : static bool shm_mq_counterparty_gone(shm_mq *mq,
162 : BackgroundWorkerHandle *handle);
163 : static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
164 : BackgroundWorkerHandle *handle);
165 : static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
166 : static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
167 : static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
168 :
169 : /* Minimum queue size is enough for header and at least one chunk of data. */
170 : const Size shm_mq_minimum_size =
171 : MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
172 :
173 : #define MQH_INITIAL_BUFSIZE 8192
174 :
175 : /*
176 : * Initialize a new shared message queue.
177 : */
178 : shm_mq *
179 3961 : shm_mq_create(void *address, Size size)
180 : {
181 3961 : shm_mq *mq = address;
182 3961 : Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
183 :
184 : /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
185 3961 : size = MAXALIGN_DOWN(size);
186 :
187 : /* Queue size must be large enough to hold some data. */
188 : Assert(size > data_offset);
189 :
190 : /* Initialize queue header. */
191 3961 : SpinLockInit(&mq->mq_mutex);
192 3961 : mq->mq_receiver = NULL;
193 3961 : mq->mq_sender = NULL;
194 3961 : pg_atomic_init_u64(&mq->mq_bytes_read, 0);
195 3961 : pg_atomic_init_u64(&mq->mq_bytes_written, 0);
196 3961 : mq->mq_ring_size = size - data_offset;
197 3961 : mq->mq_detached = false;
198 3961 : mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
199 :
200 3961 : return mq;
201 : }
202 :
203 : /*
204 : * Set the identity of the process that will receive from a shared message
205 : * queue.
206 : */
207 : void
208 3961 : shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
209 : {
210 : PGPROC *sender;
211 :
212 3961 : SpinLockAcquire(&mq->mq_mutex);
213 : Assert(mq->mq_receiver == NULL);
214 3961 : mq->mq_receiver = proc;
215 3961 : sender = mq->mq_sender;
216 3961 : SpinLockRelease(&mq->mq_mutex);
217 :
218 3961 : if (sender != NULL)
219 20 : SetLatch(&sender->procLatch);
220 3961 : }
221 :
222 : /*
223 : * Set the identity of the process that will send to a shared message queue.
224 : */
225 : void
226 3844 : shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
227 : {
228 : PGPROC *receiver;
229 :
230 3844 : SpinLockAcquire(&mq->mq_mutex);
231 : Assert(mq->mq_sender == NULL);
232 3844 : mq->mq_sender = proc;
233 3844 : receiver = mq->mq_receiver;
234 3844 : SpinLockRelease(&mq->mq_mutex);
235 :
236 3844 : if (receiver != NULL)
237 3824 : SetLatch(&receiver->procLatch);
238 3844 : }
239 :
240 : /*
241 : * Get the configured receiver.
242 : */
243 : PGPROC *
244 5 : shm_mq_get_receiver(shm_mq *mq)
245 : {
246 : PGPROC *receiver;
247 :
248 5 : SpinLockAcquire(&mq->mq_mutex);
249 5 : receiver = mq->mq_receiver;
250 5 : SpinLockRelease(&mq->mq_mutex);
251 :
252 5 : return receiver;
253 : }
254 :
255 : /*
256 : * Get the configured sender.
257 : */
258 : PGPROC *
259 11358181 : shm_mq_get_sender(shm_mq *mq)
260 : {
261 : PGPROC *sender;
262 :
263 11358181 : SpinLockAcquire(&mq->mq_mutex);
264 11358181 : sender = mq->mq_sender;
265 11358181 : SpinLockRelease(&mq->mq_mutex);
266 :
267 11358181 : return sender;
268 : }
269 :
270 : /*
271 : * Attach to a shared message queue so we can send or receive messages.
272 : *
273 : * The memory context in effect at the time this function is called should
274 : * be one which will last for at least as long as the message queue itself.
275 : * We'll allocate the handle in that context, and future allocations that
276 : * are needed to buffer incoming data will happen in that context as well.
277 : *
278 : * If seg != NULL, the queue will be automatically detached when that dynamic
279 : * shared memory segment is detached.
280 : *
281 : * If handle != NULL, the queue can be read or written even before the
282 : * other process has attached. We'll wait for it to do so if needed. The
283 : * handle must be for a background worker initialized with bgw_notify_pid
284 : * equal to our PID.
285 : *
286 : * shm_mq_detach() should be called when done. This will free the
287 : * shm_mq_handle and mark the queue itself as detached, so that our
288 : * counterpart won't get stuck waiting for us to fill or drain the queue
289 : * after we've already lost interest.
290 : */
291 : shm_mq_handle *
292 7805 : shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
293 : {
294 7805 : shm_mq_handle *mqh = palloc_object(shm_mq_handle);
295 :
296 : Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
297 7805 : mqh->mqh_queue = mq;
298 7805 : mqh->mqh_segment = seg;
299 7805 : mqh->mqh_handle = handle;
300 7805 : mqh->mqh_buffer = NULL;
301 7805 : mqh->mqh_buflen = 0;
302 7805 : mqh->mqh_consume_pending = 0;
303 7805 : mqh->mqh_send_pending = 0;
304 7805 : mqh->mqh_partial_bytes = 0;
305 7805 : mqh->mqh_expected_bytes = 0;
306 7805 : mqh->mqh_length_word_complete = false;
307 7805 : mqh->mqh_counterparty_attached = false;
308 7805 : mqh->mqh_context = CurrentMemoryContext;
309 :
310 7805 : if (seg != NULL)
311 7805 : on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
312 :
313 7805 : return mqh;
314 : }
315 :
316 : /*
317 : * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
318 : * been passed to shm_mq_attach.
319 : */
320 : void
321 3804 : shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
322 : {
323 : Assert(mqh->mqh_handle == NULL);
324 3804 : mqh->mqh_handle = handle;
325 3804 : }
326 :
327 : /*
328 : * Write a message into a shared message queue.
329 : */
330 : shm_mq_result
331 1156921 : shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
332 : bool force_flush)
333 : {
334 : shm_mq_iovec iov;
335 :
336 1156921 : iov.data = data;
337 1156921 : iov.len = nbytes;
338 :
339 1156921 : return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
340 : }
341 :
342 : /*
343 : * Write a message into a shared message queue, gathered from multiple
344 : * addresses.
345 : *
346 : * When nowait = false, we'll wait on our process latch when the ring buffer
347 : * fills up, and then continue writing once the receiver has drained some data.
348 : * The process latch is reset after each wait.
349 : *
350 : * When nowait = true, we do not manipulate the state of the process latch;
351 : * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
352 : * this case, the caller should call this function again, with the same
353 : * arguments, each time the process latch is set. (Once begun, the sending
354 : * of a message cannot be aborted except by detaching from the queue; changing
355 : * the length or payload will corrupt the queue.)
356 : *
357 : * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
358 : * and notify the receiver (if it is already attached). Otherwise, we don't
359 : * update it until we have written an amount of data greater than 1/4th of the
360 : * ring size.
361 : */
362 : shm_mq_result
363 1158937 : shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
364 : bool force_flush)
365 : {
366 : shm_mq_result res;
367 1158937 : shm_mq *mq = mqh->mqh_queue;
368 : PGPROC *receiver;
369 1158937 : Size nbytes = 0;
370 : Size bytes_written;
371 : int i;
372 1158937 : int which_iov = 0;
373 : Size offset;
374 :
375 : Assert(mq->mq_sender == MyProc);
376 :
377 : /* Compute total size of write. */
378 2319890 : for (i = 0; i < iovcnt; ++i)
379 1160953 : nbytes += iov[i].len;
380 :
381 : /* Prevent writing messages overwhelming the receiver. */
382 1158937 : if (nbytes > MaxAllocSize)
383 0 : ereport(ERROR,
384 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
385 : errmsg("cannot send a message of size %zu via shared memory queue",
386 : nbytes)));
387 :
388 : /* Try to write, or finish writing, the length word into the buffer. */
389 2313768 : while (!mqh->mqh_length_word_complete)
390 : {
391 : Assert(mqh->mqh_partial_bytes < sizeof(Size));
392 1154835 : res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
393 1154835 : ((char *) &nbytes) + mqh->mqh_partial_bytes,
394 : nowait, &bytes_written);
395 :
396 1154835 : if (res == SHM_MQ_DETACHED)
397 : {
398 : /* Reset state in case caller tries to send another message. */
399 4 : mqh->mqh_partial_bytes = 0;
400 4 : mqh->mqh_length_word_complete = false;
401 4 : return res;
402 : }
403 1154831 : mqh->mqh_partial_bytes += bytes_written;
404 :
405 1154831 : if (mqh->mqh_partial_bytes >= sizeof(Size))
406 : {
407 : Assert(mqh->mqh_partial_bytes == sizeof(Size));
408 :
409 1154831 : mqh->mqh_partial_bytes = 0;
410 1154831 : mqh->mqh_length_word_complete = true;
411 : }
412 :
413 1154831 : if (res != SHM_MQ_SUCCESS)
414 0 : return res;
415 :
416 : /* Length word can't be split unless bigger than required alignment. */
417 : Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
418 : }
419 :
420 : /* Write the actual data bytes into the buffer. */
421 : Assert(mqh->mqh_partial_bytes <= nbytes);
422 1158933 : offset = mqh->mqh_partial_bytes;
423 : do
424 : {
425 : Size chunksize;
426 :
427 : /* Figure out which bytes need to be sent next. */
428 1158954 : if (offset >= iov[which_iov].len)
429 : {
430 4008 : offset -= iov[which_iov].len;
431 4008 : ++which_iov;
432 4008 : if (which_iov >= iovcnt)
433 4000 : break;
434 8 : continue;
435 : }
436 :
437 : /*
438 : * We want to avoid copying the data if at all possible, but every
439 : * chunk of bytes we write into the queue has to be MAXALIGN'd, except
440 : * the last. Thus, if a chunk other than the last one ends on a
441 : * non-MAXALIGN'd boundary, we have to combine the tail end of its
442 : * data with data from one or more following chunks until we either
443 : * reach the last chunk or accumulate a number of bytes which is
444 : * MAXALIGN'd.
445 : */
446 1154946 : if (which_iov + 1 < iovcnt &&
447 2004 : offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
448 2004 : {
449 : char tmpbuf[MAXIMUM_ALIGNOF];
450 2004 : int j = 0;
451 :
452 : for (;;)
453 : {
454 6090 : if (offset < iov[which_iov].len)
455 : {
456 2095 : tmpbuf[j] = iov[which_iov].data[offset];
457 2095 : j++;
458 2095 : offset++;
459 2095 : if (j == MAXIMUM_ALIGNOF)
460 13 : break;
461 : }
462 : else
463 : {
464 3995 : offset -= iov[which_iov].len;
465 3995 : which_iov++;
466 3995 : if (which_iov >= iovcnt)
467 1991 : break;
468 : }
469 : }
470 :
471 2004 : res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
472 :
473 2004 : if (res == SHM_MQ_DETACHED)
474 : {
475 : /* Reset state in case caller tries to send another message. */
476 0 : mqh->mqh_partial_bytes = 0;
477 0 : mqh->mqh_length_word_complete = false;
478 0 : return res;
479 : }
480 :
481 2004 : mqh->mqh_partial_bytes += bytes_written;
482 2004 : if (res != SHM_MQ_SUCCESS)
483 0 : return res;
484 2004 : continue;
485 : }
486 :
487 : /*
488 : * If this is the last chunk, we can write all the data, even if it
489 : * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
490 : * MAXALIGN_DOWN the write size.
491 : */
492 1152942 : chunksize = iov[which_iov].len - offset;
493 1152942 : if (which_iov + 1 < iovcnt)
494 0 : chunksize = MAXALIGN_DOWN(chunksize);
495 1152942 : res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
496 : nowait, &bytes_written);
497 :
498 1152942 : if (res == SHM_MQ_DETACHED)
499 : {
500 : /* Reset state in case caller tries to send another message. */
501 0 : mqh->mqh_length_word_complete = false;
502 0 : mqh->mqh_partial_bytes = 0;
503 0 : return res;
504 : }
505 :
506 1152942 : mqh->mqh_partial_bytes += bytes_written;
507 1152942 : offset += bytes_written;
508 1152942 : if (res != SHM_MQ_SUCCESS)
509 4102 : return res;
510 1150852 : } while (mqh->mqh_partial_bytes < nbytes);
511 :
512 : /* Reset for next message. */
513 1154831 : mqh->mqh_partial_bytes = 0;
514 1154831 : mqh->mqh_length_word_complete = false;
515 :
516 : /* If queue has been detached, let caller know. */
517 1154831 : if (mq->mq_detached)
518 0 : return SHM_MQ_DETACHED;
519 :
520 : /*
521 : * If the counterparty is known to have attached, we can read mq_receiver
522 : * without acquiring the spinlock. Otherwise, more caution is needed.
523 : */
524 1154831 : if (mqh->mqh_counterparty_attached)
525 1151798 : receiver = mq->mq_receiver;
526 : else
527 : {
528 3033 : SpinLockAcquire(&mq->mq_mutex);
529 3033 : receiver = mq->mq_receiver;
530 3033 : SpinLockRelease(&mq->mq_mutex);
531 3033 : if (receiver != NULL)
532 3033 : mqh->mqh_counterparty_attached = true;
533 : }
534 :
535 : /*
536 : * If the caller has requested force flush or we have written more than
537 : * 1/4 of the ring size, mark it as written in shared memory and notify
538 : * the receiver.
539 : */
540 1154831 : if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
541 : {
542 121536 : shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
543 121536 : if (receiver != NULL)
544 121536 : SetLatch(&receiver->procLatch);
545 121536 : mqh->mqh_send_pending = 0;
546 : }
547 :
548 1154831 : return SHM_MQ_SUCCESS;
549 : }
550 :
551 : /*
552 : * Receive a message from a shared message queue.
553 : *
554 : * We set *nbytes to the message length and *data to point to the message
555 : * payload. If the entire message exists in the queue as a single,
556 : * contiguous chunk, *data will point directly into shared memory; otherwise,
557 : * it will point to a temporary buffer. This mostly avoids data copying in
558 : * the hoped-for case where messages are short compared to the buffer size,
559 : * while still allowing longer messages. In either case, the return value
560 : * remains valid until the next receive operation is performed on the queue.
561 : *
562 : * When nowait = false, we'll wait on our process latch when the ring buffer
563 : * is empty and we have not yet received a full message. The sender will
564 : * set our process latch after more data has been written, and we'll resume
565 : * processing. Each call will therefore return a complete message
566 : * (unless the sender detaches the queue).
567 : *
568 : * When nowait = true, we do not manipulate the state of the process latch;
569 : * instead, whenever the buffer is empty and we need to read from it, we
570 : * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
571 : * function again after the process latch has been set.
572 : */
573 : shm_mq_result
574 4540687 : shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
575 : {
576 4540687 : shm_mq *mq = mqh->mqh_queue;
577 : shm_mq_result res;
578 4540687 : Size rb = 0;
579 : Size nbytes;
580 : void *rawdata;
581 :
582 : Assert(mq->mq_receiver == MyProc);
583 :
584 : /* We can't receive data until the sender has attached. */
585 4540687 : if (!mqh->mqh_counterparty_attached)
586 : {
587 3247861 : if (nowait)
588 : {
589 : int counterparty_gone;
590 :
591 : /*
592 : * We shouldn't return at this point at all unless the sender
593 : * hasn't attached yet. However, the correct return value depends
594 : * on whether the sender is still attached. If we first test
595 : * whether the sender has ever attached and then test whether the
596 : * sender has detached, there's a race condition: a sender that
597 : * attaches and detaches very quickly might fool us into thinking
598 : * the sender never attached at all. So, test whether our
599 : * counterparty is definitively gone first, and only afterwards
600 : * check whether the sender ever attached in the first place.
601 : */
602 3247597 : counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
603 3247597 : if (shm_mq_get_sender(mq) == NULL)
604 : {
605 3244026 : if (counterparty_gone)
606 0 : return SHM_MQ_DETACHED;
607 : else
608 3244026 : return SHM_MQ_WOULD_BLOCK;
609 : }
610 : }
611 264 : else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
612 114 : && shm_mq_get_sender(mq) == NULL)
613 : {
614 0 : mq->mq_detached = true;
615 0 : return SHM_MQ_DETACHED;
616 : }
617 3835 : mqh->mqh_counterparty_attached = true;
618 : }
619 :
620 : /*
621 : * If we've consumed an amount of data greater than 1/4th of the ring
622 : * size, mark it consumed in shared memory. We try to avoid doing this
623 : * unnecessarily when only a small amount of data has been consumed,
624 : * because SetLatch() is fairly expensive and we don't want to do it too
625 : * often.
626 : */
627 1296661 : if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
628 : {
629 22721 : shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
630 22721 : mqh->mqh_consume_pending = 0;
631 : }
632 :
633 : /* Try to read, or finish reading, the length word from the buffer. */
634 1318443 : while (!mqh->mqh_length_word_complete)
635 : {
636 : /* Try to receive the message length word. */
637 : Assert(mqh->mqh_partial_bytes < sizeof(Size));
638 1292736 : res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
639 : nowait, &rb, &rawdata);
640 1292736 : if (res != SHM_MQ_SUCCESS)
641 142927 : return res;
642 :
643 : /*
644 : * Hopefully, we'll receive the entire message length word at once.
645 : * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
646 : * multiple reads.
647 : */
648 1149809 : if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
649 21782 : {
650 : Size needed;
651 :
652 1149809 : nbytes = *(Size *) rawdata;
653 :
654 : /* If we've already got the whole message, we're done. */
655 1149809 : needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
656 1149809 : if (rb >= needed)
657 : {
658 1128027 : mqh->mqh_consume_pending += needed;
659 1128027 : *nbytesp = nbytes;
660 1128027 : *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
661 1128027 : return SHM_MQ_SUCCESS;
662 : }
663 :
664 : /*
665 : * We don't have the whole message, but we at least have the whole
666 : * length word.
667 : */
668 21782 : mqh->mqh_expected_bytes = nbytes;
669 21782 : mqh->mqh_length_word_complete = true;
670 21782 : mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
671 21782 : rb -= MAXALIGN(sizeof(Size));
672 : }
673 : else
674 : {
675 : Size lengthbytes;
676 :
677 : /* Can't be split unless bigger than required alignment. */
678 : Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
679 :
680 : /* Message word is split; need buffer to reassemble. */
681 0 : if (mqh->mqh_buffer == NULL)
682 : {
683 0 : mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
684 : MQH_INITIAL_BUFSIZE);
685 0 : mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
686 : }
687 : Assert(mqh->mqh_buflen >= sizeof(Size));
688 :
689 : /* Copy partial length word; remember to consume it. */
690 0 : if (mqh->mqh_partial_bytes + rb > sizeof(Size))
691 0 : lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
692 : else
693 0 : lengthbytes = rb;
694 0 : memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
695 : lengthbytes);
696 0 : mqh->mqh_partial_bytes += lengthbytes;
697 0 : mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
698 0 : rb -= lengthbytes;
699 :
700 : /* If we now have the whole word, we're ready to read payload. */
701 0 : if (mqh->mqh_partial_bytes >= sizeof(Size))
702 : {
703 : Assert(mqh->mqh_partial_bytes == sizeof(Size));
704 0 : mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
705 0 : mqh->mqh_length_word_complete = true;
706 0 : mqh->mqh_partial_bytes = 0;
707 : }
708 : }
709 : }
710 25707 : nbytes = mqh->mqh_expected_bytes;
711 :
712 : /*
713 : * Should be disallowed on the sending side already, but better check and
714 : * error out on the receiver side as well rather than trying to read a
715 : * prohibitively large message.
716 : */
717 25707 : if (nbytes > MaxAllocSize)
718 0 : ereport(ERROR,
719 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
720 : errmsg("invalid message size %zu in shared memory queue",
721 : nbytes)));
722 :
723 25707 : if (mqh->mqh_partial_bytes == 0)
724 : {
725 : /*
726 : * Try to obtain the whole message in a single chunk. If this works,
727 : * we need not copy the data and can return a pointer directly into
728 : * shared memory.
729 : */
730 21980 : res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
731 21980 : if (res != SHM_MQ_SUCCESS)
732 198 : return res;
733 21782 : if (rb >= nbytes)
734 : {
735 300 : mqh->mqh_length_word_complete = false;
736 300 : mqh->mqh_consume_pending += MAXALIGN(nbytes);
737 300 : *nbytesp = nbytes;
738 300 : *datap = rawdata;
739 300 : return SHM_MQ_SUCCESS;
740 : }
741 :
742 : /*
743 : * The message has wrapped the buffer. We'll need to copy it in order
744 : * to return it to the client in one chunk. First, make sure we have
745 : * a large enough buffer available.
746 : */
747 21482 : if (mqh->mqh_buflen < nbytes)
748 : {
749 : Size newbuflen;
750 :
751 : /*
752 : * Increase size to the next power of 2 that's >= nbytes, but
753 : * limit to MaxAllocSize.
754 : */
755 150 : newbuflen = pg_nextpower2_size_t(nbytes);
756 150 : newbuflen = Min(newbuflen, MaxAllocSize);
757 :
758 150 : if (mqh->mqh_buffer != NULL)
759 : {
760 0 : pfree(mqh->mqh_buffer);
761 0 : mqh->mqh_buffer = NULL;
762 0 : mqh->mqh_buflen = 0;
763 : }
764 150 : mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
765 150 : mqh->mqh_buflen = newbuflen;
766 : }
767 : }
768 :
769 : /* Loop until we've copied the entire message. */
770 : for (;;)
771 93910 : {
772 : Size still_needed;
773 :
774 : /* Copy as much as we can. */
775 : Assert(mqh->mqh_partial_bytes + rb <= nbytes);
776 119119 : if (rb > 0)
777 : {
778 115392 : memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
779 115392 : mqh->mqh_partial_bytes += rb;
780 : }
781 :
782 : /*
783 : * Update count of bytes that can be consumed, accounting for
784 : * alignment padding. Note that this will never actually insert any
785 : * padding except at the end of a message, because the buffer size is
786 : * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
787 : */
788 : Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
789 119119 : mqh->mqh_consume_pending += MAXALIGN(rb);
790 :
791 : /* If we got all the data, exit the loop. */
792 119119 : if (mqh->mqh_partial_bytes >= nbytes)
793 21482 : break;
794 :
795 : /* Wait for some more data. */
796 97637 : still_needed = nbytes - mqh->mqh_partial_bytes;
797 97637 : res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
798 97637 : if (res != SHM_MQ_SUCCESS)
799 3727 : return res;
800 93910 : if (rb > still_needed)
801 20797 : rb = still_needed;
802 : }
803 :
804 : /* Return the complete message, and reset for next message. */
805 21482 : *nbytesp = nbytes;
806 21482 : *datap = mqh->mqh_buffer;
807 21482 : mqh->mqh_length_word_complete = false;
808 21482 : mqh->mqh_partial_bytes = 0;
809 21482 : return SHM_MQ_SUCCESS;
810 : }
811 :
812 : /*
813 : * Wait for the other process that's supposed to use this queue to attach
814 : * to it.
815 : *
816 : * The return value is SHM_MQ_DETACHED if the worker has already detached or
817 : * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
818 : * Note that we will only be able to detect that the worker has died before
819 : * attaching if a background worker handle was passed to shm_mq_attach().
820 : */
821 : shm_mq_result
822 0 : shm_mq_wait_for_attach(shm_mq_handle *mqh)
823 : {
824 0 : shm_mq *mq = mqh->mqh_queue;
825 : PGPROC **victim;
826 :
827 0 : if (shm_mq_get_receiver(mq) == MyProc)
828 0 : victim = &mq->mq_sender;
829 : else
830 : {
831 : Assert(shm_mq_get_sender(mq) == MyProc);
832 0 : victim = &mq->mq_receiver;
833 : }
834 :
835 0 : if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
836 0 : return SHM_MQ_SUCCESS;
837 : else
838 0 : return SHM_MQ_DETACHED;
839 : }
840 :
841 : /*
842 : * Detach from a shared message queue, and destroy the shm_mq_handle.
843 : */
844 : void
845 5671 : shm_mq_detach(shm_mq_handle *mqh)
846 : {
847 : /* Before detaching, notify the receiver about any already-written data. */
848 5671 : if (mqh->mqh_send_pending > 0)
849 : {
850 1014 : shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
851 1014 : mqh->mqh_send_pending = 0;
852 : }
853 :
854 : /* Notify counterparty that we're outta here. */
855 5671 : shm_mq_detach_internal(mqh->mqh_queue);
856 :
857 : /* Cancel on_dsm_detach callback, if any. */
858 5671 : if (mqh->mqh_segment)
859 5671 : cancel_on_dsm_detach(mqh->mqh_segment,
860 : shm_mq_detach_callback,
861 5671 : PointerGetDatum(mqh->mqh_queue));
862 :
863 : /* Release local memory associated with handle. */
864 5671 : if (mqh->mqh_buffer != NULL)
865 142 : pfree(mqh->mqh_buffer);
866 5671 : pfree(mqh);
867 5671 : }
868 :
869 : /*
870 : * Notify counterparty that we're detaching from shared message queue.
871 : *
872 : * The purpose of this function is to make sure that the process
873 : * with which we're communicating doesn't block forever waiting for us to
874 : * fill or drain the queue once we've lost interest. When the sender
875 : * detaches, the receiver can read any messages remaining in the queue;
876 : * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
877 : * further attempts to send messages will likewise return SHM_MQ_DETACHED.
878 : *
879 : * This is separated out from shm_mq_detach() because if the on_dsm_detach
880 : * callback fires, we only want to do this much. We do not try to touch
881 : * the local shm_mq_handle, as it may have been pfree'd already.
882 : */
883 : static void
884 7805 : shm_mq_detach_internal(shm_mq *mq)
885 : {
886 : PGPROC *victim;
887 :
888 7805 : SpinLockAcquire(&mq->mq_mutex);
889 7805 : if (mq->mq_sender == MyProc)
890 3844 : victim = mq->mq_receiver;
891 : else
892 : {
893 : Assert(mq->mq_receiver == MyProc);
894 3961 : victim = mq->mq_sender;
895 : }
896 7805 : mq->mq_detached = true;
897 7805 : SpinLockRelease(&mq->mq_mutex);
898 :
899 7805 : if (victim != NULL)
900 7692 : SetLatch(&victim->procLatch);
901 7805 : }
902 :
903 : /*
904 : * Get the shm_mq from handle.
905 : */
906 : shm_mq *
907 8110470 : shm_mq_get_queue(shm_mq_handle *mqh)
908 : {
909 8110470 : return mqh->mqh_queue;
910 : }
911 :
912 : /*
913 : * Write bytes into a shared message queue.
914 : */
915 : static shm_mq_result
916 2309781 : shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
917 : bool nowait, Size *bytes_written)
918 : {
919 2309781 : shm_mq *mq = mqh->mqh_queue;
920 2309781 : Size sent = 0;
921 : uint64 used;
922 2309781 : Size ringsize = mq->mq_ring_size;
923 : Size available;
924 :
925 4803184 : while (sent < nbytes)
926 : {
927 : uint64 rb;
928 : uint64 wb;
929 :
930 : /* Compute number of ring buffer bytes used and available. */
931 2497509 : rb = pg_atomic_read_u64(&mq->mq_bytes_read);
932 2497509 : wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
933 : Assert(wb >= rb);
934 2497509 : used = wb - rb;
935 : Assert(used <= ringsize);
936 2497509 : available = Min(ringsize - used, nbytes - sent);
937 :
938 : /*
939 : * Bail out if the queue has been detached. Note that we would be in
940 : * trouble if the compiler decided to cache the value of
941 : * mq->mq_detached in a register or on the stack across loop
942 : * iterations. It probably shouldn't do that anyway since we'll
943 : * always return, call an external function that performs a system
944 : * call, or reach a memory barrier at some point later in the loop,
945 : * but just to be sure, insert a compiler barrier here.
946 : */
947 2497509 : pg_compiler_barrier();
948 2497509 : if (mq->mq_detached)
949 : {
950 4 : *bytes_written = sent;
951 4 : return SHM_MQ_DETACHED;
952 : }
953 :
954 2497505 : if (available == 0 && !mqh->mqh_counterparty_attached)
955 : {
956 : /*
957 : * The queue is full, so if the receiver isn't yet known to be
958 : * attached, we must wait for that to happen.
959 : */
960 10 : if (nowait)
961 : {
962 5 : if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
963 : {
964 0 : *bytes_written = sent;
965 0 : return SHM_MQ_DETACHED;
966 : }
967 5 : if (shm_mq_get_receiver(mq) == NULL)
968 : {
969 0 : *bytes_written = sent;
970 0 : return SHM_MQ_WOULD_BLOCK;
971 : }
972 : }
973 5 : else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
974 : mqh->mqh_handle))
975 : {
976 0 : mq->mq_detached = true;
977 0 : *bytes_written = sent;
978 0 : return SHM_MQ_DETACHED;
979 : }
980 10 : mqh->mqh_counterparty_attached = true;
981 :
982 : /*
983 : * The receiver may have read some data after attaching, so we
984 : * must not wait without rechecking the queue state.
985 : */
986 : }
987 2497495 : else if (available == 0)
988 : {
989 : /* Update the pending send bytes in the shared memory. */
990 97352 : shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
991 :
992 : /*
993 : * Since mq->mqh_counterparty_attached is known to be true at this
994 : * point, mq_receiver has been set, and it can't change once set.
995 : * Therefore, we can read it without acquiring the spinlock.
996 : */
997 : Assert(mqh->mqh_counterparty_attached);
998 97352 : SetLatch(&mq->mq_receiver->procLatch);
999 :
1000 : /*
1001 : * We have just updated the mqh_send_pending bytes in the shared
1002 : * memory so reset it.
1003 : */
1004 97352 : mqh->mqh_send_pending = 0;
1005 :
1006 : /* Skip manipulation of our latch if nowait = true. */
1007 97352 : if (nowait)
1008 : {
1009 4102 : *bytes_written = sent;
1010 4102 : return SHM_MQ_WOULD_BLOCK;
1011 : }
1012 :
1013 : /*
1014 : * Wait for our latch to be set. It might already be set for some
1015 : * unrelated reason, but that'll just result in one extra trip
1016 : * through the loop. It's worth it to avoid resetting the latch
1017 : * at top of loop, because setting an already-set latch is much
1018 : * cheaper than setting one that has been reset.
1019 : */
1020 93250 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1021 : WAIT_EVENT_MESSAGE_QUEUE_SEND);
1022 :
1023 : /* Reset the latch so we don't spin. */
1024 93250 : ResetLatch(MyLatch);
1025 :
1026 : /* An interrupt may have occurred while we were waiting. */
1027 93250 : CHECK_FOR_INTERRUPTS();
1028 : }
1029 : else
1030 : {
1031 : Size offset;
1032 : Size sendnow;
1033 :
1034 2400143 : offset = wb % (uint64) ringsize;
1035 2400143 : sendnow = Min(available, ringsize - offset);
1036 :
1037 : /*
1038 : * Write as much data as we can via a single memcpy(). Make sure
1039 : * these writes happen after the read of mq_bytes_read, above.
1040 : * This barrier pairs with the one in shm_mq_inc_bytes_read.
1041 : * (Since we're separating the read of mq_bytes_read from a
1042 : * subsequent write to mq_ring, we need a full barrier here.)
1043 : */
1044 2400143 : pg_memory_barrier();
1045 2400143 : memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1046 : (const char *) data + sent, sendnow);
1047 2400143 : sent += sendnow;
1048 :
1049 : /*
1050 : * Update count of bytes written, with alignment padding. Note
1051 : * that this will never actually insert any padding except at the
1052 : * end of a run of bytes, because the buffer size is a multiple of
1053 : * MAXIMUM_ALIGNOF, and each read is as well.
1054 : */
1055 : Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1056 :
1057 : /*
1058 : * For efficiency, we don't update the bytes written in the shared
1059 : * memory and also don't set the reader's latch here. Refer to
1060 : * the comments atop the shm_mq_handle structure for more
1061 : * information.
1062 : */
1063 2400143 : mqh->mqh_send_pending += MAXALIGN(sendnow);
1064 : }
1065 : }
1066 :
1067 2305675 : *bytes_written = sent;
1068 2305675 : return SHM_MQ_SUCCESS;
1069 : }
1070 :
1071 : /*
1072 : * Wait until at least *nbytesp bytes are available to be read from the
1073 : * shared message queue, or until the buffer wraps around. If the queue is
1074 : * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
1075 : * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
1076 : * to the location at which data bytes can be read, *nbytesp is set to the
1077 : * number of bytes which can be read at that address, and the return value
1078 : * is SHM_MQ_SUCCESS.
1079 : */
1080 : static shm_mq_result
1081 1412353 : shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
1082 : Size *nbytesp, void **datap)
1083 : {
1084 1412353 : shm_mq *mq = mqh->mqh_queue;
1085 1412353 : Size ringsize = mq->mq_ring_size;
1086 : uint64 used;
1087 : uint64 written;
1088 :
1089 : for (;;)
1090 128920 : {
1091 : Size offset;
1092 : uint64 read;
1093 :
1094 : /* Get bytes written, so we can compute what's available to read. */
1095 1541273 : written = pg_atomic_read_u64(&mq->mq_bytes_written);
1096 :
1097 : /*
1098 : * Get bytes read. Include bytes we could consume but have not yet
1099 : * consumed.
1100 : */
1101 1541273 : read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1102 1541273 : mqh->mqh_consume_pending;
1103 1541273 : used = written - read;
1104 : Assert(used <= ringsize);
1105 1541273 : offset = read % (uint64) ringsize;
1106 :
1107 : /* If we have enough data or buffer has wrapped, we're done. */
1108 1541273 : if (used >= bytes_needed || offset + used >= ringsize)
1109 : {
1110 1265501 : *nbytesp = Min(used, ringsize - offset);
1111 1265501 : *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1112 :
1113 : /*
1114 : * Separate the read of mq_bytes_written, above, from caller's
1115 : * attempt to read the data itself. Pairs with the barrier in
1116 : * shm_mq_inc_bytes_written.
1117 : */
1118 1265501 : pg_read_barrier();
1119 1265501 : return SHM_MQ_SUCCESS;
1120 : }
1121 :
1122 : /*
1123 : * Fall out before waiting if the queue has been detached.
1124 : *
1125 : * Note that we don't check for this until *after* considering whether
1126 : * the data already available is enough, since the receiver can finish
1127 : * receiving a message stored in the buffer even after the sender has
1128 : * detached.
1129 : */
1130 275772 : if (mq->mq_detached)
1131 : {
1132 : /*
1133 : * If the writer advanced mq_bytes_written and then set
1134 : * mq_detached, we might not have read the final value of
1135 : * mq_bytes_written above. Insert a read barrier and then check
1136 : * again if mq_bytes_written has advanced.
1137 : */
1138 1804 : pg_read_barrier();
1139 1804 : if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1140 0 : continue;
1141 :
1142 1804 : return SHM_MQ_DETACHED;
1143 : }
1144 :
1145 : /*
1146 : * We didn't get enough data to satisfy the request, so mark any data
1147 : * previously-consumed as read to make more buffer space.
1148 : */
1149 273968 : if (mqh->mqh_consume_pending > 0)
1150 : {
1151 109908 : shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
1152 109908 : mqh->mqh_consume_pending = 0;
1153 : }
1154 :
1155 : /* Skip manipulation of our latch if nowait = true. */
1156 273968 : if (nowait)
1157 145048 : return SHM_MQ_WOULD_BLOCK;
1158 :
1159 : /*
1160 : * Wait for our latch to be set. It might already be set for some
1161 : * unrelated reason, but that'll just result in one extra trip through
1162 : * the loop. It's worth it to avoid resetting the latch at top of
1163 : * loop, because setting an already-set latch is much cheaper than
1164 : * setting one that has been reset.
1165 : */
1166 128920 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1167 : WAIT_EVENT_MESSAGE_QUEUE_RECEIVE);
1168 :
1169 : /* Reset the latch so we don't spin. */
1170 128920 : ResetLatch(MyLatch);
1171 :
1172 : /* An interrupt may have occurred while we were waiting. */
1173 128920 : CHECK_FOR_INTERRUPTS();
1174 : }
1175 : }
1176 :
1177 : /*
1178 : * Test whether a counterparty who may not even be alive yet is definitely gone.
1179 : */
1180 : static bool
1181 3247602 : shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
1182 : {
1183 : pid_t pid;
1184 :
1185 : /* If the queue has been detached, counterparty is definitely gone. */
1186 3247602 : if (mq->mq_detached)
1187 1232 : return true;
1188 :
1189 : /* If there's a handle, check worker status. */
1190 3246370 : if (handle != NULL)
1191 : {
1192 : BgwHandleStatus status;
1193 :
1194 : /* Check for unexpected worker death. */
1195 3246347 : status = GetBackgroundWorkerPid(handle, &pid);
1196 3246347 : if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1197 : {
1198 : /* Mark it detached, just to make it official. */
1199 0 : mq->mq_detached = true;
1200 0 : return true;
1201 : }
1202 : }
1203 :
1204 : /* Counterparty is not definitively gone. */
1205 3246370 : return false;
1206 : }
1207 :
1208 : /*
1209 : * This is used when a process is waiting for its counterpart to attach to the
1210 : * queue. We exit when the other process attaches as expected, or, if
1211 : * handle != NULL, when the referenced background process or the postmaster
1212 : * dies. Note that if handle == NULL, and the process fails to attach, we'll
1213 : * potentially get stuck here forever waiting for a process that may never
1214 : * start. We do check for interrupts, though.
1215 : *
1216 : * ptr is a pointer to the memory address that we're expecting to become
1217 : * non-NULL when our counterpart attaches to the queue.
1218 : */
1219 : static bool
1220 269 : shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
1221 : {
1222 269 : bool result = false;
1223 :
1224 : for (;;)
1225 797 : {
1226 : BgwHandleStatus status;
1227 : pid_t pid;
1228 :
1229 : /* Acquire the lock just long enough to check the pointer. */
1230 1066 : SpinLockAcquire(&mq->mq_mutex);
1231 1066 : result = (*ptr != NULL);
1232 1066 : SpinLockRelease(&mq->mq_mutex);
1233 :
1234 : /* Fail if detached; else succeed if initialized. */
1235 1066 : if (mq->mq_detached)
1236 : {
1237 114 : result = false;
1238 114 : break;
1239 : }
1240 952 : if (result)
1241 155 : break;
1242 :
1243 797 : if (handle != NULL)
1244 : {
1245 : /* Check for unexpected worker death. */
1246 795 : status = GetBackgroundWorkerPid(handle, &pid);
1247 795 : if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1248 : {
1249 0 : result = false;
1250 0 : break;
1251 : }
1252 : }
1253 :
1254 : /* Wait to be signaled. */
1255 797 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1256 : WAIT_EVENT_MESSAGE_QUEUE_INTERNAL);
1257 :
1258 : /* Reset the latch so we don't spin. */
1259 797 : ResetLatch(MyLatch);
1260 :
1261 : /* An interrupt may have occurred while we were waiting. */
1262 797 : CHECK_FOR_INTERRUPTS();
1263 : }
1264 :
1265 269 : return result;
1266 : }
1267 :
1268 : /*
1269 : * Increment the number of bytes read.
1270 : */
1271 : static void
1272 132629 : shm_mq_inc_bytes_read(shm_mq *mq, Size n)
1273 : {
1274 : PGPROC *sender;
1275 :
1276 : /*
1277 : * Separate prior reads of mq_ring from the increment of mq_bytes_read
1278 : * which follows. This pairs with the full barrier in
1279 : * shm_mq_send_bytes(). We only need a read barrier here because the
1280 : * increment of mq_bytes_read is actually a read followed by a dependent
1281 : * write.
1282 : */
1283 132629 : pg_read_barrier();
1284 :
1285 : /*
1286 : * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1287 : * else can be changing this value. This method should be cheaper.
1288 : */
1289 132629 : pg_atomic_write_u64(&mq->mq_bytes_read,
1290 132629 : pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1291 :
1292 : /*
1293 : * We shouldn't have any bytes to read without a sender, so we can read
1294 : * mq_sender here without a lock. Once it's initialized, it can't change.
1295 : */
1296 132629 : sender = mq->mq_sender;
1297 : Assert(sender != NULL);
1298 132629 : SetLatch(&sender->procLatch);
1299 132629 : }
1300 :
1301 : /*
1302 : * Increment the number of bytes written.
1303 : */
1304 : static void
1305 219902 : shm_mq_inc_bytes_written(shm_mq *mq, Size n)
1306 : {
1307 : /*
1308 : * Separate prior reads of mq_ring from the write of mq_bytes_written
1309 : * which we're about to do. Pairs with the read barrier found in
1310 : * shm_mq_receive_bytes.
1311 : */
1312 219902 : pg_write_barrier();
1313 :
1314 : /*
1315 : * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1316 : * else can be changing this value. This method avoids taking the bus
1317 : * lock unnecessarily.
1318 : */
1319 219902 : pg_atomic_write_u64(&mq->mq_bytes_written,
1320 219902 : pg_atomic_read_u64(&mq->mq_bytes_written) + n);
1321 219902 : }
1322 :
1323 : /* Shim for on_dsm_detach callback. */
1324 : static void
1325 2134 : shm_mq_detach_callback(dsm_segment *seg, Datum arg)
1326 : {
1327 2134 : shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1328 :
1329 2134 : shm_mq_detach_internal(mq);
1330 2134 : }
|