Line data Source code
1 : /*------------------------------------------------------------------------- 2 : * 3 : * tqueue.c 4 : * Use shm_mq to send & receive tuples between parallel backends 5 : * 6 : * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver 7 : * under the hood, writes tuples from the executor to a shm_mq. 8 : * 9 : * A TupleQueueReader reads tuples from a shm_mq and returns the tuples. 10 : * 11 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group 12 : * Portions Copyright (c) 1994, Regents of the University of California 13 : * 14 : * IDENTIFICATION 15 : * src/backend/executor/tqueue.c 16 : * 17 : *------------------------------------------------------------------------- 18 : */ 19 : 20 : #include "postgres.h" 21 : 22 : #include "access/htup_details.h" 23 : #include "executor/tqueue.h" 24 : 25 : /* 26 : * DestReceiver object's private contents 27 : * 28 : * queue is a pointer to data supplied by DestReceiver's caller. 29 : */ 30 : typedef struct TQueueDestReceiver 31 : { 32 : DestReceiver pub; /* public fields */ 33 : shm_mq_handle *queue; /* shm_mq to send to */ 34 : } TQueueDestReceiver; 35 : 36 : /* 37 : * TupleQueueReader object's private contents 38 : * 39 : * queue is a pointer to data supplied by reader's caller. 40 : * 41 : * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h 42 : */ 43 : struct TupleQueueReader 44 : { 45 : shm_mq_handle *queue; /* shm_mq to receive from */ 46 : }; 47 : 48 : /* 49 : * Receive a tuple from a query, and send it to the designated shm_mq. 50 : * 51 : * Returns true if successful, false if shm_mq has been detached. 52 : */ 53 : static bool 54 1441326 : tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) 55 : { 56 1441326 : TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; 57 : MinimalTuple tuple; 58 : shm_mq_result result; 59 : bool should_free; 60 : 61 : /* Send the tuple itself. */ 62 1441326 : tuple = ExecFetchSlotMinimalTuple(slot, &should_free); 63 1441326 : result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false); 64 : 65 1441326 : if (should_free) 66 1441222 : pfree(tuple); 67 : 68 : /* Check for failure. */ 69 1441326 : if (result == SHM_MQ_DETACHED) 70 0 : return false; 71 1441326 : else if (result != SHM_MQ_SUCCESS) 72 0 : ereport(ERROR, 73 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 74 : errmsg("could not send tuple to shared-memory queue"))); 75 : 76 1441326 : return true; 77 : } 78 : 79 : /* 80 : * Prepare to receive tuples from executor. 81 : */ 82 : static void 83 2518 : tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo) 84 : { 85 : /* do nothing */ 86 2518 : } 87 : 88 : /* 89 : * Clean up at end of an executor run 90 : */ 91 : static void 92 2506 : tqueueShutdownReceiver(DestReceiver *self) 93 : { 94 2506 : TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; 95 : 96 2506 : if (tqueue->queue != NULL) 97 2506 : shm_mq_detach(tqueue->queue); 98 2506 : tqueue->queue = NULL; 99 2506 : } 100 : 101 : /* 102 : * Destroy receiver when done with it 103 : */ 104 : static void 105 2506 : tqueueDestroyReceiver(DestReceiver *self) 106 : { 107 2506 : TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; 108 : 109 : /* We probably already detached from queue, but let's be sure */ 110 2506 : if (tqueue->queue != NULL) 111 0 : shm_mq_detach(tqueue->queue); 112 2506 : pfree(self); 113 2506 : } 114 : 115 : /* 116 : * Create a DestReceiver that writes tuples to a tuple queue. 117 : */ 118 : DestReceiver * 119 2518 : CreateTupleQueueDestReceiver(shm_mq_handle *handle) 120 : { 121 : TQueueDestReceiver *self; 122 : 123 2518 : self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver)); 124 : 125 2518 : self->pub.receiveSlot = tqueueReceiveSlot; 126 2518 : self->pub.rStartup = tqueueStartupReceiver; 127 2518 : self->pub.rShutdown = tqueueShutdownReceiver; 128 2518 : self->pub.rDestroy = tqueueDestroyReceiver; 129 2518 : self->pub.mydest = DestTupleQueue; 130 2518 : self->queue = handle; 131 : 132 2518 : return (DestReceiver *) self; 133 : } 134 : 135 : /* 136 : * Create a tuple queue reader. 137 : */ 138 : TupleQueueReader * 139 2518 : CreateTupleQueueReader(shm_mq_handle *handle) 140 : { 141 2518 : TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader)); 142 : 143 2518 : reader->queue = handle; 144 : 145 2518 : return reader; 146 : } 147 : 148 : /* 149 : * Destroy a tuple queue reader. 150 : * 151 : * Note: cleaning up the underlying shm_mq is the caller's responsibility. 152 : * We won't access it here, as it may be detached already. 153 : */ 154 : void 155 2506 : DestroyTupleQueueReader(TupleQueueReader *reader) 156 : { 157 2506 : pfree(reader); 158 2506 : } 159 : 160 : /* 161 : * Fetch a tuple from a tuple queue reader. 162 : * 163 : * The return value is NULL if there are no remaining tuples or if 164 : * nowait = true and no tuple is ready to return. *done, if not NULL, 165 : * is set to true when there are no remaining tuples and otherwise to false. 166 : * 167 : * The returned tuple, if any, is either in shared memory or a private buffer 168 : * and should not be freed. The pointer is invalid after the next call to 169 : * TupleQueueReaderNext(). 170 : * 171 : * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still 172 : * accumulate bytes from a partially-read message, so it's useful to call 173 : * this with nowait = true even if nothing is returned. 174 : */ 175 : MinimalTuple 176 6761024 : TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) 177 : { 178 : MinimalTuple tuple; 179 : shm_mq_result result; 180 : Size nbytes; 181 : void *data; 182 : 183 6761024 : if (done != NULL) 184 6761024 : *done = false; 185 : 186 : /* Attempt to read a message. */ 187 6761024 : result = shm_mq_receive(reader->queue, &nbytes, &data, nowait); 188 : 189 : /* If queue is detached, set *done and return NULL. */ 190 6761024 : if (result == SHM_MQ_DETACHED) 191 : { 192 2506 : if (done != NULL) 193 2506 : *done = true; 194 2506 : return NULL; 195 : } 196 : 197 : /* In non-blocking mode, bail out if no message ready yet. */ 198 6758518 : if (result == SHM_MQ_WOULD_BLOCK) 199 5317192 : return NULL; 200 : Assert(result == SHM_MQ_SUCCESS); 201 : 202 : /* 203 : * Return a pointer to the queue memory directly (which had better be 204 : * sufficiently aligned). 205 : */ 206 1441326 : tuple = (MinimalTuple) data; 207 : Assert(tuple->t_len == nbytes); 208 : 209 1441326 : return tuple; 210 : }