Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execAsync.c
4 : * Support routines for asynchronous execution
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/execAsync.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "executor/execAsync.h"
18 : #include "executor/executor.h"
19 : #include "executor/nodeAppend.h"
20 : #include "executor/nodeForeignscan.h"
21 :
22 : /*
23 : * Asynchronously request a tuple from a designed async-capable node.
24 : */
25 : void
26 6075 : ExecAsyncRequest(AsyncRequest *areq)
27 : {
28 6075 : if (areq->requestee->chgParam != NULL) /* something changed? */
29 2 : ExecReScan(areq->requestee); /* let ReScan handle this */
30 :
31 : /* must provide our own instrumentation support */
32 6075 : if (areq->requestee->instrument)
33 810 : InstrStartNode(areq->requestee->instrument);
34 :
35 6075 : switch (nodeTag(areq->requestee))
36 : {
37 6075 : case T_ForeignScanState:
38 6075 : ExecAsyncForeignScanRequest(areq);
39 6075 : break;
40 0 : default:
41 : /* If the node doesn't support async, caller messed up. */
42 0 : elog(ERROR, "unrecognized node type: %d",
43 : (int) nodeTag(areq->requestee));
44 : }
45 :
46 6075 : ExecAsyncResponse(areq);
47 :
48 : /* must provide our own instrumentation support */
49 6075 : if (areq->requestee->instrument)
50 810 : InstrStopNode(areq->requestee->instrument,
51 810 : TupIsNull(areq->result) ? 0.0 : 1.0);
52 6075 : }
53 :
54 : /*
55 : * Give the asynchronous node a chance to configure the file descriptor event
56 : * for which it wishes to wait. We expect the node-type specific callback to
57 : * make a single call of the following form:
58 : *
59 : * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
60 : */
61 : void
62 246 : ExecAsyncConfigureWait(AsyncRequest *areq)
63 : {
64 : /* must provide our own instrumentation support */
65 246 : if (areq->requestee->instrument)
66 19 : InstrStartNode(areq->requestee->instrument);
67 :
68 246 : switch (nodeTag(areq->requestee))
69 : {
70 246 : case T_ForeignScanState:
71 246 : ExecAsyncForeignScanConfigureWait(areq);
72 245 : break;
73 0 : default:
74 : /* If the node doesn't support async, caller messed up. */
75 0 : elog(ERROR, "unrecognized node type: %d",
76 : (int) nodeTag(areq->requestee));
77 : }
78 :
79 : /* must provide our own instrumentation support */
80 245 : if (areq->requestee->instrument)
81 19 : InstrStopNode(areq->requestee->instrument, 0.0);
82 245 : }
83 :
84 : /*
85 : * Call the asynchronous node back when a relevant event has occurred.
86 : */
87 : void
88 148 : ExecAsyncNotify(AsyncRequest *areq)
89 : {
90 : /* must provide our own instrumentation support */
91 148 : if (areq->requestee->instrument)
92 15 : InstrStartNode(areq->requestee->instrument);
93 :
94 148 : switch (nodeTag(areq->requestee))
95 : {
96 148 : case T_ForeignScanState:
97 148 : ExecAsyncForeignScanNotify(areq);
98 148 : break;
99 0 : default:
100 : /* If the node doesn't support async, caller messed up. */
101 0 : elog(ERROR, "unrecognized node type: %d",
102 : (int) nodeTag(areq->requestee));
103 : }
104 :
105 148 : ExecAsyncResponse(areq);
106 :
107 : /* must provide our own instrumentation support */
108 148 : if (areq->requestee->instrument)
109 15 : InstrStopNode(areq->requestee->instrument,
110 15 : TupIsNull(areq->result) ? 0.0 : 1.0);
111 148 : }
112 :
113 : /*
114 : * Call the requestor back when an asynchronous node has produced a result.
115 : */
116 : void
117 6227 : ExecAsyncResponse(AsyncRequest *areq)
118 : {
119 6227 : switch (nodeTag(areq->requestor))
120 : {
121 6227 : case T_AppendState:
122 6227 : ExecAsyncAppendResponse(areq);
123 6227 : break;
124 0 : default:
125 : /* If the node doesn't support async, caller messed up. */
126 0 : elog(ERROR, "unrecognized node type: %d",
127 : (int) nodeTag(areq->requestor));
128 : }
129 6227 : }
130 :
131 : /*
132 : * A requestee node should call this function to deliver the tuple to its
133 : * requestor node. The requestee node can call this from its ExecAsyncRequest
134 : * or ExecAsyncNotify callback.
135 : */
136 : void
137 6067 : ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
138 : {
139 6067 : areq->request_complete = true;
140 6067 : areq->result = result;
141 6067 : }
142 :
143 : /*
144 : * A requestee node should call this function to indicate that it is pending
145 : * for a callback. The requestee node can call this from its ExecAsyncRequest
146 : * or ExecAsyncNotify callback.
147 : */
148 : void
149 160 : ExecAsyncRequestPending(AsyncRequest *areq)
150 : {
151 160 : areq->callback_pending = true;
152 160 : areq->request_complete = false;
153 160 : areq->result = NULL;
154 160 : }
|