Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeAppend.c
4 : * routines to handle append nodes.
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeAppend.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /*
16 : * INTERFACE ROUTINES
17 : * ExecInitAppend - initialize the append node
18 : * ExecAppend - retrieve the next tuple from the node
19 : * ExecEndAppend - shut down the append node
20 : * ExecReScanAppend - rescan the append node
21 : *
22 : * NOTES
23 : * Each append node contains a list of one or more subplans which
24 : * must be iteratively processed (forwards or backwards).
25 : * Tuples are retrieved by executing the 'whichplan'th subplan
26 : * until the subplan stops returning tuples, at which point that
27 : * plan is shut down and the next started up.
28 : *
29 : * Append nodes don't make use of their left and right
30 : * subtrees, rather they maintain a list of subplans so
31 : * a typical append node looks like this in the plan tree:
32 : *
33 : * ...
34 : * /
35 : * Append -------+------+------+--- nil
36 : * / \ | | |
37 : * nil nil ... ... ...
38 : * subplans
39 : *
40 : * Append nodes are currently used for unions, and to support
41 : * inheritance queries, where several relations need to be scanned.
42 : * For example, in our standard person/student/employee/student-emp
43 : * example, where student and employee inherit from person
44 : * and student-emp inherits from student and employee, the
45 : * query:
46 : *
47 : * select name from person
48 : *
49 : * generates the plan:
50 : *
51 : * |
52 : * Append -------+-------+--------+--------+
53 : * / \ | | | |
54 : * nil nil Scan Scan Scan Scan
55 : * | | | |
56 : * person employee student student-emp
57 : */
58 :
59 : #include "postgres.h"
60 :
61 : #include "executor/execAsync.h"
62 : #include "executor/execPartition.h"
63 : #include "executor/executor.h"
64 : #include "executor/nodeAppend.h"
65 : #include "miscadmin.h"
66 : #include "pgstat.h"
67 : #include "storage/latch.h"
68 : #include "storage/lwlock.h"
69 : #include "utils/wait_event.h"
70 :
71 : /* Shared state for parallel-aware Append. */
72 : struct ParallelAppendState
73 : {
74 : LWLock pa_lock; /* mutual exclusion to choose next subplan */
75 : int pa_next_plan; /* next plan to choose by any worker */
76 :
77 : /*
78 : * pa_finished[i] should be true if no more workers should select subplan
79 : * i. for a non-partial plan, this should be set to true as soon as a
80 : * worker selects the plan; for a partial plan, it remains false until
81 : * some worker executes the plan to completion.
82 : */
83 : bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
84 : };
85 :
86 : #define INVALID_SUBPLAN_INDEX -1
87 : #define EVENT_BUFFER_SIZE 16
88 :
89 : static TupleTableSlot *ExecAppend(PlanState *pstate);
90 : static bool choose_next_subplan_locally(AppendState *node);
91 : static bool choose_next_subplan_for_leader(AppendState *node);
92 : static bool choose_next_subplan_for_worker(AppendState *node);
93 : static void mark_invalid_subplans_as_finished(AppendState *node);
94 : static void ExecAppendAsyncBegin(AppendState *node);
95 : static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
96 : static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
97 : static void ExecAppendAsyncEventWait(AppendState *node);
98 : static void classify_matching_subplans(AppendState *node);
99 :
100 : /* ----------------------------------------------------------------
101 : * ExecInitAppend
102 : *
103 : * Begin all of the subscans of the append node.
104 : *
105 : * (This is potentially wasteful, since the entire result of the
106 : * append node may not be scanned, but this way all of the
107 : * structures get allocated in the executor's top level memory
108 : * block instead of that of the call to ExecAppend.)
109 : * ----------------------------------------------------------------
110 : */
111 : AppendState *
112 12561 : ExecInitAppend(Append *node, EState *estate, int eflags)
113 : {
114 12561 : AppendState *appendstate = makeNode(AppendState);
115 : PlanState **appendplanstates;
116 : const TupleTableSlotOps *appendops;
117 : Bitmapset *validsubplans;
118 : Bitmapset *asyncplans;
119 : int nplans;
120 : int nasyncplans;
121 : int firstvalid;
122 : int i,
123 : j;
124 :
125 : /* check for unsupported flags */
126 : Assert(!(eflags & EXEC_FLAG_MARK));
127 :
128 : /*
129 : * create new AppendState for our append node
130 : */
131 12561 : appendstate->ps.plan = (Plan *) node;
132 12561 : appendstate->ps.state = estate;
133 12561 : appendstate->ps.ExecProcNode = ExecAppend;
134 :
135 : /* Let choose_next_subplan_* function handle setting the first subplan */
136 12561 : appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
137 12561 : appendstate->as_syncdone = false;
138 12561 : appendstate->as_begun = false;
139 :
140 : /* If run-time partition pruning is enabled, then set that up now */
141 12561 : if (node->part_prune_index >= 0)
142 : {
143 : PartitionPruneState *prunestate;
144 :
145 : /*
146 : * Set up pruning data structure. This also initializes the set of
147 : * subplans to initialize (validsubplans) by taking into account the
148 : * result of performing initial pruning if any.
149 : */
150 488 : prunestate = ExecInitPartitionExecPruning(&appendstate->ps,
151 488 : list_length(node->appendplans),
152 : node->part_prune_index,
153 : node->apprelids,
154 : &validsubplans);
155 488 : appendstate->as_prune_state = prunestate;
156 488 : nplans = bms_num_members(validsubplans);
157 :
158 : /*
159 : * When no run-time pruning is required and there's at least one
160 : * subplan, we can fill as_valid_subplans immediately, preventing
161 : * later calls to ExecFindMatchingSubPlans.
162 : */
163 488 : if (!prunestate->do_exec_prune && nplans > 0)
164 : {
165 176 : appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
166 176 : appendstate->as_valid_subplans_identified = true;
167 : }
168 : }
169 : else
170 : {
171 12073 : nplans = list_length(node->appendplans);
172 :
173 : /*
174 : * When run-time partition pruning is not enabled we can just mark all
175 : * subplans as valid; they must also all be initialized.
176 : */
177 : Assert(nplans > 0);
178 12073 : appendstate->as_valid_subplans = validsubplans =
179 12073 : bms_add_range(NULL, 0, nplans - 1);
180 12073 : appendstate->as_valid_subplans_identified = true;
181 12073 : appendstate->as_prune_state = NULL;
182 : }
183 :
184 12561 : appendplanstates = (PlanState **) palloc(nplans *
185 : sizeof(PlanState *));
186 :
187 : /*
188 : * call ExecInitNode on each of the valid plans to be executed and save
189 : * the results into the appendplanstates array.
190 : *
191 : * While at it, find out the first valid partial plan.
192 : */
193 12561 : j = 0;
194 12561 : asyncplans = NULL;
195 12561 : nasyncplans = 0;
196 12561 : firstvalid = nplans;
197 12561 : i = -1;
198 52209 : while ((i = bms_next_member(validsubplans, i)) >= 0)
199 : {
200 39648 : Plan *initNode = (Plan *) list_nth(node->appendplans, i);
201 :
202 : /*
203 : * Record async subplans. When executing EvalPlanQual, we treat them
204 : * as sync ones; don't do this when initializing an EvalPlanQual plan
205 : * tree.
206 : */
207 39648 : if (initNode->async_capable && estate->es_epq_active == NULL)
208 : {
209 93 : asyncplans = bms_add_member(asyncplans, j);
210 93 : nasyncplans++;
211 : }
212 :
213 : /*
214 : * Record the lowest appendplans index which is a valid partial plan.
215 : */
216 39648 : if (i >= node->first_partial_plan && j < firstvalid)
217 333 : firstvalid = j;
218 :
219 39648 : appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
220 : }
221 :
222 12561 : appendstate->as_first_partial_plan = firstvalid;
223 12561 : appendstate->appendplans = appendplanstates;
224 12561 : appendstate->as_nplans = nplans;
225 :
226 : /*
227 : * Initialize Append's result tuple type and slot. If the child plans all
228 : * produce the same fixed slot type, we can use that slot type; otherwise
229 : * make a virtual slot. (Note that the result slot itself is used only to
230 : * return a null tuple at end of execution; real tuples are returned to
231 : * the caller in the children's own result slots. What we are doing here
232 : * is allowing the parent plan node to optimize if the Append will return
233 : * only one kind of slot.)
234 : */
235 12561 : appendops = ExecGetCommonSlotOps(appendplanstates, j);
236 12561 : if (appendops != NULL)
237 : {
238 11933 : ExecInitResultTupleSlotTL(&appendstate->ps, appendops);
239 : }
240 : else
241 : {
242 628 : ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
243 : /* show that the output slot type is not fixed */
244 628 : appendstate->ps.resultopsset = true;
245 628 : appendstate->ps.resultopsfixed = false;
246 : }
247 :
248 : /* Initialize async state */
249 12561 : appendstate->as_asyncplans = asyncplans;
250 12561 : appendstate->as_nasyncplans = nasyncplans;
251 12561 : appendstate->as_asyncrequests = NULL;
252 12561 : appendstate->as_asyncresults = NULL;
253 12561 : appendstate->as_nasyncresults = 0;
254 12561 : appendstate->as_nasyncremain = 0;
255 12561 : appendstate->as_needrequest = NULL;
256 12561 : appendstate->as_eventset = NULL;
257 12561 : appendstate->as_valid_asyncplans = NULL;
258 :
259 12561 : if (nasyncplans > 0)
260 : {
261 47 : appendstate->as_asyncrequests = (AsyncRequest **)
262 47 : palloc0(nplans * sizeof(AsyncRequest *));
263 :
264 47 : i = -1;
265 140 : while ((i = bms_next_member(asyncplans, i)) >= 0)
266 : {
267 : AsyncRequest *areq;
268 :
269 93 : areq = palloc_object(AsyncRequest);
270 93 : areq->requestor = (PlanState *) appendstate;
271 93 : areq->requestee = appendplanstates[i];
272 93 : areq->request_index = i;
273 93 : areq->callback_pending = false;
274 93 : areq->request_complete = false;
275 93 : areq->result = NULL;
276 :
277 93 : appendstate->as_asyncrequests[i] = areq;
278 : }
279 :
280 47 : appendstate->as_asyncresults = (TupleTableSlot **)
281 47 : palloc0(nasyncplans * sizeof(TupleTableSlot *));
282 :
283 47 : if (appendstate->as_valid_subplans_identified)
284 44 : classify_matching_subplans(appendstate);
285 : }
286 :
287 : /*
288 : * Miscellaneous initialization
289 : */
290 :
291 12561 : appendstate->ps.ps_ProjInfo = NULL;
292 :
293 : /* For parallel query, this will be overridden later. */
294 12561 : appendstate->choose_next_subplan = choose_next_subplan_locally;
295 :
296 12561 : return appendstate;
297 : }
298 :
299 : /* ----------------------------------------------------------------
300 : * ExecAppend
301 : *
302 : * Handles iteration over multiple subplans.
303 : * ----------------------------------------------------------------
304 : */
305 : static TupleTableSlot *
306 1871949 : ExecAppend(PlanState *pstate)
307 : {
308 1871949 : AppendState *node = castNode(AppendState, pstate);
309 : TupleTableSlot *result;
310 :
311 : /*
312 : * If this is the first call after Init or ReScan, we need to do the
313 : * initialization work.
314 : */
315 1871949 : if (!node->as_begun)
316 : {
317 : Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
318 : Assert(!node->as_syncdone);
319 :
320 : /* Nothing to do if there are no subplans */
321 40793 : if (node->as_nplans == 0)
322 40 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
323 :
324 : /* If there are any async subplans, begin executing them. */
325 40753 : if (node->as_nasyncplans > 0)
326 37 : ExecAppendAsyncBegin(node);
327 :
328 : /*
329 : * If no sync subplan has been chosen, we must choose one before
330 : * proceeding.
331 : */
332 40753 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
333 2158 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
334 :
335 : Assert(node->as_syncdone ||
336 : (node->as_whichplan >= 0 &&
337 : node->as_whichplan < node->as_nplans));
338 :
339 : /* And we're initialized. */
340 38595 : node->as_begun = true;
341 : }
342 :
343 : for (;;)
344 48756 : {
345 : PlanState *subnode;
346 :
347 1918507 : CHECK_FOR_INTERRUPTS();
348 :
349 : /*
350 : * try to get a tuple from an async subplan if any
351 : */
352 1918507 : if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
353 : {
354 6138 : if (ExecAppendAsyncGetNext(node, &result))
355 6137 : return result;
356 : Assert(!node->as_syncdone);
357 : Assert(bms_is_empty(node->as_needrequest));
358 : }
359 :
360 : /*
361 : * figure out which sync subplan we are currently processing
362 : */
363 : Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
364 1912369 : subnode = node->appendplans[node->as_whichplan];
365 :
366 : /*
367 : * get a tuple from the subplan
368 : */
369 1912369 : result = ExecProcNode(subnode);
370 :
371 1912335 : if (!TupIsNull(result))
372 : {
373 : /*
374 : * If the subplan gave us something then return it as-is. We do
375 : * NOT make use of the result slot that was set up in
376 : * ExecInitAppend; there's no need for it.
377 : */
378 1825429 : return result;
379 : }
380 :
381 : /*
382 : * wait or poll for async events if any. We do this before checking
383 : * for the end of iteration, because it might drain the remaining
384 : * async subplans.
385 : */
386 86906 : if (node->as_nasyncremain > 0)
387 17 : ExecAppendAsyncEventWait(node);
388 :
389 : /* choose new sync subplan; if no sync/async subplans, we're done */
390 86906 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
391 38150 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
392 : }
393 : }
394 :
395 : /* ----------------------------------------------------------------
396 : * ExecEndAppend
397 : *
398 : * Shuts down the subscans of the append node.
399 : *
400 : * Returns nothing of interest.
401 : * ----------------------------------------------------------------
402 : */
403 : void
404 12360 : ExecEndAppend(AppendState *node)
405 : {
406 : PlanState **appendplans;
407 : int nplans;
408 : int i;
409 :
410 : /*
411 : * get information from the node
412 : */
413 12360 : appendplans = node->appendplans;
414 12360 : nplans = node->as_nplans;
415 :
416 : /*
417 : * shut down each of the subscans
418 : */
419 51528 : for (i = 0; i < nplans; i++)
420 39168 : ExecEndNode(appendplans[i]);
421 12360 : }
422 :
423 : void
424 32812 : ExecReScanAppend(AppendState *node)
425 : {
426 32812 : int nasyncplans = node->as_nasyncplans;
427 : int i;
428 :
429 : /*
430 : * If any PARAM_EXEC Params used in pruning expressions have changed, then
431 : * we'd better unset the valid subplans so that they are reselected for
432 : * the new parameter values.
433 : */
434 34992 : if (node->as_prune_state &&
435 2180 : bms_overlap(node->ps.chgParam,
436 2180 : node->as_prune_state->execparamids))
437 : {
438 2178 : node->as_valid_subplans_identified = false;
439 2178 : bms_free(node->as_valid_subplans);
440 2178 : node->as_valid_subplans = NULL;
441 2178 : bms_free(node->as_valid_asyncplans);
442 2178 : node->as_valid_asyncplans = NULL;
443 : }
444 :
445 113841 : for (i = 0; i < node->as_nplans; i++)
446 : {
447 81029 : PlanState *subnode = node->appendplans[i];
448 :
449 : /*
450 : * ExecReScan doesn't know about my subplans, so I have to do
451 : * changed-parameter signaling myself.
452 : */
453 81029 : if (node->ps.chgParam != NULL)
454 37409 : UpdateChangedParamSet(subnode, node->ps.chgParam);
455 :
456 : /*
457 : * If chgParam of subnode is not null then plan will be re-scanned by
458 : * first ExecProcNode or by first ExecAsyncRequest.
459 : */
460 81029 : if (subnode->chgParam == NULL)
461 51305 : ExecReScan(subnode);
462 : }
463 :
464 : /* Reset async state */
465 32812 : if (nasyncplans > 0)
466 : {
467 17 : i = -1;
468 51 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
469 : {
470 34 : AsyncRequest *areq = node->as_asyncrequests[i];
471 :
472 34 : areq->callback_pending = false;
473 34 : areq->request_complete = false;
474 34 : areq->result = NULL;
475 : }
476 :
477 17 : node->as_nasyncresults = 0;
478 17 : node->as_nasyncremain = 0;
479 17 : bms_free(node->as_needrequest);
480 17 : node->as_needrequest = NULL;
481 : }
482 :
483 : /* Let choose_next_subplan_* function handle setting the first subplan */
484 32812 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
485 32812 : node->as_syncdone = false;
486 32812 : node->as_begun = false;
487 32812 : }
488 :
489 : /* ----------------------------------------------------------------
490 : * Parallel Append Support
491 : * ----------------------------------------------------------------
492 : */
493 :
494 : /* ----------------------------------------------------------------
495 : * ExecAppendEstimate
496 : *
497 : * Compute the amount of space we'll need in the parallel
498 : * query DSM, and inform pcxt->estimator about our needs.
499 : * ----------------------------------------------------------------
500 : */
501 : void
502 108 : ExecAppendEstimate(AppendState *node,
503 : ParallelContext *pcxt)
504 : {
505 108 : node->pstate_len =
506 108 : add_size(offsetof(ParallelAppendState, pa_finished),
507 108 : sizeof(bool) * node->as_nplans);
508 :
509 108 : shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
510 108 : shm_toc_estimate_keys(&pcxt->estimator, 1);
511 108 : }
512 :
513 :
514 : /* ----------------------------------------------------------------
515 : * ExecAppendInitializeDSM
516 : *
517 : * Set up shared state for Parallel Append.
518 : * ----------------------------------------------------------------
519 : */
520 : void
521 108 : ExecAppendInitializeDSM(AppendState *node,
522 : ParallelContext *pcxt)
523 : {
524 : ParallelAppendState *pstate;
525 :
526 108 : pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
527 108 : memset(pstate, 0, node->pstate_len);
528 108 : LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
529 108 : shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
530 :
531 108 : node->as_pstate = pstate;
532 108 : node->choose_next_subplan = choose_next_subplan_for_leader;
533 108 : }
534 :
535 : /* ----------------------------------------------------------------
536 : * ExecAppendReInitializeDSM
537 : *
538 : * Reset shared state before beginning a fresh scan.
539 : * ----------------------------------------------------------------
540 : */
541 : void
542 0 : ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
543 : {
544 0 : ParallelAppendState *pstate = node->as_pstate;
545 :
546 0 : pstate->pa_next_plan = 0;
547 0 : memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
548 0 : }
549 :
550 : /* ----------------------------------------------------------------
551 : * ExecAppendInitializeWorker
552 : *
553 : * Copy relevant information from TOC into planstate, and initialize
554 : * whatever is required to choose and execute the optimal subplan.
555 : * ----------------------------------------------------------------
556 : */
557 : void
558 241 : ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
559 : {
560 241 : node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
561 241 : node->choose_next_subplan = choose_next_subplan_for_worker;
562 241 : }
563 :
564 : /* ----------------------------------------------------------------
565 : * choose_next_subplan_locally
566 : *
567 : * Choose next sync subplan for a non-parallel-aware Append,
568 : * returning false if there are no more.
569 : * ----------------------------------------------------------------
570 : */
571 : static bool
572 126381 : choose_next_subplan_locally(AppendState *node)
573 : {
574 126381 : int whichplan = node->as_whichplan;
575 : int nextplan;
576 :
577 : /* We should never be called when there are no subplans */
578 : Assert(node->as_nplans > 0);
579 :
580 : /* Nothing to do if syncdone */
581 126381 : if (node->as_syncdone)
582 18 : return false;
583 :
584 : /*
585 : * If first call then have the bms member function choose the first valid
586 : * sync subplan by initializing whichplan to -1. If there happen to be no
587 : * valid sync subplans then the bms member function will handle that by
588 : * returning a negative number which will allow us to exit returning a
589 : * false value.
590 : */
591 126363 : if (whichplan == INVALID_SUBPLAN_INDEX)
592 : {
593 40422 : if (node->as_nasyncplans > 0)
594 : {
595 : /* We'd have filled as_valid_subplans already */
596 : Assert(node->as_valid_subplans_identified);
597 : }
598 40403 : else if (!node->as_valid_subplans_identified)
599 : {
600 2255 : node->as_valid_subplans =
601 2255 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
602 2255 : node->as_valid_subplans_identified = true;
603 : }
604 :
605 40422 : whichplan = -1;
606 : }
607 :
608 : /* Ensure whichplan is within the expected range */
609 : Assert(whichplan >= -1 && whichplan <= node->as_nplans);
610 :
611 126363 : if (ScanDirectionIsForward(node->ps.state->es_direction))
612 126351 : nextplan = bms_next_member(node->as_valid_subplans, whichplan);
613 : else
614 12 : nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
615 :
616 126363 : if (nextplan < 0)
617 : {
618 : /* Set as_syncdone if in async mode */
619 40012 : if (node->as_nasyncplans > 0)
620 17 : node->as_syncdone = true;
621 40012 : return false;
622 : }
623 :
624 86351 : node->as_whichplan = nextplan;
625 :
626 86351 : return true;
627 : }
628 :
629 : /* ----------------------------------------------------------------
630 : * choose_next_subplan_for_leader
631 : *
632 : * Try to pick a plan which doesn't commit us to doing much
633 : * work locally, so that as much work as possible is done in
634 : * the workers. Cheapest subplans are at the end.
635 : * ----------------------------------------------------------------
636 : */
637 : static bool
638 952 : choose_next_subplan_for_leader(AppendState *node)
639 : {
640 952 : ParallelAppendState *pstate = node->as_pstate;
641 :
642 : /* Backward scan is not supported by parallel-aware plans */
643 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
644 :
645 : /* We should never be called when there are no subplans */
646 : Assert(node->as_nplans > 0);
647 :
648 952 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
649 :
650 952 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
651 : {
652 : /* Mark just-completed subplan as finished. */
653 856 : node->as_pstate->pa_finished[node->as_whichplan] = true;
654 : }
655 : else
656 : {
657 : /* Start with last subplan. */
658 96 : node->as_whichplan = node->as_nplans - 1;
659 :
660 : /*
661 : * If we've yet to determine the valid subplans then do so now. If
662 : * run-time pruning is disabled then the valid subplans will always be
663 : * set to all subplans.
664 : */
665 96 : if (!node->as_valid_subplans_identified)
666 : {
667 16 : node->as_valid_subplans =
668 16 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
669 16 : node->as_valid_subplans_identified = true;
670 :
671 : /*
672 : * Mark each invalid plan as finished to allow the loop below to
673 : * select the first valid subplan.
674 : */
675 16 : mark_invalid_subplans_as_finished(node);
676 : }
677 : }
678 :
679 : /* Loop until we find a subplan to execute. */
680 1752 : while (pstate->pa_finished[node->as_whichplan])
681 : {
682 896 : if (node->as_whichplan == 0)
683 : {
684 96 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
685 96 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
686 96 : LWLockRelease(&pstate->pa_lock);
687 96 : return false;
688 : }
689 :
690 : /*
691 : * We needn't pay attention to as_valid_subplans here as all invalid
692 : * plans have been marked as finished.
693 : */
694 800 : node->as_whichplan--;
695 : }
696 :
697 : /* If non-partial, immediately mark as finished. */
698 856 : if (node->as_whichplan < node->as_first_partial_plan)
699 119 : node->as_pstate->pa_finished[node->as_whichplan] = true;
700 :
701 856 : LWLockRelease(&pstate->pa_lock);
702 :
703 856 : return true;
704 : }
705 :
706 : /* ----------------------------------------------------------------
707 : * choose_next_subplan_for_worker
708 : *
709 : * Choose next subplan for a parallel-aware Append, returning
710 : * false if there are no more.
711 : *
712 : * We start from the first plan and advance through the list;
713 : * when we get back to the end, we loop back to the first
714 : * partial plan. This assigns the non-partial plans first in
715 : * order of descending cost and then spreads out the workers
716 : * as evenly as possible across the remaining partial plans.
717 : * ----------------------------------------------------------------
718 : */
719 : static bool
720 326 : choose_next_subplan_for_worker(AppendState *node)
721 : {
722 326 : ParallelAppendState *pstate = node->as_pstate;
723 :
724 : /* Backward scan is not supported by parallel-aware plans */
725 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
726 :
727 : /* We should never be called when there are no subplans */
728 : Assert(node->as_nplans > 0);
729 :
730 326 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
731 :
732 : /* Mark just-completed subplan as finished. */
733 326 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
734 109 : node->as_pstate->pa_finished[node->as_whichplan] = true;
735 :
736 : /*
737 : * If we've yet to determine the valid subplans then do so now. If
738 : * run-time pruning is disabled then the valid subplans will always be set
739 : * to all subplans.
740 : */
741 217 : else if (!node->as_valid_subplans_identified)
742 : {
743 16 : node->as_valid_subplans =
744 16 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
745 16 : node->as_valid_subplans_identified = true;
746 :
747 16 : mark_invalid_subplans_as_finished(node);
748 : }
749 :
750 : /* If all the plans are already done, we have nothing to do */
751 326 : if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
752 : {
753 195 : LWLockRelease(&pstate->pa_lock);
754 195 : return false;
755 : }
756 :
757 : /* Save the plan from which we are starting the search. */
758 131 : node->as_whichplan = pstate->pa_next_plan;
759 :
760 : /* Loop until we find a valid subplan to execute. */
761 243 : while (pstate->pa_finished[pstate->pa_next_plan])
762 : {
763 : int nextplan;
764 :
765 134 : nextplan = bms_next_member(node->as_valid_subplans,
766 : pstate->pa_next_plan);
767 134 : if (nextplan >= 0)
768 : {
769 : /* Advance to the next valid plan. */
770 103 : pstate->pa_next_plan = nextplan;
771 : }
772 31 : else if (node->as_whichplan > node->as_first_partial_plan)
773 : {
774 : /*
775 : * Try looping back to the first valid partial plan, if there is
776 : * one. If there isn't, arrange to bail out below.
777 : */
778 16 : nextplan = bms_next_member(node->as_valid_subplans,
779 16 : node->as_first_partial_plan - 1);
780 16 : pstate->pa_next_plan =
781 16 : nextplan < 0 ? node->as_whichplan : nextplan;
782 : }
783 : else
784 : {
785 : /*
786 : * At last plan, and either there are no partial plans or we've
787 : * tried them all. Arrange to bail out.
788 : */
789 15 : pstate->pa_next_plan = node->as_whichplan;
790 : }
791 :
792 134 : if (pstate->pa_next_plan == node->as_whichplan)
793 : {
794 : /* We've tried everything! */
795 22 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
796 22 : LWLockRelease(&pstate->pa_lock);
797 22 : return false;
798 : }
799 : }
800 :
801 : /* Pick the plan we found, and advance pa_next_plan one more time. */
802 109 : node->as_whichplan = pstate->pa_next_plan;
803 109 : pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
804 : pstate->pa_next_plan);
805 :
806 : /*
807 : * If there are no more valid plans then try setting the next plan to the
808 : * first valid partial plan.
809 : */
810 109 : if (pstate->pa_next_plan < 0)
811 : {
812 25 : int nextplan = bms_next_member(node->as_valid_subplans,
813 25 : node->as_first_partial_plan - 1);
814 :
815 25 : if (nextplan >= 0)
816 25 : pstate->pa_next_plan = nextplan;
817 : else
818 : {
819 : /*
820 : * There are no valid partial plans, and we already chose the last
821 : * non-partial plan; so flag that there's nothing more for our
822 : * fellow workers to do.
823 : */
824 0 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
825 : }
826 : }
827 :
828 : /* If non-partial, immediately mark as finished. */
829 109 : if (node->as_whichplan < node->as_first_partial_plan)
830 21 : node->as_pstate->pa_finished[node->as_whichplan] = true;
831 :
832 109 : LWLockRelease(&pstate->pa_lock);
833 :
834 109 : return true;
835 : }
836 :
837 : /*
838 : * mark_invalid_subplans_as_finished
839 : * Marks the ParallelAppendState's pa_finished as true for each invalid
840 : * subplan.
841 : *
842 : * This function should only be called for parallel Append with run-time
843 : * pruning enabled.
844 : */
845 : static void
846 32 : mark_invalid_subplans_as_finished(AppendState *node)
847 : {
848 : int i;
849 :
850 : /* Only valid to call this while in parallel Append mode */
851 : Assert(node->as_pstate);
852 :
853 : /* Shouldn't have been called when run-time pruning is not enabled */
854 : Assert(node->as_prune_state);
855 :
856 : /* Nothing to do if all plans are valid */
857 32 : if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
858 0 : return;
859 :
860 : /* Mark all non-valid plans as finished */
861 108 : for (i = 0; i < node->as_nplans; i++)
862 : {
863 76 : if (!bms_is_member(i, node->as_valid_subplans))
864 32 : node->as_pstate->pa_finished[i] = true;
865 : }
866 : }
867 :
868 : /* ----------------------------------------------------------------
869 : * Asynchronous Append Support
870 : * ----------------------------------------------------------------
871 : */
872 :
873 : /* ----------------------------------------------------------------
874 : * ExecAppendAsyncBegin
875 : *
876 : * Begin executing designed async-capable subplans.
877 : * ----------------------------------------------------------------
878 : */
879 : static void
880 37 : ExecAppendAsyncBegin(AppendState *node)
881 : {
882 : int i;
883 :
884 : /* Backward scan is not supported by async-aware Appends. */
885 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
886 :
887 : /* We should never be called when there are no subplans */
888 : Assert(node->as_nplans > 0);
889 :
890 : /* We should never be called when there are no async subplans. */
891 : Assert(node->as_nasyncplans > 0);
892 :
893 : /* If we've yet to determine the valid subplans then do so now. */
894 37 : if (!node->as_valid_subplans_identified)
895 : {
896 2 : node->as_valid_subplans =
897 2 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
898 2 : node->as_valid_subplans_identified = true;
899 :
900 2 : classify_matching_subplans(node);
901 : }
902 :
903 : /* Initialize state variables. */
904 37 : node->as_syncdone = bms_is_empty(node->as_valid_subplans);
905 37 : node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
906 :
907 : /* Nothing to do if there are no valid async subplans. */
908 37 : if (node->as_nasyncremain == 0)
909 0 : return;
910 :
911 : /* Make a request for each of the valid async subplans. */
912 37 : i = -1;
913 109 : while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
914 : {
915 72 : AsyncRequest *areq = node->as_asyncrequests[i];
916 :
917 : Assert(areq->request_index == i);
918 : Assert(!areq->callback_pending);
919 :
920 : /* Do the actual work. */
921 72 : ExecAsyncRequest(areq);
922 : }
923 : }
924 :
925 : /* ----------------------------------------------------------------
926 : * ExecAppendAsyncGetNext
927 : *
928 : * Get the next tuple from any of the asynchronous subplans.
929 : * ----------------------------------------------------------------
930 : */
931 : static bool
932 6138 : ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
933 : {
934 6138 : *result = NULL;
935 :
936 : /* We should never be called when there are no valid async subplans. */
937 : Assert(node->as_nasyncremain > 0);
938 :
939 : /* Request a tuple asynchronously. */
940 6138 : if (ExecAppendAsyncRequest(node, result))
941 6036 : return true;
942 :
943 146 : while (node->as_nasyncremain > 0)
944 : {
945 115 : CHECK_FOR_INTERRUPTS();
946 :
947 : /* Wait or poll for async events. */
948 115 : ExecAppendAsyncEventWait(node);
949 :
950 : /* Request a tuple asynchronously. */
951 114 : if (ExecAppendAsyncRequest(node, result))
952 70 : return true;
953 :
954 : /* Break from loop if there's any sync subplan that isn't complete. */
955 44 : if (!node->as_syncdone)
956 0 : break;
957 : }
958 :
959 : /*
960 : * If all sync subplans are complete, we're totally done scanning the
961 : * given node. Otherwise, we're done with the asynchronous stuff but must
962 : * continue scanning the sync subplans.
963 : */
964 31 : if (node->as_syncdone)
965 : {
966 : Assert(node->as_nasyncremain == 0);
967 31 : *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
968 31 : return true;
969 : }
970 :
971 0 : return false;
972 : }
973 :
974 : /* ----------------------------------------------------------------
975 : * ExecAppendAsyncRequest
976 : *
977 : * Request a tuple asynchronously.
978 : * ----------------------------------------------------------------
979 : */
980 : static bool
981 6252 : ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
982 : {
983 : Bitmapset *needrequest;
984 : int i;
985 :
986 : /* Nothing to do if there are no async subplans needing a new request. */
987 6252 : if (bms_is_empty(node->as_needrequest))
988 : {
989 : Assert(node->as_nasyncresults == 0);
990 72 : return false;
991 : }
992 :
993 : /*
994 : * If there are any asynchronously-generated results that have not yet
995 : * been returned, we have nothing to do; just return one of them.
996 : */
997 6180 : if (node->as_nasyncresults > 0)
998 : {
999 1199 : --node->as_nasyncresults;
1000 1199 : *result = node->as_asyncresults[node->as_nasyncresults];
1001 1199 : return true;
1002 : }
1003 :
1004 : /* Make a new request for each of the async subplans that need it. */
1005 4981 : needrequest = node->as_needrequest;
1006 4981 : node->as_needrequest = NULL;
1007 4981 : i = -1;
1008 11084 : while ((i = bms_next_member(needrequest, i)) >= 0)
1009 : {
1010 6103 : AsyncRequest *areq = node->as_asyncrequests[i];
1011 :
1012 : /* Do the actual work. */
1013 6103 : ExecAsyncRequest(areq);
1014 : }
1015 4981 : bms_free(needrequest);
1016 :
1017 : /* Return one of the asynchronously-generated results if any. */
1018 4981 : if (node->as_nasyncresults > 0)
1019 : {
1020 4907 : --node->as_nasyncresults;
1021 4907 : *result = node->as_asyncresults[node->as_nasyncresults];
1022 4907 : return true;
1023 : }
1024 :
1025 74 : return false;
1026 : }
1027 :
1028 : /* ----------------------------------------------------------------
1029 : * ExecAppendAsyncEventWait
1030 : *
1031 : * Wait or poll for file descriptor events and fire callbacks.
1032 : * ----------------------------------------------------------------
1033 : */
1034 : static void
1035 132 : ExecAppendAsyncEventWait(AppendState *node)
1036 : {
1037 132 : int nevents = node->as_nasyncplans + 2;
1038 132 : long timeout = node->as_syncdone ? -1 : 0;
1039 : WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1040 : int noccurred;
1041 : int i;
1042 :
1043 : /* We should never be called when there are no valid async subplans. */
1044 : Assert(node->as_nasyncremain > 0);
1045 :
1046 : Assert(node->as_eventset == NULL);
1047 132 : node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
1048 132 : AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
1049 : NULL, NULL);
1050 :
1051 : /* Give each waiting subplan a chance to add an event. */
1052 132 : i = -1;
1053 402 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1054 : {
1055 271 : AsyncRequest *areq = node->as_asyncrequests[i];
1056 :
1057 271 : if (areq->callback_pending)
1058 230 : ExecAsyncConfigureWait(areq);
1059 : }
1060 :
1061 : /*
1062 : * No need for further processing if none of the subplans configured any
1063 : * events.
1064 : */
1065 131 : if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1066 : {
1067 1 : FreeWaitEventSet(node->as_eventset);
1068 1 : node->as_eventset = NULL;
1069 11 : return;
1070 : }
1071 :
1072 : /*
1073 : * Add the process latch to the set, so that we wake up to process the
1074 : * standard interrupts with CHECK_FOR_INTERRUPTS().
1075 : *
1076 : * NOTE: For historical reasons, it's important that this is added to the
1077 : * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely,
1078 : * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
1079 : * any other events are in the set. That's a poor design, it's
1080 : * questionable for postgres_fdw to be doing that in the first place, but
1081 : * we cannot change it now. The pattern has possibly been copied to other
1082 : * extensions too.
1083 : */
1084 130 : AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
1085 : MyLatch, NULL);
1086 :
1087 : /* Return at most EVENT_BUFFER_SIZE events in one call. */
1088 130 : if (nevents > EVENT_BUFFER_SIZE)
1089 0 : nevents = EVENT_BUFFER_SIZE;
1090 :
1091 : /*
1092 : * If the timeout is -1, wait until at least one event occurs. If the
1093 : * timeout is 0, poll for events, but do not wait at all.
1094 : */
1095 130 : noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1096 : nevents, WAIT_EVENT_APPEND_READY);
1097 130 : FreeWaitEventSet(node->as_eventset);
1098 130 : node->as_eventset = NULL;
1099 130 : if (noccurred == 0)
1100 10 : return;
1101 :
1102 : /* Deliver notifications. */
1103 268 : for (i = 0; i < noccurred; i++)
1104 : {
1105 148 : WaitEvent *w = &occurred_event[i];
1106 :
1107 : /*
1108 : * Each waiting subplan should have registered its wait event with
1109 : * user_data pointing back to its AsyncRequest.
1110 : */
1111 148 : if ((w->events & WL_SOCKET_READABLE) != 0)
1112 : {
1113 148 : AsyncRequest *areq = (AsyncRequest *) w->user_data;
1114 :
1115 148 : if (areq->callback_pending)
1116 : {
1117 : /*
1118 : * Mark it as no longer needing a callback. We must do this
1119 : * before dispatching the callback in case the callback resets
1120 : * the flag.
1121 : */
1122 148 : areq->callback_pending = false;
1123 :
1124 : /* Do the actual work. */
1125 148 : ExecAsyncNotify(areq);
1126 : }
1127 : }
1128 :
1129 : /* Handle standard interrupts */
1130 148 : if ((w->events & WL_LATCH_SET) != 0)
1131 : {
1132 0 : ResetLatch(MyLatch);
1133 0 : CHECK_FOR_INTERRUPTS();
1134 : }
1135 : }
1136 : }
1137 :
1138 : /* ----------------------------------------------------------------
1139 : * ExecAsyncAppendResponse
1140 : *
1141 : * Receive a response from an asynchronous request we made.
1142 : * ----------------------------------------------------------------
1143 : */
1144 : void
1145 6328 : ExecAsyncAppendResponse(AsyncRequest *areq)
1146 : {
1147 6328 : AppendState *node = (AppendState *) areq->requestor;
1148 6328 : TupleTableSlot *slot = areq->result;
1149 :
1150 : /* The result should be a TupleTableSlot or NULL. */
1151 : Assert(slot == NULL || IsA(slot, TupleTableSlot));
1152 :
1153 : /* Nothing to do if the request is pending. */
1154 6328 : if (!areq->request_complete)
1155 : {
1156 : /* The request would have been pending for a callback. */
1157 : Assert(areq->callback_pending);
1158 161 : return;
1159 : }
1160 :
1161 : /* If the result is NULL or an empty slot, there's nothing more to do. */
1162 6167 : if (TupIsNull(slot))
1163 : {
1164 : /* The ending subplan wouldn't have been pending for a callback. */
1165 : Assert(!areq->callback_pending);
1166 60 : --node->as_nasyncremain;
1167 60 : return;
1168 : }
1169 :
1170 : /* Save result so we can return it. */
1171 : Assert(node->as_nasyncresults < node->as_nasyncplans);
1172 6107 : node->as_asyncresults[node->as_nasyncresults++] = slot;
1173 :
1174 : /*
1175 : * Mark the subplan that returned a result as ready for a new request. We
1176 : * don't launch another one here immediately because it might complete.
1177 : */
1178 6107 : node->as_needrequest = bms_add_member(node->as_needrequest,
1179 : areq->request_index);
1180 : }
1181 :
1182 : /* ----------------------------------------------------------------
1183 : * classify_matching_subplans
1184 : *
1185 : * Classify the node's as_valid_subplans into sync ones and
1186 : * async ones, adjust it to contain sync ones only, and save
1187 : * async ones in the node's as_valid_asyncplans.
1188 : * ----------------------------------------------------------------
1189 : */
1190 : static void
1191 46 : classify_matching_subplans(AppendState *node)
1192 : {
1193 : Bitmapset *valid_asyncplans;
1194 :
1195 : Assert(node->as_valid_subplans_identified);
1196 : Assert(node->as_valid_asyncplans == NULL);
1197 :
1198 : /* Nothing to do if there are no valid subplans. */
1199 46 : if (bms_is_empty(node->as_valid_subplans))
1200 : {
1201 0 : node->as_syncdone = true;
1202 0 : node->as_nasyncremain = 0;
1203 0 : return;
1204 : }
1205 :
1206 : /* Nothing to do if there are no valid async subplans. */
1207 46 : if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1208 : {
1209 0 : node->as_nasyncremain = 0;
1210 0 : return;
1211 : }
1212 :
1213 : /* Get valid async subplans. */
1214 46 : valid_asyncplans = bms_intersect(node->as_asyncplans,
1215 46 : node->as_valid_subplans);
1216 :
1217 : /* Adjust the valid subplans to contain sync subplans only. */
1218 46 : node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
1219 : valid_asyncplans);
1220 :
1221 : /* Save valid async subplans. */
1222 46 : node->as_valid_asyncplans = valid_asyncplans;
1223 : }
|