Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeAppend.c
4 : * routines to handle append nodes.
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeAppend.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /* INTERFACE ROUTINES
16 : * ExecInitAppend - initialize the append node
17 : * ExecAppend - retrieve the next tuple from the node
18 : * ExecEndAppend - shut down the append node
19 : * ExecReScanAppend - rescan the append node
20 : *
21 : * NOTES
22 : * Each append node contains a list of one or more subplans which
23 : * must be iteratively processed (forwards or backwards).
24 : * Tuples are retrieved by executing the 'whichplan'th subplan
25 : * until the subplan stops returning tuples, at which point that
26 : * plan is shut down and the next started up.
27 : *
28 : * Append nodes don't make use of their left and right
29 : * subtrees, rather they maintain a list of subplans so
30 : * a typical append node looks like this in the plan tree:
31 : *
32 : * ...
33 : * /
34 : * Append -------+------+------+--- nil
35 : * / \ | | |
36 : * nil nil ... ... ...
37 : * subplans
38 : *
39 : * Append nodes are currently used for unions, and to support
40 : * inheritance queries, where several relations need to be scanned.
41 : * For example, in our standard person/student/employee/student-emp
42 : * example, where student and employee inherit from person
43 : * and student-emp inherits from student and employee, the
44 : * query:
45 : *
46 : * select name from person
47 : *
48 : * generates the plan:
49 : *
50 : * |
51 : * Append -------+-------+--------+--------+
52 : * / \ | | | |
53 : * nil nil Scan Scan Scan Scan
54 : * | | | |
55 : * person employee student student-emp
56 : */
57 :
58 : #include "postgres.h"
59 :
60 : #include "executor/execAsync.h"
61 : #include "executor/execPartition.h"
62 : #include "executor/executor.h"
63 : #include "executor/nodeAppend.h"
64 : #include "miscadmin.h"
65 : #include "pgstat.h"
66 : #include "storage/latch.h"
67 :
68 : /* Shared state for parallel-aware Append. */
69 : struct ParallelAppendState
70 : {
71 : LWLock pa_lock; /* mutual exclusion to choose next subplan */
72 : int pa_next_plan; /* next plan to choose by any worker */
73 :
74 : /*
75 : * pa_finished[i] should be true if no more workers should select subplan
76 : * i. for a non-partial plan, this should be set to true as soon as a
77 : * worker selects the plan; for a partial plan, it remains false until
78 : * some worker executes the plan to completion.
79 : */
80 : bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
81 : };
82 :
83 : #define INVALID_SUBPLAN_INDEX -1
84 : #define EVENT_BUFFER_SIZE 16
85 :
86 : static TupleTableSlot *ExecAppend(PlanState *pstate);
87 : static bool choose_next_subplan_locally(AppendState *node);
88 : static bool choose_next_subplan_for_leader(AppendState *node);
89 : static bool choose_next_subplan_for_worker(AppendState *node);
90 : static void mark_invalid_subplans_as_finished(AppendState *node);
91 : static void ExecAppendAsyncBegin(AppendState *node);
92 : static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
93 : static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
94 : static void ExecAppendAsyncEventWait(AppendState *node);
95 : static void classify_matching_subplans(AppendState *node);
96 :
97 : /* ----------------------------------------------------------------
98 : * ExecInitAppend
99 : *
100 : * Begin all of the subscans of the append node.
101 : *
102 : * (This is potentially wasteful, since the entire result of the
103 : * append node may not be scanned, but this way all of the
104 : * structures get allocated in the executor's top level memory
105 : * block instead of that of the call to ExecAppend.)
106 : * ----------------------------------------------------------------
107 : */
108 : AppendState *
109 14080 : ExecInitAppend(Append *node, EState *estate, int eflags)
110 : {
111 14080 : AppendState *appendstate = makeNode(AppendState);
112 : PlanState **appendplanstates;
113 : const TupleTableSlotOps *appendops;
114 : Bitmapset *validsubplans;
115 : Bitmapset *asyncplans;
116 : int nplans;
117 : int nasyncplans;
118 : int firstvalid;
119 : int i,
120 : j;
121 :
122 : /* check for unsupported flags */
123 : Assert(!(eflags & EXEC_FLAG_MARK));
124 :
125 : /*
126 : * create new AppendState for our append node
127 : */
128 14080 : appendstate->ps.plan = (Plan *) node;
129 14080 : appendstate->ps.state = estate;
130 14080 : appendstate->ps.ExecProcNode = ExecAppend;
131 :
132 : /* Let choose_next_subplan_* function handle setting the first subplan */
133 14080 : appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
134 14080 : appendstate->as_syncdone = false;
135 14080 : appendstate->as_begun = false;
136 :
137 : /* If run-time partition pruning is enabled, then set that up now */
138 14080 : if (node->part_prune_index >= 0)
139 : {
140 : PartitionPruneState *prunestate;
141 :
142 : /*
143 : * Set up pruning data structure. This also initializes the set of
144 : * subplans to initialize (validsubplans) by taking into account the
145 : * result of performing initial pruning if any.
146 : */
147 696 : prunestate = ExecInitPartitionExecPruning(&appendstate->ps,
148 696 : list_length(node->appendplans),
149 : node->part_prune_index,
150 : node->apprelids,
151 : &validsubplans);
152 696 : appendstate->as_prune_state = prunestate;
153 696 : nplans = bms_num_members(validsubplans);
154 :
155 : /*
156 : * When no run-time pruning is required and there's at least one
157 : * subplan, we can fill as_valid_subplans immediately, preventing
158 : * later calls to ExecFindMatchingSubPlans.
159 : */
160 696 : if (!prunestate->do_exec_prune && nplans > 0)
161 : {
162 254 : appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
163 254 : appendstate->as_valid_subplans_identified = true;
164 : }
165 : }
166 : else
167 : {
168 13384 : nplans = list_length(node->appendplans);
169 :
170 : /*
171 : * When run-time partition pruning is not enabled we can just mark all
172 : * subplans as valid; they must also all be initialized.
173 : */
174 : Assert(nplans > 0);
175 13384 : appendstate->as_valid_subplans = validsubplans =
176 13384 : bms_add_range(NULL, 0, nplans - 1);
177 13384 : appendstate->as_valid_subplans_identified = true;
178 13384 : appendstate->as_prune_state = NULL;
179 : }
180 :
181 14080 : appendplanstates = (PlanState **) palloc(nplans *
182 : sizeof(PlanState *));
183 :
184 : /*
185 : * call ExecInitNode on each of the valid plans to be executed and save
186 : * the results into the appendplanstates array.
187 : *
188 : * While at it, find out the first valid partial plan.
189 : */
190 14080 : j = 0;
191 14080 : asyncplans = NULL;
192 14080 : nasyncplans = 0;
193 14080 : firstvalid = nplans;
194 14080 : i = -1;
195 55362 : while ((i = bms_next_member(validsubplans, i)) >= 0)
196 : {
197 41282 : Plan *initNode = (Plan *) list_nth(node->appendplans, i);
198 :
199 : /*
200 : * Record async subplans. When executing EvalPlanQual, we treat them
201 : * as sync ones; don't do this when initializing an EvalPlanQual plan
202 : * tree.
203 : */
204 41282 : if (initNode->async_capable && estate->es_epq_active == NULL)
205 : {
206 186 : asyncplans = bms_add_member(asyncplans, j);
207 186 : nasyncplans++;
208 : }
209 :
210 : /*
211 : * Record the lowest appendplans index which is a valid partial plan.
212 : */
213 41282 : if (i >= node->first_partial_plan && j < firstvalid)
214 450 : firstvalid = j;
215 :
216 41282 : appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
217 : }
218 :
219 14080 : appendstate->as_first_partial_plan = firstvalid;
220 14080 : appendstate->appendplans = appendplanstates;
221 14080 : appendstate->as_nplans = nplans;
222 :
223 : /*
224 : * Initialize Append's result tuple type and slot. If the child plans all
225 : * produce the same fixed slot type, we can use that slot type; otherwise
226 : * make a virtual slot. (Note that the result slot itself is used only to
227 : * return a null tuple at end of execution; real tuples are returned to
228 : * the caller in the children's own result slots. What we are doing here
229 : * is allowing the parent plan node to optimize if the Append will return
230 : * only one kind of slot.)
231 : */
232 14080 : appendops = ExecGetCommonSlotOps(appendplanstates, j);
233 14080 : if (appendops != NULL)
234 : {
235 13220 : ExecInitResultTupleSlotTL(&appendstate->ps, appendops);
236 : }
237 : else
238 : {
239 860 : ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
240 : /* show that the output slot type is not fixed */
241 860 : appendstate->ps.resultopsset = true;
242 860 : appendstate->ps.resultopsfixed = false;
243 : }
244 :
245 : /* Initialize async state */
246 14080 : appendstate->as_asyncplans = asyncplans;
247 14080 : appendstate->as_nasyncplans = nasyncplans;
248 14080 : appendstate->as_asyncrequests = NULL;
249 14080 : appendstate->as_asyncresults = NULL;
250 14080 : appendstate->as_nasyncresults = 0;
251 14080 : appendstate->as_nasyncremain = 0;
252 14080 : appendstate->as_needrequest = NULL;
253 14080 : appendstate->as_eventset = NULL;
254 14080 : appendstate->as_valid_asyncplans = NULL;
255 :
256 14080 : if (nasyncplans > 0)
257 : {
258 94 : appendstate->as_asyncrequests = (AsyncRequest **)
259 94 : palloc0(nplans * sizeof(AsyncRequest *));
260 :
261 94 : i = -1;
262 280 : while ((i = bms_next_member(asyncplans, i)) >= 0)
263 : {
264 : AsyncRequest *areq;
265 :
266 186 : areq = palloc(sizeof(AsyncRequest));
267 186 : areq->requestor = (PlanState *) appendstate;
268 186 : areq->requestee = appendplanstates[i];
269 186 : areq->request_index = i;
270 186 : areq->callback_pending = false;
271 186 : areq->request_complete = false;
272 186 : areq->result = NULL;
273 :
274 186 : appendstate->as_asyncrequests[i] = areq;
275 : }
276 :
277 94 : appendstate->as_asyncresults = (TupleTableSlot **)
278 94 : palloc0(nasyncplans * sizeof(TupleTableSlot *));
279 :
280 94 : if (appendstate->as_valid_subplans_identified)
281 88 : classify_matching_subplans(appendstate);
282 : }
283 :
284 : /*
285 : * Miscellaneous initialization
286 : */
287 :
288 14080 : appendstate->ps.ps_ProjInfo = NULL;
289 :
290 : /* For parallel query, this will be overridden later. */
291 14080 : appendstate->choose_next_subplan = choose_next_subplan_locally;
292 :
293 14080 : return appendstate;
294 : }
295 :
296 : /* ----------------------------------------------------------------
297 : * ExecAppend
298 : *
299 : * Handles iteration over multiple subplans.
300 : * ----------------------------------------------------------------
301 : */
302 : static TupleTableSlot *
303 2521470 : ExecAppend(PlanState *pstate)
304 : {
305 2521470 : AppendState *node = castNode(AppendState, pstate);
306 : TupleTableSlot *result;
307 :
308 : /*
309 : * If this is the first call after Init or ReScan, we need to do the
310 : * initialization work.
311 : */
312 2521470 : if (!node->as_begun)
313 : {
314 : Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
315 : Assert(!node->as_syncdone);
316 :
317 : /* Nothing to do if there are no subplans */
318 31918 : if (node->as_nplans == 0)
319 48 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
320 :
321 : /* If there are any async subplans, begin executing them. */
322 31870 : if (node->as_nasyncplans > 0)
323 74 : ExecAppendAsyncBegin(node);
324 :
325 : /*
326 : * If no sync subplan has been chosen, we must choose one before
327 : * proceeding.
328 : */
329 31870 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
330 3274 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
331 :
332 : Assert(node->as_syncdone ||
333 : (node->as_whichplan >= 0 &&
334 : node->as_whichplan < node->as_nplans));
335 :
336 : /* And we're initialized. */
337 28596 : node->as_begun = true;
338 : }
339 :
340 : for (;;)
341 36936 : {
342 : PlanState *subnode;
343 :
344 2555084 : CHECK_FOR_INTERRUPTS();
345 :
346 : /*
347 : * try to get a tuple from an async subplan if any
348 : */
349 2555084 : if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
350 : {
351 12076 : if (ExecAppendAsyncGetNext(node, &result))
352 12074 : return result;
353 : Assert(!node->as_syncdone);
354 : Assert(bms_is_empty(node->as_needrequest));
355 : }
356 :
357 : /*
358 : * figure out which sync subplan we are currently processing
359 : */
360 : Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
361 2543008 : subnode = node->appendplans[node->as_whichplan];
362 :
363 : /*
364 : * get a tuple from the subplan
365 : */
366 2543008 : result = ExecProcNode(subnode);
367 :
368 2542956 : if (!TupIsNull(result))
369 : {
370 : /*
371 : * If the subplan gave us something then return it as-is. We do
372 : * NOT make use of the result slot that was set up in
373 : * ExecInitAppend; there's no need for it.
374 : */
375 2478098 : return result;
376 : }
377 :
378 : /*
379 : * wait or poll for async events if any. We do this before checking
380 : * for the end of iteration, because it might drain the remaining
381 : * async subplans.
382 : */
383 64858 : if (node->as_nasyncremain > 0)
384 34 : ExecAppendAsyncEventWait(node);
385 :
386 : /* choose new sync subplan; if no sync/async subplans, we're done */
387 64858 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
388 27922 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
389 : }
390 : }
391 :
392 : /* ----------------------------------------------------------------
393 : * ExecEndAppend
394 : *
395 : * Shuts down the subscans of the append node.
396 : *
397 : * Returns nothing of interest.
398 : * ----------------------------------------------------------------
399 : */
400 : void
401 13784 : ExecEndAppend(AppendState *node)
402 : {
403 : PlanState **appendplans;
404 : int nplans;
405 : int i;
406 :
407 : /*
408 : * get information from the node
409 : */
410 13784 : appendplans = node->appendplans;
411 13784 : nplans = node->as_nplans;
412 :
413 : /*
414 : * shut down each of the subscans
415 : */
416 54354 : for (i = 0; i < nplans; i++)
417 40570 : ExecEndNode(appendplans[i]);
418 13784 : }
419 :
420 : void
421 22080 : ExecReScanAppend(AppendState *node)
422 : {
423 22080 : int nasyncplans = node->as_nasyncplans;
424 : int i;
425 :
426 : /*
427 : * If any PARAM_EXEC Params used in pruning expressions have changed, then
428 : * we'd better unset the valid subplans so that they are reselected for
429 : * the new parameter values.
430 : */
431 25348 : if (node->as_prune_state &&
432 3268 : bms_overlap(node->ps.chgParam,
433 3268 : node->as_prune_state->execparamids))
434 : {
435 3268 : node->as_valid_subplans_identified = false;
436 3268 : bms_free(node->as_valid_subplans);
437 3268 : node->as_valid_subplans = NULL;
438 3268 : bms_free(node->as_valid_asyncplans);
439 3268 : node->as_valid_asyncplans = NULL;
440 : }
441 :
442 89110 : for (i = 0; i < node->as_nplans; i++)
443 : {
444 67030 : PlanState *subnode = node->appendplans[i];
445 :
446 : /*
447 : * ExecReScan doesn't know about my subplans, so I have to do
448 : * changed-parameter signaling myself.
449 : */
450 67030 : if (node->ps.chgParam != NULL)
451 54624 : UpdateChangedParamSet(subnode, node->ps.chgParam);
452 :
453 : /*
454 : * If chgParam of subnode is not null then plan will be re-scanned by
455 : * first ExecProcNode or by first ExecAsyncRequest.
456 : */
457 67030 : if (subnode->chgParam == NULL)
458 23474 : ExecReScan(subnode);
459 : }
460 :
461 : /* Reset async state */
462 22080 : if (nasyncplans > 0)
463 : {
464 34 : i = -1;
465 102 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
466 : {
467 68 : AsyncRequest *areq = node->as_asyncrequests[i];
468 :
469 68 : areq->callback_pending = false;
470 68 : areq->request_complete = false;
471 68 : areq->result = NULL;
472 : }
473 :
474 34 : node->as_nasyncresults = 0;
475 34 : node->as_nasyncremain = 0;
476 34 : bms_free(node->as_needrequest);
477 34 : node->as_needrequest = NULL;
478 : }
479 :
480 : /* Let choose_next_subplan_* function handle setting the first subplan */
481 22080 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
482 22080 : node->as_syncdone = false;
483 22080 : node->as_begun = false;
484 22080 : }
485 :
486 : /* ----------------------------------------------------------------
487 : * Parallel Append Support
488 : * ----------------------------------------------------------------
489 : */
490 :
491 : /* ----------------------------------------------------------------
492 : * ExecAppendEstimate
493 : *
494 : * Compute the amount of space we'll need in the parallel
495 : * query DSM, and inform pcxt->estimator about our needs.
496 : * ----------------------------------------------------------------
497 : */
498 : void
499 138 : ExecAppendEstimate(AppendState *node,
500 : ParallelContext *pcxt)
501 : {
502 138 : node->pstate_len =
503 138 : add_size(offsetof(ParallelAppendState, pa_finished),
504 138 : sizeof(bool) * node->as_nplans);
505 :
506 138 : shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
507 138 : shm_toc_estimate_keys(&pcxt->estimator, 1);
508 138 : }
509 :
510 :
511 : /* ----------------------------------------------------------------
512 : * ExecAppendInitializeDSM
513 : *
514 : * Set up shared state for Parallel Append.
515 : * ----------------------------------------------------------------
516 : */
517 : void
518 138 : ExecAppendInitializeDSM(AppendState *node,
519 : ParallelContext *pcxt)
520 : {
521 : ParallelAppendState *pstate;
522 :
523 138 : pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
524 138 : memset(pstate, 0, node->pstate_len);
525 138 : LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
526 138 : shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
527 :
528 138 : node->as_pstate = pstate;
529 138 : node->choose_next_subplan = choose_next_subplan_for_leader;
530 138 : }
531 :
532 : /* ----------------------------------------------------------------
533 : * ExecAppendReInitializeDSM
534 : *
535 : * Reset shared state before beginning a fresh scan.
536 : * ----------------------------------------------------------------
537 : */
538 : void
539 0 : ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
540 : {
541 0 : ParallelAppendState *pstate = node->as_pstate;
542 :
543 0 : pstate->pa_next_plan = 0;
544 0 : memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
545 0 : }
546 :
547 : /* ----------------------------------------------------------------
548 : * ExecAppendInitializeWorker
549 : *
550 : * Copy relevant information from TOC into planstate, and initialize
551 : * whatever is required to choose and execute the optimal subplan.
552 : * ----------------------------------------------------------------
553 : */
554 : void
555 318 : ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
556 : {
557 318 : node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
558 318 : node->choose_next_subplan = choose_next_subplan_for_worker;
559 318 : }
560 :
561 : /* ----------------------------------------------------------------
562 : * choose_next_subplan_locally
563 : *
564 : * Choose next sync subplan for a non-parallel-aware Append,
565 : * returning false if there are no more.
566 : * ----------------------------------------------------------------
567 : */
568 : static bool
569 95868 : choose_next_subplan_locally(AppendState *node)
570 : {
571 95868 : int whichplan = node->as_whichplan;
572 : int nextplan;
573 :
574 : /* We should never be called when there are no subplans */
575 : Assert(node->as_nplans > 0);
576 :
577 : /* Nothing to do if syncdone */
578 95868 : if (node->as_syncdone)
579 36 : return false;
580 :
581 : /*
582 : * If first call then have the bms member function choose the first valid
583 : * sync subplan by initializing whichplan to -1. If there happen to be no
584 : * valid sync subplans then the bms member function will handle that by
585 : * returning a negative number which will allow us to exit returning a
586 : * false value.
587 : */
588 95832 : if (whichplan == INVALID_SUBPLAN_INDEX)
589 : {
590 31432 : if (node->as_nasyncplans > 0)
591 : {
592 : /* We'd have filled as_valid_subplans already */
593 : Assert(node->as_valid_subplans_identified);
594 : }
595 31394 : else if (!node->as_valid_subplans_identified)
596 : {
597 3382 : node->as_valid_subplans =
598 3382 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
599 3382 : node->as_valid_subplans_identified = true;
600 : }
601 :
602 31432 : whichplan = -1;
603 : }
604 :
605 : /* Ensure whichplan is within the expected range */
606 : Assert(whichplan >= -1 && whichplan <= node->as_nplans);
607 :
608 95832 : if (ScanDirectionIsForward(node->ps.state->es_direction))
609 95814 : nextplan = bms_next_member(node->as_valid_subplans, whichplan);
610 : else
611 18 : nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
612 :
613 95832 : if (nextplan < 0)
614 : {
615 : /* Set as_syncdone if in async mode */
616 30828 : if (node->as_nasyncplans > 0)
617 34 : node->as_syncdone = true;
618 30828 : return false;
619 : }
620 :
621 65004 : node->as_whichplan = nextplan;
622 :
623 65004 : return true;
624 : }
625 :
626 : /* ----------------------------------------------------------------
627 : * choose_next_subplan_for_leader
628 : *
629 : * Try to pick a plan which doesn't commit us to doing much
630 : * work locally, so that as much work as possible is done in
631 : * the workers. Cheapest subplans are at the end.
632 : * ----------------------------------------------------------------
633 : */
634 : static bool
635 516 : choose_next_subplan_for_leader(AppendState *node)
636 : {
637 516 : ParallelAppendState *pstate = node->as_pstate;
638 :
639 : /* Backward scan is not supported by parallel-aware plans */
640 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
641 :
642 : /* We should never be called when there are no subplans */
643 : Assert(node->as_nplans > 0);
644 :
645 516 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
646 :
647 516 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
648 : {
649 : /* Mark just-completed subplan as finished. */
650 396 : node->as_pstate->pa_finished[node->as_whichplan] = true;
651 : }
652 : else
653 : {
654 : /* Start with last subplan. */
655 120 : node->as_whichplan = node->as_nplans - 1;
656 :
657 : /*
658 : * If we've yet to determine the valid subplans then do so now. If
659 : * run-time pruning is disabled then the valid subplans will always be
660 : * set to all subplans.
661 : */
662 120 : if (!node->as_valid_subplans_identified)
663 : {
664 24 : node->as_valid_subplans =
665 24 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
666 24 : node->as_valid_subplans_identified = true;
667 :
668 : /*
669 : * Mark each invalid plan as finished to allow the loop below to
670 : * select the first valid subplan.
671 : */
672 24 : mark_invalid_subplans_as_finished(node);
673 : }
674 : }
675 :
676 : /* Loop until we find a subplan to execute. */
677 816 : while (pstate->pa_finished[node->as_whichplan])
678 : {
679 420 : if (node->as_whichplan == 0)
680 : {
681 120 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
682 120 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
683 120 : LWLockRelease(&pstate->pa_lock);
684 120 : return false;
685 : }
686 :
687 : /*
688 : * We needn't pay attention to as_valid_subplans here as all invalid
689 : * plans have been marked as finished.
690 : */
691 300 : node->as_whichplan--;
692 : }
693 :
694 : /* If non-partial, immediately mark as finished. */
695 396 : if (node->as_whichplan < node->as_first_partial_plan)
696 138 : node->as_pstate->pa_finished[node->as_whichplan] = true;
697 :
698 396 : LWLockRelease(&pstate->pa_lock);
699 :
700 396 : return true;
701 : }
702 :
703 : /* ----------------------------------------------------------------
704 : * choose_next_subplan_for_worker
705 : *
706 : * Choose next subplan for a parallel-aware Append, returning
707 : * false if there are no more.
708 : *
709 : * We start from the first plan and advance through the list;
710 : * when we get back to the end, we loop back to the first
711 : * partial plan. This assigns the non-partial plans first in
712 : * order of descending cost and then spreads out the workers
713 : * as evenly as possible across the remaining partial plans.
714 : * ----------------------------------------------------------------
715 : */
716 : static bool
717 344 : choose_next_subplan_for_worker(AppendState *node)
718 : {
719 344 : ParallelAppendState *pstate = node->as_pstate;
720 :
721 : /* Backward scan is not supported by parallel-aware plans */
722 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
723 :
724 : /* We should never be called when there are no subplans */
725 : Assert(node->as_nplans > 0);
726 :
727 344 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
728 :
729 : /* Mark just-completed subplan as finished. */
730 344 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
731 62 : node->as_pstate->pa_finished[node->as_whichplan] = true;
732 :
733 : /*
734 : * If we've yet to determine the valid subplans then do so now. If
735 : * run-time pruning is disabled then the valid subplans will always be set
736 : * to all subplans.
737 : */
738 282 : else if (!node->as_valid_subplans_identified)
739 : {
740 24 : node->as_valid_subplans =
741 24 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
742 24 : node->as_valid_subplans_identified = true;
743 :
744 24 : mark_invalid_subplans_as_finished(node);
745 : }
746 :
747 : /* If all the plans are already done, we have nothing to do */
748 344 : if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
749 : {
750 256 : LWLockRelease(&pstate->pa_lock);
751 256 : return false;
752 : }
753 :
754 : /* Save the plan from which we are starting the search. */
755 88 : node->as_whichplan = pstate->pa_next_plan;
756 :
757 : /* Loop until we find a valid subplan to execute. */
758 186 : while (pstate->pa_finished[pstate->pa_next_plan])
759 : {
760 : int nextplan;
761 :
762 124 : nextplan = bms_next_member(node->as_valid_subplans,
763 : pstate->pa_next_plan);
764 124 : if (nextplan >= 0)
765 : {
766 : /* Advance to the next valid plan. */
767 98 : pstate->pa_next_plan = nextplan;
768 : }
769 26 : else if (node->as_whichplan > node->as_first_partial_plan)
770 : {
771 : /*
772 : * Try looping back to the first valid partial plan, if there is
773 : * one. If there isn't, arrange to bail out below.
774 : */
775 4 : nextplan = bms_next_member(node->as_valid_subplans,
776 4 : node->as_first_partial_plan - 1);
777 4 : pstate->pa_next_plan =
778 4 : nextplan < 0 ? node->as_whichplan : nextplan;
779 : }
780 : else
781 : {
782 : /*
783 : * At last plan, and either there are no partial plans or we've
784 : * tried them all. Arrange to bail out.
785 : */
786 22 : pstate->pa_next_plan = node->as_whichplan;
787 : }
788 :
789 124 : if (pstate->pa_next_plan == node->as_whichplan)
790 : {
791 : /* We've tried everything! */
792 26 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
793 26 : LWLockRelease(&pstate->pa_lock);
794 26 : return false;
795 : }
796 : }
797 :
798 : /* Pick the plan we found, and advance pa_next_plan one more time. */
799 62 : node->as_whichplan = pstate->pa_next_plan;
800 62 : pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
801 : pstate->pa_next_plan);
802 :
803 : /*
804 : * If there are no more valid plans then try setting the next plan to the
805 : * first valid partial plan.
806 : */
807 62 : if (pstate->pa_next_plan < 0)
808 : {
809 18 : int nextplan = bms_next_member(node->as_valid_subplans,
810 18 : node->as_first_partial_plan - 1);
811 :
812 18 : if (nextplan >= 0)
813 18 : pstate->pa_next_plan = nextplan;
814 : else
815 : {
816 : /*
817 : * There are no valid partial plans, and we already chose the last
818 : * non-partial plan; so flag that there's nothing more for our
819 : * fellow workers to do.
820 : */
821 0 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
822 : }
823 : }
824 :
825 : /* If non-partial, immediately mark as finished. */
826 62 : if (node->as_whichplan < node->as_first_partial_plan)
827 0 : node->as_pstate->pa_finished[node->as_whichplan] = true;
828 :
829 62 : LWLockRelease(&pstate->pa_lock);
830 :
831 62 : return true;
832 : }
833 :
834 : /*
835 : * mark_invalid_subplans_as_finished
836 : * Marks the ParallelAppendState's pa_finished as true for each invalid
837 : * subplan.
838 : *
839 : * This function should only be called for parallel Append with run-time
840 : * pruning enabled.
841 : */
842 : static void
843 48 : mark_invalid_subplans_as_finished(AppendState *node)
844 : {
845 : int i;
846 :
847 : /* Only valid to call this while in parallel Append mode */
848 : Assert(node->as_pstate);
849 :
850 : /* Shouldn't have been called when run-time pruning is not enabled */
851 : Assert(node->as_prune_state);
852 :
853 : /* Nothing to do if all plans are valid */
854 48 : if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
855 0 : return;
856 :
857 : /* Mark all non-valid plans as finished */
858 162 : for (i = 0; i < node->as_nplans; i++)
859 : {
860 114 : if (!bms_is_member(i, node->as_valid_subplans))
861 48 : node->as_pstate->pa_finished[i] = true;
862 : }
863 : }
864 :
865 : /* ----------------------------------------------------------------
866 : * Asynchronous Append Support
867 : * ----------------------------------------------------------------
868 : */
869 :
870 : /* ----------------------------------------------------------------
871 : * ExecAppendAsyncBegin
872 : *
873 : * Begin executing designed async-capable subplans.
874 : * ----------------------------------------------------------------
875 : */
876 : static void
877 74 : ExecAppendAsyncBegin(AppendState *node)
878 : {
879 : int i;
880 :
881 : /* Backward scan is not supported by async-aware Appends. */
882 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
883 :
884 : /* We should never be called when there are no subplans */
885 : Assert(node->as_nplans > 0);
886 :
887 : /* We should never be called when there are no async subplans. */
888 : Assert(node->as_nasyncplans > 0);
889 :
890 : /* If we've yet to determine the valid subplans then do so now. */
891 74 : if (!node->as_valid_subplans_identified)
892 : {
893 4 : node->as_valid_subplans =
894 4 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
895 4 : node->as_valid_subplans_identified = true;
896 :
897 4 : classify_matching_subplans(node);
898 : }
899 :
900 : /* Initialize state variables. */
901 74 : node->as_syncdone = bms_is_empty(node->as_valid_subplans);
902 74 : node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
903 :
904 : /* Nothing to do if there are no valid async subplans. */
905 74 : if (node->as_nasyncremain == 0)
906 0 : return;
907 :
908 : /* Make a request for each of the valid async subplans. */
909 74 : i = -1;
910 218 : while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
911 : {
912 144 : AsyncRequest *areq = node->as_asyncrequests[i];
913 :
914 : Assert(areq->request_index == i);
915 : Assert(!areq->callback_pending);
916 :
917 : /* Do the actual work. */
918 144 : ExecAsyncRequest(areq);
919 : }
920 : }
921 :
922 : /* ----------------------------------------------------------------
923 : * ExecAppendAsyncGetNext
924 : *
925 : * Get the next tuple from any of the asynchronous subplans.
926 : * ----------------------------------------------------------------
927 : */
928 : static bool
929 12076 : ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
930 : {
931 12076 : *result = NULL;
932 :
933 : /* We should never be called when there are no valid async subplans. */
934 : Assert(node->as_nasyncremain > 0);
935 :
936 : /* Request a tuple asynchronously. */
937 12076 : if (ExecAppendAsyncRequest(node, result))
938 11880 : return true;
939 :
940 300 : while (node->as_nasyncremain > 0)
941 : {
942 238 : CHECK_FOR_INTERRUPTS();
943 :
944 : /* Wait or poll for async events. */
945 238 : ExecAppendAsyncEventWait(node);
946 :
947 : /* Request a tuple asynchronously. */
948 236 : if (ExecAppendAsyncRequest(node, result))
949 132 : return true;
950 :
951 : /* Break from loop if there's any sync subplan that isn't complete. */
952 104 : if (!node->as_syncdone)
953 0 : break;
954 : }
955 :
956 : /*
957 : * If all sync subplans are complete, we're totally done scanning the
958 : * given node. Otherwise, we're done with the asynchronous stuff but must
959 : * continue scanning the sync subplans.
960 : */
961 62 : if (node->as_syncdone)
962 : {
963 : Assert(node->as_nasyncremain == 0);
964 62 : *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
965 62 : return true;
966 : }
967 :
968 0 : return false;
969 : }
970 :
971 : /* ----------------------------------------------------------------
972 : * ExecAppendAsyncRequest
973 : *
974 : * Request a tuple asynchronously.
975 : * ----------------------------------------------------------------
976 : */
977 : static bool
978 12312 : ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
979 : {
980 : Bitmapset *needrequest;
981 : int i;
982 :
983 : /* Nothing to do if there are no async subplans needing a new request. */
984 12312 : if (bms_is_empty(node->as_needrequest))
985 : {
986 : Assert(node->as_nasyncresults == 0);
987 146 : return false;
988 : }
989 :
990 : /*
991 : * If there are any asynchronously-generated results that have not yet
992 : * been returned, we have nothing to do; just return one of them.
993 : */
994 12166 : if (node->as_nasyncresults > 0)
995 : {
996 1784 : --node->as_nasyncresults;
997 1784 : *result = node->as_asyncresults[node->as_nasyncresults];
998 1784 : return true;
999 : }
1000 :
1001 : /* Make a new request for each of the async subplans that need it. */
1002 10382 : needrequest = node->as_needrequest;
1003 10382 : node->as_needrequest = NULL;
1004 10382 : i = -1;
1005 22388 : while ((i = bms_next_member(needrequest, i)) >= 0)
1006 : {
1007 12006 : AsyncRequest *areq = node->as_asyncrequests[i];
1008 :
1009 : /* Do the actual work. */
1010 12006 : ExecAsyncRequest(areq);
1011 : }
1012 10382 : bms_free(needrequest);
1013 :
1014 : /* Return one of the asynchronously-generated results if any. */
1015 10382 : if (node->as_nasyncresults > 0)
1016 : {
1017 10228 : --node->as_nasyncresults;
1018 10228 : *result = node->as_asyncresults[node->as_nasyncresults];
1019 10228 : return true;
1020 : }
1021 :
1022 154 : return false;
1023 : }
1024 :
1025 : /* ----------------------------------------------------------------
1026 : * ExecAppendAsyncEventWait
1027 : *
1028 : * Wait or poll for file descriptor events and fire callbacks.
1029 : * ----------------------------------------------------------------
1030 : */
1031 : static void
1032 272 : ExecAppendAsyncEventWait(AppendState *node)
1033 : {
1034 272 : int nevents = node->as_nasyncplans + 1;
1035 272 : long timeout = node->as_syncdone ? -1 : 0;
1036 : WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1037 : int noccurred;
1038 : int i;
1039 :
1040 : /* We should never be called when there are no valid async subplans. */
1041 : Assert(node->as_nasyncremain > 0);
1042 :
1043 : Assert(node->as_eventset == NULL);
1044 272 : node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
1045 272 : AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
1046 : NULL, NULL);
1047 :
1048 : /* Give each waiting subplan a chance to add an event. */
1049 272 : i = -1;
1050 826 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1051 : {
1052 556 : AsyncRequest *areq = node->as_asyncrequests[i];
1053 :
1054 556 : if (areq->callback_pending)
1055 458 : ExecAsyncConfigureWait(areq);
1056 : }
1057 :
1058 : /*
1059 : * No need for further processing if there are no configured events other
1060 : * than the postmaster death event.
1061 : */
1062 270 : if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1063 : {
1064 2 : FreeWaitEventSet(node->as_eventset);
1065 2 : node->as_eventset = NULL;
1066 4 : return;
1067 : }
1068 :
1069 : /* Return at most EVENT_BUFFER_SIZE events in one call. */
1070 268 : if (nevents > EVENT_BUFFER_SIZE)
1071 0 : nevents = EVENT_BUFFER_SIZE;
1072 :
1073 : /*
1074 : * If the timeout is -1, wait until at least one event occurs. If the
1075 : * timeout is 0, poll for events, but do not wait at all.
1076 : */
1077 268 : noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1078 : nevents, WAIT_EVENT_APPEND_READY);
1079 268 : FreeWaitEventSet(node->as_eventset);
1080 268 : node->as_eventset = NULL;
1081 268 : if (noccurred == 0)
1082 2 : return;
1083 :
1084 : /* Deliver notifications. */
1085 560 : for (i = 0; i < noccurred; i++)
1086 : {
1087 294 : WaitEvent *w = &occurred_event[i];
1088 :
1089 : /*
1090 : * Each waiting subplan should have registered its wait event with
1091 : * user_data pointing back to its AsyncRequest.
1092 : */
1093 294 : if ((w->events & WL_SOCKET_READABLE) != 0)
1094 : {
1095 294 : AsyncRequest *areq = (AsyncRequest *) w->user_data;
1096 :
1097 294 : if (areq->callback_pending)
1098 : {
1099 : /*
1100 : * Mark it as no longer needing a callback. We must do this
1101 : * before dispatching the callback in case the callback resets
1102 : * the flag.
1103 : */
1104 294 : areq->callback_pending = false;
1105 :
1106 : /* Do the actual work. */
1107 294 : ExecAsyncNotify(areq);
1108 : }
1109 : }
1110 : }
1111 : }
1112 :
1113 : /* ----------------------------------------------------------------
1114 : * ExecAsyncAppendResponse
1115 : *
1116 : * Receive a response from an asynchronous request we made.
1117 : * ----------------------------------------------------------------
1118 : */
1119 : void
1120 12454 : ExecAsyncAppendResponse(AsyncRequest *areq)
1121 : {
1122 12454 : AppendState *node = (AppendState *) areq->requestor;
1123 12454 : TupleTableSlot *slot = areq->result;
1124 :
1125 : /* The result should be a TupleTableSlot or NULL. */
1126 : Assert(slot == NULL || IsA(slot, TupleTableSlot));
1127 :
1128 : /* Nothing to do if the request is pending. */
1129 12454 : if (!areq->request_complete)
1130 : {
1131 : /* The request would have been pending for a callback. */
1132 : Assert(areq->callback_pending);
1133 320 : return;
1134 : }
1135 :
1136 : /* If the result is NULL or an empty slot, there's nothing more to do. */
1137 12134 : if (TupIsNull(slot))
1138 : {
1139 : /* The ending subplan wouldn't have been pending for a callback. */
1140 : Assert(!areq->callback_pending);
1141 120 : --node->as_nasyncremain;
1142 120 : return;
1143 : }
1144 :
1145 : /* Save result so we can return it. */
1146 : Assert(node->as_nasyncresults < node->as_nasyncplans);
1147 12014 : node->as_asyncresults[node->as_nasyncresults++] = slot;
1148 :
1149 : /*
1150 : * Mark the subplan that returned a result as ready for a new request. We
1151 : * don't launch another one here immediately because it might complete.
1152 : */
1153 12014 : node->as_needrequest = bms_add_member(node->as_needrequest,
1154 : areq->request_index);
1155 : }
1156 :
1157 : /* ----------------------------------------------------------------
1158 : * classify_matching_subplans
1159 : *
1160 : * Classify the node's as_valid_subplans into sync ones and
1161 : * async ones, adjust it to contain sync ones only, and save
1162 : * async ones in the node's as_valid_asyncplans.
1163 : * ----------------------------------------------------------------
1164 : */
1165 : static void
1166 92 : classify_matching_subplans(AppendState *node)
1167 : {
1168 : Bitmapset *valid_asyncplans;
1169 :
1170 : Assert(node->as_valid_subplans_identified);
1171 : Assert(node->as_valid_asyncplans == NULL);
1172 :
1173 : /* Nothing to do if there are no valid subplans. */
1174 92 : if (bms_is_empty(node->as_valid_subplans))
1175 : {
1176 0 : node->as_syncdone = true;
1177 0 : node->as_nasyncremain = 0;
1178 0 : return;
1179 : }
1180 :
1181 : /* Nothing to do if there are no valid async subplans. */
1182 92 : if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1183 : {
1184 0 : node->as_nasyncremain = 0;
1185 0 : return;
1186 : }
1187 :
1188 : /* Get valid async subplans. */
1189 92 : valid_asyncplans = bms_intersect(node->as_asyncplans,
1190 92 : node->as_valid_subplans);
1191 :
1192 : /* Adjust the valid subplans to contain sync subplans only. */
1193 92 : node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
1194 : valid_asyncplans);
1195 :
1196 : /* Save valid async subplans. */
1197 92 : node->as_valid_asyncplans = valid_asyncplans;
1198 : }
|