Line data Source code
1 : /*------------------------------------------------------------------------- 2 : * 3 : * execAsync.c 4 : * Support routines for asynchronous execution 5 : * 6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group 7 : * Portions Copyright (c) 1994, Regents of the University of California 8 : * 9 : * IDENTIFICATION 10 : * src/backend/executor/execAsync.c 11 : * 12 : *------------------------------------------------------------------------- 13 : */ 14 : 15 : #include "postgres.h" 16 : 17 : #include "executor/execAsync.h" 18 : #include "executor/executor.h" 19 : #include "executor/nodeAppend.h" 20 : #include "executor/nodeForeignscan.h" 21 : 22 : /* 23 : * Asynchronously request a tuple from a designed async-capable node. 24 : */ 25 : void 26 12350 : ExecAsyncRequest(AsyncRequest *areq) 27 : { 28 12350 : if (areq->requestee->chgParam != NULL) /* something changed? */ 29 4 : ExecReScan(areq->requestee); /* let ReScan handle this */ 30 : 31 : /* must provide our own instrumentation support */ 32 12350 : if (areq->requestee->instrument) 33 1620 : InstrStartNode(areq->requestee->instrument); 34 : 35 12350 : switch (nodeTag(areq->requestee)) 36 : { 37 12350 : case T_ForeignScanState: 38 12350 : ExecAsyncForeignScanRequest(areq); 39 12350 : break; 40 0 : default: 41 : /* If the node doesn't support async, caller messed up. */ 42 0 : elog(ERROR, "unrecognized node type: %d", 43 : (int) nodeTag(areq->requestee)); 44 : } 45 : 46 12350 : ExecAsyncResponse(areq); 47 : 48 : /* must provide our own instrumentation support */ 49 12350 : if (areq->requestee->instrument) 50 1620 : InstrStopNode(areq->requestee->instrument, 51 1620 : TupIsNull(areq->result) ? 0.0 : 1.0); 52 12350 : } 53 : 54 : /* 55 : * Give the asynchronous node a chance to configure the file descriptor event 56 : * for which it wishes to wait. We expect the node-type specific callback to 57 : * make a single call of the following form: 58 : * 59 : * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq); 60 : */ 61 : void 62 368 : ExecAsyncConfigureWait(AsyncRequest *areq) 63 : { 64 : /* must provide our own instrumentation support */ 65 368 : if (areq->requestee->instrument) 66 42 : InstrStartNode(areq->requestee->instrument); 67 : 68 368 : switch (nodeTag(areq->requestee)) 69 : { 70 368 : case T_ForeignScanState: 71 368 : ExecAsyncForeignScanConfigureWait(areq); 72 366 : break; 73 0 : default: 74 : /* If the node doesn't support async, caller messed up. */ 75 0 : elog(ERROR, "unrecognized node type: %d", 76 : (int) nodeTag(areq->requestee)); 77 : } 78 : 79 : /* must provide our own instrumentation support */ 80 366 : if (areq->requestee->instrument) 81 42 : InstrStopNode(areq->requestee->instrument, 0.0); 82 366 : } 83 : 84 : /* 85 : * Call the asynchronous node back when a relevant event has occurred. 86 : */ 87 : void 88 302 : ExecAsyncNotify(AsyncRequest *areq) 89 : { 90 : /* must provide our own instrumentation support */ 91 302 : if (areq->requestee->instrument) 92 30 : InstrStartNode(areq->requestee->instrument); 93 : 94 302 : switch (nodeTag(areq->requestee)) 95 : { 96 302 : case T_ForeignScanState: 97 302 : ExecAsyncForeignScanNotify(areq); 98 302 : break; 99 0 : default: 100 : /* If the node doesn't support async, caller messed up. */ 101 0 : elog(ERROR, "unrecognized node type: %d", 102 : (int) nodeTag(areq->requestee)); 103 : } 104 : 105 302 : ExecAsyncResponse(areq); 106 : 107 : /* must provide our own instrumentation support */ 108 302 : if (areq->requestee->instrument) 109 30 : InstrStopNode(areq->requestee->instrument, 110 30 : TupIsNull(areq->result) ? 0.0 : 1.0); 111 302 : } 112 : 113 : /* 114 : * Call the requestor back when an asynchronous node has produced a result. 115 : */ 116 : void 117 12658 : ExecAsyncResponse(AsyncRequest *areq) 118 : { 119 12658 : switch (nodeTag(areq->requestor)) 120 : { 121 12658 : case T_AppendState: 122 12658 : ExecAsyncAppendResponse(areq); 123 12658 : break; 124 0 : default: 125 : /* If the node doesn't support async, caller messed up. */ 126 0 : elog(ERROR, "unrecognized node type: %d", 127 : (int) nodeTag(areq->requestor)); 128 : } 129 12658 : } 130 : 131 : /* 132 : * A requestee node should call this function to deliver the tuple to its 133 : * requestor node. The requestee node can call this from its ExecAsyncRequest 134 : * or ExecAsyncNotify callback. 135 : */ 136 : void 137 12336 : ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result) 138 : { 139 12336 : areq->request_complete = true; 140 12336 : areq->result = result; 141 12336 : } 142 : 143 : /* 144 : * A requestee node should call this function to indicate that it is pending 145 : * for a callback. The requestee node can call this from its ExecAsyncRequest 146 : * or ExecAsyncNotify callback. 147 : */ 148 : void 149 322 : ExecAsyncRequestPending(AsyncRequest *areq) 150 : { 151 322 : areq->callback_pending = true; 152 322 : areq->request_complete = false; 153 322 : areq->result = NULL; 154 322 : }