Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execAsync.c
4 : * Support routines for asynchronous execution
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/execAsync.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "executor/execAsync.h"
18 : #include "executor/executor.h"
19 : #include "executor/instrument.h"
20 : #include "executor/nodeAppend.h"
21 : #include "executor/nodeForeignscan.h"
22 :
23 : /*
24 : * Asynchronously request a tuple from a designed async-capable node.
25 : */
26 : void
27 6175 : ExecAsyncRequest(AsyncRequest *areq)
28 : {
29 6175 : if (areq->requestee->chgParam != NULL) /* something changed? */
30 2 : ExecReScan(areq->requestee); /* let ReScan handle this */
31 :
32 : /* must provide our own instrumentation support */
33 6175 : if (areq->requestee->instrument)
34 810 : InstrStartNode(areq->requestee->instrument);
35 :
36 6175 : switch (nodeTag(areq->requestee))
37 : {
38 6175 : case T_ForeignScanState:
39 6175 : ExecAsyncForeignScanRequest(areq);
40 6175 : break;
41 0 : default:
42 : /* If the node doesn't support async, caller messed up. */
43 0 : elog(ERROR, "unrecognized node type: %d",
44 : (int) nodeTag(areq->requestee));
45 : }
46 :
47 6175 : ExecAsyncResponse(areq);
48 :
49 : /* must provide our own instrumentation support */
50 6175 : if (areq->requestee->instrument)
51 810 : InstrStopNode(areq->requestee->instrument,
52 810 : TupIsNull(areq->result) ? 0.0 : 1.0);
53 6175 : }
54 :
55 : /*
56 : * Give the asynchronous node a chance to configure the file descriptor event
57 : * for which it wishes to wait. We expect the node-type specific callback to
58 : * make a single call of the following form:
59 : *
60 : * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
61 : */
62 : void
63 210 : ExecAsyncConfigureWait(AsyncRequest *areq)
64 : {
65 : /* must provide our own instrumentation support */
66 210 : if (areq->requestee->instrument)
67 20 : InstrStartNode(areq->requestee->instrument);
68 :
69 210 : switch (nodeTag(areq->requestee))
70 : {
71 210 : case T_ForeignScanState:
72 210 : ExecAsyncForeignScanConfigureWait(areq);
73 209 : break;
74 0 : default:
75 : /* If the node doesn't support async, caller messed up. */
76 0 : elog(ERROR, "unrecognized node type: %d",
77 : (int) nodeTag(areq->requestee));
78 : }
79 :
80 : /* must provide our own instrumentation support */
81 209 : if (areq->requestee->instrument)
82 20 : InstrStopNode(areq->requestee->instrument, 0.0);
83 209 : }
84 :
85 : /*
86 : * Call the asynchronous node back when a relevant event has occurred.
87 : */
88 : void
89 149 : ExecAsyncNotify(AsyncRequest *areq)
90 : {
91 : /* must provide our own instrumentation support */
92 149 : if (areq->requestee->instrument)
93 15 : InstrStartNode(areq->requestee->instrument);
94 :
95 149 : switch (nodeTag(areq->requestee))
96 : {
97 149 : case T_ForeignScanState:
98 149 : ExecAsyncForeignScanNotify(areq);
99 149 : break;
100 0 : default:
101 : /* If the node doesn't support async, caller messed up. */
102 0 : elog(ERROR, "unrecognized node type: %d",
103 : (int) nodeTag(areq->requestee));
104 : }
105 :
106 149 : ExecAsyncResponse(areq);
107 :
108 : /* must provide our own instrumentation support */
109 149 : if (areq->requestee->instrument)
110 15 : InstrStopNode(areq->requestee->instrument,
111 15 : TupIsNull(areq->result) ? 0.0 : 1.0);
112 149 : }
113 :
114 : /*
115 : * Call the requestor back when an asynchronous node has produced a result.
116 : */
117 : void
118 6328 : ExecAsyncResponse(AsyncRequest *areq)
119 : {
120 6328 : switch (nodeTag(areq->requestor))
121 : {
122 6328 : case T_AppendState:
123 6328 : ExecAsyncAppendResponse(areq);
124 6328 : break;
125 0 : default:
126 : /* If the node doesn't support async, caller messed up. */
127 0 : elog(ERROR, "unrecognized node type: %d",
128 : (int) nodeTag(areq->requestor));
129 : }
130 6328 : }
131 :
132 : /*
133 : * A requestee node should call this function to deliver the tuple to its
134 : * requestor node. The requestee node can call this from its ExecAsyncRequest
135 : * or ExecAsyncNotify callback.
136 : */
137 : void
138 6167 : ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
139 : {
140 6167 : areq->request_complete = true;
141 6167 : areq->result = result;
142 6167 : }
143 :
144 : /*
145 : * A requestee node should call this function to indicate that it is pending
146 : * for a callback. The requestee node can call this from its ExecAsyncRequest
147 : * or ExecAsyncNotify callback.
148 : */
149 : void
150 161 : ExecAsyncRequestPending(AsyncRequest *areq)
151 : {
152 161 : areq->callback_pending = true;
153 161 : areq->request_complete = false;
154 161 : areq->result = NULL;
155 161 : }
|