Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execAsync.c
4 : * Support routines for asynchronous execution
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/execAsync.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "executor/execAsync.h"
18 : #include "executor/executor.h"
19 : #include "executor/instrument.h"
20 : #include "executor/nodeAppend.h"
21 : #include "executor/nodeForeignscan.h"
22 :
23 : /*
24 : * Asynchronously request a tuple from a designed async-capable node.
25 : */
26 : void
27 5975 : ExecAsyncRequest(AsyncRequest *areq)
28 : {
29 5975 : if (areq->requestee->chgParam != NULL) /* something changed? */
30 2 : ExecReScan(areq->requestee); /* let ReScan handle this */
31 :
32 : /* must provide our own instrumentation support */
33 5975 : if (areq->requestee->instrument)
34 810 : InstrStartNode(areq->requestee->instrument);
35 :
36 5975 : switch (nodeTag(areq->requestee))
37 : {
38 5975 : case T_ForeignScanState:
39 5975 : ExecAsyncForeignScanRequest(areq);
40 5975 : break;
41 0 : default:
42 : /* If the node doesn't support async, caller messed up. */
43 0 : elog(ERROR, "unrecognized node type: %d",
44 : (int) nodeTag(areq->requestee));
45 : }
46 :
47 5975 : ExecAsyncResponse(areq);
48 :
49 : /* must provide our own instrumentation support */
50 5975 : if (areq->requestee->instrument)
51 810 : InstrStopNode(areq->requestee->instrument,
52 810 : TupIsNull(areq->result) ? 0.0 : 1.0);
53 5975 : }
54 :
55 : /*
56 : * Give the asynchronous node a chance to configure the file descriptor event
57 : * for which it wishes to wait. We expect the node-type specific callback to
58 : * make a single call of the following form:
59 : *
60 : * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
61 : */
62 : void
63 222 : ExecAsyncConfigureWait(AsyncRequest *areq)
64 : {
65 : /* must provide our own instrumentation support */
66 222 : if (areq->requestee->instrument)
67 22 : InstrStartNode(areq->requestee->instrument);
68 :
69 222 : switch (nodeTag(areq->requestee))
70 : {
71 222 : case T_ForeignScanState:
72 222 : ExecAsyncForeignScanConfigureWait(areq);
73 221 : break;
74 0 : default:
75 : /* If the node doesn't support async, caller messed up. */
76 0 : elog(ERROR, "unrecognized node type: %d",
77 : (int) nodeTag(areq->requestee));
78 : }
79 :
80 : /* must provide our own instrumentation support */
81 221 : if (areq->requestee->instrument)
82 22 : InstrStopNode(areq->requestee->instrument, 0.0);
83 221 : }
84 :
85 : /*
86 : * Call the asynchronous node back when a relevant event has occurred.
87 : */
88 : void
89 146 : ExecAsyncNotify(AsyncRequest *areq)
90 : {
91 : /* must provide our own instrumentation support */
92 146 : if (areq->requestee->instrument)
93 14 : InstrStartNode(areq->requestee->instrument);
94 :
95 146 : switch (nodeTag(areq->requestee))
96 : {
97 146 : case T_ForeignScanState:
98 146 : ExecAsyncForeignScanNotify(areq);
99 146 : break;
100 0 : default:
101 : /* If the node doesn't support async, caller messed up. */
102 0 : elog(ERROR, "unrecognized node type: %d",
103 : (int) nodeTag(areq->requestee));
104 : }
105 :
106 146 : ExecAsyncResponse(areq);
107 :
108 : /* must provide our own instrumentation support */
109 146 : if (areq->requestee->instrument)
110 14 : InstrStopNode(areq->requestee->instrument,
111 14 : TupIsNull(areq->result) ? 0.0 : 1.0);
112 146 : }
113 :
114 : /*
115 : * Call the requestor back when an asynchronous node has produced a result.
116 : */
117 : void
118 6126 : ExecAsyncResponse(AsyncRequest *areq)
119 : {
120 6126 : switch (nodeTag(areq->requestor))
121 : {
122 6126 : case T_AppendState:
123 6126 : ExecAsyncAppendResponse(areq);
124 6126 : break;
125 0 : default:
126 : /* If the node doesn't support async, caller messed up. */
127 0 : elog(ERROR, "unrecognized node type: %d",
128 : (int) nodeTag(areq->requestor));
129 : }
130 6126 : }
131 :
132 : /*
133 : * A requestee node should call this function to deliver the tuple to its
134 : * requestor node. The requestee node can call this from its ExecAsyncRequest
135 : * or ExecAsyncNotify callback.
136 : */
137 : void
138 5967 : ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
139 : {
140 5967 : areq->request_complete = true;
141 5967 : areq->result = result;
142 5967 : }
143 :
144 : /*
145 : * A requestee node should call this function to indicate that it is pending
146 : * for a callback. The requestee node can call this from its ExecAsyncRequest
147 : * or ExecAsyncNotify callback.
148 : */
149 : void
150 159 : ExecAsyncRequestPending(AsyncRequest *areq)
151 : {
152 159 : areq->callback_pending = true;
153 159 : areq->request_complete = false;
154 159 : areq->result = NULL;
155 159 : }
|