Line data Source code
1 : /*------------------------------------------------------------------------- 2 : * 3 : * execAsync.c 4 : * Support routines for asynchronous execution 5 : * 6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group 7 : * Portions Copyright (c) 1994, Regents of the University of California 8 : * 9 : * IDENTIFICATION 10 : * src/backend/executor/execAsync.c 11 : * 12 : *------------------------------------------------------------------------- 13 : */ 14 : 15 : #include "postgres.h" 16 : 17 : #include "executor/execAsync.h" 18 : #include "executor/executor.h" 19 : #include "executor/nodeAppend.h" 20 : #include "executor/nodeForeignscan.h" 21 : 22 : /* 23 : * Asynchronously request a tuple from a designed async-capable node. 24 : */ 25 : void 26 11544 : ExecAsyncRequest(AsyncRequest *areq) 27 : { 28 11544 : if (areq->requestee->chgParam != NULL) /* something changed? */ 29 4 : ExecReScan(areq->requestee); /* let ReScan handle this */ 30 : 31 : /* must provide our own instrumentation support */ 32 11544 : if (areq->requestee->instrument) 33 1620 : InstrStartNode(areq->requestee->instrument); 34 : 35 11544 : switch (nodeTag(areq->requestee)) 36 : { 37 11544 : case T_ForeignScanState: 38 11544 : ExecAsyncForeignScanRequest(areq); 39 11544 : break; 40 0 : default: 41 : /* If the node doesn't support async, caller messed up. */ 42 0 : elog(ERROR, "unrecognized node type: %d", 43 : (int) nodeTag(areq->requestee)); 44 : } 45 : 46 11544 : ExecAsyncResponse(areq); 47 : 48 : /* must provide our own instrumentation support */ 49 11544 : if (areq->requestee->instrument) 50 1620 : InstrStopNode(areq->requestee->instrument, 51 1620 : TupIsNull(areq->result) ? 0.0 : 1.0); 52 11544 : } 53 : 54 : /* 55 : * Give the asynchronous node a chance to configure the file descriptor event 56 : * for which it wishes to wait. We expect the node-type specific callback to 57 : * make a single call of the following form: 58 : * 59 : * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq); 60 : */ 61 : void 62 314 : ExecAsyncConfigureWait(AsyncRequest *areq) 63 : { 64 : /* must provide our own instrumentation support */ 65 314 : if (areq->requestee->instrument) 66 36 : InstrStartNode(areq->requestee->instrument); 67 : 68 314 : switch (nodeTag(areq->requestee)) 69 : { 70 314 : case T_ForeignScanState: 71 314 : ExecAsyncForeignScanConfigureWait(areq); 72 314 : break; 73 0 : default: 74 : /* If the node doesn't support async, caller messed up. */ 75 0 : elog(ERROR, "unrecognized node type: %d", 76 : (int) nodeTag(areq->requestee)); 77 : } 78 : 79 : /* must provide our own instrumentation support */ 80 314 : if (areq->requestee->instrument) 81 36 : InstrStopNode(areq->requestee->instrument, 0.0); 82 314 : } 83 : 84 : /* 85 : * Call the asynchronous node back when a relevant event has occurred. 86 : */ 87 : void 88 290 : ExecAsyncNotify(AsyncRequest *areq) 89 : { 90 : /* must provide our own instrumentation support */ 91 290 : if (areq->requestee->instrument) 92 30 : InstrStartNode(areq->requestee->instrument); 93 : 94 290 : switch (nodeTag(areq->requestee)) 95 : { 96 290 : case T_ForeignScanState: 97 290 : ExecAsyncForeignScanNotify(areq); 98 290 : break; 99 0 : default: 100 : /* If the node doesn't support async, caller messed up. */ 101 0 : elog(ERROR, "unrecognized node type: %d", 102 : (int) nodeTag(areq->requestee)); 103 : } 104 : 105 290 : ExecAsyncResponse(areq); 106 : 107 : /* must provide our own instrumentation support */ 108 290 : if (areq->requestee->instrument) 109 30 : InstrStopNode(areq->requestee->instrument, 110 30 : TupIsNull(areq->result) ? 0.0 : 1.0); 111 290 : } 112 : 113 : /* 114 : * Call the requestor back when an asynchronous node has produced a result. 115 : */ 116 : void 117 11840 : ExecAsyncResponse(AsyncRequest *areq) 118 : { 119 11840 : switch (nodeTag(areq->requestor)) 120 : { 121 11840 : case T_AppendState: 122 11840 : ExecAsyncAppendResponse(areq); 123 11840 : break; 124 0 : default: 125 : /* If the node doesn't support async, caller messed up. */ 126 0 : elog(ERROR, "unrecognized node type: %d", 127 : (int) nodeTag(areq->requestor)); 128 : } 129 11840 : } 130 : 131 : /* 132 : * A requestee node should call this function to deliver the tuple to its 133 : * requestor node. The requestee node can call this from its ExecAsyncRequest 134 : * or ExecAsyncNotify callback. 135 : */ 136 : void 137 11532 : ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result) 138 : { 139 11532 : areq->request_complete = true; 140 11532 : areq->result = result; 141 11532 : } 142 : 143 : /* 144 : * A requestee node should call this function to indicate that it is pending 145 : * for a callback. The requestee node can call this from its ExecAsyncRequest 146 : * or ExecAsyncNotify callback. 147 : */ 148 : void 149 308 : ExecAsyncRequestPending(AsyncRequest *areq) 150 : { 151 308 : areq->callback_pending = true; 152 308 : areq->request_complete = false; 153 308 : areq->result = NULL; 154 308 : }