Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeAppend.c
4 : * routines to handle append nodes.
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeAppend.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /* INTERFACE ROUTINES
16 : * ExecInitAppend - initialize the append node
17 : * ExecAppend - retrieve the next tuple from the node
18 : * ExecEndAppend - shut down the append node
19 : * ExecReScanAppend - rescan the append node
20 : *
21 : * NOTES
22 : * Each append node contains a list of one or more subplans which
23 : * must be iteratively processed (forwards or backwards).
24 : * Tuples are retrieved by executing the 'whichplan'th subplan
25 : * until the subplan stops returning tuples, at which point that
26 : * plan is shut down and the next started up.
27 : *
28 : * Append nodes don't make use of their left and right
29 : * subtrees, rather they maintain a list of subplans so
30 : * a typical append node looks like this in the plan tree:
31 : *
32 : * ...
33 : * /
34 : * Append -------+------+------+--- nil
35 : * / \ | | |
36 : * nil nil ... ... ...
37 : * subplans
38 : *
39 : * Append nodes are currently used for unions, and to support
40 : * inheritance queries, where several relations need to be scanned.
41 : * For example, in our standard person/student/employee/student-emp
42 : * example, where student and employee inherit from person
43 : * and student-emp inherits from student and employee, the
44 : * query:
45 : *
46 : * select name from person
47 : *
48 : * generates the plan:
49 : *
50 : * |
51 : * Append -------+-------+--------+--------+
52 : * / \ | | | |
53 : * nil nil Scan Scan Scan Scan
54 : * | | | |
55 : * person employee student student-emp
56 : */
57 :
58 : #include "postgres.h"
59 :
60 : #include "executor/execAsync.h"
61 : #include "executor/execPartition.h"
62 : #include "executor/executor.h"
63 : #include "executor/nodeAppend.h"
64 : #include "miscadmin.h"
65 : #include "pgstat.h"
66 : #include "storage/latch.h"
67 : #include "storage/lwlock.h"
68 : #include "utils/wait_event.h"
69 :
70 : /* Shared state for parallel-aware Append. */
71 : struct ParallelAppendState
72 : {
73 : LWLock pa_lock; /* mutual exclusion to choose next subplan */
74 : int pa_next_plan; /* next plan to choose by any worker */
75 :
76 : /*
77 : * pa_finished[i] should be true if no more workers should select subplan
78 : * i. for a non-partial plan, this should be set to true as soon as a
79 : * worker selects the plan; for a partial plan, it remains false until
80 : * some worker executes the plan to completion.
81 : */
82 : bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
83 : };
84 :
85 : #define INVALID_SUBPLAN_INDEX -1
86 : #define EVENT_BUFFER_SIZE 16
87 :
88 : static TupleTableSlot *ExecAppend(PlanState *pstate);
89 : static bool choose_next_subplan_locally(AppendState *node);
90 : static bool choose_next_subplan_for_leader(AppendState *node);
91 : static bool choose_next_subplan_for_worker(AppendState *node);
92 : static void mark_invalid_subplans_as_finished(AppendState *node);
93 : static void ExecAppendAsyncBegin(AppendState *node);
94 : static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
95 : static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
96 : static void ExecAppendAsyncEventWait(AppendState *node);
97 : static void classify_matching_subplans(AppendState *node);
98 :
99 : /* ----------------------------------------------------------------
100 : * ExecInitAppend
101 : *
102 : * Begin all of the subscans of the append node.
103 : *
104 : * (This is potentially wasteful, since the entire result of the
105 : * append node may not be scanned, but this way all of the
106 : * structures get allocated in the executor's top level memory
107 : * block instead of that of the call to ExecAppend.)
108 : * ----------------------------------------------------------------
109 : */
110 : AppendState *
111 9471 : ExecInitAppend(Append *node, EState *estate, int eflags)
112 : {
113 9471 : AppendState *appendstate = makeNode(AppendState);
114 : PlanState **appendplanstates;
115 : const TupleTableSlotOps *appendops;
116 : Bitmapset *validsubplans;
117 : Bitmapset *asyncplans;
118 : int nplans;
119 : int nasyncplans;
120 : int firstvalid;
121 : int i,
122 : j;
123 :
124 : /* check for unsupported flags */
125 : Assert(!(eflags & EXEC_FLAG_MARK));
126 :
127 : /*
128 : * create new AppendState for our append node
129 : */
130 9471 : appendstate->ps.plan = (Plan *) node;
131 9471 : appendstate->ps.state = estate;
132 9471 : appendstate->ps.ExecProcNode = ExecAppend;
133 :
134 : /* Let choose_next_subplan_* function handle setting the first subplan */
135 9471 : appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
136 9471 : appendstate->as_syncdone = false;
137 9471 : appendstate->as_begun = false;
138 :
139 : /* If run-time partition pruning is enabled, then set that up now */
140 9471 : if (node->part_prune_index >= 0)
141 : {
142 : PartitionPruneState *prunestate;
143 :
144 : /*
145 : * Set up pruning data structure. This also initializes the set of
146 : * subplans to initialize (validsubplans) by taking into account the
147 : * result of performing initial pruning if any.
148 : */
149 370 : prunestate = ExecInitPartitionExecPruning(&appendstate->ps,
150 370 : list_length(node->appendplans),
151 : node->part_prune_index,
152 : node->apprelids,
153 : &validsubplans);
154 370 : appendstate->as_prune_state = prunestate;
155 370 : nplans = bms_num_members(validsubplans);
156 :
157 : /*
158 : * When no run-time pruning is required and there's at least one
159 : * subplan, we can fill as_valid_subplans immediately, preventing
160 : * later calls to ExecFindMatchingSubPlans.
161 : */
162 370 : if (!prunestate->do_exec_prune && nplans > 0)
163 : {
164 135 : appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
165 135 : appendstate->as_valid_subplans_identified = true;
166 : }
167 : }
168 : else
169 : {
170 9101 : nplans = list_length(node->appendplans);
171 :
172 : /*
173 : * When run-time partition pruning is not enabled we can just mark all
174 : * subplans as valid; they must also all be initialized.
175 : */
176 : Assert(nplans > 0);
177 9101 : appendstate->as_valid_subplans = validsubplans =
178 9101 : bms_add_range(NULL, 0, nplans - 1);
179 9101 : appendstate->as_valid_subplans_identified = true;
180 9101 : appendstate->as_prune_state = NULL;
181 : }
182 :
183 9471 : appendplanstates = (PlanState **) palloc(nplans *
184 : sizeof(PlanState *));
185 :
186 : /*
187 : * call ExecInitNode on each of the valid plans to be executed and save
188 : * the results into the appendplanstates array.
189 : *
190 : * While at it, find out the first valid partial plan.
191 : */
192 9471 : j = 0;
193 9471 : asyncplans = NULL;
194 9471 : nasyncplans = 0;
195 9471 : firstvalid = nplans;
196 9471 : i = -1;
197 38491 : while ((i = bms_next_member(validsubplans, i)) >= 0)
198 : {
199 29020 : Plan *initNode = (Plan *) list_nth(node->appendplans, i);
200 :
201 : /*
202 : * Record async subplans. When executing EvalPlanQual, we treat them
203 : * as sync ones; don't do this when initializing an EvalPlanQual plan
204 : * tree.
205 : */
206 29020 : if (initNode->async_capable && estate->es_epq_active == NULL)
207 : {
208 93 : asyncplans = bms_add_member(asyncplans, j);
209 93 : nasyncplans++;
210 : }
211 :
212 : /*
213 : * Record the lowest appendplans index which is a valid partial plan.
214 : */
215 29020 : if (i >= node->first_partial_plan && j < firstvalid)
216 224 : firstvalid = j;
217 :
218 29020 : appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
219 : }
220 :
221 9471 : appendstate->as_first_partial_plan = firstvalid;
222 9471 : appendstate->appendplans = appendplanstates;
223 9471 : appendstate->as_nplans = nplans;
224 :
225 : /*
226 : * Initialize Append's result tuple type and slot. If the child plans all
227 : * produce the same fixed slot type, we can use that slot type; otherwise
228 : * make a virtual slot. (Note that the result slot itself is used only to
229 : * return a null tuple at end of execution; real tuples are returned to
230 : * the caller in the children's own result slots. What we are doing here
231 : * is allowing the parent plan node to optimize if the Append will return
232 : * only one kind of slot.)
233 : */
234 9471 : appendops = ExecGetCommonSlotOps(appendplanstates, j);
235 9471 : if (appendops != NULL)
236 : {
237 8963 : ExecInitResultTupleSlotTL(&appendstate->ps, appendops);
238 : }
239 : else
240 : {
241 508 : ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
242 : /* show that the output slot type is not fixed */
243 508 : appendstate->ps.resultopsset = true;
244 508 : appendstate->ps.resultopsfixed = false;
245 : }
246 :
247 : /* Initialize async state */
248 9471 : appendstate->as_asyncplans = asyncplans;
249 9471 : appendstate->as_nasyncplans = nasyncplans;
250 9471 : appendstate->as_asyncrequests = NULL;
251 9471 : appendstate->as_asyncresults = NULL;
252 9471 : appendstate->as_nasyncresults = 0;
253 9471 : appendstate->as_nasyncremain = 0;
254 9471 : appendstate->as_needrequest = NULL;
255 9471 : appendstate->as_eventset = NULL;
256 9471 : appendstate->as_valid_asyncplans = NULL;
257 :
258 9471 : if (nasyncplans > 0)
259 : {
260 47 : appendstate->as_asyncrequests = (AsyncRequest **)
261 47 : palloc0(nplans * sizeof(AsyncRequest *));
262 :
263 47 : i = -1;
264 140 : while ((i = bms_next_member(asyncplans, i)) >= 0)
265 : {
266 : AsyncRequest *areq;
267 :
268 93 : areq = palloc_object(AsyncRequest);
269 93 : areq->requestor = (PlanState *) appendstate;
270 93 : areq->requestee = appendplanstates[i];
271 93 : areq->request_index = i;
272 93 : areq->callback_pending = false;
273 93 : areq->request_complete = false;
274 93 : areq->result = NULL;
275 :
276 93 : appendstate->as_asyncrequests[i] = areq;
277 : }
278 :
279 47 : appendstate->as_asyncresults = (TupleTableSlot **)
280 47 : palloc0(nasyncplans * sizeof(TupleTableSlot *));
281 :
282 47 : if (appendstate->as_valid_subplans_identified)
283 44 : classify_matching_subplans(appendstate);
284 : }
285 :
286 : /*
287 : * Miscellaneous initialization
288 : */
289 :
290 9471 : appendstate->ps.ps_ProjInfo = NULL;
291 :
292 : /* For parallel query, this will be overridden later. */
293 9471 : appendstate->choose_next_subplan = choose_next_subplan_locally;
294 :
295 9471 : return appendstate;
296 : }
297 :
298 : /* ----------------------------------------------------------------
299 : * ExecAppend
300 : *
301 : * Handles iteration over multiple subplans.
302 : * ----------------------------------------------------------------
303 : */
304 : static TupleTableSlot *
305 1482689 : ExecAppend(PlanState *pstate)
306 : {
307 1482689 : AppendState *node = castNode(AppendState, pstate);
308 : TupleTableSlot *result;
309 :
310 : /*
311 : * If this is the first call after Init or ReScan, we need to do the
312 : * initialization work.
313 : */
314 1482689 : if (!node->as_begun)
315 : {
316 : Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
317 : Assert(!node->as_syncdone);
318 :
319 : /* Nothing to do if there are no subplans */
320 23865 : if (node->as_nplans == 0)
321 30 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
322 :
323 : /* If there are any async subplans, begin executing them. */
324 23835 : if (node->as_nasyncplans > 0)
325 37 : ExecAppendAsyncBegin(node);
326 :
327 : /*
328 : * If no sync subplan has been chosen, we must choose one before
329 : * proceeding.
330 : */
331 23835 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
332 1596 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
333 :
334 : Assert(node->as_syncdone ||
335 : (node->as_whichplan >= 0 &&
336 : node->as_whichplan < node->as_nplans));
337 :
338 : /* And we're initialized. */
339 22239 : node->as_begun = true;
340 : }
341 :
342 : for (;;)
343 30018 : {
344 : PlanState *subnode;
345 :
346 1511081 : CHECK_FOR_INTERRUPTS();
347 :
348 : /*
349 : * try to get a tuple from an async subplan if any
350 : */
351 1511081 : if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
352 : {
353 6138 : if (ExecAppendAsyncGetNext(node, &result))
354 6137 : return result;
355 : Assert(!node->as_syncdone);
356 : Assert(bms_is_empty(node->as_needrequest));
357 : }
358 :
359 : /*
360 : * figure out which sync subplan we are currently processing
361 : */
362 : Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
363 1504943 : subnode = node->appendplans[node->as_whichplan];
364 :
365 : /*
366 : * get a tuple from the subplan
367 : */
368 1504943 : result = ExecProcNode(subnode);
369 :
370 1504917 : if (!TupIsNull(result))
371 : {
372 : /*
373 : * If the subplan gave us something then return it as-is. We do
374 : * NOT make use of the result slot that was set up in
375 : * ExecInitAppend; there's no need for it.
376 : */
377 1453010 : return result;
378 : }
379 :
380 : /*
381 : * wait or poll for async events if any. We do this before checking
382 : * for the end of iteration, because it might drain the remaining
383 : * async subplans.
384 : */
385 51907 : if (node->as_nasyncremain > 0)
386 17 : ExecAppendAsyncEventWait(node);
387 :
388 : /* choose new sync subplan; if no sync/async subplans, we're done */
389 51907 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
390 21889 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
391 : }
392 : }
393 :
394 : /* ----------------------------------------------------------------
395 : * ExecEndAppend
396 : *
397 : * Shuts down the subscans of the append node.
398 : *
399 : * Returns nothing of interest.
400 : * ----------------------------------------------------------------
401 : */
402 : void
403 9317 : ExecEndAppend(AppendState *node)
404 : {
405 : PlanState **appendplans;
406 : int nplans;
407 : int i;
408 :
409 : /*
410 : * get information from the node
411 : */
412 9317 : appendplans = node->appendplans;
413 9317 : nplans = node->as_nplans;
414 :
415 : /*
416 : * shut down each of the subscans
417 : */
418 37969 : for (i = 0; i < nplans; i++)
419 28652 : ExecEndNode(appendplans[i]);
420 9317 : }
421 :
422 : void
423 17753 : ExecReScanAppend(AppendState *node)
424 : {
425 17753 : int nasyncplans = node->as_nasyncplans;
426 : int i;
427 :
428 : /*
429 : * If any PARAM_EXEC Params used in pruning expressions have changed, then
430 : * we'd better unset the valid subplans so that they are reselected for
431 : * the new parameter values.
432 : */
433 19389 : if (node->as_prune_state &&
434 1636 : bms_overlap(node->ps.chgParam,
435 1636 : node->as_prune_state->execparamids))
436 : {
437 1634 : node->as_valid_subplans_identified = false;
438 1634 : bms_free(node->as_valid_subplans);
439 1634 : node->as_valid_subplans = NULL;
440 1634 : bms_free(node->as_valid_asyncplans);
441 1634 : node->as_valid_asyncplans = NULL;
442 : }
443 :
444 64935 : for (i = 0; i < node->as_nplans; i++)
445 : {
446 47182 : PlanState *subnode = node->appendplans[i];
447 :
448 : /*
449 : * ExecReScan doesn't know about my subplans, so I have to do
450 : * changed-parameter signaling myself.
451 : */
452 47182 : if (node->ps.chgParam != NULL)
453 29569 : UpdateChangedParamSet(subnode, node->ps.chgParam);
454 :
455 : /*
456 : * If chgParam of subnode is not null then plan will be re-scanned by
457 : * first ExecProcNode or by first ExecAsyncRequest.
458 : */
459 47182 : if (subnode->chgParam == NULL)
460 24013 : ExecReScan(subnode);
461 : }
462 :
463 : /* Reset async state */
464 17753 : if (nasyncplans > 0)
465 : {
466 17 : i = -1;
467 51 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
468 : {
469 34 : AsyncRequest *areq = node->as_asyncrequests[i];
470 :
471 34 : areq->callback_pending = false;
472 34 : areq->request_complete = false;
473 34 : areq->result = NULL;
474 : }
475 :
476 17 : node->as_nasyncresults = 0;
477 17 : node->as_nasyncremain = 0;
478 17 : bms_free(node->as_needrequest);
479 17 : node->as_needrequest = NULL;
480 : }
481 :
482 : /* Let choose_next_subplan_* function handle setting the first subplan */
483 17753 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
484 17753 : node->as_syncdone = false;
485 17753 : node->as_begun = false;
486 17753 : }
487 :
488 : /* ----------------------------------------------------------------
489 : * Parallel Append Support
490 : * ----------------------------------------------------------------
491 : */
492 :
493 : /* ----------------------------------------------------------------
494 : * ExecAppendEstimate
495 : *
496 : * Compute the amount of space we'll need in the parallel
497 : * query DSM, and inform pcxt->estimator about our needs.
498 : * ----------------------------------------------------------------
499 : */
500 : void
501 69 : ExecAppendEstimate(AppendState *node,
502 : ParallelContext *pcxt)
503 : {
504 69 : node->pstate_len =
505 69 : add_size(offsetof(ParallelAppendState, pa_finished),
506 69 : sizeof(bool) * node->as_nplans);
507 :
508 69 : shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
509 69 : shm_toc_estimate_keys(&pcxt->estimator, 1);
510 69 : }
511 :
512 :
513 : /* ----------------------------------------------------------------
514 : * ExecAppendInitializeDSM
515 : *
516 : * Set up shared state for Parallel Append.
517 : * ----------------------------------------------------------------
518 : */
519 : void
520 69 : ExecAppendInitializeDSM(AppendState *node,
521 : ParallelContext *pcxt)
522 : {
523 : ParallelAppendState *pstate;
524 :
525 69 : pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
526 69 : memset(pstate, 0, node->pstate_len);
527 69 : LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
528 69 : shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
529 :
530 69 : node->as_pstate = pstate;
531 69 : node->choose_next_subplan = choose_next_subplan_for_leader;
532 69 : }
533 :
534 : /* ----------------------------------------------------------------
535 : * ExecAppendReInitializeDSM
536 : *
537 : * Reset shared state before beginning a fresh scan.
538 : * ----------------------------------------------------------------
539 : */
540 : void
541 0 : ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
542 : {
543 0 : ParallelAppendState *pstate = node->as_pstate;
544 :
545 0 : pstate->pa_next_plan = 0;
546 0 : memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
547 0 : }
548 :
549 : /* ----------------------------------------------------------------
550 : * ExecAppendInitializeWorker
551 : *
552 : * Copy relevant information from TOC into planstate, and initialize
553 : * whatever is required to choose and execute the optimal subplan.
554 : * ----------------------------------------------------------------
555 : */
556 : void
557 158 : ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
558 : {
559 158 : node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
560 158 : node->choose_next_subplan = choose_next_subplan_for_worker;
561 158 : }
562 :
563 : /* ----------------------------------------------------------------
564 : * choose_next_subplan_locally
565 : *
566 : * Choose next sync subplan for a non-parallel-aware Append,
567 : * returning false if there are no more.
568 : * ----------------------------------------------------------------
569 : */
570 : static bool
571 75278 : choose_next_subplan_locally(AppendState *node)
572 : {
573 75278 : int whichplan = node->as_whichplan;
574 : int nextplan;
575 :
576 : /* We should never be called when there are no subplans */
577 : Assert(node->as_nplans > 0);
578 :
579 : /* Nothing to do if syncdone */
580 75278 : if (node->as_syncdone)
581 18 : return false;
582 :
583 : /*
584 : * If first call then have the bms member function choose the first valid
585 : * sync subplan by initializing whichplan to -1. If there happen to be no
586 : * valid sync subplans then the bms member function will handle that by
587 : * returning a negative number which will allow us to exit returning a
588 : * false value.
589 : */
590 75260 : if (whichplan == INVALID_SUBPLAN_INDEX)
591 : {
592 23617 : if (node->as_nasyncplans > 0)
593 : {
594 : /* We'd have filled as_valid_subplans already */
595 : Assert(node->as_valid_subplans_identified);
596 : }
597 23598 : else if (!node->as_valid_subplans_identified)
598 : {
599 1693 : node->as_valid_subplans =
600 1693 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
601 1693 : node->as_valid_subplans_identified = true;
602 : }
603 :
604 23617 : whichplan = -1;
605 : }
606 :
607 : /* Ensure whichplan is within the expected range */
608 : Assert(whichplan >= -1 && whichplan <= node->as_nplans);
609 :
610 75260 : if (ScanDirectionIsForward(node->ps.state->es_direction))
611 75251 : nextplan = bms_next_member(node->as_valid_subplans, whichplan);
612 : else
613 9 : nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
614 :
615 75260 : if (nextplan < 0)
616 : {
617 : /* Set as_syncdone if in async mode */
618 23302 : if (node->as_nasyncplans > 0)
619 17 : node->as_syncdone = true;
620 23302 : return false;
621 : }
622 :
623 51958 : node->as_whichplan = nextplan;
624 :
625 51958 : return true;
626 : }
627 :
628 : /* ----------------------------------------------------------------
629 : * choose_next_subplan_for_leader
630 : *
631 : * Try to pick a plan which doesn't commit us to doing much
632 : * work locally, so that as much work as possible is done in
633 : * the workers. Cheapest subplans are at the end.
634 : * ----------------------------------------------------------------
635 : */
636 : static bool
637 244 : choose_next_subplan_for_leader(AppendState *node)
638 : {
639 244 : ParallelAppendState *pstate = node->as_pstate;
640 :
641 : /* Backward scan is not supported by parallel-aware plans */
642 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
643 :
644 : /* We should never be called when there are no subplans */
645 : Assert(node->as_nplans > 0);
646 :
647 244 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
648 :
649 244 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
650 : {
651 : /* Mark just-completed subplan as finished. */
652 184 : node->as_pstate->pa_finished[node->as_whichplan] = true;
653 : }
654 : else
655 : {
656 : /* Start with last subplan. */
657 60 : node->as_whichplan = node->as_nplans - 1;
658 :
659 : /*
660 : * If we've yet to determine the valid subplans then do so now. If
661 : * run-time pruning is disabled then the valid subplans will always be
662 : * set to all subplans.
663 : */
664 60 : if (!node->as_valid_subplans_identified)
665 : {
666 12 : node->as_valid_subplans =
667 12 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
668 12 : node->as_valid_subplans_identified = true;
669 :
670 : /*
671 : * Mark each invalid plan as finished to allow the loop below to
672 : * select the first valid subplan.
673 : */
674 12 : mark_invalid_subplans_as_finished(node);
675 : }
676 : }
677 :
678 : /* Loop until we find a subplan to execute. */
679 394 : while (pstate->pa_finished[node->as_whichplan])
680 : {
681 210 : if (node->as_whichplan == 0)
682 : {
683 60 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
684 60 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
685 60 : LWLockRelease(&pstate->pa_lock);
686 60 : return false;
687 : }
688 :
689 : /*
690 : * We needn't pay attention to as_valid_subplans here as all invalid
691 : * plans have been marked as finished.
692 : */
693 150 : node->as_whichplan--;
694 : }
695 :
696 : /* If non-partial, immediately mark as finished. */
697 184 : if (node->as_whichplan < node->as_first_partial_plan)
698 56 : node->as_pstate->pa_finished[node->as_whichplan] = true;
699 :
700 184 : LWLockRelease(&pstate->pa_lock);
701 :
702 184 : return true;
703 : }
704 :
705 : /* ----------------------------------------------------------------
706 : * choose_next_subplan_for_worker
707 : *
708 : * Choose next subplan for a parallel-aware Append, returning
709 : * false if there are no more.
710 : *
711 : * We start from the first plan and advance through the list;
712 : * when we get back to the end, we loop back to the first
713 : * partial plan. This assigns the non-partial plans first in
714 : * order of descending cost and then spreads out the workers
715 : * as evenly as possible across the remaining partial plans.
716 : * ----------------------------------------------------------------
717 : */
718 : static bool
719 220 : choose_next_subplan_for_worker(AppendState *node)
720 : {
721 220 : ParallelAppendState *pstate = node->as_pstate;
722 :
723 : /* Backward scan is not supported by parallel-aware plans */
724 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
725 :
726 : /* We should never be called when there are no subplans */
727 : Assert(node->as_nplans > 0);
728 :
729 220 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
730 :
731 : /* Mark just-completed subplan as finished. */
732 220 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
733 80 : node->as_pstate->pa_finished[node->as_whichplan] = true;
734 :
735 : /*
736 : * If we've yet to determine the valid subplans then do so now. If
737 : * run-time pruning is disabled then the valid subplans will always be set
738 : * to all subplans.
739 : */
740 140 : else if (!node->as_valid_subplans_identified)
741 : {
742 12 : node->as_valid_subplans =
743 12 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
744 12 : node->as_valid_subplans_identified = true;
745 :
746 12 : mark_invalid_subplans_as_finished(node);
747 : }
748 :
749 : /* If all the plans are already done, we have nothing to do */
750 220 : if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
751 : {
752 119 : LWLockRelease(&pstate->pa_lock);
753 119 : return false;
754 : }
755 :
756 : /* Save the plan from which we are starting the search. */
757 101 : node->as_whichplan = pstate->pa_next_plan;
758 :
759 : /* Loop until we find a valid subplan to execute. */
760 186 : while (pstate->pa_finished[pstate->pa_next_plan])
761 : {
762 : int nextplan;
763 :
764 106 : nextplan = bms_next_member(node->as_valid_subplans,
765 : pstate->pa_next_plan);
766 106 : if (nextplan >= 0)
767 : {
768 : /* Advance to the next valid plan. */
769 79 : pstate->pa_next_plan = nextplan;
770 : }
771 27 : else if (node->as_whichplan > node->as_first_partial_plan)
772 : {
773 : /*
774 : * Try looping back to the first valid partial plan, if there is
775 : * one. If there isn't, arrange to bail out below.
776 : */
777 14 : nextplan = bms_next_member(node->as_valid_subplans,
778 14 : node->as_first_partial_plan - 1);
779 14 : pstate->pa_next_plan =
780 14 : nextplan < 0 ? node->as_whichplan : nextplan;
781 : }
782 : else
783 : {
784 : /*
785 : * At last plan, and either there are no partial plans or we've
786 : * tried them all. Arrange to bail out.
787 : */
788 13 : pstate->pa_next_plan = node->as_whichplan;
789 : }
790 :
791 106 : if (pstate->pa_next_plan == node->as_whichplan)
792 : {
793 : /* We've tried everything! */
794 21 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
795 21 : LWLockRelease(&pstate->pa_lock);
796 21 : return false;
797 : }
798 : }
799 :
800 : /* Pick the plan we found, and advance pa_next_plan one more time. */
801 80 : node->as_whichplan = pstate->pa_next_plan;
802 80 : pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
803 : pstate->pa_next_plan);
804 :
805 : /*
806 : * If there are no more valid plans then try setting the next plan to the
807 : * first valid partial plan.
808 : */
809 80 : if (pstate->pa_next_plan < 0)
810 : {
811 18 : int nextplan = bms_next_member(node->as_valid_subplans,
812 18 : node->as_first_partial_plan - 1);
813 :
814 18 : if (nextplan >= 0)
815 18 : pstate->pa_next_plan = nextplan;
816 : else
817 : {
818 : /*
819 : * There are no valid partial plans, and we already chose the last
820 : * non-partial plan; so flag that there's nothing more for our
821 : * fellow workers to do.
822 : */
823 0 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
824 : }
825 : }
826 :
827 : /* If non-partial, immediately mark as finished. */
828 80 : if (node->as_whichplan < node->as_first_partial_plan)
829 13 : node->as_pstate->pa_finished[node->as_whichplan] = true;
830 :
831 80 : LWLockRelease(&pstate->pa_lock);
832 :
833 80 : return true;
834 : }
835 :
836 : /*
837 : * mark_invalid_subplans_as_finished
838 : * Marks the ParallelAppendState's pa_finished as true for each invalid
839 : * subplan.
840 : *
841 : * This function should only be called for parallel Append with run-time
842 : * pruning enabled.
843 : */
844 : static void
845 24 : mark_invalid_subplans_as_finished(AppendState *node)
846 : {
847 : int i;
848 :
849 : /* Only valid to call this while in parallel Append mode */
850 : Assert(node->as_pstate);
851 :
852 : /* Shouldn't have been called when run-time pruning is not enabled */
853 : Assert(node->as_prune_state);
854 :
855 : /* Nothing to do if all plans are valid */
856 24 : if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
857 0 : return;
858 :
859 : /* Mark all non-valid plans as finished */
860 81 : for (i = 0; i < node->as_nplans; i++)
861 : {
862 57 : if (!bms_is_member(i, node->as_valid_subplans))
863 24 : node->as_pstate->pa_finished[i] = true;
864 : }
865 : }
866 :
867 : /* ----------------------------------------------------------------
868 : * Asynchronous Append Support
869 : * ----------------------------------------------------------------
870 : */
871 :
872 : /* ----------------------------------------------------------------
873 : * ExecAppendAsyncBegin
874 : *
875 : * Begin executing designed async-capable subplans.
876 : * ----------------------------------------------------------------
877 : */
878 : static void
879 37 : ExecAppendAsyncBegin(AppendState *node)
880 : {
881 : int i;
882 :
883 : /* Backward scan is not supported by async-aware Appends. */
884 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
885 :
886 : /* We should never be called when there are no subplans */
887 : Assert(node->as_nplans > 0);
888 :
889 : /* We should never be called when there are no async subplans. */
890 : Assert(node->as_nasyncplans > 0);
891 :
892 : /* If we've yet to determine the valid subplans then do so now. */
893 37 : if (!node->as_valid_subplans_identified)
894 : {
895 2 : node->as_valid_subplans =
896 2 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
897 2 : node->as_valid_subplans_identified = true;
898 :
899 2 : classify_matching_subplans(node);
900 : }
901 :
902 : /* Initialize state variables. */
903 37 : node->as_syncdone = bms_is_empty(node->as_valid_subplans);
904 37 : node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
905 :
906 : /* Nothing to do if there are no valid async subplans. */
907 37 : if (node->as_nasyncremain == 0)
908 0 : return;
909 :
910 : /* Make a request for each of the valid async subplans. */
911 37 : i = -1;
912 109 : while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
913 : {
914 72 : AsyncRequest *areq = node->as_asyncrequests[i];
915 :
916 : Assert(areq->request_index == i);
917 : Assert(!areq->callback_pending);
918 :
919 : /* Do the actual work. */
920 72 : ExecAsyncRequest(areq);
921 : }
922 : }
923 :
924 : /* ----------------------------------------------------------------
925 : * ExecAppendAsyncGetNext
926 : *
927 : * Get the next tuple from any of the asynchronous subplans.
928 : * ----------------------------------------------------------------
929 : */
930 : static bool
931 6138 : ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
932 : {
933 6138 : *result = NULL;
934 :
935 : /* We should never be called when there are no valid async subplans. */
936 : Assert(node->as_nasyncremain > 0);
937 :
938 : /* Request a tuple asynchronously. */
939 6138 : if (ExecAppendAsyncRequest(node, result))
940 6039 : return true;
941 :
942 144 : while (node->as_nasyncremain > 0)
943 : {
944 113 : CHECK_FOR_INTERRUPTS();
945 :
946 : /* Wait or poll for async events. */
947 113 : ExecAppendAsyncEventWait(node);
948 :
949 : /* Request a tuple asynchronously. */
950 112 : if (ExecAppendAsyncRequest(node, result))
951 67 : return true;
952 :
953 : /* Break from loop if there's any sync subplan that isn't complete. */
954 45 : if (!node->as_syncdone)
955 0 : break;
956 : }
957 :
958 : /*
959 : * If all sync subplans are complete, we're totally done scanning the
960 : * given node. Otherwise, we're done with the asynchronous stuff but must
961 : * continue scanning the sync subplans.
962 : */
963 31 : if (node->as_syncdone)
964 : {
965 : Assert(node->as_nasyncremain == 0);
966 31 : *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
967 31 : return true;
968 : }
969 :
970 0 : return false;
971 : }
972 :
973 : /* ----------------------------------------------------------------
974 : * ExecAppendAsyncRequest
975 : *
976 : * Request a tuple asynchronously.
977 : * ----------------------------------------------------------------
978 : */
979 : static bool
980 6250 : ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
981 : {
982 : Bitmapset *needrequest;
983 : int i;
984 :
985 : /* Nothing to do if there are no async subplans needing a new request. */
986 6250 : if (bms_is_empty(node->as_needrequest))
987 : {
988 : Assert(node->as_nasyncresults == 0);
989 67 : return false;
990 : }
991 :
992 : /*
993 : * If there are any asynchronously-generated results that have not yet
994 : * been returned, we have nothing to do; just return one of them.
995 : */
996 6183 : if (node->as_nasyncresults > 0)
997 : {
998 1280 : --node->as_nasyncresults;
999 1280 : *result = node->as_asyncresults[node->as_nasyncresults];
1000 1280 : return true;
1001 : }
1002 :
1003 : /* Make a new request for each of the async subplans that need it. */
1004 4903 : needrequest = node->as_needrequest;
1005 4903 : node->as_needrequest = NULL;
1006 4903 : i = -1;
1007 11006 : while ((i = bms_next_member(needrequest, i)) >= 0)
1008 : {
1009 6103 : AsyncRequest *areq = node->as_asyncrequests[i];
1010 :
1011 : /* Do the actual work. */
1012 6103 : ExecAsyncRequest(areq);
1013 : }
1014 4903 : bms_free(needrequest);
1015 :
1016 : /* Return one of the asynchronously-generated results if any. */
1017 4903 : if (node->as_nasyncresults > 0)
1018 : {
1019 4826 : --node->as_nasyncresults;
1020 4826 : *result = node->as_asyncresults[node->as_nasyncresults];
1021 4826 : return true;
1022 : }
1023 :
1024 77 : return false;
1025 : }
1026 :
1027 : /* ----------------------------------------------------------------
1028 : * ExecAppendAsyncEventWait
1029 : *
1030 : * Wait or poll for file descriptor events and fire callbacks.
1031 : * ----------------------------------------------------------------
1032 : */
1033 : static void
1034 130 : ExecAppendAsyncEventWait(AppendState *node)
1035 : {
1036 130 : int nevents = node->as_nasyncplans + 2;
1037 130 : long timeout = node->as_syncdone ? -1 : 0;
1038 : WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1039 : int noccurred;
1040 : int i;
1041 :
1042 : /* We should never be called when there are no valid async subplans. */
1043 : Assert(node->as_nasyncremain > 0);
1044 :
1045 : Assert(node->as_eventset == NULL);
1046 130 : node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
1047 130 : AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
1048 : NULL, NULL);
1049 :
1050 : /* Give each waiting subplan a chance to add an event. */
1051 130 : i = -1;
1052 398 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1053 : {
1054 269 : AsyncRequest *areq = node->as_asyncrequests[i];
1055 :
1056 269 : if (areq->callback_pending)
1057 219 : ExecAsyncConfigureWait(areq);
1058 : }
1059 :
1060 : /*
1061 : * No need for further processing if none of the subplans configured any
1062 : * events.
1063 : */
1064 129 : if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1065 : {
1066 1 : FreeWaitEventSet(node->as_eventset);
1067 1 : node->as_eventset = NULL;
1068 3 : return;
1069 : }
1070 :
1071 : /*
1072 : * Add the process latch to the set, so that we wake up to process the
1073 : * standard interrupts with CHECK_FOR_INTERRUPTS().
1074 : *
1075 : * NOTE: For historical reasons, it's important that this is added to the
1076 : * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely,
1077 : * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
1078 : * any other events are in the set. That's a poor design, it's
1079 : * questionable for postgres_fdw to be doing that in the first place, but
1080 : * we cannot change it now. The pattern has possibly been copied to other
1081 : * extensions too.
1082 : */
1083 128 : AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
1084 : MyLatch, NULL);
1085 :
1086 : /* Return at most EVENT_BUFFER_SIZE events in one call. */
1087 128 : if (nevents > EVENT_BUFFER_SIZE)
1088 0 : nevents = EVENT_BUFFER_SIZE;
1089 :
1090 : /*
1091 : * If the timeout is -1, wait until at least one event occurs. If the
1092 : * timeout is 0, poll for events, but do not wait at all.
1093 : */
1094 128 : noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1095 : nevents, WAIT_EVENT_APPEND_READY);
1096 128 : FreeWaitEventSet(node->as_eventset);
1097 128 : node->as_eventset = NULL;
1098 128 : if (noccurred == 0)
1099 2 : return;
1100 :
1101 : /* Deliver notifications. */
1102 275 : for (i = 0; i < noccurred; i++)
1103 : {
1104 149 : WaitEvent *w = &occurred_event[i];
1105 :
1106 : /*
1107 : * Each waiting subplan should have registered its wait event with
1108 : * user_data pointing back to its AsyncRequest.
1109 : */
1110 149 : if ((w->events & WL_SOCKET_READABLE) != 0)
1111 : {
1112 149 : AsyncRequest *areq = (AsyncRequest *) w->user_data;
1113 :
1114 149 : if (areq->callback_pending)
1115 : {
1116 : /*
1117 : * Mark it as no longer needing a callback. We must do this
1118 : * before dispatching the callback in case the callback resets
1119 : * the flag.
1120 : */
1121 149 : areq->callback_pending = false;
1122 :
1123 : /* Do the actual work. */
1124 149 : ExecAsyncNotify(areq);
1125 : }
1126 : }
1127 :
1128 : /* Handle standard interrupts */
1129 149 : if ((w->events & WL_LATCH_SET) != 0)
1130 : {
1131 0 : ResetLatch(MyLatch);
1132 0 : CHECK_FOR_INTERRUPTS();
1133 : }
1134 : }
1135 : }
1136 :
1137 : /* ----------------------------------------------------------------
1138 : * ExecAsyncAppendResponse
1139 : *
1140 : * Receive a response from an asynchronous request we made.
1141 : * ----------------------------------------------------------------
1142 : */
1143 : void
1144 6329 : ExecAsyncAppendResponse(AsyncRequest *areq)
1145 : {
1146 6329 : AppendState *node = (AppendState *) areq->requestor;
1147 6329 : TupleTableSlot *slot = areq->result;
1148 :
1149 : /* The result should be a TupleTableSlot or NULL. */
1150 : Assert(slot == NULL || IsA(slot, TupleTableSlot));
1151 :
1152 : /* Nothing to do if the request is pending. */
1153 6329 : if (!areq->request_complete)
1154 : {
1155 : /* The request would have been pending for a callback. */
1156 : Assert(areq->callback_pending);
1157 161 : return;
1158 : }
1159 :
1160 : /* If the result is NULL or an empty slot, there's nothing more to do. */
1161 6168 : if (TupIsNull(slot))
1162 : {
1163 : /* The ending subplan wouldn't have been pending for a callback. */
1164 : Assert(!areq->callback_pending);
1165 61 : --node->as_nasyncremain;
1166 61 : return;
1167 : }
1168 :
1169 : /* Save result so we can return it. */
1170 : Assert(node->as_nasyncresults < node->as_nasyncplans);
1171 6107 : node->as_asyncresults[node->as_nasyncresults++] = slot;
1172 :
1173 : /*
1174 : * Mark the subplan that returned a result as ready for a new request. We
1175 : * don't launch another one here immediately because it might complete.
1176 : */
1177 6107 : node->as_needrequest = bms_add_member(node->as_needrequest,
1178 : areq->request_index);
1179 : }
1180 :
1181 : /* ----------------------------------------------------------------
1182 : * classify_matching_subplans
1183 : *
1184 : * Classify the node's as_valid_subplans into sync ones and
1185 : * async ones, adjust it to contain sync ones only, and save
1186 : * async ones in the node's as_valid_asyncplans.
1187 : * ----------------------------------------------------------------
1188 : */
1189 : static void
1190 46 : classify_matching_subplans(AppendState *node)
1191 : {
1192 : Bitmapset *valid_asyncplans;
1193 :
1194 : Assert(node->as_valid_subplans_identified);
1195 : Assert(node->as_valid_asyncplans == NULL);
1196 :
1197 : /* Nothing to do if there are no valid subplans. */
1198 46 : if (bms_is_empty(node->as_valid_subplans))
1199 : {
1200 0 : node->as_syncdone = true;
1201 0 : node->as_nasyncremain = 0;
1202 0 : return;
1203 : }
1204 :
1205 : /* Nothing to do if there are no valid async subplans. */
1206 46 : if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1207 : {
1208 0 : node->as_nasyncremain = 0;
1209 0 : return;
1210 : }
1211 :
1212 : /* Get valid async subplans. */
1213 46 : valid_asyncplans = bms_intersect(node->as_asyncplans,
1214 46 : node->as_valid_subplans);
1215 :
1216 : /* Adjust the valid subplans to contain sync subplans only. */
1217 46 : node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
1218 : valid_asyncplans);
1219 :
1220 : /* Save valid async subplans. */
1221 46 : node->as_valid_asyncplans = valid_asyncplans;
1222 : }
|