Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * shm_mq.c
4 : * single-reader, single-writer shared memory message queue
5 : *
6 : * Both the sender and the receiver must have a PGPROC; their respective
7 : * process latches are used for synchronization. Only the sender may send,
8 : * and only the receiver may receive. This is intended to allow a user
9 : * backend to communicate with worker backends that it has registered.
10 : *
11 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
12 : * Portions Copyright (c) 1994, Regents of the University of California
13 : *
14 : * src/backend/storage/ipc/shm_mq.c
15 : *
16 : *-------------------------------------------------------------------------
17 : */
18 :
19 : #include "postgres.h"
20 :
21 : #include "miscadmin.h"
22 : #include "pgstat.h"
23 : #include "port/pg_bitutils.h"
24 : #include "postmaster/bgworker.h"
25 : #include "storage/shm_mq.h"
26 : #include "storage/spin.h"
27 : #include "utils/memutils.h"
28 :
29 : /*
30 : * This structure represents the actual queue, stored in shared memory.
31 : *
32 : * Some notes on synchronization:
33 : *
34 : * mq_receiver and mq_bytes_read can only be changed by the receiver; and
35 : * mq_sender and mq_bytes_written can only be changed by the sender.
36 : * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
37 : * they cannot change once set, and thus may be read without a lock once this
38 : * is known to be the case.
39 : *
40 : * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
41 : * they are written atomically using 8 byte loads and stores. Memory barriers
42 : * must be carefully used to synchronize reads and writes of these values with
43 : * reads and writes of the actual data in mq_ring.
44 : *
45 : * mq_detached needs no locking. It can be set by either the sender or the
46 : * receiver, but only ever from false to true, so redundant writes don't
47 : * matter. It is important that if we set mq_detached and then set the
48 : * counterparty's latch, the counterparty must be certain to see the change
49 : * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
50 : * ends with one, this should be OK.
51 : *
52 : * mq_ring_size and mq_ring_offset never change after initialization, and
53 : * can therefore be read without the lock.
54 : *
55 : * Importantly, mq_ring can be safely read and written without a lock.
56 : * At any given time, the difference between mq_bytes_read and
57 : * mq_bytes_written defines the number of bytes within mq_ring that contain
58 : * unread data, and mq_bytes_read defines the position where those bytes
59 : * begin. The sender can increase the number of unread bytes at any time,
60 : * but only the receiver can give license to overwrite those bytes, by
61 : * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
62 : * the unread bytes it knows to be present without the lock. Conversely,
63 : * the sender can write to the unused portion of the ring buffer without
64 : * the lock, because nobody else can be reading or writing those bytes. The
65 : * receiver could be making more bytes unused by incrementing mq_bytes_read,
66 : * but that's OK. Note that it would be unsafe for the receiver to read any
67 : * data it's already marked as read, or to write any data; and it would be
68 : * unsafe for the sender to reread any data after incrementing
69 : * mq_bytes_written, but fortunately there's no need for any of that.
70 : */
71 : struct shm_mq
72 : {
73 : slock_t mq_mutex;
74 : PGPROC *mq_receiver;
75 : PGPROC *mq_sender;
76 : pg_atomic_uint64 mq_bytes_read;
77 : pg_atomic_uint64 mq_bytes_written;
78 : Size mq_ring_size;
79 : bool mq_detached;
80 : uint8 mq_ring_offset;
81 : char mq_ring[FLEXIBLE_ARRAY_MEMBER];
82 : };
83 :
84 : /*
85 : * This structure is a backend-private handle for access to a queue.
86 : *
87 : * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
88 : * an optional pointer to the dynamic shared memory segment that contains it.
89 : * (If mqh_segment is provided, we register an on_dsm_detach callback to
90 : * make sure we detach from the queue before detaching from DSM.)
91 : *
92 : * If this queue is intended to connect the current process with a background
93 : * worker that started it, the user can pass a pointer to the worker handle
94 : * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
95 : * is to allow us to begin sending to or receiving from that queue before the
96 : * process we'll be communicating with has even been started. If it fails
97 : * to start, the handle will allow us to notice that and fail cleanly, rather
98 : * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
99 : * simple cases - e.g. where there are just 2 processes communicating; in
100 : * more complex scenarios, every process may not have a BackgroundWorkerHandle
101 : * available, or may need to watch for the failure of more than one other
102 : * process at a time.
103 : *
104 : * When a message exists as a contiguous chunk of bytes in the queue - that is,
105 : * it is smaller than the size of the ring buffer and does not wrap around
106 : * the end - we return the message to the caller as a pointer into the buffer.
107 : * For messages that are larger or happen to wrap, we reassemble the message
108 : * locally by copying the chunks into a backend-local buffer. mqh_buffer is
109 : * the buffer, and mqh_buflen is the number of bytes allocated for it.
110 : *
111 : * mqh_send_pending, is number of bytes that is written to the queue but not
112 : * yet updated in the shared memory. We will not update it until the written
113 : * data is 1/4th of the ring size or the tuple queue is full. This will
114 : * prevent frequent CPU cache misses, and it will also avoid frequent
115 : * SetLatch() calls, which are quite expensive.
116 : *
117 : * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
118 : * are used to track the state of non-blocking operations. When the caller
119 : * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
120 : * are expected to retry the call at a later time with the same argument;
121 : * we need to retain enough state to pick up where we left off.
122 : * mqh_length_word_complete tracks whether we are done sending or receiving
123 : * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
124 : * the number of bytes read or written for either the length word or the
125 : * message itself, and mqh_expected_bytes - which is used only for reads -
126 : * tracks the expected total size of the payload.
127 : *
128 : * mqh_counterparty_attached tracks whether we know the counterparty to have
129 : * attached to the queue at some previous point. This lets us avoid some
130 : * mutex acquisitions.
131 : *
132 : * mqh_context is the memory context in effect at the time we attached to
133 : * the shm_mq. The shm_mq_handle itself is allocated in this context, and
134 : * we make sure any other allocations we do happen in this context as well,
135 : * to avoid nasty surprises.
136 : */
137 : struct shm_mq_handle
138 : {
139 : shm_mq *mqh_queue;
140 : dsm_segment *mqh_segment;
141 : BackgroundWorkerHandle *mqh_handle;
142 : char *mqh_buffer;
143 : Size mqh_buflen;
144 : Size mqh_consume_pending;
145 : Size mqh_send_pending;
146 : Size mqh_partial_bytes;
147 : Size mqh_expected_bytes;
148 : bool mqh_length_word_complete;
149 : bool mqh_counterparty_attached;
150 : MemoryContext mqh_context;
151 : };
152 :
153 : static void shm_mq_detach_internal(shm_mq *mq);
154 : static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
155 : const void *data, bool nowait, Size *bytes_written);
156 : static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
157 : Size bytes_needed, bool nowait, Size *nbytesp,
158 : void **datap);
159 : static bool shm_mq_counterparty_gone(shm_mq *mq,
160 : BackgroundWorkerHandle *handle);
161 : static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
162 : BackgroundWorkerHandle *handle);
163 : static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
164 : static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
165 : static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
166 :
167 : /* Minimum queue size is enough for header and at least one chunk of data. */
168 : const Size shm_mq_minimum_size =
169 : MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
170 :
171 : #define MQH_INITIAL_BUFSIZE 8192
172 :
173 : /*
174 : * Initialize a new shared message queue.
175 : */
176 : shm_mq *
177 5464 : shm_mq_create(void *address, Size size)
178 : {
179 5464 : shm_mq *mq = address;
180 5464 : Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
181 :
182 : /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
183 5464 : size = MAXALIGN_DOWN(size);
184 :
185 : /* Queue size must be large enough to hold some data. */
186 : Assert(size > data_offset);
187 :
188 : /* Initialize queue header. */
189 5464 : SpinLockInit(&mq->mq_mutex);
190 5464 : mq->mq_receiver = NULL;
191 5464 : mq->mq_sender = NULL;
192 5464 : pg_atomic_init_u64(&mq->mq_bytes_read, 0);
193 5464 : pg_atomic_init_u64(&mq->mq_bytes_written, 0);
194 5464 : mq->mq_ring_size = size - data_offset;
195 5464 : mq->mq_detached = false;
196 5464 : mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
197 :
198 5464 : return mq;
199 : }
200 :
201 : /*
202 : * Set the identity of the process that will receive from a shared message
203 : * queue.
204 : */
205 : void
206 5464 : shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
207 : {
208 : PGPROC *sender;
209 :
210 5464 : SpinLockAcquire(&mq->mq_mutex);
211 : Assert(mq->mq_receiver == NULL);
212 5464 : mq->mq_receiver = proc;
213 5464 : sender = mq->mq_sender;
214 5464 : SpinLockRelease(&mq->mq_mutex);
215 :
216 5464 : if (sender != NULL)
217 34 : SetLatch(&sender->procLatch);
218 5464 : }
219 :
220 : /*
221 : * Set the identity of the process that will send to a shared message queue.
222 : */
223 : void
224 5298 : shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
225 : {
226 : PGPROC *receiver;
227 :
228 5298 : SpinLockAcquire(&mq->mq_mutex);
229 : Assert(mq->mq_sender == NULL);
230 5298 : mq->mq_sender = proc;
231 5298 : receiver = mq->mq_receiver;
232 5298 : SpinLockRelease(&mq->mq_mutex);
233 :
234 5298 : if (receiver != NULL)
235 5264 : SetLatch(&receiver->procLatch);
236 5298 : }
237 :
238 : /*
239 : * Get the configured receiver.
240 : */
241 : PGPROC *
242 8 : shm_mq_get_receiver(shm_mq *mq)
243 : {
244 : PGPROC *receiver;
245 :
246 8 : SpinLockAcquire(&mq->mq_mutex);
247 8 : receiver = mq->mq_receiver;
248 8 : SpinLockRelease(&mq->mq_mutex);
249 :
250 8 : return receiver;
251 : }
252 :
253 : /*
254 : * Get the configured sender.
255 : */
256 : PGPROC *
257 15283704 : shm_mq_get_sender(shm_mq *mq)
258 : {
259 : PGPROC *sender;
260 :
261 15283704 : SpinLockAcquire(&mq->mq_mutex);
262 15283704 : sender = mq->mq_sender;
263 15283704 : SpinLockRelease(&mq->mq_mutex);
264 :
265 15283704 : return sender;
266 : }
267 :
268 : /*
269 : * Attach to a shared message queue so we can send or receive messages.
270 : *
271 : * The memory context in effect at the time this function is called should
272 : * be one which will last for at least as long as the message queue itself.
273 : * We'll allocate the handle in that context, and future allocations that
274 : * are needed to buffer incoming data will happen in that context as well.
275 : *
276 : * If seg != NULL, the queue will be automatically detached when that dynamic
277 : * shared memory segment is detached.
278 : *
279 : * If handle != NULL, the queue can be read or written even before the
280 : * other process has attached. We'll wait for it to do so if needed. The
281 : * handle must be for a background worker initialized with bgw_notify_pid
282 : * equal to our PID.
283 : *
284 : * shm_mq_detach() should be called when done. This will free the
285 : * shm_mq_handle and mark the queue itself as detached, so that our
286 : * counterpart won't get stuck waiting for us to fill or drain the queue
287 : * after we've already lost interest.
288 : */
289 : shm_mq_handle *
290 10762 : shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
291 : {
292 10762 : shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
293 :
294 : Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
295 10762 : mqh->mqh_queue = mq;
296 10762 : mqh->mqh_segment = seg;
297 10762 : mqh->mqh_handle = handle;
298 10762 : mqh->mqh_buffer = NULL;
299 10762 : mqh->mqh_buflen = 0;
300 10762 : mqh->mqh_consume_pending = 0;
301 10762 : mqh->mqh_send_pending = 0;
302 10762 : mqh->mqh_partial_bytes = 0;
303 10762 : mqh->mqh_expected_bytes = 0;
304 10762 : mqh->mqh_length_word_complete = false;
305 10762 : mqh->mqh_counterparty_attached = false;
306 10762 : mqh->mqh_context = CurrentMemoryContext;
307 :
308 10762 : if (seg != NULL)
309 10762 : on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
310 :
311 10762 : return mqh;
312 : }
313 :
314 : /*
315 : * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
316 : * been passed to shm_mq_attach.
317 : */
318 : void
319 5234 : shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
320 : {
321 : Assert(mqh->mqh_handle == NULL);
322 5234 : mqh->mqh_handle = handle;
323 5234 : }
324 :
325 : /*
326 : * Write a message into a shared message queue.
327 : */
328 : shm_mq_result
329 1728314 : shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
330 : bool force_flush)
331 : {
332 : shm_mq_iovec iov;
333 :
334 1728314 : iov.data = data;
335 1728314 : iov.len = nbytes;
336 :
337 1728314 : return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
338 : }
339 :
340 : /*
341 : * Write a message into a shared message queue, gathered from multiple
342 : * addresses.
343 : *
344 : * When nowait = false, we'll wait on our process latch when the ring buffer
345 : * fills up, and then continue writing once the receiver has drained some data.
346 : * The process latch is reset after each wait.
347 : *
348 : * When nowait = true, we do not manipulate the state of the process latch;
349 : * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
350 : * this case, the caller should call this function again, with the same
351 : * arguments, each time the process latch is set. (Once begun, the sending
352 : * of a message cannot be aborted except by detaching from the queue; changing
353 : * the length or payload will corrupt the queue.)
354 : *
355 : * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
356 : * and notify the receiver (if it is already attached). Otherwise, we don't
357 : * update it until we have written an amount of data greater than 1/4th of the
358 : * ring size.
359 : */
360 : shm_mq_result
361 1731050 : shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
362 : bool force_flush)
363 : {
364 : shm_mq_result res;
365 1731050 : shm_mq *mq = mqh->mqh_queue;
366 : PGPROC *receiver;
367 1731050 : Size nbytes = 0;
368 : Size bytes_written;
369 : int i;
370 1731050 : int which_iov = 0;
371 : Size offset;
372 :
373 : Assert(mq->mq_sender == MyProc);
374 :
375 : /* Compute total size of write. */
376 3464836 : for (i = 0; i < iovcnt; ++i)
377 1733786 : nbytes += iov[i].len;
378 :
379 : /* Prevent writing messages overwhelming the receiver. */
380 1731050 : if (nbytes > MaxAllocSize)
381 0 : ereport(ERROR,
382 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
383 : errmsg("cannot send a message of size %zu via shared memory queue",
384 : nbytes)));
385 :
386 : /* Try to write, or finish writing, the length word into the buffer. */
387 3450514 : while (!mqh->mqh_length_word_complete)
388 : {
389 : Assert(mqh->mqh_partial_bytes < sizeof(Size));
390 1719470 : res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
391 1719470 : ((char *) &nbytes) + mqh->mqh_partial_bytes,
392 : nowait, &bytes_written);
393 :
394 1719470 : if (res == SHM_MQ_DETACHED)
395 : {
396 : /* Reset state in case caller tries to send another message. */
397 6 : mqh->mqh_partial_bytes = 0;
398 6 : mqh->mqh_length_word_complete = false;
399 6 : return res;
400 : }
401 1719464 : mqh->mqh_partial_bytes += bytes_written;
402 :
403 1719464 : if (mqh->mqh_partial_bytes >= sizeof(Size))
404 : {
405 : Assert(mqh->mqh_partial_bytes == sizeof(Size));
406 :
407 1719464 : mqh->mqh_partial_bytes = 0;
408 1719464 : mqh->mqh_length_word_complete = true;
409 : }
410 :
411 1719464 : if (res != SHM_MQ_SUCCESS)
412 0 : return res;
413 :
414 : /* Length word can't be split unless bigger than required alignment. */
415 : Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
416 : }
417 :
418 : /* Write the actual data bytes into the buffer. */
419 : Assert(mqh->mqh_partial_bytes <= nbytes);
420 1731044 : offset = mqh->mqh_partial_bytes;
421 : do
422 : {
423 : Size chunksize;
424 :
425 : /* Figure out which bytes need to be sent next. */
426 1731072 : if (offset >= iov[which_iov].len)
427 : {
428 8012 : offset -= iov[which_iov].len;
429 8012 : ++which_iov;
430 8012 : if (which_iov >= iovcnt)
431 8000 : break;
432 12 : continue;
433 : }
434 :
435 : /*
436 : * We want to avoid copying the data if at all possible, but every
437 : * chunk of bytes we write into the queue has to be MAXALIGN'd, except
438 : * the last. Thus, if a chunk other than the last one ends on a
439 : * non-MAXALIGN'd boundary, we have to combine the tail end of its
440 : * data with data from one or more following chunks until we either
441 : * reach the last chunk or accumulate a number of bytes which is
442 : * MAXALIGN'd.
443 : */
444 1723060 : if (which_iov + 1 < iovcnt &&
445 2718 : offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
446 : {
447 : char tmpbuf[MAXIMUM_ALIGNOF];
448 2718 : int j = 0;
449 :
450 : for (;;)
451 : {
452 8250 : if (offset < iov[which_iov].len)
453 : {
454 2830 : tmpbuf[j] = iov[which_iov].data[offset];
455 2830 : j++;
456 2830 : offset++;
457 2830 : if (j == MAXIMUM_ALIGNOF)
458 16 : break;
459 : }
460 : else
461 : {
462 5420 : offset -= iov[which_iov].len;
463 5420 : which_iov++;
464 5420 : if (which_iov >= iovcnt)
465 2702 : break;
466 : }
467 : }
468 :
469 2718 : res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
470 :
471 2718 : if (res == SHM_MQ_DETACHED)
472 : {
473 : /* Reset state in case caller tries to send another message. */
474 0 : mqh->mqh_partial_bytes = 0;
475 0 : mqh->mqh_length_word_complete = false;
476 0 : return res;
477 : }
478 :
479 2718 : mqh->mqh_partial_bytes += bytes_written;
480 2718 : if (res != SHM_MQ_SUCCESS)
481 0 : return res;
482 2718 : continue;
483 : }
484 :
485 : /*
486 : * If this is the last chunk, we can write all the data, even if it
487 : * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
488 : * MAXALIGN_DOWN the write size.
489 : */
490 1720342 : chunksize = iov[which_iov].len - offset;
491 1720342 : if (which_iov + 1 < iovcnt)
492 0 : chunksize = MAXALIGN_DOWN(chunksize);
493 1720342 : res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
494 : nowait, &bytes_written);
495 :
496 1720342 : if (res == SHM_MQ_DETACHED)
497 : {
498 : /* Reset state in case caller tries to send another message. */
499 0 : mqh->mqh_length_word_complete = false;
500 0 : mqh->mqh_partial_bytes = 0;
501 0 : return res;
502 : }
503 :
504 1720342 : mqh->mqh_partial_bytes += bytes_written;
505 1720342 : offset += bytes_written;
506 1720342 : if (res != SHM_MQ_SUCCESS)
507 11580 : return res;
508 1711492 : } while (mqh->mqh_partial_bytes < nbytes);
509 :
510 : /* Reset for next message. */
511 1719464 : mqh->mqh_partial_bytes = 0;
512 1719464 : mqh->mqh_length_word_complete = false;
513 :
514 : /* If queue has been detached, let caller know. */
515 1719464 : if (mq->mq_detached)
516 0 : return SHM_MQ_DETACHED;
517 :
518 : /*
519 : * If the counterparty is known to have attached, we can read mq_receiver
520 : * without acquiring the spinlock. Otherwise, more caution is needed.
521 : */
522 1719464 : if (mqh->mqh_counterparty_attached)
523 1715456 : receiver = mq->mq_receiver;
524 : else
525 : {
526 4008 : SpinLockAcquire(&mq->mq_mutex);
527 4008 : receiver = mq->mq_receiver;
528 4008 : SpinLockRelease(&mq->mq_mutex);
529 4008 : if (receiver != NULL)
530 4008 : mqh->mqh_counterparty_attached = true;
531 : }
532 :
533 : /*
534 : * If the caller has requested force flush or we have written more than
535 : * 1/4 of the ring size, mark it as written in shared memory and notify
536 : * the receiver.
537 : */
538 1719464 : if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
539 : {
540 240714 : shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
541 240714 : if (receiver != NULL)
542 240714 : SetLatch(&receiver->procLatch);
543 240714 : mqh->mqh_send_pending = 0;
544 : }
545 :
546 1719464 : return SHM_MQ_SUCCESS;
547 : }
548 :
549 : /*
550 : * Receive a message from a shared message queue.
551 : *
552 : * We set *nbytes to the message length and *data to point to the message
553 : * payload. If the entire message exists in the queue as a single,
554 : * contiguous chunk, *data will point directly into shared memory; otherwise,
555 : * it will point to a temporary buffer. This mostly avoids data copying in
556 : * the hoped-for case where messages are short compared to the buffer size,
557 : * while still allowing longer messages. In either case, the return value
558 : * remains valid until the next receive operation is performed on the queue.
559 : *
560 : * When nowait = false, we'll wait on our process latch when the ring buffer
561 : * is empty and we have not yet received a full message. The sender will
562 : * set our process latch after more data has been written, and we'll resume
563 : * processing. Each call will therefore return a complete message
564 : * (unless the sender detaches the queue).
565 : *
566 : * When nowait = true, we do not manipulate the state of the process latch;
567 : * instead, whenever the buffer is empty and we need to read from it, we
568 : * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
569 : * function again after the process latch has been set.
570 : */
571 : shm_mq_result
572 6995794 : shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
573 : {
574 6995794 : shm_mq *mq = mqh->mqh_queue;
575 : shm_mq_result res;
576 6995794 : Size rb = 0;
577 : Size nbytes;
578 : void *rawdata;
579 :
580 : Assert(mq->mq_receiver == MyProc);
581 :
582 : /* We can't receive data until the sender has attached. */
583 6995794 : if (!mqh->mqh_counterparty_attached)
584 : {
585 5159862 : if (nowait)
586 : {
587 : int counterparty_gone;
588 :
589 : /*
590 : * We shouldn't return at this point at all unless the sender
591 : * hasn't attached yet. However, the correct return value depends
592 : * on whether the sender is still attached. If we first test
593 : * whether the sender has ever attached and then test whether the
594 : * sender has detached, there's a race condition: a sender that
595 : * attaches and detaches very quickly might fool us into thinking
596 : * the sender never attached at all. So, test whether our
597 : * counterparty is definitively gone first, and only afterwards
598 : * check whether the sender ever attached in the first place.
599 : */
600 5159444 : counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
601 5159444 : if (shm_mq_get_sender(mq) == NULL)
602 : {
603 5154580 : if (counterparty_gone)
604 0 : return SHM_MQ_DETACHED;
605 : else
606 5154580 : return SHM_MQ_WOULD_BLOCK;
607 : }
608 : }
609 418 : else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
610 232 : && shm_mq_get_sender(mq) == NULL)
611 : {
612 0 : mq->mq_detached = true;
613 0 : return SHM_MQ_DETACHED;
614 : }
615 5282 : mqh->mqh_counterparty_attached = true;
616 : }
617 :
618 : /*
619 : * If we've consumed an amount of data greater than 1/4th of the ring
620 : * size, mark it consumed in shared memory. We try to avoid doing this
621 : * unnecessarily when only a small amount of data has been consumed,
622 : * because SetLatch() is fairly expensive and we don't want to do it too
623 : * often.
624 : */
625 1841214 : if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
626 : {
627 37780 : shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
628 37780 : mqh->mqh_consume_pending = 0;
629 : }
630 :
631 : /* Try to read, or finish reading, the length word from the buffer. */
632 1884370 : while (!mqh->mqh_length_word_complete)
633 : {
634 : /* Try to receive the message length word. */
635 : Assert(mqh->mqh_partial_bytes < sizeof(Size));
636 1829562 : res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
637 : nowait, &rb, &rawdata);
638 1829562 : if (res != SHM_MQ_SUCCESS)
639 119082 : return res;
640 :
641 : /*
642 : * Hopefully, we'll receive the entire message length word at once.
643 : * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
644 : * multiple reads.
645 : */
646 1710480 : if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
647 43156 : {
648 : Size needed;
649 :
650 1710480 : nbytes = *(Size *) rawdata;
651 :
652 : /* If we've already got the whole message, we're done. */
653 1710480 : needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
654 1710480 : if (rb >= needed)
655 : {
656 1667324 : mqh->mqh_consume_pending += needed;
657 1667324 : *nbytesp = nbytes;
658 1667324 : *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
659 1667324 : return SHM_MQ_SUCCESS;
660 : }
661 :
662 : /*
663 : * We don't have the whole message, but we at least have the whole
664 : * length word.
665 : */
666 43156 : mqh->mqh_expected_bytes = nbytes;
667 43156 : mqh->mqh_length_word_complete = true;
668 43156 : mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
669 43156 : rb -= MAXALIGN(sizeof(Size));
670 : }
671 : else
672 : {
673 : Size lengthbytes;
674 :
675 : /* Can't be split unless bigger than required alignment. */
676 : Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
677 :
678 : /* Message word is split; need buffer to reassemble. */
679 0 : if (mqh->mqh_buffer == NULL)
680 : {
681 0 : mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
682 : MQH_INITIAL_BUFSIZE);
683 0 : mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
684 : }
685 : Assert(mqh->mqh_buflen >= sizeof(Size));
686 :
687 : /* Copy partial length word; remember to consume it. */
688 0 : if (mqh->mqh_partial_bytes + rb > sizeof(Size))
689 0 : lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
690 : else
691 0 : lengthbytes = rb;
692 0 : memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
693 : lengthbytes);
694 0 : mqh->mqh_partial_bytes += lengthbytes;
695 0 : mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
696 0 : rb -= lengthbytes;
697 :
698 : /* If we now have the whole word, we're ready to read payload. */
699 0 : if (mqh->mqh_partial_bytes >= sizeof(Size))
700 : {
701 : Assert(mqh->mqh_partial_bytes == sizeof(Size));
702 0 : mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
703 0 : mqh->mqh_length_word_complete = true;
704 0 : mqh->mqh_partial_bytes = 0;
705 : }
706 : }
707 : }
708 54808 : nbytes = mqh->mqh_expected_bytes;
709 :
710 : /*
711 : * Should be disallowed on the sending side already, but better check and
712 : * error out on the receiver side as well rather than trying to read a
713 : * prohibitively large message.
714 : */
715 54808 : if (nbytes > MaxAllocSize)
716 0 : ereport(ERROR,
717 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
718 : errmsg("invalid message size %zu in shared memory queue",
719 : nbytes)));
720 :
721 54808 : if (mqh->mqh_partial_bytes == 0)
722 : {
723 : /*
724 : * Try to obtain the whole message in a single chunk. If this works,
725 : * we need not copy the data and can return a pointer directly into
726 : * shared memory.
727 : */
728 43356 : res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
729 43356 : if (res != SHM_MQ_SUCCESS)
730 200 : return res;
731 43156 : if (rb >= nbytes)
732 : {
733 332 : mqh->mqh_length_word_complete = false;
734 332 : mqh->mqh_consume_pending += MAXALIGN(nbytes);
735 332 : *nbytesp = nbytes;
736 332 : *datap = rawdata;
737 332 : return SHM_MQ_SUCCESS;
738 : }
739 :
740 : /*
741 : * The message has wrapped the buffer. We'll need to copy it in order
742 : * to return it to the client in one chunk. First, make sure we have
743 : * a large enough buffer available.
744 : */
745 42824 : if (mqh->mqh_buflen < nbytes)
746 : {
747 : Size newbuflen;
748 :
749 : /*
750 : * Increase size to the next power of 2 that's >= nbytes, but
751 : * limit to MaxAllocSize.
752 : */
753 232 : newbuflen = pg_nextpower2_size_t(nbytes);
754 232 : newbuflen = Min(newbuflen, MaxAllocSize);
755 :
756 232 : if (mqh->mqh_buffer != NULL)
757 : {
758 0 : pfree(mqh->mqh_buffer);
759 0 : mqh->mqh_buffer = NULL;
760 0 : mqh->mqh_buflen = 0;
761 : }
762 232 : mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
763 232 : mqh->mqh_buflen = newbuflen;
764 : }
765 : }
766 :
767 : /* Loop until we've copied the entire message. */
768 : for (;;)
769 203680 : {
770 : Size still_needed;
771 :
772 : /* Copy as much as we can. */
773 : Assert(mqh->mqh_partial_bytes + rb <= nbytes);
774 257956 : if (rb > 0)
775 : {
776 246504 : memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
777 246504 : mqh->mqh_partial_bytes += rb;
778 : }
779 :
780 : /*
781 : * Update count of bytes that can be consumed, accounting for
782 : * alignment padding. Note that this will never actually insert any
783 : * padding except at the end of a message, because the buffer size is
784 : * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
785 : */
786 : Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
787 257956 : mqh->mqh_consume_pending += MAXALIGN(rb);
788 :
789 : /* If we got all the data, exit the loop. */
790 257956 : if (mqh->mqh_partial_bytes >= nbytes)
791 42824 : break;
792 :
793 : /* Wait for some more data. */
794 215132 : still_needed = nbytes - mqh->mqh_partial_bytes;
795 215132 : res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
796 215132 : if (res != SHM_MQ_SUCCESS)
797 11452 : return res;
798 203680 : if (rb > still_needed)
799 41632 : rb = still_needed;
800 : }
801 :
802 : /* Return the complete message, and reset for next message. */
803 42824 : *nbytesp = nbytes;
804 42824 : *datap = mqh->mqh_buffer;
805 42824 : mqh->mqh_length_word_complete = false;
806 42824 : mqh->mqh_partial_bytes = 0;
807 42824 : return SHM_MQ_SUCCESS;
808 : }
809 :
810 : /*
811 : * Wait for the other process that's supposed to use this queue to attach
812 : * to it.
813 : *
814 : * The return value is SHM_MQ_DETACHED if the worker has already detached or
815 : * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
816 : * Note that we will only be able to detect that the worker has died before
817 : * attaching if a background worker handle was passed to shm_mq_attach().
818 : */
819 : shm_mq_result
820 0 : shm_mq_wait_for_attach(shm_mq_handle *mqh)
821 : {
822 0 : shm_mq *mq = mqh->mqh_queue;
823 : PGPROC **victim;
824 :
825 0 : if (shm_mq_get_receiver(mq) == MyProc)
826 0 : victim = &mq->mq_sender;
827 : else
828 : {
829 : Assert(shm_mq_get_sender(mq) == MyProc);
830 0 : victim = &mq->mq_receiver;
831 : }
832 :
833 0 : if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
834 0 : return SHM_MQ_SUCCESS;
835 : else
836 0 : return SHM_MQ_DETACHED;
837 : }
838 :
839 : /*
840 : * Detach from a shared message queue, and destroy the shm_mq_handle.
841 : */
842 : void
843 7846 : shm_mq_detach(shm_mq_handle *mqh)
844 : {
845 : /* Before detaching, notify the receiver about any already-written data. */
846 7846 : if (mqh->mqh_send_pending > 0)
847 : {
848 1266 : shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
849 1266 : mqh->mqh_send_pending = 0;
850 : }
851 :
852 : /* Notify counterparty that we're outta here. */
853 7846 : shm_mq_detach_internal(mqh->mqh_queue);
854 :
855 : /* Cancel on_dsm_detach callback, if any. */
856 7846 : if (mqh->mqh_segment)
857 7846 : cancel_on_dsm_detach(mqh->mqh_segment,
858 : shm_mq_detach_callback,
859 7846 : PointerGetDatum(mqh->mqh_queue));
860 :
861 : /* Release local memory associated with handle. */
862 7846 : if (mqh->mqh_buffer != NULL)
863 216 : pfree(mqh->mqh_buffer);
864 7846 : pfree(mqh);
865 7846 : }
866 :
867 : /*
868 : * Notify counterparty that we're detaching from shared message queue.
869 : *
870 : * The purpose of this function is to make sure that the process
871 : * with which we're communicating doesn't block forever waiting for us to
872 : * fill or drain the queue once we've lost interest. When the sender
873 : * detaches, the receiver can read any messages remaining in the queue;
874 : * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
875 : * further attempts to send messages will likewise return SHM_MQ_DETACHED.
876 : *
877 : * This is separated out from shm_mq_detach() because if the on_dsm_detach
878 : * callback fires, we only want to do this much. We do not try to touch
879 : * the local shm_mq_handle, as it may have been pfree'd already.
880 : */
881 : static void
882 10762 : shm_mq_detach_internal(shm_mq *mq)
883 : {
884 : PGPROC *victim;
885 :
886 10762 : SpinLockAcquire(&mq->mq_mutex);
887 10762 : if (mq->mq_sender == MyProc)
888 5298 : victim = mq->mq_receiver;
889 : else
890 : {
891 : Assert(mq->mq_receiver == MyProc);
892 5464 : victim = mq->mq_sender;
893 : }
894 10762 : mq->mq_detached = true;
895 10762 : SpinLockRelease(&mq->mq_mutex);
896 :
897 10762 : if (victim != NULL)
898 10602 : SetLatch(&victim->procLatch);
899 10762 : }
900 :
901 : /*
902 : * Get the shm_mq from handle.
903 : */
904 : shm_mq *
905 10124028 : shm_mq_get_queue(shm_mq_handle *mqh)
906 : {
907 10124028 : return mqh->mqh_queue;
908 : }
909 :
910 : /*
911 : * Write bytes into a shared message queue.
912 : */
913 : static shm_mq_result
914 3442530 : shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
915 : bool nowait, Size *bytes_written)
916 : {
917 3442530 : shm_mq *mq = mqh->mqh_queue;
918 3442530 : Size sent = 0;
919 : uint64 used;
920 3442530 : Size ringsize = mq->mq_ring_size;
921 : Size available;
922 :
923 7320354 : while (sent < nbytes)
924 : {
925 : uint64 rb;
926 : uint64 wb;
927 :
928 : /* Compute number of ring buffer bytes used and available. */
929 3889410 : rb = pg_atomic_read_u64(&mq->mq_bytes_read);
930 3889410 : wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
931 : Assert(wb >= rb);
932 3889410 : used = wb - rb;
933 : Assert(used <= ringsize);
934 3889410 : available = Min(ringsize - used, nbytes - sent);
935 :
936 : /*
937 : * Bail out if the queue has been detached. Note that we would be in
938 : * trouble if the compiler decided to cache the value of
939 : * mq->mq_detached in a register or on the stack across loop
940 : * iterations. It probably shouldn't do that anyway since we'll
941 : * always return, call an external function that performs a system
942 : * call, or reach a memory barrier at some point later in the loop,
943 : * but just to be sure, insert a compiler barrier here.
944 : */
945 3889410 : pg_compiler_barrier();
946 3889410 : if (mq->mq_detached)
947 : {
948 6 : *bytes_written = sent;
949 6 : return SHM_MQ_DETACHED;
950 : }
951 :
952 3889404 : if (available == 0 && !mqh->mqh_counterparty_attached)
953 : {
954 : /*
955 : * The queue is full, so if the receiver isn't yet known to be
956 : * attached, we must wait for that to happen.
957 : */
958 18 : if (nowait)
959 : {
960 8 : if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
961 : {
962 0 : *bytes_written = sent;
963 0 : return SHM_MQ_DETACHED;
964 : }
965 8 : if (shm_mq_get_receiver(mq) == NULL)
966 : {
967 0 : *bytes_written = sent;
968 0 : return SHM_MQ_WOULD_BLOCK;
969 : }
970 : }
971 10 : else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
972 : mqh->mqh_handle))
973 : {
974 0 : mq->mq_detached = true;
975 0 : *bytes_written = sent;
976 0 : return SHM_MQ_DETACHED;
977 : }
978 18 : mqh->mqh_counterparty_attached = true;
979 :
980 : /*
981 : * The receiver may have read some data after attaching, so we
982 : * must not wait without rechecking the queue state.
983 : */
984 : }
985 3889386 : else if (available == 0)
986 : {
987 : /* Update the pending send bytes in the shared memory. */
988 224380 : shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
989 :
990 : /*
991 : * Since mq->mqh_counterparty_attached is known to be true at this
992 : * point, mq_receiver has been set, and it can't change once set.
993 : * Therefore, we can read it without acquiring the spinlock.
994 : */
995 : Assert(mqh->mqh_counterparty_attached);
996 224380 : SetLatch(&mq->mq_receiver->procLatch);
997 :
998 : /*
999 : * We have just updated the mqh_send_pending bytes in the shared
1000 : * memory so reset it.
1001 : */
1002 224380 : mqh->mqh_send_pending = 0;
1003 :
1004 : /* Skip manipulation of our latch if nowait = true. */
1005 224380 : if (nowait)
1006 : {
1007 11580 : *bytes_written = sent;
1008 11580 : return SHM_MQ_WOULD_BLOCK;
1009 : }
1010 :
1011 : /*
1012 : * Wait for our latch to be set. It might already be set for some
1013 : * unrelated reason, but that'll just result in one extra trip
1014 : * through the loop. It's worth it to avoid resetting the latch
1015 : * at top of loop, because setting an already-set latch is much
1016 : * cheaper than setting one that has been reset.
1017 : */
1018 212800 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1019 : WAIT_EVENT_MESSAGE_QUEUE_SEND);
1020 :
1021 : /* Reset the latch so we don't spin. */
1022 212800 : ResetLatch(MyLatch);
1023 :
1024 : /* An interrupt may have occurred while we were waiting. */
1025 212800 : CHECK_FOR_INTERRUPTS();
1026 : }
1027 : else
1028 : {
1029 : Size offset;
1030 : Size sendnow;
1031 :
1032 3665006 : offset = wb % (uint64) ringsize;
1033 3665006 : sendnow = Min(available, ringsize - offset);
1034 :
1035 : /*
1036 : * Write as much data as we can via a single memcpy(). Make sure
1037 : * these writes happen after the read of mq_bytes_read, above.
1038 : * This barrier pairs with the one in shm_mq_inc_bytes_read.
1039 : * (Since we're separating the read of mq_bytes_read from a
1040 : * subsequent write to mq_ring, we need a full barrier here.)
1041 : */
1042 3665006 : pg_memory_barrier();
1043 3665006 : memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1044 : (char *) data + sent, sendnow);
1045 3665006 : sent += sendnow;
1046 :
1047 : /*
1048 : * Update count of bytes written, with alignment padding. Note
1049 : * that this will never actually insert any padding except at the
1050 : * end of a run of bytes, because the buffer size is a multiple of
1051 : * MAXIMUM_ALIGNOF, and each read is as well.
1052 : */
1053 : Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1054 :
1055 : /*
1056 : * For efficiency, we don't update the bytes written in the shared
1057 : * memory and also don't set the reader's latch here. Refer to
1058 : * the comments atop the shm_mq_handle structure for more
1059 : * information.
1060 : */
1061 3665006 : mqh->mqh_send_pending += MAXALIGN(sendnow);
1062 : }
1063 : }
1064 :
1065 3430944 : *bytes_written = sent;
1066 3430944 : return SHM_MQ_SUCCESS;
1067 : }
1068 :
1069 : /*
1070 : * Wait until at least *nbytesp bytes are available to be read from the
1071 : * shared message queue, or until the buffer wraps around. If the queue is
1072 : * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
1073 : * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
1074 : * to the location at which data bytes can be read, *nbytesp is set to the
1075 : * number of bytes which can be read at that address, and the return value
1076 : * is SHM_MQ_SUCCESS.
1077 : */
1078 : static shm_mq_result
1079 2088050 : shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
1080 : Size *nbytesp, void **datap)
1081 : {
1082 2088050 : shm_mq *mq = mqh->mqh_queue;
1083 2088050 : Size ringsize = mq->mq_ring_size;
1084 : uint64 used;
1085 : uint64 written;
1086 :
1087 : for (;;)
1088 268530 : {
1089 : Size offset;
1090 : uint64 read;
1091 :
1092 : /* Get bytes written, so we can compute what's available to read. */
1093 2356580 : written = pg_atomic_read_u64(&mq->mq_bytes_written);
1094 :
1095 : /*
1096 : * Get bytes read. Include bytes we could consume but have not yet
1097 : * consumed.
1098 : */
1099 2356580 : read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1100 2356580 : mqh->mqh_consume_pending;
1101 2356580 : used = written - read;
1102 : Assert(used <= ringsize);
1103 2356580 : offset = read % (uint64) ringsize;
1104 :
1105 : /* If we have enough data or buffer has wrapped, we're done. */
1106 2356580 : if (used >= bytes_needed || offset + used >= ringsize)
1107 : {
1108 1957316 : *nbytesp = Min(used, ringsize - offset);
1109 1957316 : *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1110 :
1111 : /*
1112 : * Separate the read of mq_bytes_written, above, from caller's
1113 : * attempt to read the data itself. Pairs with the barrier in
1114 : * shm_mq_inc_bytes_written.
1115 : */
1116 1957316 : pg_read_barrier();
1117 1957316 : return SHM_MQ_SUCCESS;
1118 : }
1119 :
1120 : /*
1121 : * Fall out before waiting if the queue has been detached.
1122 : *
1123 : * Note that we don't check for this until *after* considering whether
1124 : * the data already available is enough, since the receiver can finish
1125 : * receiving a message stored in the buffer even after the sender has
1126 : * detached.
1127 : */
1128 399264 : if (mq->mq_detached)
1129 : {
1130 : /*
1131 : * If the writer advanced mq_bytes_written and then set
1132 : * mq_detached, we might not have read the final value of
1133 : * mq_bytes_written above. Insert a read barrier and then check
1134 : * again if mq_bytes_written has advanced.
1135 : */
1136 2522 : pg_read_barrier();
1137 2522 : if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1138 0 : continue;
1139 :
1140 2522 : return SHM_MQ_DETACHED;
1141 : }
1142 :
1143 : /*
1144 : * We didn't get enough data to satisfy the request, so mark any data
1145 : * previously-consumed as read to make more buffer space.
1146 : */
1147 396742 : if (mqh->mqh_consume_pending > 0)
1148 : {
1149 247110 : shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
1150 247110 : mqh->mqh_consume_pending = 0;
1151 : }
1152 :
1153 : /* Skip manipulation of our latch if nowait = true. */
1154 396742 : if (nowait)
1155 128212 : return SHM_MQ_WOULD_BLOCK;
1156 :
1157 : /*
1158 : * Wait for our latch to be set. It might already be set for some
1159 : * unrelated reason, but that'll just result in one extra trip through
1160 : * the loop. It's worth it to avoid resetting the latch at top of
1161 : * loop, because setting an already-set latch is much cheaper than
1162 : * setting one that has been reset.
1163 : */
1164 268530 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1165 : WAIT_EVENT_MESSAGE_QUEUE_RECEIVE);
1166 :
1167 : /* Reset the latch so we don't spin. */
1168 268530 : ResetLatch(MyLatch);
1169 :
1170 : /* An interrupt may have occurred while we were waiting. */
1171 268530 : CHECK_FOR_INTERRUPTS();
1172 : }
1173 : }
1174 :
1175 : /*
1176 : * Test whether a counterparty who may not even be alive yet is definitely gone.
1177 : */
1178 : static bool
1179 5159452 : shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
1180 : {
1181 : pid_t pid;
1182 :
1183 : /* If the queue has been detached, counterparty is definitely gone. */
1184 5159452 : if (mq->mq_detached)
1185 1468 : return true;
1186 :
1187 : /* If there's a handle, check worker status. */
1188 5157984 : if (handle != NULL)
1189 : {
1190 : BgwHandleStatus status;
1191 :
1192 : /* Check for unexpected worker death. */
1193 5157954 : status = GetBackgroundWorkerPid(handle, &pid);
1194 5157954 : if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1195 : {
1196 : /* Mark it detached, just to make it official. */
1197 0 : mq->mq_detached = true;
1198 0 : return true;
1199 : }
1200 : }
1201 :
1202 : /* Counterparty is not definitively gone. */
1203 5157984 : return false;
1204 : }
1205 :
1206 : /*
1207 : * This is used when a process is waiting for its counterpart to attach to the
1208 : * queue. We exit when the other process attaches as expected, or, if
1209 : * handle != NULL, when the referenced background process or the postmaster
1210 : * dies. Note that if handle == NULL, and the process fails to attach, we'll
1211 : * potentially get stuck here forever waiting for a process that may never
1212 : * start. We do check for interrupts, though.
1213 : *
1214 : * ptr is a pointer to the memory address that we're expecting to become
1215 : * non-NULL when our counterpart attaches to the queue.
1216 : */
1217 : static bool
1218 428 : shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
1219 : {
1220 428 : bool result = false;
1221 :
1222 : for (;;)
1223 1986 : {
1224 : BgwHandleStatus status;
1225 : pid_t pid;
1226 :
1227 : /* Acquire the lock just long enough to check the pointer. */
1228 2414 : SpinLockAcquire(&mq->mq_mutex);
1229 2414 : result = (*ptr != NULL);
1230 2414 : SpinLockRelease(&mq->mq_mutex);
1231 :
1232 : /* Fail if detached; else succeed if initialized. */
1233 2414 : if (mq->mq_detached)
1234 : {
1235 232 : result = false;
1236 232 : break;
1237 : }
1238 2182 : if (result)
1239 196 : break;
1240 :
1241 1986 : if (handle != NULL)
1242 : {
1243 : /* Check for unexpected worker death. */
1244 1986 : status = GetBackgroundWorkerPid(handle, &pid);
1245 1986 : if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1246 : {
1247 0 : result = false;
1248 0 : break;
1249 : }
1250 : }
1251 :
1252 : /* Wait to be signaled. */
1253 1986 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1254 : WAIT_EVENT_MESSAGE_QUEUE_INTERNAL);
1255 :
1256 : /* Reset the latch so we don't spin. */
1257 1986 : ResetLatch(MyLatch);
1258 :
1259 : /* An interrupt may have occurred while we were waiting. */
1260 1986 : CHECK_FOR_INTERRUPTS();
1261 : }
1262 :
1263 428 : return result;
1264 : }
1265 :
1266 : /*
1267 : * Increment the number of bytes read.
1268 : */
1269 : static void
1270 284890 : shm_mq_inc_bytes_read(shm_mq *mq, Size n)
1271 : {
1272 : PGPROC *sender;
1273 :
1274 : /*
1275 : * Separate prior reads of mq_ring from the increment of mq_bytes_read
1276 : * which follows. This pairs with the full barrier in
1277 : * shm_mq_send_bytes(). We only need a read barrier here because the
1278 : * increment of mq_bytes_read is actually a read followed by a dependent
1279 : * write.
1280 : */
1281 284890 : pg_read_barrier();
1282 :
1283 : /*
1284 : * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1285 : * else can be changing this value. This method should be cheaper.
1286 : */
1287 284890 : pg_atomic_write_u64(&mq->mq_bytes_read,
1288 284890 : pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1289 :
1290 : /*
1291 : * We shouldn't have any bytes to read without a sender, so we can read
1292 : * mq_sender here without a lock. Once it's initialized, it can't change.
1293 : */
1294 284890 : sender = mq->mq_sender;
1295 : Assert(sender != NULL);
1296 284890 : SetLatch(&sender->procLatch);
1297 284890 : }
1298 :
1299 : /*
1300 : * Increment the number of bytes written.
1301 : */
1302 : static void
1303 466360 : shm_mq_inc_bytes_written(shm_mq *mq, Size n)
1304 : {
1305 : /*
1306 : * Separate prior reads of mq_ring from the write of mq_bytes_written
1307 : * which we're about to do. Pairs with the read barrier found in
1308 : * shm_mq_receive_bytes.
1309 : */
1310 466360 : pg_write_barrier();
1311 :
1312 : /*
1313 : * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1314 : * else can be changing this value. This method avoids taking the bus
1315 : * lock unnecessarily.
1316 : */
1317 466360 : pg_atomic_write_u64(&mq->mq_bytes_written,
1318 466360 : pg_atomic_read_u64(&mq->mq_bytes_written) + n);
1319 466360 : }
1320 :
1321 : /* Shim for on_dsm_detach callback. */
1322 : static void
1323 2916 : shm_mq_detach_callback(dsm_segment *seg, Datum arg)
1324 : {
1325 2916 : shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1326 :
1327 2916 : shm_mq_detach_internal(mq);
1328 2916 : }
|