LCOV - code coverage report
Current view: top level - src/backend/executor - tqueue.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 94.3 % 53 50
Test Date: 2026-03-01 00:15:48 Functions: 100.0 % 8 8
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * tqueue.c
       4              :  *    Use shm_mq to send & receive tuples between parallel backends
       5              :  *
       6              :  * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
       7              :  * under the hood, writes tuples from the executor to a shm_mq.
       8              :  *
       9              :  * A TupleQueueReader reads tuples from a shm_mq and returns the tuples.
      10              :  *
      11              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      12              :  * Portions Copyright (c) 1994, Regents of the University of California
      13              :  *
      14              :  * IDENTIFICATION
      15              :  *    src/backend/executor/tqueue.c
      16              :  *
      17              :  *-------------------------------------------------------------------------
      18              :  */
      19              : 
      20              : #include "postgres.h"
      21              : 
      22              : #include "access/htup_details.h"
      23              : #include "executor/tqueue.h"
      24              : 
      25              : /*
      26              :  * DestReceiver object's private contents
      27              :  *
      28              :  * queue is a pointer to data supplied by DestReceiver's caller.
      29              :  */
      30              : typedef struct TQueueDestReceiver
      31              : {
      32              :     DestReceiver pub;           /* public fields */
      33              :     shm_mq_handle *queue;       /* shm_mq to send to */
      34              : } TQueueDestReceiver;
      35              : 
      36              : /*
      37              :  * TupleQueueReader object's private contents
      38              :  *
      39              :  * queue is a pointer to data supplied by reader's caller.
      40              :  *
      41              :  * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
      42              :  */
      43              : struct TupleQueueReader
      44              : {
      45              :     shm_mq_handle *queue;       /* shm_mq to receive from */
      46              : };
      47              : 
      48              : /*
      49              :  * Receive a tuple from a query, and send it to the designated shm_mq.
      50              :  *
      51              :  * Returns true if successful, false if shm_mq has been detached.
      52              :  */
      53              : static bool
      54       802161 : tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
      55              : {
      56       802161 :     TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
      57              :     MinimalTuple tuple;
      58              :     shm_mq_result result;
      59              :     bool        should_free;
      60              : 
      61              :     /* Send the tuple itself. */
      62       802161 :     tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
      63       802161 :     result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false);
      64              : 
      65       802161 :     if (should_free)
      66       799363 :         pfree(tuple);
      67              : 
      68              :     /* Check for failure. */
      69       802161 :     if (result == SHM_MQ_DETACHED)
      70            0 :         return false;
      71       802161 :     else if (result != SHM_MQ_SUCCESS)
      72            0 :         ereport(ERROR,
      73              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      74              :                  errmsg("could not send tuple to shared-memory queue")));
      75              : 
      76       802161 :     return true;
      77              : }
      78              : 
      79              : /*
      80              :  * Prepare to receive tuples from executor.
      81              :  */
      82              : static void
      83         1342 : tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
      84              : {
      85              :     /* do nothing */
      86         1342 : }
      87              : 
      88              : /*
      89              :  * Clean up at end of an executor run
      90              :  */
      91              : static void
      92         1336 : tqueueShutdownReceiver(DestReceiver *self)
      93              : {
      94         1336 :     TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
      95              : 
      96         1336 :     if (tqueue->queue != NULL)
      97         1336 :         shm_mq_detach(tqueue->queue);
      98         1336 :     tqueue->queue = NULL;
      99         1336 : }
     100              : 
     101              : /*
     102              :  * Destroy receiver when done with it
     103              :  */
     104              : static void
     105         1336 : tqueueDestroyReceiver(DestReceiver *self)
     106              : {
     107         1336 :     TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
     108              : 
     109              :     /* We probably already detached from queue, but let's be sure */
     110         1336 :     if (tqueue->queue != NULL)
     111            0 :         shm_mq_detach(tqueue->queue);
     112         1336 :     pfree(self);
     113         1336 : }
     114              : 
     115              : /*
     116              :  * Create a DestReceiver that writes tuples to a tuple queue.
     117              :  */
     118              : DestReceiver *
     119         1342 : CreateTupleQueueDestReceiver(shm_mq_handle *handle)
     120              : {
     121              :     TQueueDestReceiver *self;
     122              : 
     123         1342 :     self = palloc0_object(TQueueDestReceiver);
     124              : 
     125         1342 :     self->pub.receiveSlot = tqueueReceiveSlot;
     126         1342 :     self->pub.rStartup = tqueueStartupReceiver;
     127         1342 :     self->pub.rShutdown = tqueueShutdownReceiver;
     128         1342 :     self->pub.rDestroy = tqueueDestroyReceiver;
     129         1342 :     self->pub.mydest = DestTupleQueue;
     130         1342 :     self->queue = handle;
     131              : 
     132         1342 :     return (DestReceiver *) self;
     133              : }
     134              : 
     135              : /*
     136              :  * Create a tuple queue reader.
     137              :  */
     138              : TupleQueueReader *
     139         1342 : CreateTupleQueueReader(shm_mq_handle *handle)
     140              : {
     141         1342 :     TupleQueueReader *reader = palloc0_object(TupleQueueReader);
     142              : 
     143         1342 :     reader->queue = handle;
     144              : 
     145         1342 :     return reader;
     146              : }
     147              : 
     148              : /*
     149              :  * Destroy a tuple queue reader.
     150              :  *
     151              :  * Note: cleaning up the underlying shm_mq is the caller's responsibility.
     152              :  * We won't access it here, as it may be detached already.
     153              :  */
     154              : void
     155         1336 : DestroyTupleQueueReader(TupleQueueReader *reader)
     156              : {
     157         1336 :     pfree(reader);
     158         1336 : }
     159              : 
     160              : /*
     161              :  * Fetch a tuple from a tuple queue reader.
     162              :  *
     163              :  * The return value is NULL if there are no remaining tuples or if
     164              :  * nowait = true and no tuple is ready to return.  *done, if not NULL,
     165              :  * is set to true when there are no remaining tuples and otherwise to false.
     166              :  *
     167              :  * The returned tuple, if any, is either in shared memory or a private buffer
     168              :  * and should not be freed.  The pointer is invalid after the next call to
     169              :  * TupleQueueReaderNext().
     170              :  *
     171              :  * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
     172              :  * accumulate bytes from a partially-read message, so it's useful to call
     173              :  * this with nowait = true even if nothing is returned.
     174              :  */
     175              : MinimalTuple
     176      3375582 : TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
     177              : {
     178              :     MinimalTuple tuple;
     179              :     shm_mq_result result;
     180              :     Size        nbytes;
     181              :     void       *data;
     182              : 
     183      3375582 :     if (done != NULL)
     184      3375582 :         *done = false;
     185              : 
     186              :     /* Attempt to read a message. */
     187      3375582 :     result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
     188              : 
     189              :     /* If queue is detached, set *done and return NULL. */
     190      3375582 :     if (result == SHM_MQ_DETACHED)
     191              :     {
     192         1336 :         if (done != NULL)
     193         1336 :             *done = true;
     194         1336 :         return NULL;
     195              :     }
     196              : 
     197              :     /* In non-blocking mode, bail out if no message ready yet. */
     198      3374246 :     if (result == SHM_MQ_WOULD_BLOCK)
     199      2572085 :         return NULL;
     200              :     Assert(result == SHM_MQ_SUCCESS);
     201              : 
     202              :     /*
     203              :      * Return a pointer to the queue memory directly (which had better be
     204              :      * sufficiently aligned).
     205              :      */
     206       802161 :     tuple = (MinimalTuple) data;
     207              :     Assert(tuple->t_len == nbytes);
     208              : 
     209       802161 :     return tuple;
     210              : }
        

Generated by: LCOV version 2.0-1