Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeAppend.c
4 : * routines to handle append nodes.
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeAppend.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /* INTERFACE ROUTINES
16 : * ExecInitAppend - initialize the append node
17 : * ExecAppend - retrieve the next tuple from the node
18 : * ExecEndAppend - shut down the append node
19 : * ExecReScanAppend - rescan the append node
20 : *
21 : * NOTES
22 : * Each append node contains a list of one or more subplans which
23 : * must be iteratively processed (forwards or backwards).
24 : * Tuples are retrieved by executing the 'whichplan'th subplan
25 : * until the subplan stops returning tuples, at which point that
26 : * plan is shut down and the next started up.
27 : *
28 : * Append nodes don't make use of their left and right
29 : * subtrees, rather they maintain a list of subplans so
30 : * a typical append node looks like this in the plan tree:
31 : *
32 : * ...
33 : * /
34 : * Append -------+------+------+--- nil
35 : * / \ | | |
36 : * nil nil ... ... ...
37 : * subplans
38 : *
39 : * Append nodes are currently used for unions, and to support
40 : * inheritance queries, where several relations need to be scanned.
41 : * For example, in our standard person/student/employee/student-emp
42 : * example, where student and employee inherit from person
43 : * and student-emp inherits from student and employee, the
44 : * query:
45 : *
46 : * select name from person
47 : *
48 : * generates the plan:
49 : *
50 : * |
51 : * Append -------+-------+--------+--------+
52 : * / \ | | | |
53 : * nil nil Scan Scan Scan Scan
54 : * | | | |
55 : * person employee student student-emp
56 : */
57 :
58 : #include "postgres.h"
59 :
60 : #include "executor/execAsync.h"
61 : #include "executor/execPartition.h"
62 : #include "executor/executor.h"
63 : #include "executor/nodeAppend.h"
64 : #include "miscadmin.h"
65 : #include "pgstat.h"
66 : #include "storage/latch.h"
67 :
68 : /* Shared state for parallel-aware Append. */
69 : struct ParallelAppendState
70 : {
71 : LWLock pa_lock; /* mutual exclusion to choose next subplan */
72 : int pa_next_plan; /* next plan to choose by any worker */
73 :
74 : /*
75 : * pa_finished[i] should be true if no more workers should select subplan
76 : * i. for a non-partial plan, this should be set to true as soon as a
77 : * worker selects the plan; for a partial plan, it remains false until
78 : * some worker executes the plan to completion.
79 : */
80 : bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
81 : };
82 :
83 : #define INVALID_SUBPLAN_INDEX -1
84 : #define EVENT_BUFFER_SIZE 16
85 :
86 : static TupleTableSlot *ExecAppend(PlanState *pstate);
87 : static bool choose_next_subplan_locally(AppendState *node);
88 : static bool choose_next_subplan_for_leader(AppendState *node);
89 : static bool choose_next_subplan_for_worker(AppendState *node);
90 : static void mark_invalid_subplans_as_finished(AppendState *node);
91 : static void ExecAppendAsyncBegin(AppendState *node);
92 : static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
93 : static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
94 : static void ExecAppendAsyncEventWait(AppendState *node);
95 : static void classify_matching_subplans(AppendState *node);
96 :
97 : /* ----------------------------------------------------------------
98 : * ExecInitAppend
99 : *
100 : * Begin all of the subscans of the append node.
101 : *
102 : * (This is potentially wasteful, since the entire result of the
103 : * append node may not be scanned, but this way all of the
104 : * structures get allocated in the executor's top level memory
105 : * block instead of that of the call to ExecAppend.)
106 : * ----------------------------------------------------------------
107 : */
108 : AppendState *
109 14256 : ExecInitAppend(Append *node, EState *estate, int eflags)
110 : {
111 14256 : AppendState *appendstate = makeNode(AppendState);
112 : PlanState **appendplanstates;
113 : Bitmapset *validsubplans;
114 : Bitmapset *asyncplans;
115 : int nplans;
116 : int nasyncplans;
117 : int firstvalid;
118 : int i,
119 : j;
120 :
121 : /* check for unsupported flags */
122 : Assert(!(eflags & EXEC_FLAG_MARK));
123 :
124 : /*
125 : * create new AppendState for our append node
126 : */
127 14256 : appendstate->ps.plan = (Plan *) node;
128 14256 : appendstate->ps.state = estate;
129 14256 : appendstate->ps.ExecProcNode = ExecAppend;
130 :
131 : /* Let choose_next_subplan_* function handle setting the first subplan */
132 14256 : appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
133 14256 : appendstate->as_syncdone = false;
134 14256 : appendstate->as_begun = false;
135 :
136 : /* If run-time partition pruning is enabled, then set that up now */
137 14256 : if (node->part_prune_info != NULL)
138 : {
139 : PartitionPruneState *prunestate;
140 :
141 : /*
142 : * Set up pruning data structure. This also initializes the set of
143 : * subplans to initialize (validsubplans) by taking into account the
144 : * result of performing initial pruning if any.
145 : */
146 630 : prunestate = ExecInitPartitionPruning(&appendstate->ps,
147 630 : list_length(node->appendplans),
148 630 : node->part_prune_info,
149 : &validsubplans);
150 630 : appendstate->as_prune_state = prunestate;
151 630 : nplans = bms_num_members(validsubplans);
152 :
153 : /*
154 : * When no run-time pruning is required and there's at least one
155 : * subplan, we can fill as_valid_subplans immediately, preventing
156 : * later calls to ExecFindMatchingSubPlans.
157 : */
158 630 : if (!prunestate->do_exec_prune && nplans > 0)
159 : {
160 218 : appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
161 218 : appendstate->as_valid_subplans_identified = true;
162 : }
163 : }
164 : else
165 : {
166 13626 : nplans = list_length(node->appendplans);
167 :
168 : /*
169 : * When run-time partition pruning is not enabled we can just mark all
170 : * subplans as valid; they must also all be initialized.
171 : */
172 : Assert(nplans > 0);
173 13626 : appendstate->as_valid_subplans = validsubplans =
174 13626 : bms_add_range(NULL, 0, nplans - 1);
175 13626 : appendstate->as_valid_subplans_identified = true;
176 13626 : appendstate->as_prune_state = NULL;
177 : }
178 :
179 : /*
180 : * Initialize result tuple type and slot.
181 : */
182 14256 : ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
183 :
184 : /* node returns slots from each of its subnodes, therefore not fixed */
185 14256 : appendstate->ps.resultopsset = true;
186 14256 : appendstate->ps.resultopsfixed = false;
187 :
188 14256 : appendplanstates = (PlanState **) palloc(nplans *
189 : sizeof(PlanState *));
190 :
191 : /*
192 : * call ExecInitNode on each of the valid plans to be executed and save
193 : * the results into the appendplanstates array.
194 : *
195 : * While at it, find out the first valid partial plan.
196 : */
197 14256 : j = 0;
198 14256 : asyncplans = NULL;
199 14256 : nasyncplans = 0;
200 14256 : firstvalid = nplans;
201 14256 : i = -1;
202 55630 : while ((i = bms_next_member(validsubplans, i)) >= 0)
203 : {
204 41374 : Plan *initNode = (Plan *) list_nth(node->appendplans, i);
205 :
206 : /*
207 : * Record async subplans. When executing EvalPlanQual, we treat them
208 : * as sync ones; don't do this when initializing an EvalPlanQual plan
209 : * tree.
210 : */
211 41374 : if (initNode->async_capable && estate->es_epq_active == NULL)
212 : {
213 186 : asyncplans = bms_add_member(asyncplans, j);
214 186 : nasyncplans++;
215 : }
216 :
217 : /*
218 : * Record the lowest appendplans index which is a valid partial plan.
219 : */
220 41374 : if (i >= node->first_partial_plan && j < firstvalid)
221 450 : firstvalid = j;
222 :
223 41374 : appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
224 : }
225 :
226 14256 : appendstate->as_first_partial_plan = firstvalid;
227 14256 : appendstate->appendplans = appendplanstates;
228 14256 : appendstate->as_nplans = nplans;
229 :
230 : /* Initialize async state */
231 14256 : appendstate->as_asyncplans = asyncplans;
232 14256 : appendstate->as_nasyncplans = nasyncplans;
233 14256 : appendstate->as_asyncrequests = NULL;
234 14256 : appendstate->as_asyncresults = NULL;
235 14256 : appendstate->as_nasyncresults = 0;
236 14256 : appendstate->as_nasyncremain = 0;
237 14256 : appendstate->as_needrequest = NULL;
238 14256 : appendstate->as_eventset = NULL;
239 14256 : appendstate->as_valid_asyncplans = NULL;
240 :
241 14256 : if (nasyncplans > 0)
242 : {
243 94 : appendstate->as_asyncrequests = (AsyncRequest **)
244 94 : palloc0(nplans * sizeof(AsyncRequest *));
245 :
246 94 : i = -1;
247 280 : while ((i = bms_next_member(asyncplans, i)) >= 0)
248 : {
249 : AsyncRequest *areq;
250 :
251 186 : areq = palloc(sizeof(AsyncRequest));
252 186 : areq->requestor = (PlanState *) appendstate;
253 186 : areq->requestee = appendplanstates[i];
254 186 : areq->request_index = i;
255 186 : areq->callback_pending = false;
256 186 : areq->request_complete = false;
257 186 : areq->result = NULL;
258 :
259 186 : appendstate->as_asyncrequests[i] = areq;
260 : }
261 :
262 94 : appendstate->as_asyncresults = (TupleTableSlot **)
263 94 : palloc0(nasyncplans * sizeof(TupleTableSlot *));
264 :
265 94 : if (appendstate->as_valid_subplans_identified)
266 88 : classify_matching_subplans(appendstate);
267 : }
268 :
269 : /*
270 : * Miscellaneous initialization
271 : */
272 :
273 14256 : appendstate->ps.ps_ProjInfo = NULL;
274 :
275 : /* For parallel query, this will be overridden later. */
276 14256 : appendstate->choose_next_subplan = choose_next_subplan_locally;
277 :
278 14256 : return appendstate;
279 : }
280 :
281 : /* ----------------------------------------------------------------
282 : * ExecAppend
283 : *
284 : * Handles iteration over multiple subplans.
285 : * ----------------------------------------------------------------
286 : */
287 : static TupleTableSlot *
288 3490464 : ExecAppend(PlanState *pstate)
289 : {
290 3490464 : AppendState *node = castNode(AppendState, pstate);
291 : TupleTableSlot *result;
292 :
293 : /*
294 : * If this is the first call after Init or ReScan, we need to do the
295 : * initialization work.
296 : */
297 3490464 : if (!node->as_begun)
298 : {
299 : Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
300 : Assert(!node->as_syncdone);
301 :
302 : /* Nothing to do if there are no subplans */
303 32698 : if (node->as_nplans == 0)
304 36 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
305 :
306 : /* If there are any async subplans, begin executing them. */
307 32662 : if (node->as_nasyncplans > 0)
308 74 : ExecAppendAsyncBegin(node);
309 :
310 : /*
311 : * If no sync subplan has been chosen, we must choose one before
312 : * proceeding.
313 : */
314 32662 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
315 3268 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
316 :
317 : Assert(node->as_syncdone ||
318 : (node->as_whichplan >= 0 &&
319 : node->as_whichplan < node->as_nplans));
320 :
321 : /* And we're initialized. */
322 29394 : node->as_begun = true;
323 : }
324 :
325 : for (;;)
326 37390 : {
327 : PlanState *subnode;
328 :
329 3524550 : CHECK_FOR_INTERRUPTS();
330 :
331 : /*
332 : * try to get a tuple from an async subplan if any
333 : */
334 3524550 : if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
335 : {
336 12276 : if (ExecAppendAsyncGetNext(node, &result))
337 12274 : return result;
338 : Assert(!node->as_syncdone);
339 : Assert(bms_is_empty(node->as_needrequest));
340 : }
341 :
342 : /*
343 : * figure out which sync subplan we are currently processing
344 : */
345 : Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
346 3512274 : subnode = node->appendplans[node->as_whichplan];
347 :
348 : /*
349 : * get a tuple from the subplan
350 : */
351 3512274 : result = ExecProcNode(subnode);
352 :
353 3512222 : if (!TupIsNull(result))
354 : {
355 : /*
356 : * If the subplan gave us something then return it as-is. We do
357 : * NOT make use of the result slot that was set up in
358 : * ExecInitAppend; there's no need for it.
359 : */
360 3446106 : return result;
361 : }
362 :
363 : /*
364 : * wait or poll for async events if any. We do this before checking
365 : * for the end of iteration, because it might drain the remaining
366 : * async subplans.
367 : */
368 66116 : if (node->as_nasyncremain > 0)
369 34 : ExecAppendAsyncEventWait(node);
370 :
371 : /* choose new sync subplan; if no sync/async subplans, we're done */
372 66116 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
373 28726 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
374 : }
375 : }
376 :
377 : /* ----------------------------------------------------------------
378 : * ExecEndAppend
379 : *
380 : * Shuts down the subscans of the append node.
381 : *
382 : * Returns nothing of interest.
383 : * ----------------------------------------------------------------
384 : */
385 : void
386 13966 : ExecEndAppend(AppendState *node)
387 : {
388 : PlanState **appendplans;
389 : int nplans;
390 : int i;
391 :
392 : /*
393 : * get information from the node
394 : */
395 13966 : appendplans = node->appendplans;
396 13966 : nplans = node->as_nplans;
397 :
398 : /*
399 : * shut down each of the subscans
400 : */
401 54634 : for (i = 0; i < nplans; i++)
402 40668 : ExecEndNode(appendplans[i]);
403 13966 : }
404 :
405 : void
406 22692 : ExecReScanAppend(AppendState *node)
407 : {
408 22692 : int nasyncplans = node->as_nasyncplans;
409 : int i;
410 :
411 : /*
412 : * If any PARAM_EXEC Params used in pruning expressions have changed, then
413 : * we'd better unset the valid subplans so that they are reselected for
414 : * the new parameter values.
415 : */
416 25960 : if (node->as_prune_state &&
417 3268 : bms_overlap(node->ps.chgParam,
418 3268 : node->as_prune_state->execparamids))
419 : {
420 3268 : node->as_valid_subplans_identified = false;
421 3268 : bms_free(node->as_valid_subplans);
422 3268 : node->as_valid_subplans = NULL;
423 3268 : bms_free(node->as_valid_asyncplans);
424 3268 : node->as_valid_asyncplans = NULL;
425 : }
426 :
427 90906 : for (i = 0; i < node->as_nplans; i++)
428 : {
429 68214 : PlanState *subnode = node->appendplans[i];
430 :
431 : /*
432 : * ExecReScan doesn't know about my subplans, so I have to do
433 : * changed-parameter signaling myself.
434 : */
435 68214 : if (node->ps.chgParam != NULL)
436 55708 : UpdateChangedParamSet(subnode, node->ps.chgParam);
437 :
438 : /*
439 : * If chgParam of subnode is not null then plan will be re-scanned by
440 : * first ExecProcNode or by first ExecAsyncRequest.
441 : */
442 68214 : if (subnode->chgParam == NULL)
443 23574 : ExecReScan(subnode);
444 : }
445 :
446 : /* Reset async state */
447 22692 : if (nasyncplans > 0)
448 : {
449 34 : i = -1;
450 102 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
451 : {
452 68 : AsyncRequest *areq = node->as_asyncrequests[i];
453 :
454 68 : areq->callback_pending = false;
455 68 : areq->request_complete = false;
456 68 : areq->result = NULL;
457 : }
458 :
459 34 : node->as_nasyncresults = 0;
460 34 : node->as_nasyncremain = 0;
461 34 : bms_free(node->as_needrequest);
462 34 : node->as_needrequest = NULL;
463 : }
464 :
465 : /* Let choose_next_subplan_* function handle setting the first subplan */
466 22692 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
467 22692 : node->as_syncdone = false;
468 22692 : node->as_begun = false;
469 22692 : }
470 :
471 : /* ----------------------------------------------------------------
472 : * Parallel Append Support
473 : * ----------------------------------------------------------------
474 : */
475 :
476 : /* ----------------------------------------------------------------
477 : * ExecAppendEstimate
478 : *
479 : * Compute the amount of space we'll need in the parallel
480 : * query DSM, and inform pcxt->estimator about our needs.
481 : * ----------------------------------------------------------------
482 : */
483 : void
484 138 : ExecAppendEstimate(AppendState *node,
485 : ParallelContext *pcxt)
486 : {
487 138 : node->pstate_len =
488 138 : add_size(offsetof(ParallelAppendState, pa_finished),
489 138 : sizeof(bool) * node->as_nplans);
490 :
491 138 : shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
492 138 : shm_toc_estimate_keys(&pcxt->estimator, 1);
493 138 : }
494 :
495 :
496 : /* ----------------------------------------------------------------
497 : * ExecAppendInitializeDSM
498 : *
499 : * Set up shared state for Parallel Append.
500 : * ----------------------------------------------------------------
501 : */
502 : void
503 138 : ExecAppendInitializeDSM(AppendState *node,
504 : ParallelContext *pcxt)
505 : {
506 : ParallelAppendState *pstate;
507 :
508 138 : pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
509 138 : memset(pstate, 0, node->pstate_len);
510 138 : LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
511 138 : shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
512 :
513 138 : node->as_pstate = pstate;
514 138 : node->choose_next_subplan = choose_next_subplan_for_leader;
515 138 : }
516 :
517 : /* ----------------------------------------------------------------
518 : * ExecAppendReInitializeDSM
519 : *
520 : * Reset shared state before beginning a fresh scan.
521 : * ----------------------------------------------------------------
522 : */
523 : void
524 0 : ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
525 : {
526 0 : ParallelAppendState *pstate = node->as_pstate;
527 :
528 0 : pstate->pa_next_plan = 0;
529 0 : memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
530 0 : }
531 :
532 : /* ----------------------------------------------------------------
533 : * ExecAppendInitializeWorker
534 : *
535 : * Copy relevant information from TOC into planstate, and initialize
536 : * whatever is required to choose and execute the optimal subplan.
537 : * ----------------------------------------------------------------
538 : */
539 : void
540 318 : ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
541 : {
542 318 : node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
543 318 : node->choose_next_subplan = choose_next_subplan_for_worker;
544 318 : }
545 :
546 : /* ----------------------------------------------------------------
547 : * choose_next_subplan_locally
548 : *
549 : * Choose next sync subplan for a non-parallel-aware Append,
550 : * returning false if there are no more.
551 : * ----------------------------------------------------------------
552 : */
553 : static bool
554 97914 : choose_next_subplan_locally(AppendState *node)
555 : {
556 97914 : int whichplan = node->as_whichplan;
557 : int nextplan;
558 :
559 : /* We should never be called when there are no subplans */
560 : Assert(node->as_nplans > 0);
561 :
562 : /* Nothing to do if syncdone */
563 97914 : if (node->as_syncdone)
564 36 : return false;
565 :
566 : /*
567 : * If first call then have the bms member function choose the first valid
568 : * sync subplan by initializing whichplan to -1. If there happen to be no
569 : * valid sync subplans then the bms member function will handle that by
570 : * returning a negative number which will allow us to exit returning a
571 : * false value.
572 : */
573 97878 : if (whichplan == INVALID_SUBPLAN_INDEX)
574 : {
575 32224 : if (node->as_nasyncplans > 0)
576 : {
577 : /* We'd have filled as_valid_subplans already */
578 : Assert(node->as_valid_subplans_identified);
579 : }
580 32186 : else if (!node->as_valid_subplans_identified)
581 : {
582 3382 : node->as_valid_subplans =
583 3382 : ExecFindMatchingSubPlans(node->as_prune_state, false);
584 3382 : node->as_valid_subplans_identified = true;
585 : }
586 :
587 32224 : whichplan = -1;
588 : }
589 :
590 : /* Ensure whichplan is within the expected range */
591 : Assert(whichplan >= -1 && whichplan <= node->as_nplans);
592 :
593 97878 : if (ScanDirectionIsForward(node->ps.state->es_direction))
594 97860 : nextplan = bms_next_member(node->as_valid_subplans, whichplan);
595 : else
596 18 : nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
597 :
598 97878 : if (nextplan < 0)
599 : {
600 : /* Set as_syncdone if in async mode */
601 31626 : if (node->as_nasyncplans > 0)
602 34 : node->as_syncdone = true;
603 31626 : return false;
604 : }
605 :
606 66252 : node->as_whichplan = nextplan;
607 :
608 66252 : return true;
609 : }
610 :
611 : /* ----------------------------------------------------------------
612 : * choose_next_subplan_for_leader
613 : *
614 : * Try to pick a plan which doesn't commit us to doing much
615 : * work locally, so that as much work as possible is done in
616 : * the workers. Cheapest subplans are at the end.
617 : * ----------------------------------------------------------------
618 : */
619 : static bool
620 514 : choose_next_subplan_for_leader(AppendState *node)
621 : {
622 514 : ParallelAppendState *pstate = node->as_pstate;
623 :
624 : /* Backward scan is not supported by parallel-aware plans */
625 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
626 :
627 : /* We should never be called when there are no subplans */
628 : Assert(node->as_nplans > 0);
629 :
630 514 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
631 :
632 514 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
633 : {
634 : /* Mark just-completed subplan as finished. */
635 394 : node->as_pstate->pa_finished[node->as_whichplan] = true;
636 : }
637 : else
638 : {
639 : /* Start with last subplan. */
640 120 : node->as_whichplan = node->as_nplans - 1;
641 :
642 : /*
643 : * If we've yet to determine the valid subplans then do so now. If
644 : * run-time pruning is disabled then the valid subplans will always be
645 : * set to all subplans.
646 : */
647 120 : if (!node->as_valid_subplans_identified)
648 : {
649 24 : node->as_valid_subplans =
650 24 : ExecFindMatchingSubPlans(node->as_prune_state, false);
651 24 : node->as_valid_subplans_identified = true;
652 :
653 : /*
654 : * Mark each invalid plan as finished to allow the loop below to
655 : * select the first valid subplan.
656 : */
657 24 : mark_invalid_subplans_as_finished(node);
658 : }
659 : }
660 :
661 : /* Loop until we find a subplan to execute. */
662 814 : while (pstate->pa_finished[node->as_whichplan])
663 : {
664 420 : if (node->as_whichplan == 0)
665 : {
666 120 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
667 120 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
668 120 : LWLockRelease(&pstate->pa_lock);
669 120 : return false;
670 : }
671 :
672 : /*
673 : * We needn't pay attention to as_valid_subplans here as all invalid
674 : * plans have been marked as finished.
675 : */
676 300 : node->as_whichplan--;
677 : }
678 :
679 : /* If non-partial, immediately mark as finished. */
680 394 : if (node->as_whichplan < node->as_first_partial_plan)
681 136 : node->as_pstate->pa_finished[node->as_whichplan] = true;
682 :
683 394 : LWLockRelease(&pstate->pa_lock);
684 :
685 394 : return true;
686 : }
687 :
688 : /* ----------------------------------------------------------------
689 : * choose_next_subplan_for_worker
690 : *
691 : * Choose next subplan for a parallel-aware Append, returning
692 : * false if there are no more.
693 : *
694 : * We start from the first plan and advance through the list;
695 : * when we get back to the end, we loop back to the first
696 : * partial plan. This assigns the non-partial plans first in
697 : * order of descending cost and then spreads out the workers
698 : * as evenly as possible across the remaining partial plans.
699 : * ----------------------------------------------------------------
700 : */
701 : static bool
702 350 : choose_next_subplan_for_worker(AppendState *node)
703 : {
704 350 : ParallelAppendState *pstate = node->as_pstate;
705 :
706 : /* Backward scan is not supported by parallel-aware plans */
707 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
708 :
709 : /* We should never be called when there are no subplans */
710 : Assert(node->as_nplans > 0);
711 :
712 350 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
713 :
714 : /* Mark just-completed subplan as finished. */
715 350 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
716 68 : node->as_pstate->pa_finished[node->as_whichplan] = true;
717 :
718 : /*
719 : * If we've yet to determine the valid subplans then do so now. If
720 : * run-time pruning is disabled then the valid subplans will always be set
721 : * to all subplans.
722 : */
723 282 : else if (!node->as_valid_subplans_identified)
724 : {
725 24 : node->as_valid_subplans =
726 24 : ExecFindMatchingSubPlans(node->as_prune_state, false);
727 24 : node->as_valid_subplans_identified = true;
728 :
729 24 : mark_invalid_subplans_as_finished(node);
730 : }
731 :
732 : /* If all the plans are already done, we have nothing to do */
733 350 : if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
734 : {
735 254 : LWLockRelease(&pstate->pa_lock);
736 254 : return false;
737 : }
738 :
739 : /* Save the plan from which we are starting the search. */
740 96 : node->as_whichplan = pstate->pa_next_plan;
741 :
742 : /* Loop until we find a valid subplan to execute. */
743 204 : while (pstate->pa_finished[pstate->pa_next_plan])
744 : {
745 : int nextplan;
746 :
747 136 : nextplan = bms_next_member(node->as_valid_subplans,
748 : pstate->pa_next_plan);
749 136 : if (nextplan >= 0)
750 : {
751 : /* Advance to the next valid plan. */
752 108 : pstate->pa_next_plan = nextplan;
753 : }
754 28 : else if (node->as_whichplan > node->as_first_partial_plan)
755 : {
756 : /*
757 : * Try looping back to the first valid partial plan, if there is
758 : * one. If there isn't, arrange to bail out below.
759 : */
760 6 : nextplan = bms_next_member(node->as_valid_subplans,
761 6 : node->as_first_partial_plan - 1);
762 6 : pstate->pa_next_plan =
763 6 : nextplan < 0 ? node->as_whichplan : nextplan;
764 : }
765 : else
766 : {
767 : /*
768 : * At last plan, and either there are no partial plans or we've
769 : * tried them all. Arrange to bail out.
770 : */
771 22 : pstate->pa_next_plan = node->as_whichplan;
772 : }
773 :
774 136 : if (pstate->pa_next_plan == node->as_whichplan)
775 : {
776 : /* We've tried everything! */
777 28 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
778 28 : LWLockRelease(&pstate->pa_lock);
779 28 : return false;
780 : }
781 : }
782 :
783 : /* Pick the plan we found, and advance pa_next_plan one more time. */
784 68 : node->as_whichplan = pstate->pa_next_plan;
785 68 : pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
786 : pstate->pa_next_plan);
787 :
788 : /*
789 : * If there are no more valid plans then try setting the next plan to the
790 : * first valid partial plan.
791 : */
792 68 : if (pstate->pa_next_plan < 0)
793 : {
794 20 : int nextplan = bms_next_member(node->as_valid_subplans,
795 20 : node->as_first_partial_plan - 1);
796 :
797 20 : if (nextplan >= 0)
798 20 : pstate->pa_next_plan = nextplan;
799 : else
800 : {
801 : /*
802 : * There are no valid partial plans, and we already chose the last
803 : * non-partial plan; so flag that there's nothing more for our
804 : * fellow workers to do.
805 : */
806 0 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
807 : }
808 : }
809 :
810 : /* If non-partial, immediately mark as finished. */
811 68 : if (node->as_whichplan < node->as_first_partial_plan)
812 2 : node->as_pstate->pa_finished[node->as_whichplan] = true;
813 :
814 68 : LWLockRelease(&pstate->pa_lock);
815 :
816 68 : return true;
817 : }
818 :
819 : /*
820 : * mark_invalid_subplans_as_finished
821 : * Marks the ParallelAppendState's pa_finished as true for each invalid
822 : * subplan.
823 : *
824 : * This function should only be called for parallel Append with run-time
825 : * pruning enabled.
826 : */
827 : static void
828 48 : mark_invalid_subplans_as_finished(AppendState *node)
829 : {
830 : int i;
831 :
832 : /* Only valid to call this while in parallel Append mode */
833 : Assert(node->as_pstate);
834 :
835 : /* Shouldn't have been called when run-time pruning is not enabled */
836 : Assert(node->as_prune_state);
837 :
838 : /* Nothing to do if all plans are valid */
839 48 : if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
840 0 : return;
841 :
842 : /* Mark all non-valid plans as finished */
843 162 : for (i = 0; i < node->as_nplans; i++)
844 : {
845 114 : if (!bms_is_member(i, node->as_valid_subplans))
846 48 : node->as_pstate->pa_finished[i] = true;
847 : }
848 : }
849 :
850 : /* ----------------------------------------------------------------
851 : * Asynchronous Append Support
852 : * ----------------------------------------------------------------
853 : */
854 :
855 : /* ----------------------------------------------------------------
856 : * ExecAppendAsyncBegin
857 : *
858 : * Begin executing designed async-capable subplans.
859 : * ----------------------------------------------------------------
860 : */
861 : static void
862 74 : ExecAppendAsyncBegin(AppendState *node)
863 : {
864 : int i;
865 :
866 : /* Backward scan is not supported by async-aware Appends. */
867 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
868 :
869 : /* We should never be called when there are no subplans */
870 : Assert(node->as_nplans > 0);
871 :
872 : /* We should never be called when there are no async subplans. */
873 : Assert(node->as_nasyncplans > 0);
874 :
875 : /* If we've yet to determine the valid subplans then do so now. */
876 74 : if (!node->as_valid_subplans_identified)
877 : {
878 4 : node->as_valid_subplans =
879 4 : ExecFindMatchingSubPlans(node->as_prune_state, false);
880 4 : node->as_valid_subplans_identified = true;
881 :
882 4 : classify_matching_subplans(node);
883 : }
884 :
885 : /* Initialize state variables. */
886 74 : node->as_syncdone = bms_is_empty(node->as_valid_subplans);
887 74 : node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
888 :
889 : /* Nothing to do if there are no valid async subplans. */
890 74 : if (node->as_nasyncremain == 0)
891 0 : return;
892 :
893 : /* Make a request for each of the valid async subplans. */
894 74 : i = -1;
895 218 : while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
896 : {
897 144 : AsyncRequest *areq = node->as_asyncrequests[i];
898 :
899 : Assert(areq->request_index == i);
900 : Assert(!areq->callback_pending);
901 :
902 : /* Do the actual work. */
903 144 : ExecAsyncRequest(areq);
904 : }
905 : }
906 :
907 : /* ----------------------------------------------------------------
908 : * ExecAppendAsyncGetNext
909 : *
910 : * Get the next tuple from any of the asynchronous subplans.
911 : * ----------------------------------------------------------------
912 : */
913 : static bool
914 12276 : ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
915 : {
916 12276 : *result = NULL;
917 :
918 : /* We should never be called when there are no valid async subplans. */
919 : Assert(node->as_nasyncremain > 0);
920 :
921 : /* Request a tuple asynchronously. */
922 12276 : if (ExecAppendAsyncRequest(node, result))
923 12116 : return true;
924 :
925 234 : while (node->as_nasyncremain > 0)
926 : {
927 172 : CHECK_FOR_INTERRUPTS();
928 :
929 : /* Wait or poll for async events. */
930 172 : ExecAppendAsyncEventWait(node);
931 :
932 : /* Request a tuple asynchronously. */
933 170 : if (ExecAppendAsyncRequest(node, result))
934 96 : return true;
935 :
936 : /* Break from loop if there's any sync subplan that isn't complete. */
937 74 : if (!node->as_syncdone)
938 0 : break;
939 : }
940 :
941 : /*
942 : * If all sync subplans are complete, we're totally done scanning the
943 : * given node. Otherwise, we're done with the asynchronous stuff but must
944 : * continue scanning the sync subplans.
945 : */
946 62 : if (node->as_syncdone)
947 : {
948 : Assert(node->as_nasyncremain == 0);
949 62 : *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
950 62 : return true;
951 : }
952 :
953 0 : return false;
954 : }
955 :
956 : /* ----------------------------------------------------------------
957 : * ExecAppendAsyncRequest
958 : *
959 : * Request a tuple asynchronously.
960 : * ----------------------------------------------------------------
961 : */
962 : static bool
963 12446 : ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
964 : {
965 : Bitmapset *needrequest;
966 : int i;
967 :
968 : /* Nothing to do if there are no async subplans needing a new request. */
969 12446 : if (bms_is_empty(node->as_needrequest))
970 : {
971 : Assert(node->as_nasyncresults == 0);
972 114 : return false;
973 : }
974 :
975 : /*
976 : * If there are any asynchronously-generated results that have not yet
977 : * been returned, we have nothing to do; just return one of them.
978 : */
979 12332 : if (node->as_nasyncresults > 0)
980 : {
981 5170 : --node->as_nasyncresults;
982 5170 : *result = node->as_asyncresults[node->as_nasyncresults];
983 5170 : return true;
984 : }
985 :
986 : /* Make a new request for each of the async subplans that need it. */
987 7162 : needrequest = node->as_needrequest;
988 7162 : node->as_needrequest = NULL;
989 7162 : i = -1;
990 19368 : while ((i = bms_next_member(needrequest, i)) >= 0)
991 : {
992 12206 : AsyncRequest *areq = node->as_asyncrequests[i];
993 :
994 : /* Do the actual work. */
995 12206 : ExecAsyncRequest(areq);
996 : }
997 7162 : bms_free(needrequest);
998 :
999 : /* Return one of the asynchronously-generated results if any. */
1000 7162 : if (node->as_nasyncresults > 0)
1001 : {
1002 7042 : --node->as_nasyncresults;
1003 7042 : *result = node->as_asyncresults[node->as_nasyncresults];
1004 7042 : return true;
1005 : }
1006 :
1007 120 : return false;
1008 : }
1009 :
1010 : /* ----------------------------------------------------------------
1011 : * ExecAppendAsyncEventWait
1012 : *
1013 : * Wait or poll for file descriptor events and fire callbacks.
1014 : * ----------------------------------------------------------------
1015 : */
1016 : static void
1017 206 : ExecAppendAsyncEventWait(AppendState *node)
1018 : {
1019 206 : int nevents = node->as_nasyncplans + 1;
1020 206 : long timeout = node->as_syncdone ? -1 : 0;
1021 : WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1022 : int noccurred;
1023 : int i;
1024 :
1025 : /* We should never be called when there are no valid async subplans. */
1026 : Assert(node->as_nasyncremain > 0);
1027 :
1028 : Assert(node->as_eventset == NULL);
1029 206 : node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
1030 206 : AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
1031 : NULL, NULL);
1032 :
1033 : /* Give each waiting subplan a chance to add an event. */
1034 206 : i = -1;
1035 628 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1036 : {
1037 424 : AsyncRequest *areq = node->as_asyncrequests[i];
1038 :
1039 424 : if (areq->callback_pending)
1040 368 : ExecAsyncConfigureWait(areq);
1041 : }
1042 :
1043 : /*
1044 : * No need for further processing if there are no configured events other
1045 : * than the postmaster death event.
1046 : */
1047 204 : if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1048 : {
1049 2 : FreeWaitEventSet(node->as_eventset);
1050 2 : node->as_eventset = NULL;
1051 4 : return;
1052 : }
1053 :
1054 : /* Return at most EVENT_BUFFER_SIZE events in one call. */
1055 202 : if (nevents > EVENT_BUFFER_SIZE)
1056 0 : nevents = EVENT_BUFFER_SIZE;
1057 :
1058 : /*
1059 : * If the timeout is -1, wait until at least one event occurs. If the
1060 : * timeout is 0, poll for events, but do not wait at all.
1061 : */
1062 202 : noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1063 : nevents, WAIT_EVENT_APPEND_READY);
1064 202 : FreeWaitEventSet(node->as_eventset);
1065 202 : node->as_eventset = NULL;
1066 202 : if (noccurred == 0)
1067 2 : return;
1068 :
1069 : /* Deliver notifications. */
1070 502 : for (i = 0; i < noccurred; i++)
1071 : {
1072 302 : WaitEvent *w = &occurred_event[i];
1073 :
1074 : /*
1075 : * Each waiting subplan should have registered its wait event with
1076 : * user_data pointing back to its AsyncRequest.
1077 : */
1078 302 : if ((w->events & WL_SOCKET_READABLE) != 0)
1079 : {
1080 302 : AsyncRequest *areq = (AsyncRequest *) w->user_data;
1081 :
1082 302 : if (areq->callback_pending)
1083 : {
1084 : /*
1085 : * Mark it as no longer needing a callback. We must do this
1086 : * before dispatching the callback in case the callback resets
1087 : * the flag.
1088 : */
1089 302 : areq->callback_pending = false;
1090 :
1091 : /* Do the actual work. */
1092 302 : ExecAsyncNotify(areq);
1093 : }
1094 : }
1095 : }
1096 : }
1097 :
1098 : /* ----------------------------------------------------------------
1099 : * ExecAsyncAppendResponse
1100 : *
1101 : * Receive a response from an asynchronous request we made.
1102 : * ----------------------------------------------------------------
1103 : */
1104 : void
1105 12658 : ExecAsyncAppendResponse(AsyncRequest *areq)
1106 : {
1107 12658 : AppendState *node = (AppendState *) areq->requestor;
1108 12658 : TupleTableSlot *slot = areq->result;
1109 :
1110 : /* The result should be a TupleTableSlot or NULL. */
1111 : Assert(slot == NULL || IsA(slot, TupleTableSlot));
1112 :
1113 : /* Nothing to do if the request is pending. */
1114 12658 : if (!areq->request_complete)
1115 : {
1116 : /* The request would have been pending for a callback. */
1117 : Assert(areq->callback_pending);
1118 322 : return;
1119 : }
1120 :
1121 : /* If the result is NULL or an empty slot, there's nothing more to do. */
1122 12336 : if (TupIsNull(slot))
1123 : {
1124 : /* The ending subplan wouldn't have been pending for a callback. */
1125 : Assert(!areq->callback_pending);
1126 122 : --node->as_nasyncremain;
1127 122 : return;
1128 : }
1129 :
1130 : /* Save result so we can return it. */
1131 : Assert(node->as_nasyncresults < node->as_nasyncplans);
1132 12214 : node->as_asyncresults[node->as_nasyncresults++] = slot;
1133 :
1134 : /*
1135 : * Mark the subplan that returned a result as ready for a new request. We
1136 : * don't launch another one here immediately because it might complete.
1137 : */
1138 12214 : node->as_needrequest = bms_add_member(node->as_needrequest,
1139 : areq->request_index);
1140 : }
1141 :
1142 : /* ----------------------------------------------------------------
1143 : * classify_matching_subplans
1144 : *
1145 : * Classify the node's as_valid_subplans into sync ones and
1146 : * async ones, adjust it to contain sync ones only, and save
1147 : * async ones in the node's as_valid_asyncplans.
1148 : * ----------------------------------------------------------------
1149 : */
1150 : static void
1151 92 : classify_matching_subplans(AppendState *node)
1152 : {
1153 : Bitmapset *valid_asyncplans;
1154 :
1155 : Assert(node->as_valid_subplans_identified);
1156 : Assert(node->as_valid_asyncplans == NULL);
1157 :
1158 : /* Nothing to do if there are no valid subplans. */
1159 92 : if (bms_is_empty(node->as_valid_subplans))
1160 : {
1161 0 : node->as_syncdone = true;
1162 0 : node->as_nasyncremain = 0;
1163 0 : return;
1164 : }
1165 :
1166 : /* Nothing to do if there are no valid async subplans. */
1167 92 : if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1168 : {
1169 0 : node->as_nasyncremain = 0;
1170 0 : return;
1171 : }
1172 :
1173 : /* Get valid async subplans. */
1174 92 : valid_asyncplans = bms_intersect(node->as_asyncplans,
1175 92 : node->as_valid_subplans);
1176 :
1177 : /* Adjust the valid subplans to contain sync subplans only. */
1178 92 : node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
1179 : valid_asyncplans);
1180 :
1181 : /* Save valid async subplans. */
1182 92 : node->as_valid_asyncplans = valid_asyncplans;
1183 : }
|