Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeForeignscan.c
4 : * Routines to support scans of foreign tables
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeForeignscan.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /*
16 : * INTERFACE ROUTINES
17 : *
18 : * ExecForeignScan scans a foreign table.
19 : * ExecInitForeignScan creates and initializes state info.
20 : * ExecReScanForeignScan rescans the foreign relation.
21 : * ExecEndForeignScan releases any resources allocated.
22 : */
23 : #include "postgres.h"
24 :
25 : #include "executor/executor.h"
26 : #include "executor/nodeForeignscan.h"
27 : #include "foreign/fdwapi.h"
28 : #include "utils/memutils.h"
29 : #include "utils/rel.h"
30 :
31 : static TupleTableSlot *ForeignNext(ForeignScanState *node);
32 : static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot);
33 :
34 :
35 : /* ----------------------------------------------------------------
36 : * ForeignNext
37 : *
38 : * This is a workhorse for ExecForeignScan
39 : * ----------------------------------------------------------------
40 : */
41 : static TupleTableSlot *
42 140882 : ForeignNext(ForeignScanState *node)
43 : {
44 : TupleTableSlot *slot;
45 140882 : ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
46 140882 : ExprContext *econtext = node->ss.ps.ps_ExprContext;
47 : MemoryContext oldcontext;
48 :
49 : /* Call the Iterate function in short-lived context */
50 140882 : oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
51 140882 : if (plan->operation != CMD_SELECT)
52 : {
53 : /*
54 : * direct modifications cannot be re-evaluated, so shouldn't get here
55 : * during EvalPlanQual processing
56 : */
57 : Assert(node->ss.ps.state->es_epq_active == NULL);
58 :
59 836 : slot = node->fdwroutine->IterateDirectModify(node);
60 : }
61 : else
62 140046 : slot = node->fdwroutine->IterateForeignScan(node);
63 140856 : MemoryContextSwitchTo(oldcontext);
64 :
65 : /*
66 : * Insert valid value into tableoid, the only actually-useful system
67 : * column.
68 : */
69 140856 : if (plan->fsSystemCol && !TupIsNull(slot))
70 5038 : slot->tts_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
71 :
72 140856 : return slot;
73 : }
74 :
75 : /*
76 : * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual
77 : */
78 : static bool
79 0 : ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot)
80 : {
81 0 : FdwRoutine *fdwroutine = node->fdwroutine;
82 : ExprContext *econtext;
83 :
84 : /*
85 : * extract necessary information from foreign scan node
86 : */
87 0 : econtext = node->ss.ps.ps_ExprContext;
88 :
89 : /* Does the tuple meet the remote qual condition? */
90 0 : econtext->ecxt_scantuple = slot;
91 :
92 0 : ResetExprContext(econtext);
93 :
94 : /*
95 : * If an outer join is pushed down, RecheckForeignScan may need to store a
96 : * different tuple in the slot, because a different set of columns may go
97 : * to NULL upon recheck. Otherwise, it shouldn't need to change the slot
98 : * contents, just return true or false to indicate whether the quals still
99 : * pass. For simple cases, setting fdw_recheck_quals may be easier than
100 : * providing this callback.
101 : */
102 0 : if (fdwroutine->RecheckForeignScan &&
103 0 : !fdwroutine->RecheckForeignScan(node, slot))
104 0 : return false;
105 :
106 0 : return ExecQual(node->fdw_recheck_quals, econtext);
107 : }
108 :
109 : /* ----------------------------------------------------------------
110 : * ExecForeignScan(node)
111 : *
112 : * Fetches the next tuple from the FDW, checks local quals, and
113 : * returns it.
114 : * We call the ExecScan() routine and pass it the appropriate
115 : * access method functions.
116 : * ----------------------------------------------------------------
117 : */
118 : static TupleTableSlot *
119 122710 : ExecForeignScan(PlanState *pstate)
120 : {
121 122710 : ForeignScanState *node = castNode(ForeignScanState, pstate);
122 122710 : ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
123 122710 : EState *estate = node->ss.ps.state;
124 :
125 : /*
126 : * Ignore direct modifications when EvalPlanQual is active --- they are
127 : * irrelevant for EvalPlanQual rechecking
128 : */
129 122710 : if (estate->es_epq_active != NULL && plan->operation != CMD_SELECT)
130 0 : return NULL;
131 :
132 122710 : return ExecScan(&node->ss,
133 : (ExecScanAccessMtd) ForeignNext,
134 : (ExecScanRecheckMtd) ForeignRecheck);
135 : }
136 :
137 :
138 : /* ----------------------------------------------------------------
139 : * ExecInitForeignScan
140 : * ----------------------------------------------------------------
141 : */
142 : ForeignScanState *
143 1916 : ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
144 : {
145 : ForeignScanState *scanstate;
146 1916 : Relation currentRelation = NULL;
147 1916 : Index scanrelid = node->scan.scanrelid;
148 : int tlistvarno;
149 : FdwRoutine *fdwroutine;
150 :
151 : /* check for unsupported flags */
152 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
153 :
154 : /*
155 : * create state structure
156 : */
157 1916 : scanstate = makeNode(ForeignScanState);
158 1916 : scanstate->ss.ps.plan = (Plan *) node;
159 1916 : scanstate->ss.ps.state = estate;
160 1916 : scanstate->ss.ps.ExecProcNode = ExecForeignScan;
161 :
162 : /*
163 : * Miscellaneous initialization
164 : *
165 : * create expression context for node
166 : */
167 1916 : ExecAssignExprContext(estate, &scanstate->ss.ps);
168 :
169 : /*
170 : * open the scan relation, if any; also acquire function pointers from the
171 : * FDW's handler
172 : */
173 1916 : if (scanrelid > 0)
174 : {
175 1410 : currentRelation = ExecOpenScanRelation(estate, scanrelid, eflags);
176 1410 : scanstate->ss.ss_currentRelation = currentRelation;
177 1410 : fdwroutine = GetFdwRoutineForRelation(currentRelation, true);
178 : }
179 : else
180 : {
181 : /* We can't use the relcache, so get fdwroutine the hard way */
182 506 : fdwroutine = GetFdwRoutineByServerId(node->fs_server);
183 : }
184 :
185 : /*
186 : * Determine the scan tuple type. If the FDW provided a targetlist
187 : * describing the scan tuples, use that; else use base relation's rowtype.
188 : */
189 1916 : if (node->fdw_scan_tlist != NIL || currentRelation == NULL)
190 506 : {
191 : TupleDesc scan_tupdesc;
192 :
193 506 : scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist);
194 506 : ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
195 : &TTSOpsHeapTuple);
196 : /* Node's targetlist will contain Vars with varno = INDEX_VAR */
197 506 : tlistvarno = INDEX_VAR;
198 : }
199 : else
200 : {
201 : TupleDesc scan_tupdesc;
202 :
203 : /* don't trust FDWs to return tuples fulfilling NOT NULL constraints */
204 1410 : scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation));
205 1410 : ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
206 : &TTSOpsHeapTuple);
207 : /* Node's targetlist will contain Vars with varno = scanrelid */
208 1410 : tlistvarno = scanrelid;
209 : }
210 :
211 : /* Don't know what an FDW might return */
212 1916 : scanstate->ss.ps.scanopsfixed = false;
213 1916 : scanstate->ss.ps.scanopsset = true;
214 :
215 : /*
216 : * Initialize result slot, type and projection.
217 : */
218 1916 : ExecInitResultTypeTL(&scanstate->ss.ps);
219 1916 : ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno);
220 :
221 : /*
222 : * initialize child expressions
223 : */
224 1916 : scanstate->ss.ps.qual =
225 1916 : ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate);
226 1916 : scanstate->fdw_recheck_quals =
227 1916 : ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
228 :
229 : /*
230 : * Determine whether to scan the foreign relation asynchronously or not;
231 : * this has to be kept in sync with the code in ExecInitAppend().
232 : */
233 2102 : scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable &&
234 186 : estate->es_epq_active == NULL);
235 :
236 : /*
237 : * Initialize FDW-related state.
238 : */
239 1916 : scanstate->fdwroutine = fdwroutine;
240 1916 : scanstate->fdw_state = NULL;
241 :
242 : /*
243 : * For the FDW's convenience, look up the modification target relation's
244 : * ResultRelInfo. The ModifyTable node should have initialized it for us,
245 : * see ExecInitModifyTable.
246 : *
247 : * Don't try to look up the ResultRelInfo when EvalPlanQual is active,
248 : * though. Direct modifications cannot be re-evaluated as part of
249 : * EvalPlanQual. The lookup wouldn't work anyway because during
250 : * EvalPlanQual processing, EvalPlanQual only initializes the subtree
251 : * under the ModifyTable, and doesn't run ExecInitModifyTable.
252 : */
253 1916 : if (node->resultRelation > 0 && estate->es_epq_active == NULL)
254 : {
255 208 : if (estate->es_result_relations == NULL ||
256 208 : estate->es_result_relations[node->resultRelation - 1] == NULL)
257 : {
258 0 : elog(ERROR, "result relation not initialized");
259 : }
260 208 : scanstate->resultRelInfo = estate->es_result_relations[node->resultRelation - 1];
261 : }
262 :
263 : /* Initialize any outer plan. */
264 1916 : if (outerPlan(node))
265 24 : outerPlanState(scanstate) =
266 24 : ExecInitNode(outerPlan(node), estate, eflags);
267 :
268 : /*
269 : * Tell the FDW to initialize the scan.
270 : */
271 1916 : if (node->operation != CMD_SELECT)
272 : {
273 : /*
274 : * Direct modifications cannot be re-evaluated by EvalPlanQual, so
275 : * don't bother preparing the FDW.
276 : *
277 : * In case of an inherited UPDATE/DELETE with foreign targets there
278 : * can be direct-modify ForeignScan nodes in the EvalPlanQual subtree,
279 : * so we need to ignore such ForeignScan nodes during EvalPlanQual
280 : * processing. See also ExecForeignScan/ExecReScanForeignScan.
281 : */
282 208 : if (estate->es_epq_active == NULL)
283 208 : fdwroutine->BeginDirectModify(scanstate, eflags);
284 : }
285 : else
286 1708 : fdwroutine->BeginForeignScan(scanstate, eflags);
287 :
288 1900 : return scanstate;
289 : }
290 :
291 : /* ----------------------------------------------------------------
292 : * ExecEndForeignScan
293 : *
294 : * frees any storage allocated through C routines.
295 : * ----------------------------------------------------------------
296 : */
297 : void
298 1848 : ExecEndForeignScan(ForeignScanState *node)
299 : {
300 1848 : ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
301 1848 : EState *estate = node->ss.ps.state;
302 :
303 : /* Let the FDW shut down */
304 1848 : if (plan->operation != CMD_SELECT)
305 : {
306 192 : if (estate->es_epq_active == NULL)
307 192 : node->fdwroutine->EndDirectModify(node);
308 : }
309 : else
310 1656 : node->fdwroutine->EndForeignScan(node);
311 :
312 : /* Shut down any outer plan. */
313 1848 : if (outerPlanState(node))
314 24 : ExecEndNode(outerPlanState(node));
315 1848 : }
316 :
317 : /* ----------------------------------------------------------------
318 : * ExecReScanForeignScan
319 : *
320 : * Rescans the relation.
321 : * ----------------------------------------------------------------
322 : */
323 : void
324 804 : ExecReScanForeignScan(ForeignScanState *node)
325 : {
326 804 : ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
327 804 : EState *estate = node->ss.ps.state;
328 804 : PlanState *outerPlan = outerPlanState(node);
329 :
330 : /*
331 : * Ignore direct modifications when EvalPlanQual is active --- they are
332 : * irrelevant for EvalPlanQual rechecking
333 : */
334 804 : if (estate->es_epq_active != NULL && plan->operation != CMD_SELECT)
335 0 : return;
336 :
337 804 : node->fdwroutine->ReScanForeignScan(node);
338 :
339 : /*
340 : * If chgParam of subnode is not null then plan will be re-scanned by
341 : * first ExecProcNode. outerPlan may also be NULL, in which case there is
342 : * nothing to rescan at all.
343 : */
344 804 : if (outerPlan != NULL && outerPlan->chgParam == NULL)
345 20 : ExecReScan(outerPlan);
346 :
347 804 : ExecScanReScan(&node->ss);
348 : }
349 :
350 : /* ----------------------------------------------------------------
351 : * ExecForeignScanEstimate
352 : *
353 : * Informs size of the parallel coordination information, if any
354 : * ----------------------------------------------------------------
355 : */
356 : void
357 0 : ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
358 : {
359 0 : FdwRoutine *fdwroutine = node->fdwroutine;
360 :
361 0 : if (fdwroutine->EstimateDSMForeignScan)
362 : {
363 0 : node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
364 0 : shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
365 0 : shm_toc_estimate_keys(&pcxt->estimator, 1);
366 : }
367 0 : }
368 :
369 : /* ----------------------------------------------------------------
370 : * ExecForeignScanInitializeDSM
371 : *
372 : * Initialize the parallel coordination information
373 : * ----------------------------------------------------------------
374 : */
375 : void
376 0 : ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
377 : {
378 0 : FdwRoutine *fdwroutine = node->fdwroutine;
379 :
380 0 : if (fdwroutine->InitializeDSMForeignScan)
381 : {
382 0 : int plan_node_id = node->ss.ps.plan->plan_node_id;
383 : void *coordinate;
384 :
385 0 : coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
386 0 : fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
387 0 : shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
388 : }
389 0 : }
390 :
391 : /* ----------------------------------------------------------------
392 : * ExecForeignScanReInitializeDSM
393 : *
394 : * Reset shared state before beginning a fresh scan.
395 : * ----------------------------------------------------------------
396 : */
397 : void
398 0 : ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
399 : {
400 0 : FdwRoutine *fdwroutine = node->fdwroutine;
401 :
402 0 : if (fdwroutine->ReInitializeDSMForeignScan)
403 : {
404 0 : int plan_node_id = node->ss.ps.plan->plan_node_id;
405 : void *coordinate;
406 :
407 0 : coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
408 0 : fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate);
409 : }
410 0 : }
411 :
412 : /* ----------------------------------------------------------------
413 : * ExecForeignScanInitializeWorker
414 : *
415 : * Initialization according to the parallel coordination information
416 : * ----------------------------------------------------------------
417 : */
418 : void
419 0 : ExecForeignScanInitializeWorker(ForeignScanState *node,
420 : ParallelWorkerContext *pwcxt)
421 : {
422 0 : FdwRoutine *fdwroutine = node->fdwroutine;
423 :
424 0 : if (fdwroutine->InitializeWorkerForeignScan)
425 : {
426 0 : int plan_node_id = node->ss.ps.plan->plan_node_id;
427 : void *coordinate;
428 :
429 0 : coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
430 0 : fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate);
431 : }
432 0 : }
433 :
434 : /* ----------------------------------------------------------------
435 : * ExecShutdownForeignScan
436 : *
437 : * Gives FDW chance to stop asynchronous resource consumption
438 : * and release any resources still held.
439 : * ----------------------------------------------------------------
440 : */
441 : void
442 1084 : ExecShutdownForeignScan(ForeignScanState *node)
443 : {
444 1084 : FdwRoutine *fdwroutine = node->fdwroutine;
445 :
446 1084 : if (fdwroutine->ShutdownForeignScan)
447 0 : fdwroutine->ShutdownForeignScan(node);
448 1084 : }
449 :
450 : /* ----------------------------------------------------------------
451 : * ExecAsyncForeignScanRequest
452 : *
453 : * Asynchronously request a tuple from a designed async-capable node
454 : * ----------------------------------------------------------------
455 : */
456 : void
457 12150 : ExecAsyncForeignScanRequest(AsyncRequest *areq)
458 : {
459 12150 : ForeignScanState *node = (ForeignScanState *) areq->requestee;
460 12150 : FdwRoutine *fdwroutine = node->fdwroutine;
461 :
462 : Assert(fdwroutine->ForeignAsyncRequest != NULL);
463 12150 : fdwroutine->ForeignAsyncRequest(areq);
464 12150 : }
465 :
466 : /* ----------------------------------------------------------------
467 : * ExecAsyncForeignScanConfigureWait
468 : *
469 : * In async mode, configure for a wait
470 : * ----------------------------------------------------------------
471 : */
472 : void
473 430 : ExecAsyncForeignScanConfigureWait(AsyncRequest *areq)
474 : {
475 430 : ForeignScanState *node = (ForeignScanState *) areq->requestee;
476 430 : FdwRoutine *fdwroutine = node->fdwroutine;
477 :
478 : Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
479 430 : fdwroutine->ForeignAsyncConfigureWait(areq);
480 428 : }
481 :
482 : /* ----------------------------------------------------------------
483 : * ExecAsyncForeignScanNotify
484 : *
485 : * Callback invoked when a relevant event has occurred
486 : * ----------------------------------------------------------------
487 : */
488 : void
489 294 : ExecAsyncForeignScanNotify(AsyncRequest *areq)
490 : {
491 294 : ForeignScanState *node = (ForeignScanState *) areq->requestee;
492 294 : FdwRoutine *fdwroutine = node->fdwroutine;
493 :
494 : Assert(fdwroutine->ForeignAsyncNotify != NULL);
495 294 : fdwroutine->ForeignAsyncNotify(areq);
496 294 : }
|