Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeForeignscan.c
4 : * Routines to support scans of foreign tables
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeForeignscan.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /*
16 : * INTERFACE ROUTINES
17 : *
18 : * ExecForeignScan scans a foreign table.
19 : * ExecInitForeignScan creates and initializes state info.
20 : * ExecReScanForeignScan rescans the foreign relation.
21 : * ExecEndForeignScan releases any resources allocated.
22 : */
23 : #include "postgres.h"
24 :
25 : #include "executor/executor.h"
26 : #include "executor/nodeForeignscan.h"
27 : #include "foreign/fdwapi.h"
28 : #include "utils/rel.h"
29 :
30 : static TupleTableSlot *ForeignNext(ForeignScanState *node);
31 : static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot);
32 :
33 :
34 : /* ----------------------------------------------------------------
35 : * ForeignNext
36 : *
37 : * This is a workhorse for ExecForeignScan
38 : * ----------------------------------------------------------------
39 : */
40 : static TupleTableSlot *
41 142492 : ForeignNext(ForeignScanState *node)
42 : {
43 : TupleTableSlot *slot;
44 142492 : ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
45 142492 : ExprContext *econtext = node->ss.ps.ps_ExprContext;
46 : MemoryContext oldcontext;
47 :
48 : /* Call the Iterate function in short-lived context */
49 142492 : oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
50 142492 : if (plan->operation != CMD_SELECT)
51 : {
52 : /*
53 : * direct modifications cannot be re-evaluated, so shouldn't get here
54 : * during EvalPlanQual processing
55 : */
56 : Assert(node->ss.ps.state->es_epq_active == NULL);
57 :
58 836 : slot = node->fdwroutine->IterateDirectModify(node);
59 : }
60 : else
61 141656 : slot = node->fdwroutine->IterateForeignScan(node);
62 142464 : MemoryContextSwitchTo(oldcontext);
63 :
64 : /*
65 : * Insert valid value into tableoid, the only actually-useful system
66 : * column.
67 : */
68 142464 : if (plan->fsSystemCol && !TupIsNull(slot))
69 9046 : slot->tts_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
70 :
71 142464 : return slot;
72 : }
73 :
74 : /*
75 : * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual
76 : */
77 : static bool
78 0 : ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot)
79 : {
80 0 : FdwRoutine *fdwroutine = node->fdwroutine;
81 : ExprContext *econtext;
82 :
83 : /*
84 : * extract necessary information from foreign scan node
85 : */
86 0 : econtext = node->ss.ps.ps_ExprContext;
87 :
88 : /* Does the tuple meet the remote qual condition? */
89 0 : econtext->ecxt_scantuple = slot;
90 :
91 0 : ResetExprContext(econtext);
92 :
93 : /*
94 : * If an outer join is pushed down, RecheckForeignScan may need to store a
95 : * different tuple in the slot, because a different set of columns may go
96 : * to NULL upon recheck. Otherwise, it shouldn't need to change the slot
97 : * contents, just return true or false to indicate whether the quals still
98 : * pass. For simple cases, setting fdw_recheck_quals may be easier than
99 : * providing this callback.
100 : */
101 0 : if (fdwroutine->RecheckForeignScan &&
102 0 : !fdwroutine->RecheckForeignScan(node, slot))
103 0 : return false;
104 :
105 0 : return ExecQual(node->fdw_recheck_quals, econtext);
106 : }
107 :
108 : /* ----------------------------------------------------------------
109 : * ExecForeignScan(node)
110 : *
111 : * Fetches the next tuple from the FDW, checks local quals, and
112 : * returns it.
113 : * We call the ExecScan() routine and pass it the appropriate
114 : * access method functions.
115 : * ----------------------------------------------------------------
116 : */
117 : static TupleTableSlot *
118 124320 : ExecForeignScan(PlanState *pstate)
119 : {
120 124320 : ForeignScanState *node = castNode(ForeignScanState, pstate);
121 124320 : ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
122 124320 : EState *estate = node->ss.ps.state;
123 :
124 : /*
125 : * Ignore direct modifications when EvalPlanQual is active --- they are
126 : * irrelevant for EvalPlanQual rechecking
127 : */
128 124320 : if (estate->es_epq_active != NULL && plan->operation != CMD_SELECT)
129 0 : return NULL;
130 :
131 124320 : return ExecScan(&node->ss,
132 : (ExecScanAccessMtd) ForeignNext,
133 : (ExecScanRecheckMtd) ForeignRecheck);
134 : }
135 :
136 :
137 : /* ----------------------------------------------------------------
138 : * ExecInitForeignScan
139 : * ----------------------------------------------------------------
140 : */
141 : ForeignScanState *
142 1960 : ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
143 : {
144 : ForeignScanState *scanstate;
145 1960 : Relation currentRelation = NULL;
146 1960 : Index scanrelid = node->scan.scanrelid;
147 : int tlistvarno;
148 : FdwRoutine *fdwroutine;
149 :
150 : /* check for unsupported flags */
151 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
152 :
153 : /*
154 : * create state structure
155 : */
156 1960 : scanstate = makeNode(ForeignScanState);
157 1960 : scanstate->ss.ps.plan = (Plan *) node;
158 1960 : scanstate->ss.ps.state = estate;
159 1960 : scanstate->ss.ps.ExecProcNode = ExecForeignScan;
160 :
161 : /*
162 : * Miscellaneous initialization
163 : *
164 : * create expression context for node
165 : */
166 1960 : ExecAssignExprContext(estate, &scanstate->ss.ps);
167 :
168 : /*
169 : * open the scan relation, if any; also acquire function pointers from the
170 : * FDW's handler
171 : */
172 1960 : if (scanrelid > 0)
173 : {
174 1404 : currentRelation = ExecOpenScanRelation(estate, scanrelid, eflags);
175 1404 : scanstate->ss.ss_currentRelation = currentRelation;
176 1404 : fdwroutine = GetFdwRoutineForRelation(currentRelation, true);
177 : }
178 : else
179 : {
180 : /* We can't use the relcache, so get fdwroutine the hard way */
181 556 : fdwroutine = GetFdwRoutineByServerId(node->fs_server);
182 : }
183 :
184 : /*
185 : * Determine the scan tuple type. If the FDW provided a targetlist
186 : * describing the scan tuples, use that; else use base relation's rowtype.
187 : */
188 1960 : if (node->fdw_scan_tlist != NIL || currentRelation == NULL)
189 556 : {
190 : TupleDesc scan_tupdesc;
191 :
192 556 : scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist);
193 556 : ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
194 : &TTSOpsHeapTuple);
195 : /* Node's targetlist will contain Vars with varno = INDEX_VAR */
196 556 : tlistvarno = INDEX_VAR;
197 : }
198 : else
199 : {
200 : TupleDesc scan_tupdesc;
201 :
202 : /* don't trust FDWs to return tuples fulfilling NOT NULL constraints */
203 1404 : scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation));
204 1404 : ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
205 : &TTSOpsHeapTuple);
206 : /* Node's targetlist will contain Vars with varno = scanrelid */
207 1404 : tlistvarno = scanrelid;
208 : }
209 :
210 : /* Don't know what an FDW might return */
211 1960 : scanstate->ss.ps.scanopsfixed = false;
212 1960 : scanstate->ss.ps.scanopsset = true;
213 :
214 : /*
215 : * Initialize result slot, type and projection.
216 : */
217 1960 : ExecInitResultTypeTL(&scanstate->ss.ps);
218 1960 : ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno);
219 :
220 : /*
221 : * initialize child expressions
222 : */
223 1960 : scanstate->ss.ps.qual =
224 1960 : ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate);
225 1960 : scanstate->fdw_recheck_quals =
226 1960 : ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
227 :
228 : /*
229 : * Determine whether to scan the foreign relation asynchronously or not;
230 : * this has to be kept in sync with the code in ExecInitAppend().
231 : */
232 2146 : scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable &&
233 186 : estate->es_epq_active == NULL);
234 :
235 : /*
236 : * Initialize FDW-related state.
237 : */
238 1960 : scanstate->fdwroutine = fdwroutine;
239 1960 : scanstate->fdw_state = NULL;
240 :
241 : /*
242 : * For the FDW's convenience, look up the modification target relation's
243 : * ResultRelInfo. The ModifyTable node should have initialized it for us,
244 : * see ExecInitModifyTable.
245 : *
246 : * Don't try to look up the ResultRelInfo when EvalPlanQual is active,
247 : * though. Direct modifications cannot be re-evaluated as part of
248 : * EvalPlanQual. The lookup wouldn't work anyway because during
249 : * EvalPlanQual processing, EvalPlanQual only initializes the subtree
250 : * under the ModifyTable, and doesn't run ExecInitModifyTable.
251 : */
252 1960 : if (node->resultRelation > 0 && estate->es_epq_active == NULL)
253 : {
254 208 : if (estate->es_result_relations == NULL ||
255 208 : estate->es_result_relations[node->resultRelation - 1] == NULL)
256 : {
257 0 : elog(ERROR, "result relation not initialized");
258 : }
259 208 : scanstate->resultRelInfo = estate->es_result_relations[node->resultRelation - 1];
260 : }
261 :
262 : /* Initialize any outer plan. */
263 1960 : if (outerPlan(node))
264 24 : outerPlanState(scanstate) =
265 24 : ExecInitNode(outerPlan(node), estate, eflags);
266 :
267 : /*
268 : * Tell the FDW to initialize the scan.
269 : */
270 1960 : if (node->operation != CMD_SELECT)
271 : {
272 : /*
273 : * Direct modifications cannot be re-evaluated by EvalPlanQual, so
274 : * don't bother preparing the FDW.
275 : *
276 : * In case of an inherited UPDATE/DELETE with foreign targets there
277 : * can be direct-modify ForeignScan nodes in the EvalPlanQual subtree,
278 : * so we need to ignore such ForeignScan nodes during EvalPlanQual
279 : * processing. See also ExecForeignScan/ExecReScanForeignScan.
280 : */
281 208 : if (estate->es_epq_active == NULL)
282 208 : fdwroutine->BeginDirectModify(scanstate, eflags);
283 : }
284 : else
285 1752 : fdwroutine->BeginForeignScan(scanstate, eflags);
286 :
287 1944 : return scanstate;
288 : }
289 :
290 : /* ----------------------------------------------------------------
291 : * ExecEndForeignScan
292 : *
293 : * frees any storage allocated through C routines.
294 : * ----------------------------------------------------------------
295 : */
296 : void
297 1890 : ExecEndForeignScan(ForeignScanState *node)
298 : {
299 1890 : ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
300 1890 : EState *estate = node->ss.ps.state;
301 :
302 : /* Let the FDW shut down */
303 1890 : if (plan->operation != CMD_SELECT)
304 : {
305 192 : if (estate->es_epq_active == NULL)
306 192 : node->fdwroutine->EndDirectModify(node);
307 : }
308 : else
309 1698 : node->fdwroutine->EndForeignScan(node);
310 :
311 : /* Shut down any outer plan. */
312 1890 : if (outerPlanState(node))
313 24 : ExecEndNode(outerPlanState(node));
314 1890 : }
315 :
316 : /* ----------------------------------------------------------------
317 : * ExecReScanForeignScan
318 : *
319 : * Rescans the relation.
320 : * ----------------------------------------------------------------
321 : */
322 : void
323 808 : ExecReScanForeignScan(ForeignScanState *node)
324 : {
325 808 : ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
326 808 : EState *estate = node->ss.ps.state;
327 808 : PlanState *outerPlan = outerPlanState(node);
328 :
329 : /*
330 : * Ignore direct modifications when EvalPlanQual is active --- they are
331 : * irrelevant for EvalPlanQual rechecking
332 : */
333 808 : if (estate->es_epq_active != NULL && plan->operation != CMD_SELECT)
334 0 : return;
335 :
336 808 : node->fdwroutine->ReScanForeignScan(node);
337 :
338 : /*
339 : * If chgParam of subnode is not null then plan will be re-scanned by
340 : * first ExecProcNode. outerPlan may also be NULL, in which case there is
341 : * nothing to rescan at all.
342 : */
343 808 : if (outerPlan != NULL && outerPlan->chgParam == NULL)
344 20 : ExecReScan(outerPlan);
345 :
346 808 : ExecScanReScan(&node->ss);
347 : }
348 :
349 : /* ----------------------------------------------------------------
350 : * ExecForeignScanEstimate
351 : *
352 : * Informs size of the parallel coordination information, if any
353 : * ----------------------------------------------------------------
354 : */
355 : void
356 0 : ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
357 : {
358 0 : FdwRoutine *fdwroutine = node->fdwroutine;
359 :
360 0 : if (fdwroutine->EstimateDSMForeignScan)
361 : {
362 0 : node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
363 0 : shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
364 0 : shm_toc_estimate_keys(&pcxt->estimator, 1);
365 : }
366 0 : }
367 :
368 : /* ----------------------------------------------------------------
369 : * ExecForeignScanInitializeDSM
370 : *
371 : * Initialize the parallel coordination information
372 : * ----------------------------------------------------------------
373 : */
374 : void
375 0 : ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
376 : {
377 0 : FdwRoutine *fdwroutine = node->fdwroutine;
378 :
379 0 : if (fdwroutine->InitializeDSMForeignScan)
380 : {
381 0 : int plan_node_id = node->ss.ps.plan->plan_node_id;
382 : void *coordinate;
383 :
384 0 : coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
385 0 : fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
386 0 : shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
387 : }
388 0 : }
389 :
390 : /* ----------------------------------------------------------------
391 : * ExecForeignScanReInitializeDSM
392 : *
393 : * Reset shared state before beginning a fresh scan.
394 : * ----------------------------------------------------------------
395 : */
396 : void
397 0 : ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
398 : {
399 0 : FdwRoutine *fdwroutine = node->fdwroutine;
400 :
401 0 : if (fdwroutine->ReInitializeDSMForeignScan)
402 : {
403 0 : int plan_node_id = node->ss.ps.plan->plan_node_id;
404 : void *coordinate;
405 :
406 0 : coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
407 0 : fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate);
408 : }
409 0 : }
410 :
411 : /* ----------------------------------------------------------------
412 : * ExecForeignScanInitializeWorker
413 : *
414 : * Initialization according to the parallel coordination information
415 : * ----------------------------------------------------------------
416 : */
417 : void
418 0 : ExecForeignScanInitializeWorker(ForeignScanState *node,
419 : ParallelWorkerContext *pwcxt)
420 : {
421 0 : FdwRoutine *fdwroutine = node->fdwroutine;
422 :
423 0 : if (fdwroutine->InitializeWorkerForeignScan)
424 : {
425 0 : int plan_node_id = node->ss.ps.plan->plan_node_id;
426 : void *coordinate;
427 :
428 0 : coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
429 0 : fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate);
430 : }
431 0 : }
432 :
433 : /* ----------------------------------------------------------------
434 : * ExecShutdownForeignScan
435 : *
436 : * Gives FDW chance to stop asynchronous resource consumption
437 : * and release any resources still held.
438 : * ----------------------------------------------------------------
439 : */
440 : void
441 1098 : ExecShutdownForeignScan(ForeignScanState *node)
442 : {
443 1098 : FdwRoutine *fdwroutine = node->fdwroutine;
444 :
445 1098 : if (fdwroutine->ShutdownForeignScan)
446 0 : fdwroutine->ShutdownForeignScan(node);
447 1098 : }
448 :
449 : /* ----------------------------------------------------------------
450 : * ExecAsyncForeignScanRequest
451 : *
452 : * Asynchronously request a tuple from a designed async-capable node
453 : * ----------------------------------------------------------------
454 : */
455 : void
456 12150 : ExecAsyncForeignScanRequest(AsyncRequest *areq)
457 : {
458 12150 : ForeignScanState *node = (ForeignScanState *) areq->requestee;
459 12150 : FdwRoutine *fdwroutine = node->fdwroutine;
460 :
461 : Assert(fdwroutine->ForeignAsyncRequest != NULL);
462 12150 : fdwroutine->ForeignAsyncRequest(areq);
463 12150 : }
464 :
465 : /* ----------------------------------------------------------------
466 : * ExecAsyncForeignScanConfigureWait
467 : *
468 : * In async mode, configure for a wait
469 : * ----------------------------------------------------------------
470 : */
471 : void
472 368 : ExecAsyncForeignScanConfigureWait(AsyncRequest *areq)
473 : {
474 368 : ForeignScanState *node = (ForeignScanState *) areq->requestee;
475 368 : FdwRoutine *fdwroutine = node->fdwroutine;
476 :
477 : Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
478 368 : fdwroutine->ForeignAsyncConfigureWait(areq);
479 366 : }
480 :
481 : /* ----------------------------------------------------------------
482 : * ExecAsyncForeignScanNotify
483 : *
484 : * Callback invoked when a relevant event has occurred
485 : * ----------------------------------------------------------------
486 : */
487 : void
488 294 : ExecAsyncForeignScanNotify(AsyncRequest *areq)
489 : {
490 294 : ForeignScanState *node = (ForeignScanState *) areq->requestee;
491 294 : FdwRoutine *fdwroutine = node->fdwroutine;
492 :
493 : Assert(fdwroutine->ForeignAsyncNotify != NULL);
494 294 : fdwroutine->ForeignAsyncNotify(areq);
495 294 : }
|