Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeAppend.c
4 : * routines to handle append nodes.
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeAppend.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /* INTERFACE ROUTINES
16 : * ExecInitAppend - initialize the append node
17 : * ExecAppend - retrieve the next tuple from the node
18 : * ExecEndAppend - shut down the append node
19 : * ExecReScanAppend - rescan the append node
20 : *
21 : * NOTES
22 : * Each append node contains a list of one or more subplans which
23 : * must be iteratively processed (forwards or backwards).
24 : * Tuples are retrieved by executing the 'whichplan'th subplan
25 : * until the subplan stops returning tuples, at which point that
26 : * plan is shut down and the next started up.
27 : *
28 : * Append nodes don't make use of their left and right
29 : * subtrees, rather they maintain a list of subplans so
30 : * a typical append node looks like this in the plan tree:
31 : *
32 : * ...
33 : * /
34 : * Append -------+------+------+--- nil
35 : * / \ | | |
36 : * nil nil ... ... ...
37 : * subplans
38 : *
39 : * Append nodes are currently used for unions, and to support
40 : * inheritance queries, where several relations need to be scanned.
41 : * For example, in our standard person/student/employee/student-emp
42 : * example, where student and employee inherit from person
43 : * and student-emp inherits from student and employee, the
44 : * query:
45 : *
46 : * select name from person
47 : *
48 : * generates the plan:
49 : *
50 : * |
51 : * Append -------+-------+--------+--------+
52 : * / \ | | | |
53 : * nil nil Scan Scan Scan Scan
54 : * | | | |
55 : * person employee student student-emp
56 : */
57 :
58 : #include "postgres.h"
59 :
60 : #include "executor/execAsync.h"
61 : #include "executor/execPartition.h"
62 : #include "executor/executor.h"
63 : #include "executor/nodeAppend.h"
64 : #include "miscadmin.h"
65 : #include "pgstat.h"
66 : #include "storage/latch.h"
67 :
68 : /* Shared state for parallel-aware Append. */
69 : struct ParallelAppendState
70 : {
71 : LWLock pa_lock; /* mutual exclusion to choose next subplan */
72 : int pa_next_plan; /* next plan to choose by any worker */
73 :
74 : /*
75 : * pa_finished[i] should be true if no more workers should select subplan
76 : * i. for a non-partial plan, this should be set to true as soon as a
77 : * worker selects the plan; for a partial plan, it remains false until
78 : * some worker executes the plan to completion.
79 : */
80 : bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
81 : };
82 :
83 : #define INVALID_SUBPLAN_INDEX -1
84 : #define EVENT_BUFFER_SIZE 16
85 :
86 : static TupleTableSlot *ExecAppend(PlanState *pstate);
87 : static bool choose_next_subplan_locally(AppendState *node);
88 : static bool choose_next_subplan_for_leader(AppendState *node);
89 : static bool choose_next_subplan_for_worker(AppendState *node);
90 : static void mark_invalid_subplans_as_finished(AppendState *node);
91 : static void ExecAppendAsyncBegin(AppendState *node);
92 : static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
93 : static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
94 : static void ExecAppendAsyncEventWait(AppendState *node);
95 : static void classify_matching_subplans(AppendState *node);
96 :
97 : /* ----------------------------------------------------------------
98 : * ExecInitAppend
99 : *
100 : * Begin all of the subscans of the append node.
101 : *
102 : * (This is potentially wasteful, since the entire result of the
103 : * append node may not be scanned, but this way all of the
104 : * structures get allocated in the executor's top level memory
105 : * block instead of that of the call to ExecAppend.)
106 : * ----------------------------------------------------------------
107 : */
108 : AppendState *
109 13662 : ExecInitAppend(Append *node, EState *estate, int eflags)
110 : {
111 13662 : AppendState *appendstate = makeNode(AppendState);
112 : PlanState **appendplanstates;
113 : const TupleTableSlotOps *appendops;
114 : Bitmapset *validsubplans;
115 : Bitmapset *asyncplans;
116 : int nplans;
117 : int nasyncplans;
118 : int firstvalid;
119 : int i,
120 : j;
121 :
122 : /* check for unsupported flags */
123 : Assert(!(eflags & EXEC_FLAG_MARK));
124 :
125 : /*
126 : * create new AppendState for our append node
127 : */
128 13662 : appendstate->ps.plan = (Plan *) node;
129 13662 : appendstate->ps.state = estate;
130 13662 : appendstate->ps.ExecProcNode = ExecAppend;
131 :
132 : /* Let choose_next_subplan_* function handle setting the first subplan */
133 13662 : appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
134 13662 : appendstate->as_syncdone = false;
135 13662 : appendstate->as_begun = false;
136 :
137 : /* If run-time partition pruning is enabled, then set that up now */
138 13662 : if (node->part_prune_info != NULL)
139 : {
140 : PartitionPruneState *prunestate;
141 :
142 : /*
143 : * Set up pruning data structure. This also initializes the set of
144 : * subplans to initialize (validsubplans) by taking into account the
145 : * result of performing initial pruning if any.
146 : */
147 636 : prunestate = ExecInitPartitionPruning(&appendstate->ps,
148 636 : list_length(node->appendplans),
149 636 : node->part_prune_info,
150 : &validsubplans);
151 636 : appendstate->as_prune_state = prunestate;
152 636 : nplans = bms_num_members(validsubplans);
153 :
154 : /*
155 : * When no run-time pruning is required and there's at least one
156 : * subplan, we can fill as_valid_subplans immediately, preventing
157 : * later calls to ExecFindMatchingSubPlans.
158 : */
159 636 : if (!prunestate->do_exec_prune && nplans > 0)
160 : {
161 218 : appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
162 218 : appendstate->as_valid_subplans_identified = true;
163 : }
164 : }
165 : else
166 : {
167 13026 : nplans = list_length(node->appendplans);
168 :
169 : /*
170 : * When run-time partition pruning is not enabled we can just mark all
171 : * subplans as valid; they must also all be initialized.
172 : */
173 : Assert(nplans > 0);
174 13026 : appendstate->as_valid_subplans = validsubplans =
175 13026 : bms_add_range(NULL, 0, nplans - 1);
176 13026 : appendstate->as_valid_subplans_identified = true;
177 13026 : appendstate->as_prune_state = NULL;
178 : }
179 :
180 13662 : appendplanstates = (PlanState **) palloc(nplans *
181 : sizeof(PlanState *));
182 :
183 : /*
184 : * call ExecInitNode on each of the valid plans to be executed and save
185 : * the results into the appendplanstates array.
186 : *
187 : * While at it, find out the first valid partial plan.
188 : */
189 13662 : j = 0;
190 13662 : asyncplans = NULL;
191 13662 : nasyncplans = 0;
192 13662 : firstvalid = nplans;
193 13662 : i = -1;
194 53870 : while ((i = bms_next_member(validsubplans, i)) >= 0)
195 : {
196 40208 : Plan *initNode = (Plan *) list_nth(node->appendplans, i);
197 :
198 : /*
199 : * Record async subplans. When executing EvalPlanQual, we treat them
200 : * as sync ones; don't do this when initializing an EvalPlanQual plan
201 : * tree.
202 : */
203 40208 : if (initNode->async_capable && estate->es_epq_active == NULL)
204 : {
205 186 : asyncplans = bms_add_member(asyncplans, j);
206 186 : nasyncplans++;
207 : }
208 :
209 : /*
210 : * Record the lowest appendplans index which is a valid partial plan.
211 : */
212 40208 : if (i >= node->first_partial_plan && j < firstvalid)
213 450 : firstvalid = j;
214 :
215 40208 : appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
216 : }
217 :
218 13662 : appendstate->as_first_partial_plan = firstvalid;
219 13662 : appendstate->appendplans = appendplanstates;
220 13662 : appendstate->as_nplans = nplans;
221 :
222 : /*
223 : * Initialize Append's result tuple type and slot. If the child plans all
224 : * produce the same fixed slot type, we can use that slot type; otherwise
225 : * make a virtual slot. (Note that the result slot itself is used only to
226 : * return a null tuple at end of execution; real tuples are returned to
227 : * the caller in the children's own result slots. What we are doing here
228 : * is allowing the parent plan node to optimize if the Append will return
229 : * only one kind of slot.)
230 : */
231 13662 : appendops = ExecGetCommonSlotOps(appendplanstates, j);
232 13662 : if (appendops != NULL)
233 : {
234 12866 : ExecInitResultTupleSlotTL(&appendstate->ps, appendops);
235 : }
236 : else
237 : {
238 796 : ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
239 : /* show that the output slot type is not fixed */
240 796 : appendstate->ps.resultopsset = true;
241 796 : appendstate->ps.resultopsfixed = false;
242 : }
243 :
244 : /* Initialize async state */
245 13662 : appendstate->as_asyncplans = asyncplans;
246 13662 : appendstate->as_nasyncplans = nasyncplans;
247 13662 : appendstate->as_asyncrequests = NULL;
248 13662 : appendstate->as_asyncresults = NULL;
249 13662 : appendstate->as_nasyncresults = 0;
250 13662 : appendstate->as_nasyncremain = 0;
251 13662 : appendstate->as_needrequest = NULL;
252 13662 : appendstate->as_eventset = NULL;
253 13662 : appendstate->as_valid_asyncplans = NULL;
254 :
255 13662 : if (nasyncplans > 0)
256 : {
257 94 : appendstate->as_asyncrequests = (AsyncRequest **)
258 94 : palloc0(nplans * sizeof(AsyncRequest *));
259 :
260 94 : i = -1;
261 280 : while ((i = bms_next_member(asyncplans, i)) >= 0)
262 : {
263 : AsyncRequest *areq;
264 :
265 186 : areq = palloc(sizeof(AsyncRequest));
266 186 : areq->requestor = (PlanState *) appendstate;
267 186 : areq->requestee = appendplanstates[i];
268 186 : areq->request_index = i;
269 186 : areq->callback_pending = false;
270 186 : areq->request_complete = false;
271 186 : areq->result = NULL;
272 :
273 186 : appendstate->as_asyncrequests[i] = areq;
274 : }
275 :
276 94 : appendstate->as_asyncresults = (TupleTableSlot **)
277 94 : palloc0(nasyncplans * sizeof(TupleTableSlot *));
278 :
279 94 : if (appendstate->as_valid_subplans_identified)
280 88 : classify_matching_subplans(appendstate);
281 : }
282 :
283 : /*
284 : * Miscellaneous initialization
285 : */
286 :
287 13662 : appendstate->ps.ps_ProjInfo = NULL;
288 :
289 : /* For parallel query, this will be overridden later. */
290 13662 : appendstate->choose_next_subplan = choose_next_subplan_locally;
291 :
292 13662 : return appendstate;
293 : }
294 :
295 : /* ----------------------------------------------------------------
296 : * ExecAppend
297 : *
298 : * Handles iteration over multiple subplans.
299 : * ----------------------------------------------------------------
300 : */
301 : static TupleTableSlot *
302 2482390 : ExecAppend(PlanState *pstate)
303 : {
304 2482390 : AppendState *node = castNode(AppendState, pstate);
305 : TupleTableSlot *result;
306 :
307 : /*
308 : * If this is the first call after Init or ReScan, we need to do the
309 : * initialization work.
310 : */
311 2482390 : if (!node->as_begun)
312 : {
313 : Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
314 : Assert(!node->as_syncdone);
315 :
316 : /* Nothing to do if there are no subplans */
317 30964 : if (node->as_nplans == 0)
318 36 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
319 :
320 : /* If there are any async subplans, begin executing them. */
321 30928 : if (node->as_nasyncplans > 0)
322 74 : ExecAppendAsyncBegin(node);
323 :
324 : /*
325 : * If no sync subplan has been chosen, we must choose one before
326 : * proceeding.
327 : */
328 30928 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
329 3272 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
330 :
331 : Assert(node->as_syncdone ||
332 : (node->as_whichplan >= 0 &&
333 : node->as_whichplan < node->as_nplans));
334 :
335 : /* And we're initialized. */
336 27656 : node->as_begun = true;
337 : }
338 :
339 : for (;;)
340 35676 : {
341 : PlanState *subnode;
342 :
343 2514758 : CHECK_FOR_INTERRUPTS();
344 :
345 : /*
346 : * try to get a tuple from an async subplan if any
347 : */
348 2514758 : if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
349 : {
350 12074 : if (ExecAppendAsyncGetNext(node, &result))
351 12072 : return result;
352 : Assert(!node->as_syncdone);
353 : Assert(bms_is_empty(node->as_needrequest));
354 : }
355 :
356 : /*
357 : * figure out which sync subplan we are currently processing
358 : */
359 : Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
360 2502684 : subnode = node->appendplans[node->as_whichplan];
361 :
362 : /*
363 : * get a tuple from the subplan
364 : */
365 2502684 : result = ExecProcNode(subnode);
366 :
367 2502632 : if (!TupIsNull(result))
368 : {
369 : /*
370 : * If the subplan gave us something then return it as-is. We do
371 : * NOT make use of the result slot that was set up in
372 : * ExecInitAppend; there's no need for it.
373 : */
374 2439966 : return result;
375 : }
376 :
377 : /*
378 : * wait or poll for async events if any. We do this before checking
379 : * for the end of iteration, because it might drain the remaining
380 : * async subplans.
381 : */
382 62666 : if (node->as_nasyncremain > 0)
383 34 : ExecAppendAsyncEventWait(node);
384 :
385 : /* choose new sync subplan; if no sync/async subplans, we're done */
386 62666 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
387 26990 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
388 : }
389 : }
390 :
391 : /* ----------------------------------------------------------------
392 : * ExecEndAppend
393 : *
394 : * Shuts down the subscans of the append node.
395 : *
396 : * Returns nothing of interest.
397 : * ----------------------------------------------------------------
398 : */
399 : void
400 13372 : ExecEndAppend(AppendState *node)
401 : {
402 : PlanState **appendplans;
403 : int nplans;
404 : int i;
405 :
406 : /*
407 : * get information from the node
408 : */
409 13372 : appendplans = node->appendplans;
410 13372 : nplans = node->as_nplans;
411 :
412 : /*
413 : * shut down each of the subscans
414 : */
415 52874 : for (i = 0; i < nplans; i++)
416 39502 : ExecEndNode(appendplans[i]);
417 13372 : }
418 :
419 : void
420 21442 : ExecReScanAppend(AppendState *node)
421 : {
422 21442 : int nasyncplans = node->as_nasyncplans;
423 : int i;
424 :
425 : /*
426 : * If any PARAM_EXEC Params used in pruning expressions have changed, then
427 : * we'd better unset the valid subplans so that they are reselected for
428 : * the new parameter values.
429 : */
430 24710 : if (node->as_prune_state &&
431 3268 : bms_overlap(node->ps.chgParam,
432 3268 : node->as_prune_state->execparamids))
433 : {
434 3268 : node->as_valid_subplans_identified = false;
435 3268 : bms_free(node->as_valid_subplans);
436 3268 : node->as_valid_subplans = NULL;
437 3268 : bms_free(node->as_valid_asyncplans);
438 3268 : node->as_valid_asyncplans = NULL;
439 : }
440 :
441 87156 : for (i = 0; i < node->as_nplans; i++)
442 : {
443 65714 : PlanState *subnode = node->appendplans[i];
444 :
445 : /*
446 : * ExecReScan doesn't know about my subplans, so I have to do
447 : * changed-parameter signaling myself.
448 : */
449 65714 : if (node->ps.chgParam != NULL)
450 54508 : UpdateChangedParamSet(subnode, node->ps.chgParam);
451 :
452 : /*
453 : * If chgParam of subnode is not null then plan will be re-scanned by
454 : * first ExecProcNode or by first ExecAsyncRequest.
455 : */
456 65714 : if (subnode->chgParam == NULL)
457 22274 : ExecReScan(subnode);
458 : }
459 :
460 : /* Reset async state */
461 21442 : if (nasyncplans > 0)
462 : {
463 34 : i = -1;
464 102 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
465 : {
466 68 : AsyncRequest *areq = node->as_asyncrequests[i];
467 :
468 68 : areq->callback_pending = false;
469 68 : areq->request_complete = false;
470 68 : areq->result = NULL;
471 : }
472 :
473 34 : node->as_nasyncresults = 0;
474 34 : node->as_nasyncremain = 0;
475 34 : bms_free(node->as_needrequest);
476 34 : node->as_needrequest = NULL;
477 : }
478 :
479 : /* Let choose_next_subplan_* function handle setting the first subplan */
480 21442 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
481 21442 : node->as_syncdone = false;
482 21442 : node->as_begun = false;
483 21442 : }
484 :
485 : /* ----------------------------------------------------------------
486 : * Parallel Append Support
487 : * ----------------------------------------------------------------
488 : */
489 :
490 : /* ----------------------------------------------------------------
491 : * ExecAppendEstimate
492 : *
493 : * Compute the amount of space we'll need in the parallel
494 : * query DSM, and inform pcxt->estimator about our needs.
495 : * ----------------------------------------------------------------
496 : */
497 : void
498 138 : ExecAppendEstimate(AppendState *node,
499 : ParallelContext *pcxt)
500 : {
501 138 : node->pstate_len =
502 138 : add_size(offsetof(ParallelAppendState, pa_finished),
503 138 : sizeof(bool) * node->as_nplans);
504 :
505 138 : shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
506 138 : shm_toc_estimate_keys(&pcxt->estimator, 1);
507 138 : }
508 :
509 :
510 : /* ----------------------------------------------------------------
511 : * ExecAppendInitializeDSM
512 : *
513 : * Set up shared state for Parallel Append.
514 : * ----------------------------------------------------------------
515 : */
516 : void
517 138 : ExecAppendInitializeDSM(AppendState *node,
518 : ParallelContext *pcxt)
519 : {
520 : ParallelAppendState *pstate;
521 :
522 138 : pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
523 138 : memset(pstate, 0, node->pstate_len);
524 138 : LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
525 138 : shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
526 :
527 138 : node->as_pstate = pstate;
528 138 : node->choose_next_subplan = choose_next_subplan_for_leader;
529 138 : }
530 :
531 : /* ----------------------------------------------------------------
532 : * ExecAppendReInitializeDSM
533 : *
534 : * Reset shared state before beginning a fresh scan.
535 : * ----------------------------------------------------------------
536 : */
537 : void
538 0 : ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
539 : {
540 0 : ParallelAppendState *pstate = node->as_pstate;
541 :
542 0 : pstate->pa_next_plan = 0;
543 0 : memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
544 0 : }
545 :
546 : /* ----------------------------------------------------------------
547 : * ExecAppendInitializeWorker
548 : *
549 : * Copy relevant information from TOC into planstate, and initialize
550 : * whatever is required to choose and execute the optimal subplan.
551 : * ----------------------------------------------------------------
552 : */
553 : void
554 318 : ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
555 : {
556 318 : node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
557 318 : node->choose_next_subplan = choose_next_subplan_for_worker;
558 318 : }
559 :
560 : /* ----------------------------------------------------------------
561 : * choose_next_subplan_locally
562 : *
563 : * Choose next sync subplan for a non-parallel-aware Append,
564 : * returning false if there are no more.
565 : * ----------------------------------------------------------------
566 : */
567 : static bool
568 92734 : choose_next_subplan_locally(AppendState *node)
569 : {
570 92734 : int whichplan = node->as_whichplan;
571 : int nextplan;
572 :
573 : /* We should never be called when there are no subplans */
574 : Assert(node->as_nplans > 0);
575 :
576 : /* Nothing to do if syncdone */
577 92734 : if (node->as_syncdone)
578 36 : return false;
579 :
580 : /*
581 : * If first call then have the bms member function choose the first valid
582 : * sync subplan by initializing whichplan to -1. If there happen to be no
583 : * valid sync subplans then the bms member function will handle that by
584 : * returning a negative number which will allow us to exit returning a
585 : * false value.
586 : */
587 92698 : if (whichplan == INVALID_SUBPLAN_INDEX)
588 : {
589 30490 : if (node->as_nasyncplans > 0)
590 : {
591 : /* We'd have filled as_valid_subplans already */
592 : Assert(node->as_valid_subplans_identified);
593 : }
594 30452 : else if (!node->as_valid_subplans_identified)
595 : {
596 3382 : node->as_valid_subplans =
597 3382 : ExecFindMatchingSubPlans(node->as_prune_state, false);
598 3382 : node->as_valid_subplans_identified = true;
599 : }
600 :
601 30490 : whichplan = -1;
602 : }
603 :
604 : /* Ensure whichplan is within the expected range */
605 : Assert(whichplan >= -1 && whichplan <= node->as_nplans);
606 :
607 92698 : if (ScanDirectionIsForward(node->ps.state->es_direction))
608 92680 : nextplan = bms_next_member(node->as_valid_subplans, whichplan);
609 : else
610 18 : nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
611 :
612 92698 : if (nextplan < 0)
613 : {
614 : /* Set as_syncdone if in async mode */
615 29892 : if (node->as_nasyncplans > 0)
616 34 : node->as_syncdone = true;
617 29892 : return false;
618 : }
619 :
620 62806 : node->as_whichplan = nextplan;
621 :
622 62806 : return true;
623 : }
624 :
625 : /* ----------------------------------------------------------------
626 : * choose_next_subplan_for_leader
627 : *
628 : * Try to pick a plan which doesn't commit us to doing much
629 : * work locally, so that as much work as possible is done in
630 : * the workers. Cheapest subplans are at the end.
631 : * ----------------------------------------------------------------
632 : */
633 : static bool
634 514 : choose_next_subplan_for_leader(AppendState *node)
635 : {
636 514 : ParallelAppendState *pstate = node->as_pstate;
637 :
638 : /* Backward scan is not supported by parallel-aware plans */
639 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
640 :
641 : /* We should never be called when there are no subplans */
642 : Assert(node->as_nplans > 0);
643 :
644 514 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
645 :
646 514 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
647 : {
648 : /* Mark just-completed subplan as finished. */
649 394 : node->as_pstate->pa_finished[node->as_whichplan] = true;
650 : }
651 : else
652 : {
653 : /* Start with last subplan. */
654 120 : node->as_whichplan = node->as_nplans - 1;
655 :
656 : /*
657 : * If we've yet to determine the valid subplans then do so now. If
658 : * run-time pruning is disabled then the valid subplans will always be
659 : * set to all subplans.
660 : */
661 120 : if (!node->as_valid_subplans_identified)
662 : {
663 24 : node->as_valid_subplans =
664 24 : ExecFindMatchingSubPlans(node->as_prune_state, false);
665 24 : node->as_valid_subplans_identified = true;
666 :
667 : /*
668 : * Mark each invalid plan as finished to allow the loop below to
669 : * select the first valid subplan.
670 : */
671 24 : mark_invalid_subplans_as_finished(node);
672 : }
673 : }
674 :
675 : /* Loop until we find a subplan to execute. */
676 814 : while (pstate->pa_finished[node->as_whichplan])
677 : {
678 420 : if (node->as_whichplan == 0)
679 : {
680 120 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
681 120 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
682 120 : LWLockRelease(&pstate->pa_lock);
683 120 : return false;
684 : }
685 :
686 : /*
687 : * We needn't pay attention to as_valid_subplans here as all invalid
688 : * plans have been marked as finished.
689 : */
690 300 : node->as_whichplan--;
691 : }
692 :
693 : /* If non-partial, immediately mark as finished. */
694 394 : if (node->as_whichplan < node->as_first_partial_plan)
695 136 : node->as_pstate->pa_finished[node->as_whichplan] = true;
696 :
697 394 : LWLockRelease(&pstate->pa_lock);
698 :
699 394 : return true;
700 : }
701 :
702 : /* ----------------------------------------------------------------
703 : * choose_next_subplan_for_worker
704 : *
705 : * Choose next subplan for a parallel-aware Append, returning
706 : * false if there are no more.
707 : *
708 : * We start from the first plan and advance through the list;
709 : * when we get back to the end, we loop back to the first
710 : * partial plan. This assigns the non-partial plans first in
711 : * order of descending cost and then spreads out the workers
712 : * as evenly as possible across the remaining partial plans.
713 : * ----------------------------------------------------------------
714 : */
715 : static bool
716 346 : choose_next_subplan_for_worker(AppendState *node)
717 : {
718 346 : ParallelAppendState *pstate = node->as_pstate;
719 :
720 : /* Backward scan is not supported by parallel-aware plans */
721 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
722 :
723 : /* We should never be called when there are no subplans */
724 : Assert(node->as_nplans > 0);
725 :
726 346 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
727 :
728 : /* Mark just-completed subplan as finished. */
729 346 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
730 64 : node->as_pstate->pa_finished[node->as_whichplan] = true;
731 :
732 : /*
733 : * If we've yet to determine the valid subplans then do so now. If
734 : * run-time pruning is disabled then the valid subplans will always be set
735 : * to all subplans.
736 : */
737 282 : else if (!node->as_valid_subplans_identified)
738 : {
739 24 : node->as_valid_subplans =
740 24 : ExecFindMatchingSubPlans(node->as_prune_state, false);
741 24 : node->as_valid_subplans_identified = true;
742 :
743 24 : mark_invalid_subplans_as_finished(node);
744 : }
745 :
746 : /* If all the plans are already done, we have nothing to do */
747 346 : if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
748 : {
749 256 : LWLockRelease(&pstate->pa_lock);
750 256 : return false;
751 : }
752 :
753 : /* Save the plan from which we are starting the search. */
754 90 : node->as_whichplan = pstate->pa_next_plan;
755 :
756 : /* Loop until we find a valid subplan to execute. */
757 184 : while (pstate->pa_finished[pstate->pa_next_plan])
758 : {
759 : int nextplan;
760 :
761 120 : nextplan = bms_next_member(node->as_valid_subplans,
762 : pstate->pa_next_plan);
763 120 : if (nextplan >= 0)
764 : {
765 : /* Advance to the next valid plan. */
766 94 : pstate->pa_next_plan = nextplan;
767 : }
768 26 : else if (node->as_whichplan > node->as_first_partial_plan)
769 : {
770 : /*
771 : * Try looping back to the first valid partial plan, if there is
772 : * one. If there isn't, arrange to bail out below.
773 : */
774 6 : nextplan = bms_next_member(node->as_valid_subplans,
775 6 : node->as_first_partial_plan - 1);
776 6 : pstate->pa_next_plan =
777 6 : nextplan < 0 ? node->as_whichplan : nextplan;
778 : }
779 : else
780 : {
781 : /*
782 : * At last plan, and either there are no partial plans or we've
783 : * tried them all. Arrange to bail out.
784 : */
785 20 : pstate->pa_next_plan = node->as_whichplan;
786 : }
787 :
788 120 : if (pstate->pa_next_plan == node->as_whichplan)
789 : {
790 : /* We've tried everything! */
791 26 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
792 26 : LWLockRelease(&pstate->pa_lock);
793 26 : return false;
794 : }
795 : }
796 :
797 : /* Pick the plan we found, and advance pa_next_plan one more time. */
798 64 : node->as_whichplan = pstate->pa_next_plan;
799 64 : pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
800 : pstate->pa_next_plan);
801 :
802 : /*
803 : * If there are no more valid plans then try setting the next plan to the
804 : * first valid partial plan.
805 : */
806 64 : if (pstate->pa_next_plan < 0)
807 : {
808 18 : int nextplan = bms_next_member(node->as_valid_subplans,
809 18 : node->as_first_partial_plan - 1);
810 :
811 18 : if (nextplan >= 0)
812 18 : pstate->pa_next_plan = nextplan;
813 : else
814 : {
815 : /*
816 : * There are no valid partial plans, and we already chose the last
817 : * non-partial plan; so flag that there's nothing more for our
818 : * fellow workers to do.
819 : */
820 0 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
821 : }
822 : }
823 :
824 : /* If non-partial, immediately mark as finished. */
825 64 : if (node->as_whichplan < node->as_first_partial_plan)
826 2 : node->as_pstate->pa_finished[node->as_whichplan] = true;
827 :
828 64 : LWLockRelease(&pstate->pa_lock);
829 :
830 64 : return true;
831 : }
832 :
833 : /*
834 : * mark_invalid_subplans_as_finished
835 : * Marks the ParallelAppendState's pa_finished as true for each invalid
836 : * subplan.
837 : *
838 : * This function should only be called for parallel Append with run-time
839 : * pruning enabled.
840 : */
841 : static void
842 48 : mark_invalid_subplans_as_finished(AppendState *node)
843 : {
844 : int i;
845 :
846 : /* Only valid to call this while in parallel Append mode */
847 : Assert(node->as_pstate);
848 :
849 : /* Shouldn't have been called when run-time pruning is not enabled */
850 : Assert(node->as_prune_state);
851 :
852 : /* Nothing to do if all plans are valid */
853 48 : if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
854 0 : return;
855 :
856 : /* Mark all non-valid plans as finished */
857 162 : for (i = 0; i < node->as_nplans; i++)
858 : {
859 114 : if (!bms_is_member(i, node->as_valid_subplans))
860 48 : node->as_pstate->pa_finished[i] = true;
861 : }
862 : }
863 :
864 : /* ----------------------------------------------------------------
865 : * Asynchronous Append Support
866 : * ----------------------------------------------------------------
867 : */
868 :
869 : /* ----------------------------------------------------------------
870 : * ExecAppendAsyncBegin
871 : *
872 : * Begin executing designed async-capable subplans.
873 : * ----------------------------------------------------------------
874 : */
875 : static void
876 74 : ExecAppendAsyncBegin(AppendState *node)
877 : {
878 : int i;
879 :
880 : /* Backward scan is not supported by async-aware Appends. */
881 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
882 :
883 : /* We should never be called when there are no subplans */
884 : Assert(node->as_nplans > 0);
885 :
886 : /* We should never be called when there are no async subplans. */
887 : Assert(node->as_nasyncplans > 0);
888 :
889 : /* If we've yet to determine the valid subplans then do so now. */
890 74 : if (!node->as_valid_subplans_identified)
891 : {
892 4 : node->as_valid_subplans =
893 4 : ExecFindMatchingSubPlans(node->as_prune_state, false);
894 4 : node->as_valid_subplans_identified = true;
895 :
896 4 : classify_matching_subplans(node);
897 : }
898 :
899 : /* Initialize state variables. */
900 74 : node->as_syncdone = bms_is_empty(node->as_valid_subplans);
901 74 : node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
902 :
903 : /* Nothing to do if there are no valid async subplans. */
904 74 : if (node->as_nasyncremain == 0)
905 0 : return;
906 :
907 : /* Make a request for each of the valid async subplans. */
908 74 : i = -1;
909 218 : while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
910 : {
911 144 : AsyncRequest *areq = node->as_asyncrequests[i];
912 :
913 : Assert(areq->request_index == i);
914 : Assert(!areq->callback_pending);
915 :
916 : /* Do the actual work. */
917 144 : ExecAsyncRequest(areq);
918 : }
919 : }
920 :
921 : /* ----------------------------------------------------------------
922 : * ExecAppendAsyncGetNext
923 : *
924 : * Get the next tuple from any of the asynchronous subplans.
925 : * ----------------------------------------------------------------
926 : */
927 : static bool
928 12074 : ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
929 : {
930 12074 : *result = NULL;
931 :
932 : /* We should never be called when there are no valid async subplans. */
933 : Assert(node->as_nasyncremain > 0);
934 :
935 : /* Request a tuple asynchronously. */
936 12074 : if (ExecAppendAsyncRequest(node, result))
937 11918 : return true;
938 :
939 228 : while (node->as_nasyncremain > 0)
940 : {
941 168 : CHECK_FOR_INTERRUPTS();
942 :
943 : /* Wait or poll for async events. */
944 168 : ExecAppendAsyncEventWait(node);
945 :
946 : /* Request a tuple asynchronously. */
947 166 : if (ExecAppendAsyncRequest(node, result))
948 94 : return true;
949 :
950 : /* Break from loop if there's any sync subplan that isn't complete. */
951 72 : if (!node->as_syncdone)
952 0 : break;
953 : }
954 :
955 : /*
956 : * If all sync subplans are complete, we're totally done scanning the
957 : * given node. Otherwise, we're done with the asynchronous stuff but must
958 : * continue scanning the sync subplans.
959 : */
960 60 : if (node->as_syncdone)
961 : {
962 : Assert(node->as_nasyncremain == 0);
963 60 : *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
964 60 : return true;
965 : }
966 :
967 0 : return false;
968 : }
969 :
970 : /* ----------------------------------------------------------------
971 : * ExecAppendAsyncRequest
972 : *
973 : * Request a tuple asynchronously.
974 : * ----------------------------------------------------------------
975 : */
976 : static bool
977 12240 : ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
978 : {
979 : Bitmapset *needrequest;
980 : int i;
981 :
982 : /* Nothing to do if there are no async subplans needing a new request. */
983 12240 : if (bms_is_empty(node->as_needrequest))
984 : {
985 : Assert(node->as_nasyncresults == 0);
986 108 : return false;
987 : }
988 :
989 : /*
990 : * If there are any asynchronously-generated results that have not yet
991 : * been returned, we have nothing to do; just return one of them.
992 : */
993 12132 : if (node->as_nasyncresults > 0)
994 : {
995 4772 : --node->as_nasyncresults;
996 4772 : *result = node->as_asyncresults[node->as_nasyncresults];
997 4772 : return true;
998 : }
999 :
1000 : /* Make a new request for each of the async subplans that need it. */
1001 7360 : needrequest = node->as_needrequest;
1002 7360 : node->as_needrequest = NULL;
1003 7360 : i = -1;
1004 19366 : while ((i = bms_next_member(needrequest, i)) >= 0)
1005 : {
1006 12006 : AsyncRequest *areq = node->as_asyncrequests[i];
1007 :
1008 : /* Do the actual work. */
1009 12006 : ExecAsyncRequest(areq);
1010 : }
1011 7360 : bms_free(needrequest);
1012 :
1013 : /* Return one of the asynchronously-generated results if any. */
1014 7360 : if (node->as_nasyncresults > 0)
1015 : {
1016 7240 : --node->as_nasyncresults;
1017 7240 : *result = node->as_asyncresults[node->as_nasyncresults];
1018 7240 : return true;
1019 : }
1020 :
1021 120 : return false;
1022 : }
1023 :
1024 : /* ----------------------------------------------------------------
1025 : * ExecAppendAsyncEventWait
1026 : *
1027 : * Wait or poll for file descriptor events and fire callbacks.
1028 : * ----------------------------------------------------------------
1029 : */
1030 : static void
1031 202 : ExecAppendAsyncEventWait(AppendState *node)
1032 : {
1033 202 : int nevents = node->as_nasyncplans + 1;
1034 202 : long timeout = node->as_syncdone ? -1 : 0;
1035 : WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1036 : int noccurred;
1037 : int i;
1038 :
1039 : /* We should never be called when there are no valid async subplans. */
1040 : Assert(node->as_nasyncremain > 0);
1041 :
1042 : Assert(node->as_eventset == NULL);
1043 202 : node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
1044 202 : AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
1045 : NULL, NULL);
1046 :
1047 : /* Give each waiting subplan a chance to add an event. */
1048 202 : i = -1;
1049 616 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1050 : {
1051 416 : AsyncRequest *areq = node->as_asyncrequests[i];
1052 :
1053 416 : if (areq->callback_pending)
1054 362 : ExecAsyncConfigureWait(areq);
1055 : }
1056 :
1057 : /*
1058 : * No need for further processing if there are no configured events other
1059 : * than the postmaster death event.
1060 : */
1061 200 : if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1062 : {
1063 2 : FreeWaitEventSet(node->as_eventset);
1064 2 : node->as_eventset = NULL;
1065 2 : return;
1066 : }
1067 :
1068 : /* Return at most EVENT_BUFFER_SIZE events in one call. */
1069 198 : if (nevents > EVENT_BUFFER_SIZE)
1070 0 : nevents = EVENT_BUFFER_SIZE;
1071 :
1072 : /*
1073 : * If the timeout is -1, wait until at least one event occurs. If the
1074 : * timeout is 0, poll for events, but do not wait at all.
1075 : */
1076 198 : noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1077 : nevents, WAIT_EVENT_APPEND_READY);
1078 198 : FreeWaitEventSet(node->as_eventset);
1079 198 : node->as_eventset = NULL;
1080 198 : if (noccurred == 0)
1081 0 : return;
1082 :
1083 : /* Deliver notifications. */
1084 492 : for (i = 0; i < noccurred; i++)
1085 : {
1086 294 : WaitEvent *w = &occurred_event[i];
1087 :
1088 : /*
1089 : * Each waiting subplan should have registered its wait event with
1090 : * user_data pointing back to its AsyncRequest.
1091 : */
1092 294 : if ((w->events & WL_SOCKET_READABLE) != 0)
1093 : {
1094 294 : AsyncRequest *areq = (AsyncRequest *) w->user_data;
1095 :
1096 294 : if (areq->callback_pending)
1097 : {
1098 : /*
1099 : * Mark it as no longer needing a callback. We must do this
1100 : * before dispatching the callback in case the callback resets
1101 : * the flag.
1102 : */
1103 294 : areq->callback_pending = false;
1104 :
1105 : /* Do the actual work. */
1106 294 : ExecAsyncNotify(areq);
1107 : }
1108 : }
1109 : }
1110 : }
1111 :
1112 : /* ----------------------------------------------------------------
1113 : * ExecAsyncAppendResponse
1114 : *
1115 : * Receive a response from an asynchronous request we made.
1116 : * ----------------------------------------------------------------
1117 : */
1118 : void
1119 12454 : ExecAsyncAppendResponse(AsyncRequest *areq)
1120 : {
1121 12454 : AppendState *node = (AppendState *) areq->requestor;
1122 12454 : TupleTableSlot *slot = areq->result;
1123 :
1124 : /* The result should be a TupleTableSlot or NULL. */
1125 : Assert(slot == NULL || IsA(slot, TupleTableSlot));
1126 :
1127 : /* Nothing to do if the request is pending. */
1128 12454 : if (!areq->request_complete)
1129 : {
1130 : /* The request would have been pending for a callback. */
1131 : Assert(areq->callback_pending);
1132 320 : return;
1133 : }
1134 :
1135 : /* If the result is NULL or an empty slot, there's nothing more to do. */
1136 12134 : if (TupIsNull(slot))
1137 : {
1138 : /* The ending subplan wouldn't have been pending for a callback. */
1139 : Assert(!areq->callback_pending);
1140 120 : --node->as_nasyncremain;
1141 120 : return;
1142 : }
1143 :
1144 : /* Save result so we can return it. */
1145 : Assert(node->as_nasyncresults < node->as_nasyncplans);
1146 12014 : node->as_asyncresults[node->as_nasyncresults++] = slot;
1147 :
1148 : /*
1149 : * Mark the subplan that returned a result as ready for a new request. We
1150 : * don't launch another one here immediately because it might complete.
1151 : */
1152 12014 : node->as_needrequest = bms_add_member(node->as_needrequest,
1153 : areq->request_index);
1154 : }
1155 :
1156 : /* ----------------------------------------------------------------
1157 : * classify_matching_subplans
1158 : *
1159 : * Classify the node's as_valid_subplans into sync ones and
1160 : * async ones, adjust it to contain sync ones only, and save
1161 : * async ones in the node's as_valid_asyncplans.
1162 : * ----------------------------------------------------------------
1163 : */
1164 : static void
1165 92 : classify_matching_subplans(AppendState *node)
1166 : {
1167 : Bitmapset *valid_asyncplans;
1168 :
1169 : Assert(node->as_valid_subplans_identified);
1170 : Assert(node->as_valid_asyncplans == NULL);
1171 :
1172 : /* Nothing to do if there are no valid subplans. */
1173 92 : if (bms_is_empty(node->as_valid_subplans))
1174 : {
1175 0 : node->as_syncdone = true;
1176 0 : node->as_nasyncremain = 0;
1177 0 : return;
1178 : }
1179 :
1180 : /* Nothing to do if there are no valid async subplans. */
1181 92 : if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1182 : {
1183 0 : node->as_nasyncremain = 0;
1184 0 : return;
1185 : }
1186 :
1187 : /* Get valid async subplans. */
1188 92 : valid_asyncplans = bms_intersect(node->as_asyncplans,
1189 92 : node->as_valid_subplans);
1190 :
1191 : /* Adjust the valid subplans to contain sync subplans only. */
1192 92 : node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
1193 : valid_asyncplans);
1194 :
1195 : /* Save valid async subplans. */
1196 92 : node->as_valid_asyncplans = valid_asyncplans;
1197 : }
|