Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeIncrementalSort.c
4 : * Routines to handle incremental sorting of relations.
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/nodeIncrementalSort.c
11 : *
12 : * DESCRIPTION
13 : *
14 : * Incremental sort is an optimized variant of multikey sort for cases
15 : * when the input is already sorted by a prefix of the sort keys. For
16 : * example when a sort by (key1, key2 ... keyN) is requested, and the
17 : * input is already sorted by (key1, key2 ... keyM), M < N, we can
18 : * divide the input into groups where keys (key1, ... keyM) are equal,
19 : * and only sort on the remaining columns.
20 : *
21 : * Consider the following example. We have input tuples consisting of
22 : * two integers (X, Y) already presorted by X, while it's required to
23 : * sort them by both X and Y. Let input tuples be following.
24 : *
25 : * (1, 5)
26 : * (1, 2)
27 : * (2, 9)
28 : * (2, 1)
29 : * (2, 5)
30 : * (3, 3)
31 : * (3, 7)
32 : *
33 : * An incremental sort algorithm would split the input into the following
34 : * groups, which have equal X, and then sort them by Y individually:
35 : *
36 : * (1, 5) (1, 2)
37 : * (2, 9) (2, 1) (2, 5)
38 : * (3, 3) (3, 7)
39 : *
40 : * After sorting these groups and putting them altogether, we would get
41 : * the following result which is sorted by X and Y, as requested:
42 : *
43 : * (1, 2)
44 : * (1, 5)
45 : * (2, 1)
46 : * (2, 5)
47 : * (2, 9)
48 : * (3, 3)
49 : * (3, 7)
50 : *
51 : * Incremental sort may be more efficient than plain sort, particularly
52 : * on large datasets, as it reduces the amount of data to sort at once,
53 : * making it more likely it fits into work_mem (eliminating the need to
54 : * spill to disk). But the main advantage of incremental sort is that
55 : * it can start producing rows early, before sorting the whole dataset,
56 : * which is a significant benefit especially for queries with LIMIT.
57 : *
58 : * The algorithm we've implemented here is modified from the theoretical
59 : * base described above by operating in two different modes:
60 : * - Fetching a minimum number of tuples without checking prefix key
61 : * group membership and sorting on all columns when safe.
62 : * - Fetching all tuples for a single prefix key group and sorting on
63 : * solely the unsorted columns.
64 : * We always begin in the first mode, and employ a heuristic to switch
65 : * into the second mode if we believe it's beneficial.
66 : *
67 : * Sorting incrementally can potentially use less memory, avoid fetching
68 : * and sorting all tuples in the dataset, and begin returning tuples before
69 : * the entire result set is available.
70 : *
71 : * The hybrid mode approach allows us to optimize for both very small
72 : * groups (where the overhead of a new tuplesort is high) and very large
73 : * groups (where we can lower cost by not having to sort on already sorted
74 : * columns), albeit at some extra cost while switching between modes.
75 : *
76 : *-------------------------------------------------------------------------
77 : */
78 :
79 : #include "postgres.h"
80 :
81 : #include "executor/execdebug.h"
82 : #include "executor/nodeIncrementalSort.h"
83 : #include "miscadmin.h"
84 : #include "utils/lsyscache.h"
85 : #include "utils/tuplesort.h"
86 :
87 : /*
88 : * We need to store the instrumentation information in either local node's sort
89 : * info or, for a parallel worker process, in the shared info (this avoids
90 : * having to additionally memcpy the info from local memory to shared memory
91 : * at each instrumentation call). This macro expands to choose the proper sort
92 : * state and group info.
93 : *
94 : * Arguments:
95 : * - node: type IncrementalSortState *
96 : * - groupName: the token fullsort or prefixsort
97 : */
98 : #define INSTRUMENT_SORT_GROUP(node, groupName) \
99 : do { \
100 : if ((node)->ss.ps.instrument != NULL) \
101 : { \
102 : if ((node)->shared_info && (node)->am_worker) \
103 : { \
104 : Assert(IsParallelWorker()); \
105 : Assert(ParallelWorkerNumber <= (node)->shared_info->num_workers); \
106 : instrumentSortedGroup(&(node)->shared_info->sinfo[ParallelWorkerNumber].groupName##GroupInfo, \
107 : (node)->groupName##_state); \
108 : } \
109 : else \
110 : { \
111 : instrumentSortedGroup(&(node)->incsort_info.groupName##GroupInfo, \
112 : (node)->groupName##_state); \
113 : } \
114 : } \
115 : } while (0)
116 :
117 :
118 : /* ----------------------------------------------------------------
119 : * instrumentSortedGroup
120 : *
121 : * Because incremental sort processes (potentially many) sort batches, we need
122 : * to capture tuplesort stats each time we finalize a sort state. This summary
123 : * data is later used for EXPLAIN ANALYZE output.
124 : * ----------------------------------------------------------------
125 : */
126 : static void
127 144 : instrumentSortedGroup(IncrementalSortGroupInfo *groupInfo,
128 : Tuplesortstate *sortState)
129 : {
130 : TuplesortInstrumentation sort_instr;
131 :
132 144 : groupInfo->groupCount++;
133 :
134 144 : tuplesort_get_stats(sortState, &sort_instr);
135 :
136 : /* Calculate total and maximum memory and disk space used. */
137 144 : switch (sort_instr.spaceType)
138 : {
139 0 : case SORT_SPACE_TYPE_DISK:
140 0 : groupInfo->totalDiskSpaceUsed += sort_instr.spaceUsed;
141 0 : if (sort_instr.spaceUsed > groupInfo->maxDiskSpaceUsed)
142 0 : groupInfo->maxDiskSpaceUsed = sort_instr.spaceUsed;
143 :
144 0 : break;
145 144 : case SORT_SPACE_TYPE_MEMORY:
146 144 : groupInfo->totalMemorySpaceUsed += sort_instr.spaceUsed;
147 144 : if (sort_instr.spaceUsed > groupInfo->maxMemorySpaceUsed)
148 54 : groupInfo->maxMemorySpaceUsed = sort_instr.spaceUsed;
149 :
150 144 : break;
151 : }
152 :
153 : /* Track each sort method we've used. */
154 144 : groupInfo->sortMethods |= sort_instr.sortMethod;
155 144 : }
156 :
157 : /* ----------------------------------------------------------------
158 : * preparePresortedCols
159 : *
160 : * Prepare information for presorted_keys comparisons.
161 : * ----------------------------------------------------------------
162 : */
163 : static void
164 410 : preparePresortedCols(IncrementalSortState *node)
165 : {
166 410 : IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan);
167 :
168 410 : node->presorted_keys =
169 410 : (PresortedKeyData *) palloc(plannode->nPresortedCols *
170 : sizeof(PresortedKeyData));
171 :
172 : /* Pre-cache comparison functions for each pre-sorted key. */
173 828 : for (int i = 0; i < plannode->nPresortedCols; i++)
174 : {
175 : Oid equalityOp,
176 : equalityFunc;
177 : PresortedKeyData *key;
178 :
179 418 : key = &node->presorted_keys[i];
180 418 : key->attno = plannode->sort.sortColIdx[i];
181 :
182 418 : equalityOp = get_equality_op_for_ordering_op(plannode->sort.sortOperators[i],
183 : NULL);
184 418 : if (!OidIsValid(equalityOp))
185 0 : elog(ERROR, "missing equality operator for ordering operator %u",
186 : plannode->sort.sortOperators[i]);
187 :
188 418 : equalityFunc = get_opcode(equalityOp);
189 418 : if (!OidIsValid(equalityFunc))
190 0 : elog(ERROR, "missing function for operator %u", equalityOp);
191 :
192 : /* Lookup the comparison function */
193 418 : fmgr_info_cxt(equalityFunc, &key->flinfo, CurrentMemoryContext);
194 :
195 : /* We can initialize the callinfo just once and re-use it */
196 418 : key->fcinfo = palloc0(SizeForFunctionCallInfo(2));
197 418 : InitFunctionCallInfoData(*key->fcinfo, &key->flinfo, 2,
198 : plannode->sort.collations[i], NULL, NULL);
199 418 : key->fcinfo->args[0].isnull = false;
200 418 : key->fcinfo->args[1].isnull = false;
201 : }
202 410 : }
203 :
204 : /* ----------------------------------------------------------------
205 : * isCurrentGroup
206 : *
207 : * Check whether a given tuple belongs to the current sort group by comparing
208 : * the presorted column values to the pivot tuple of the current group.
209 : * ----------------------------------------------------------------
210 : */
211 : static bool
212 76170 : isCurrentGroup(IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple)
213 : {
214 : int nPresortedCols;
215 :
216 76170 : nPresortedCols = castNode(IncrementalSort, node->ss.ps.plan)->nPresortedCols;
217 :
218 : /*
219 : * That the input is sorted by keys * (0, ... n) implies that the tail
220 : * keys are more likely to change. Therefore we do our comparison starting
221 : * from the last pre-sorted column to optimize for early detection of
222 : * inequality and minimizing the number of function calls..
223 : */
224 149508 : for (int i = nPresortedCols - 1; i >= 0; i--)
225 : {
226 : Datum datumA,
227 : datumB,
228 : result;
229 : bool isnullA,
230 : isnullB;
231 76170 : AttrNumber attno = node->presorted_keys[i].attno;
232 : PresortedKeyData *key;
233 :
234 76170 : datumA = slot_getattr(pivot, attno, &isnullA);
235 76170 : datumB = slot_getattr(tuple, attno, &isnullB);
236 :
237 : /* Special case for NULL-vs-NULL, else use standard comparison */
238 76170 : if (isnullA || isnullB)
239 : {
240 0 : if (isnullA == isnullB)
241 0 : continue;
242 : else
243 2832 : return false;
244 : }
245 :
246 76170 : key = &node->presorted_keys[i];
247 :
248 76170 : key->fcinfo->args[0].value = datumA;
249 76170 : key->fcinfo->args[1].value = datumB;
250 :
251 : /* just for paranoia's sake, we reset isnull each time */
252 76170 : key->fcinfo->isnull = false;
253 :
254 76170 : result = FunctionCallInvoke(key->fcinfo);
255 :
256 : /* Check for null result, since caller is clearly not expecting one */
257 76170 : if (key->fcinfo->isnull)
258 0 : elog(ERROR, "function %u returned NULL", key->flinfo.fn_oid);
259 :
260 76170 : if (!DatumGetBool(result))
261 2832 : return false;
262 : }
263 73338 : return true;
264 : }
265 :
266 : /* ----------------------------------------------------------------
267 : * switchToPresortedPrefixMode
268 : *
269 : * When we determine that we've likely encountered a large batch of tuples all
270 : * having the same presorted prefix values, we want to optimize tuplesort by
271 : * only sorting on unsorted suffix keys.
272 : *
273 : * The problem is that we've already accumulated several tuples in another
274 : * tuplesort configured to sort by all columns (assuming that there may be
275 : * more than one prefix key group). So to switch to presorted prefix mode we
276 : * have to go back and look at all the tuples we've already accumulated to
277 : * verify they're all part of the same prefix key group before sorting them
278 : * solely by unsorted suffix keys.
279 : *
280 : * While it's likely that all tuples already fetched are all part of a single
281 : * prefix group, we also have to handle the possibility that there is at least
282 : * one different prefix key group before the large prefix key group.
283 : * ----------------------------------------------------------------
284 : */
285 : static void
286 286 : switchToPresortedPrefixMode(PlanState *pstate)
287 : {
288 286 : IncrementalSortState *node = castNode(IncrementalSortState, pstate);
289 : ScanDirection dir;
290 : int64 nTuples;
291 : TupleDesc tupDesc;
292 : PlanState *outerNode;
293 286 : IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan);
294 :
295 286 : dir = node->ss.ps.state->es_direction;
296 286 : outerNode = outerPlanState(node);
297 286 : tupDesc = ExecGetResultType(outerNode);
298 :
299 : /* Configure the prefix sort state the first time around. */
300 286 : if (node->prefixsort_state == NULL)
301 : {
302 : Tuplesortstate *prefixsort_state;
303 78 : int nPresortedCols = plannode->nPresortedCols;
304 :
305 : /*
306 : * Optimize the sort by assuming the prefix columns are all equal and
307 : * thus we only need to sort by any remaining columns.
308 : */
309 78 : prefixsort_state = tuplesort_begin_heap(tupDesc,
310 78 : plannode->sort.numCols - nPresortedCols,
311 78 : &(plannode->sort.sortColIdx[nPresortedCols]),
312 78 : &(plannode->sort.sortOperators[nPresortedCols]),
313 78 : &(plannode->sort.collations[nPresortedCols]),
314 78 : &(plannode->sort.nullsFirst[nPresortedCols]),
315 : work_mem,
316 : NULL,
317 78 : node->bounded ? TUPLESORT_ALLOWBOUNDED : TUPLESORT_NONE);
318 78 : node->prefixsort_state = prefixsort_state;
319 : }
320 : else
321 : {
322 : /* Next group of presorted data */
323 208 : tuplesort_reset(node->prefixsort_state);
324 : }
325 :
326 : /*
327 : * If the current node has a bound, then it's reasonably likely that a
328 : * large prefix key group will benefit from bounded sort, so configure the
329 : * tuplesort to allow for that optimization.
330 : */
331 286 : if (node->bounded)
332 : {
333 : SO1_printf("Setting bound on presorted prefix tuplesort to: " INT64_FORMAT "\n",
334 : node->bound - node->bound_Done);
335 182 : tuplesort_set_bound(node->prefixsort_state,
336 182 : node->bound - node->bound_Done);
337 : }
338 :
339 : /*
340 : * Copy as many tuples as we can (i.e., in the same prefix key group) from
341 : * the full sort state to the prefix sort state.
342 : */
343 6872 : for (nTuples = 0; nTuples < node->n_fullsort_remaining; nTuples++)
344 : {
345 : /*
346 : * When we encounter multiple prefix key groups inside the full sort
347 : * tuplesort we have to carry over the last read tuple into the next
348 : * batch.
349 : */
350 6762 : if (nTuples == 0 && !TupIsNull(node->transfer_tuple))
351 : {
352 176 : tuplesort_puttupleslot(node->prefixsort_state, node->transfer_tuple);
353 : /* The carried over tuple is our new group pivot tuple. */
354 176 : ExecCopySlot(node->group_pivot, node->transfer_tuple);
355 : }
356 : else
357 : {
358 6586 : tuplesort_gettupleslot(node->fullsort_state,
359 : ScanDirectionIsForward(dir),
360 : false, node->transfer_tuple, NULL);
361 :
362 : /*
363 : * If this is our first time through the loop, then we need to
364 : * save the first tuple we get as our new group pivot.
365 : */
366 6586 : if (TupIsNull(node->group_pivot))
367 110 : ExecCopySlot(node->group_pivot, node->transfer_tuple);
368 :
369 6586 : if (isCurrentGroup(node, node->group_pivot, node->transfer_tuple))
370 : {
371 6410 : tuplesort_puttupleslot(node->prefixsort_state, node->transfer_tuple);
372 : }
373 : else
374 : {
375 : /*
376 : * The tuple isn't part of the current batch so we need to
377 : * carry it over into the next batch of tuples we transfer out
378 : * of the full sort tuplesort into the presorted prefix
379 : * tuplesort. We don't actually have to do anything special to
380 : * save the tuple since we've already loaded it into the
381 : * node->transfer_tuple slot, and, even though that slot
382 : * points to memory inside the full sort tuplesort, we can't
383 : * reset that tuplesort anyway until we've fully transferred
384 : * out its tuples, so this reference is safe. We do need to
385 : * reset the group pivot tuple though since we've finished the
386 : * current prefix key group.
387 : */
388 176 : ExecClearTuple(node->group_pivot);
389 :
390 : /* Break out of for-loop early */
391 176 : break;
392 : }
393 : }
394 : }
395 :
396 : /*
397 : * Track how many tuples remain in the full sort batch so that we know if
398 : * we need to sort multiple prefix key groups before processing tuples
399 : * remaining in the large single prefix key group we think we've
400 : * encountered.
401 : */
402 : SO1_printf("Moving " INT64_FORMAT " tuples to presorted prefix tuplesort\n", nTuples);
403 286 : node->n_fullsort_remaining -= nTuples;
404 : SO1_printf("Setting n_fullsort_remaining to " INT64_FORMAT "\n", node->n_fullsort_remaining);
405 :
406 286 : if (node->n_fullsort_remaining == 0)
407 : {
408 : /*
409 : * We've found that all tuples remaining in the full sort batch are in
410 : * the same prefix key group and moved all of those tuples into the
411 : * presorted prefix tuplesort. We don't know that we've yet found the
412 : * last tuple in the current prefix key group, so save our pivot
413 : * comparison tuple and continue fetching tuples from the outer
414 : * execution node to load into the presorted prefix tuplesort.
415 : */
416 110 : ExecCopySlot(node->group_pivot, node->transfer_tuple);
417 : SO_printf("Setting execution_status to INCSORT_LOADPREFIXSORT (switchToPresortedPrefixMode)\n");
418 110 : node->execution_status = INCSORT_LOADPREFIXSORT;
419 :
420 : /*
421 : * Make sure we clear the transfer tuple slot so that next time we
422 : * encounter a large prefix key group we don't incorrectly assume we
423 : * have a tuple carried over from the previous group.
424 : */
425 110 : ExecClearTuple(node->transfer_tuple);
426 : }
427 : else
428 : {
429 : /*
430 : * We finished a group but didn't consume all of the tuples from the
431 : * full sort state, so we'll sort this batch, let the outer node read
432 : * out all of those tuples, and then come back around to find another
433 : * batch.
434 : */
435 : SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples);
436 176 : tuplesort_performsort(node->prefixsort_state);
437 :
438 176 : INSTRUMENT_SORT_GROUP(node, prefixsort);
439 :
440 176 : if (node->bounded)
441 : {
442 : /*
443 : * If the current node has a bound and we've already sorted n
444 : * tuples, then the functional bound remaining is (original bound
445 : * - n), so store the current number of processed tuples for use
446 : * in configuring sorting bound.
447 : */
448 : SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
449 : Min(node->bound, node->bound_Done + nTuples), node->bound_Done);
450 120 : node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
451 : }
452 :
453 : SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (switchToPresortedPrefixMode)\n");
454 176 : node->execution_status = INCSORT_READPREFIXSORT;
455 : }
456 286 : }
457 :
458 : /*
459 : * Sorting many small groups with tuplesort is inefficient. In order to
460 : * cope with this problem we don't start a new group until the current one
461 : * contains at least DEFAULT_MIN_GROUP_SIZE tuples (unfortunately this also
462 : * means we can't assume small groups of tuples all have the same prefix keys.)
463 : * When we have a bound that's less than DEFAULT_MIN_GROUP_SIZE we start looking
464 : * for the new group as soon as we've met our bound to avoid fetching more
465 : * tuples than we absolutely have to fetch.
466 : */
467 : #define DEFAULT_MIN_GROUP_SIZE 32
468 :
469 : /*
470 : * While we've optimized for small prefix key groups by not starting our prefix
471 : * key comparisons until we've reached a minimum number of tuples, we don't want
472 : * that optimization to cause us to lose out on the benefits of being able to
473 : * assume a large group of tuples is fully presorted by its prefix keys.
474 : * Therefore we use the DEFAULT_MAX_FULL_SORT_GROUP_SIZE cutoff as a heuristic
475 : * for determining when we believe we've encountered a large group, and, if we
476 : * get to that point without finding a new prefix key group we transition to
477 : * presorted prefix key mode.
478 : */
479 : #define DEFAULT_MAX_FULL_SORT_GROUP_SIZE (2 * DEFAULT_MIN_GROUP_SIZE)
480 :
481 : /* ----------------------------------------------------------------
482 : * ExecIncrementalSort
483 : *
484 : * Assuming that outer subtree returns tuple presorted by some prefix
485 : * of target sort columns, performs incremental sort.
486 : *
487 : * Conditions:
488 : * -- none.
489 : *
490 : * Initial States:
491 : * -- the outer child is prepared to return the first tuple.
492 : * ----------------------------------------------------------------
493 : */
494 : static TupleTableSlot *
495 122040 : ExecIncrementalSort(PlanState *pstate)
496 : {
497 122040 : IncrementalSortState *node = castNode(IncrementalSortState, pstate);
498 : EState *estate;
499 : ScanDirection dir;
500 : Tuplesortstate *read_sortstate;
501 : Tuplesortstate *fullsort_state;
502 : TupleTableSlot *slot;
503 122040 : IncrementalSort *plannode = (IncrementalSort *) node->ss.ps.plan;
504 : PlanState *outerNode;
505 : TupleDesc tupDesc;
506 122040 : int64 nTuples = 0;
507 : int64 minGroupSize;
508 :
509 122040 : CHECK_FOR_INTERRUPTS();
510 :
511 122040 : estate = node->ss.ps.state;
512 122040 : dir = estate->es_direction;
513 122040 : fullsort_state = node->fullsort_state;
514 :
515 : /*
516 : * If a previous iteration has sorted a batch, then we need to check to
517 : * see if there are any remaining tuples in that batch that we can return
518 : * before moving on to other execution states.
519 : */
520 122040 : if (node->execution_status == INCSORT_READFULLSORT
521 27062 : || node->execution_status == INCSORT_READPREFIXSORT)
522 : {
523 : /*
524 : * Return next tuple from the current sorted group set if available.
525 : */
526 243248 : read_sortstate = node->execution_status == INCSORT_READFULLSORT ?
527 121624 : fullsort_state : node->prefixsort_state;
528 121624 : slot = node->ss.ps.ps_ResultTupleSlot;
529 :
530 : /*
531 : * We have to populate the slot from the tuplesort before checking
532 : * outerNodeDone because it will set the slot to NULL if no more
533 : * tuples remain. If the tuplesort is empty, but we don't have any
534 : * more tuples available for sort from the outer node, then
535 : * outerNodeDone will have been set so we'll return that now-empty
536 : * slot to the caller.
537 : */
538 121624 : if (tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir),
539 2972 : false, slot, NULL) || node->outerNodeDone)
540 :
541 : /*
542 : * Note: there isn't a good test case for the node->outerNodeDone
543 : * check directly, but we need it for any plan where the outer
544 : * node will fail when trying to fetch too many tuples.
545 : */
546 118908 : return slot;
547 2716 : else if (node->n_fullsort_remaining > 0)
548 : {
549 : /*
550 : * When we transition to presorted prefix mode, we might have
551 : * accumulated at least one additional prefix key group in the
552 : * full sort tuplesort. The first call to
553 : * switchToPresortedPrefixMode() will have pulled the first one of
554 : * those groups out, and we've returned those tuples to the parent
555 : * node, but if at this point we still have tuples remaining in
556 : * the full sort state (i.e., n_fullsort_remaining > 0), then we
557 : * need to re-execute the prefix mode transition function to pull
558 : * out the next prefix key group.
559 : */
560 : SO1_printf("Re-calling switchToPresortedPrefixMode() because n_fullsort_remaining is > 0 (" INT64_FORMAT ")\n",
561 : node->n_fullsort_remaining);
562 176 : switchToPresortedPrefixMode(pstate);
563 : }
564 : else
565 : {
566 : /*
567 : * If we don't have any sorted tuples to read and we're not
568 : * currently transitioning into presorted prefix sort mode, then
569 : * it's time to start the process all over again by building a new
570 : * group in the full sort state.
571 : */
572 : SO_printf("Setting execution_status to INCSORT_LOADFULLSORT (n_fullsort_remaining > 0)\n");
573 2540 : node->execution_status = INCSORT_LOADFULLSORT;
574 : }
575 : }
576 :
577 : /*
578 : * Scan the subplan in the forward direction while creating the sorted
579 : * data.
580 : */
581 3132 : estate->es_direction = ForwardScanDirection;
582 :
583 3132 : outerNode = outerPlanState(node);
584 3132 : tupDesc = ExecGetResultType(outerNode);
585 :
586 : /* Load tuples into the full sort state. */
587 3132 : if (node->execution_status == INCSORT_LOADFULLSORT)
588 : {
589 : /*
590 : * Initialize sorting structures.
591 : */
592 2956 : if (fullsort_state == NULL)
593 : {
594 : /*
595 : * Initialize presorted column support structures for
596 : * isCurrentGroup(). It's correct to do this along with the
597 : * initial initialization for the full sort state (and not for the
598 : * prefix sort state) since we always load the full sort state
599 : * first.
600 : */
601 410 : preparePresortedCols(node);
602 :
603 : /*
604 : * Since we optimize small prefix key groups by accumulating a
605 : * minimum number of tuples before sorting, we can't assume that a
606 : * group of tuples all have the same prefix key values. Hence we
607 : * setup the full sort tuplesort to sort by all requested sort
608 : * keys.
609 : */
610 410 : fullsort_state = tuplesort_begin_heap(tupDesc,
611 : plannode->sort.numCols,
612 : plannode->sort.sortColIdx,
613 : plannode->sort.sortOperators,
614 : plannode->sort.collations,
615 : plannode->sort.nullsFirst,
616 : work_mem,
617 : NULL,
618 410 : node->bounded ?
619 : TUPLESORT_ALLOWBOUNDED :
620 : TUPLESORT_NONE);
621 410 : node->fullsort_state = fullsort_state;
622 : }
623 : else
624 : {
625 : /* Reset sort for the next batch. */
626 2546 : tuplesort_reset(fullsort_state);
627 : }
628 :
629 : /*
630 : * Calculate the remaining tuples left if bounded and configure both
631 : * bounded sort and the minimum group size accordingly.
632 : */
633 2956 : if (node->bounded)
634 : {
635 212 : int64 currentBound = node->bound - node->bound_Done;
636 :
637 : /*
638 : * Bounded sort isn't likely to be a useful optimization for full
639 : * sort mode since we limit full sort mode to a relatively small
640 : * number of tuples and tuplesort doesn't switch over to top-n
641 : * heap sort anyway unless it hits (2 * bound) tuples.
642 : */
643 212 : if (currentBound < DEFAULT_MIN_GROUP_SIZE)
644 78 : tuplesort_set_bound(fullsort_state, currentBound);
645 :
646 212 : minGroupSize = Min(DEFAULT_MIN_GROUP_SIZE, currentBound);
647 : }
648 : else
649 2744 : minGroupSize = DEFAULT_MIN_GROUP_SIZE;
650 :
651 : /*
652 : * Because we have to read the next tuple to find out that we've
653 : * encountered a new prefix key group, on subsequent groups we have to
654 : * carry over that extra tuple and add it to the new group's sort here
655 : * before we read any new tuples from the outer node.
656 : */
657 2956 : if (!TupIsNull(node->group_pivot))
658 : {
659 2540 : tuplesort_puttupleslot(fullsort_state, node->group_pivot);
660 2540 : nTuples++;
661 :
662 : /*
663 : * We're in full sort mode accumulating a minimum number of tuples
664 : * and not checking for prefix key equality yet, so we can't
665 : * assume the group pivot tuple will remain the same -- unless
666 : * we're using a minimum group size of 1, in which case the pivot
667 : * is obviously still the pivot.
668 : */
669 2540 : if (nTuples != minGroupSize)
670 2528 : ExecClearTuple(node->group_pivot);
671 : }
672 :
673 :
674 : /*
675 : * Pull as many tuples from the outer node as possible given our
676 : * current operating mode.
677 : */
678 : for (;;)
679 : {
680 103766 : slot = ExecProcNode(outerNode);
681 :
682 : /*
683 : * If the outer node can't provide us any more tuples, then we can
684 : * sort the current group and return those tuples.
685 : */
686 103766 : if (TupIsNull(slot))
687 : {
688 : /*
689 : * We need to know later if the outer node has completed to be
690 : * able to distinguish between being done with a batch and
691 : * being done with the whole node.
692 : */
693 256 : node->outerNodeDone = true;
694 :
695 : SO1_printf("Sorting fullsort with " INT64_FORMAT " tuples\n", nTuples);
696 256 : tuplesort_performsort(fullsort_state);
697 :
698 256 : INSTRUMENT_SORT_GROUP(node, fullsort);
699 :
700 : SO_printf("Setting execution_status to INCSORT_READFULLSORT (final tuple)\n");
701 256 : node->execution_status = INCSORT_READFULLSORT;
702 256 : break;
703 : }
704 :
705 : /* Accumulate the next group of presorted tuples. */
706 103510 : if (nTuples < minGroupSize)
707 : {
708 : /*
709 : * If we haven't yet hit our target minimum group size, then
710 : * we don't need to bother checking for inclusion in the
711 : * current prefix group since at this point we'll assume that
712 : * we'll full sort this batch to avoid a large number of very
713 : * tiny (and thus inefficient) sorts.
714 : */
715 85258 : tuplesort_puttupleslot(fullsort_state, slot);
716 85258 : nTuples++;
717 :
718 : /*
719 : * If we've reached our minimum group size, then we need to
720 : * store the most recent tuple as a pivot.
721 : */
722 85258 : if (nTuples == minGroupSize)
723 2688 : ExecCopySlot(node->group_pivot, slot);
724 : }
725 : else
726 : {
727 : /*
728 : * If we've already accumulated enough tuples to reach our
729 : * minimum group size, then we need to compare any additional
730 : * tuples to our pivot tuple to see if we reach the end of
731 : * that prefix key group. Only after we find changed prefix
732 : * keys can we guarantee sort stability of the tuples we've
733 : * already accumulated.
734 : */
735 18252 : if (isCurrentGroup(node, node->group_pivot, slot))
736 : {
737 : /*
738 : * As long as the prefix keys match the pivot tuple then
739 : * load the tuple into the tuplesort.
740 : */
741 15662 : tuplesort_puttupleslot(fullsort_state, slot);
742 15662 : nTuples++;
743 : }
744 : else
745 : {
746 : /*
747 : * Since the tuple we fetched isn't part of the current
748 : * prefix key group we don't want to sort it as part of
749 : * the current batch. Instead we use the group_pivot slot
750 : * to carry it over to the next batch (even though we
751 : * won't actually treat it as a group pivot).
752 : */
753 2590 : ExecCopySlot(node->group_pivot, slot);
754 :
755 2590 : if (node->bounded)
756 : {
757 : /*
758 : * If the current node has a bound, and we've already
759 : * sorted n tuples, then the functional bound
760 : * remaining is (original bound - n), so store the
761 : * current number of processed tuples for later use
762 : * configuring the sort state's bound.
763 : */
764 : SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
765 : node->bound_Done,
766 : Min(node->bound, node->bound_Done + nTuples));
767 150 : node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
768 : }
769 :
770 : /*
771 : * Once we find changed prefix keys we can complete the
772 : * sort and transition modes to reading out the sorted
773 : * tuples.
774 : */
775 : SO1_printf("Sorting fullsort tuplesort with " INT64_FORMAT " tuples\n",
776 : nTuples);
777 2590 : tuplesort_performsort(fullsort_state);
778 :
779 2590 : INSTRUMENT_SORT_GROUP(node, fullsort);
780 :
781 : SO_printf("Setting execution_status to INCSORT_READFULLSORT (found end of group)\n");
782 2590 : node->execution_status = INCSORT_READFULLSORT;
783 2590 : break;
784 : }
785 : }
786 :
787 : /*
788 : * Unless we've already transitioned modes to reading from the
789 : * full sort state, then we assume that having read at least
790 : * DEFAULT_MAX_FULL_SORT_GROUP_SIZE tuples means it's likely we're
791 : * processing a large group of tuples all having equal prefix keys
792 : * (but haven't yet found the final tuple in that prefix key
793 : * group), so we need to transition into presorted prefix mode.
794 : */
795 100920 : if (nTuples > DEFAULT_MAX_FULL_SORT_GROUP_SIZE &&
796 110 : node->execution_status != INCSORT_READFULLSORT)
797 : {
798 : /*
799 : * The group pivot we have stored has already been put into
800 : * the tuplesort; we don't want to carry it over. Since we
801 : * haven't yet found the end of the prefix key group, it might
802 : * seem like we should keep this, but we don't actually know
803 : * how many prefix key groups might be represented in the full
804 : * sort state, so we'll let the mode transition function
805 : * manage this state for us.
806 : */
807 110 : ExecClearTuple(node->group_pivot);
808 :
809 : /*
810 : * Unfortunately the tuplesort API doesn't include a way to
811 : * retrieve tuples unless a sort has been performed, so we
812 : * perform the sort even though we could just as easily rely
813 : * on FIFO retrieval semantics when transferring them to the
814 : * presorted prefix tuplesort.
815 : */
816 : SO1_printf("Sorting fullsort tuplesort with " INT64_FORMAT " tuples\n", nTuples);
817 110 : tuplesort_performsort(fullsort_state);
818 :
819 110 : INSTRUMENT_SORT_GROUP(node, fullsort);
820 :
821 : /*
822 : * If the full sort tuplesort happened to switch into top-n
823 : * heapsort mode then we will only be able to retrieve
824 : * currentBound tuples (since the tuplesort will have only
825 : * retained the top-n tuples). This is safe even though we
826 : * haven't yet completed fetching the current prefix key group
827 : * because the tuples we've "lost" already sorted "below" the
828 : * retained ones, and we're already contractually guaranteed
829 : * to not need any more than the currentBound tuples.
830 : */
831 110 : if (tuplesort_used_bound(node->fullsort_state))
832 : {
833 12 : int64 currentBound = node->bound - node->bound_Done;
834 :
835 : SO2_printf("Read " INT64_FORMAT " tuples, but setting to " INT64_FORMAT " because we used bounded sort\n",
836 : nTuples, Min(currentBound, nTuples));
837 12 : nTuples = Min(currentBound, nTuples);
838 : }
839 :
840 : SO1_printf("Setting n_fullsort_remaining to " INT64_FORMAT " and calling switchToPresortedPrefixMode()\n",
841 : nTuples);
842 :
843 : /*
844 : * We might have multiple prefix key groups in the full sort
845 : * state, so the mode transition function needs to know that
846 : * it needs to move from the fullsort to presorted prefix
847 : * sort.
848 : */
849 110 : node->n_fullsort_remaining = nTuples;
850 :
851 : /* Transition the tuples to the presorted prefix tuplesort. */
852 110 : switchToPresortedPrefixMode(pstate);
853 :
854 : /*
855 : * Since we know we had tuples to move to the presorted prefix
856 : * tuplesort, we know that unless that transition has verified
857 : * that all tuples belonged to the same prefix key group (in
858 : * which case we can go straight to continuing to load tuples
859 : * into that tuplesort), we should have a tuple to return
860 : * here.
861 : *
862 : * Either way, the appropriate execution status should have
863 : * been set by switchToPresortedPrefixMode(), so we can drop
864 : * out of the loop here and let the appropriate path kick in.
865 : */
866 110 : break;
867 : }
868 : }
869 : }
870 :
871 3132 : if (node->execution_status == INCSORT_LOADPREFIXSORT)
872 : {
873 : /*
874 : * We only enter this state after the mode transition function has
875 : * confirmed all remaining tuples from the full sort state have the
876 : * same prefix and moved those tuples to the prefix sort state. That
877 : * function has also set a group pivot tuple (which doesn't need to be
878 : * carried over; it's already been put into the prefix sort state).
879 : */
880 : Assert(!TupIsNull(node->group_pivot));
881 :
882 : /*
883 : * Read tuples from the outer node and load them into the prefix sort
884 : * state until we encounter a tuple whose prefix keys don't match the
885 : * current group_pivot tuple, since we can't guarantee sort stability
886 : * until we have all tuples matching those prefix keys.
887 : */
888 : for (;;)
889 : {
890 51376 : slot = ExecProcNode(outerNode);
891 :
892 : /*
893 : * If we've exhausted tuples from the outer node we're done
894 : * loading the prefix sort state.
895 : */
896 51376 : if (TupIsNull(slot))
897 : {
898 : /*
899 : * We need to know later if the outer node has completed to be
900 : * able to distinguish between being done with a batch and
901 : * being done with the whole node.
902 : */
903 44 : node->outerNodeDone = true;
904 44 : break;
905 : }
906 :
907 : /*
908 : * If the tuple's prefix keys match our pivot tuple, we're not
909 : * done yet and can load it into the prefix sort state. If not, we
910 : * don't want to sort it as part of the current batch. Instead we
911 : * use the group_pivot slot to carry it over to the next batch
912 : * (even though we won't actually treat it as a group pivot).
913 : */
914 51332 : if (isCurrentGroup(node, node->group_pivot, slot))
915 : {
916 51266 : tuplesort_puttupleslot(node->prefixsort_state, slot);
917 51266 : nTuples++;
918 : }
919 : else
920 : {
921 66 : ExecCopySlot(node->group_pivot, slot);
922 66 : break;
923 : }
924 : }
925 :
926 : /*
927 : * Perform the sort and begin returning the tuples to the parent plan
928 : * node.
929 : */
930 : SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples);
931 110 : tuplesort_performsort(node->prefixsort_state);
932 :
933 110 : INSTRUMENT_SORT_GROUP(node, prefixsort);
934 :
935 : SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (found end of group)\n");
936 110 : node->execution_status = INCSORT_READPREFIXSORT;
937 :
938 110 : if (node->bounded)
939 : {
940 : /*
941 : * If the current node has a bound, and we've already sorted n
942 : * tuples, then the functional bound remaining is (original bound
943 : * - n), so store the current number of processed tuples for use
944 : * in configuring sorting bound.
945 : */
946 : SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
947 : node->bound_Done,
948 : Min(node->bound, node->bound_Done + nTuples));
949 62 : node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
950 : }
951 : }
952 :
953 : /* Restore to user specified direction. */
954 3132 : estate->es_direction = dir;
955 :
956 : /*
957 : * Get the first or next tuple from tuplesort. Returns NULL if no more
958 : * tuples.
959 : */
960 6264 : read_sortstate = node->execution_status == INCSORT_READFULLSORT ?
961 3132 : fullsort_state : node->prefixsort_state;
962 3132 : slot = node->ss.ps.ps_ResultTupleSlot;
963 3132 : (void) tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir),
964 : false, slot, NULL);
965 3132 : return slot;
966 : }
967 :
968 : /* ----------------------------------------------------------------
969 : * ExecInitIncrementalSort
970 : *
971 : * Creates the run-time state information for the sort node
972 : * produced by the planner and initializes its outer subtree.
973 : * ----------------------------------------------------------------
974 : */
975 : IncrementalSortState *
976 700 : ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags)
977 : {
978 : IncrementalSortState *incrsortstate;
979 :
980 : SO_printf("ExecInitIncrementalSort: initializing sort node\n");
981 :
982 : /*
983 : * Incremental sort can't be used with EXEC_FLAG_BACKWARD or
984 : * EXEC_FLAG_MARK, because the current sort state contains only one sort
985 : * batch rather than the full result set.
986 : */
987 : Assert((eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)) == 0);
988 :
989 : /* Initialize state structure. */
990 700 : incrsortstate = makeNode(IncrementalSortState);
991 700 : incrsortstate->ss.ps.plan = (Plan *) node;
992 700 : incrsortstate->ss.ps.state = estate;
993 700 : incrsortstate->ss.ps.ExecProcNode = ExecIncrementalSort;
994 :
995 700 : incrsortstate->execution_status = INCSORT_LOADFULLSORT;
996 700 : incrsortstate->bounded = false;
997 700 : incrsortstate->outerNodeDone = false;
998 700 : incrsortstate->bound_Done = 0;
999 700 : incrsortstate->fullsort_state = NULL;
1000 700 : incrsortstate->prefixsort_state = NULL;
1001 700 : incrsortstate->group_pivot = NULL;
1002 700 : incrsortstate->transfer_tuple = NULL;
1003 700 : incrsortstate->n_fullsort_remaining = 0;
1004 700 : incrsortstate->presorted_keys = NULL;
1005 :
1006 700 : if (incrsortstate->ss.ps.instrument != NULL)
1007 : {
1008 0 : IncrementalSortGroupInfo *fullsortGroupInfo =
1009 : &incrsortstate->incsort_info.fullsortGroupInfo;
1010 0 : IncrementalSortGroupInfo *prefixsortGroupInfo =
1011 : &incrsortstate->incsort_info.prefixsortGroupInfo;
1012 :
1013 0 : fullsortGroupInfo->groupCount = 0;
1014 0 : fullsortGroupInfo->maxDiskSpaceUsed = 0;
1015 0 : fullsortGroupInfo->totalDiskSpaceUsed = 0;
1016 0 : fullsortGroupInfo->maxMemorySpaceUsed = 0;
1017 0 : fullsortGroupInfo->totalMemorySpaceUsed = 0;
1018 0 : fullsortGroupInfo->sortMethods = 0;
1019 0 : prefixsortGroupInfo->groupCount = 0;
1020 0 : prefixsortGroupInfo->maxDiskSpaceUsed = 0;
1021 0 : prefixsortGroupInfo->totalDiskSpaceUsed = 0;
1022 0 : prefixsortGroupInfo->maxMemorySpaceUsed = 0;
1023 0 : prefixsortGroupInfo->totalMemorySpaceUsed = 0;
1024 0 : prefixsortGroupInfo->sortMethods = 0;
1025 : }
1026 :
1027 : /*
1028 : * Miscellaneous initialization
1029 : *
1030 : * Sort nodes don't initialize their ExprContexts because they never call
1031 : * ExecQual or ExecProject.
1032 : */
1033 :
1034 : /*
1035 : * Initialize child nodes.
1036 : *
1037 : * Incremental sort does not support backwards scans and mark/restore, so
1038 : * we don't bother removing the flags from eflags here. We allow passing a
1039 : * REWIND flag, because although incremental sort can't use it, the child
1040 : * nodes may be able to do something more useful.
1041 : */
1042 700 : outerPlanState(incrsortstate) = ExecInitNode(outerPlan(node), estate, eflags);
1043 :
1044 : /*
1045 : * Initialize scan slot and type.
1046 : */
1047 700 : ExecCreateScanSlotFromOuterPlan(estate, &incrsortstate->ss, &TTSOpsMinimalTuple);
1048 :
1049 : /*
1050 : * Initialize return slot and type. No need to initialize projection info
1051 : * because we don't do any projections.
1052 : */
1053 700 : ExecInitResultTupleSlotTL(&incrsortstate->ss.ps, &TTSOpsMinimalTuple);
1054 700 : incrsortstate->ss.ps.ps_ProjInfo = NULL;
1055 :
1056 : /*
1057 : * Initialize standalone slots to store a tuple for pivot prefix keys and
1058 : * for carrying over a tuple from one batch to the next.
1059 : */
1060 700 : incrsortstate->group_pivot =
1061 700 : MakeSingleTupleTableSlot(ExecGetResultType(outerPlanState(incrsortstate)),
1062 : &TTSOpsMinimalTuple);
1063 700 : incrsortstate->transfer_tuple =
1064 700 : MakeSingleTupleTableSlot(ExecGetResultType(outerPlanState(incrsortstate)),
1065 : &TTSOpsMinimalTuple);
1066 :
1067 : SO_printf("ExecInitIncrementalSort: sort node initialized\n");
1068 :
1069 700 : return incrsortstate;
1070 : }
1071 :
1072 : /* ----------------------------------------------------------------
1073 : * ExecEndIncrementalSort(node)
1074 : * ----------------------------------------------------------------
1075 : */
1076 : void
1077 700 : ExecEndIncrementalSort(IncrementalSortState *node)
1078 : {
1079 : SO_printf("ExecEndIncrementalSort: shutting down sort node\n");
1080 :
1081 700 : ExecDropSingleTupleTableSlot(node->group_pivot);
1082 700 : ExecDropSingleTupleTableSlot(node->transfer_tuple);
1083 :
1084 : /*
1085 : * Release tuplesort resources.
1086 : */
1087 700 : if (node->fullsort_state != NULL)
1088 : {
1089 410 : tuplesort_end(node->fullsort_state);
1090 410 : node->fullsort_state = NULL;
1091 : }
1092 700 : if (node->prefixsort_state != NULL)
1093 : {
1094 78 : tuplesort_end(node->prefixsort_state);
1095 78 : node->prefixsort_state = NULL;
1096 : }
1097 :
1098 : /*
1099 : * Shut down the subplan.
1100 : */
1101 700 : ExecEndNode(outerPlanState(node));
1102 :
1103 : SO_printf("ExecEndIncrementalSort: sort node shutdown\n");
1104 700 : }
1105 :
1106 : void
1107 12 : ExecReScanIncrementalSort(IncrementalSortState *node)
1108 : {
1109 12 : PlanState *outerPlan = outerPlanState(node);
1110 :
1111 : /*
1112 : * Incremental sort doesn't support efficient rescan even when parameters
1113 : * haven't changed (e.g., rewind) because unlike regular sort we don't
1114 : * store all tuples at once for the full sort.
1115 : *
1116 : * So even if EXEC_FLAG_REWIND is set we just reset all of our state and
1117 : * re-execute the sort along with the child node. Incremental sort itself
1118 : * can't do anything smarter, but maybe the child nodes can.
1119 : *
1120 : * In theory if we've only filled the full sort with one batch (and
1121 : * haven't reset it for a new batch yet) then we could efficiently rewind,
1122 : * but that seems a narrow enough case that it's not worth handling
1123 : * specially at this time.
1124 : */
1125 :
1126 : /* must drop pointer to sort result tuple */
1127 12 : ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
1128 :
1129 12 : if (node->group_pivot != NULL)
1130 12 : ExecClearTuple(node->group_pivot);
1131 12 : if (node->transfer_tuple != NULL)
1132 12 : ExecClearTuple(node->transfer_tuple);
1133 :
1134 12 : node->outerNodeDone = false;
1135 12 : node->n_fullsort_remaining = 0;
1136 12 : node->bound_Done = 0;
1137 :
1138 12 : node->execution_status = INCSORT_LOADFULLSORT;
1139 :
1140 : /*
1141 : * If we've set up either of the sort states yet, we need to reset them.
1142 : * We could end them and null out the pointers, but there's no reason to
1143 : * repay the setup cost, and because ExecIncrementalSort guards presorted
1144 : * column functions by checking to see if the full sort state has been
1145 : * initialized yet, setting the sort states to null here might actually
1146 : * cause a leak.
1147 : */
1148 12 : if (node->fullsort_state != NULL)
1149 6 : tuplesort_reset(node->fullsort_state);
1150 12 : if (node->prefixsort_state != NULL)
1151 6 : tuplesort_reset(node->prefixsort_state);
1152 :
1153 : /*
1154 : * If chgParam of subnode is not null, then the plan will be re-scanned by
1155 : * the first ExecProcNode.
1156 : */
1157 12 : if (outerPlan->chgParam == NULL)
1158 12 : ExecReScan(outerPlan);
1159 12 : }
1160 :
1161 : /* ----------------------------------------------------------------
1162 : * Parallel Query Support
1163 : * ----------------------------------------------------------------
1164 : */
1165 :
1166 : /* ----------------------------------------------------------------
1167 : * ExecSortEstimate
1168 : *
1169 : * Estimate space required to propagate sort statistics.
1170 : * ----------------------------------------------------------------
1171 : */
1172 : void
1173 0 : ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)
1174 : {
1175 : Size size;
1176 :
1177 : /* don't need this if not instrumenting or no workers */
1178 0 : if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1179 0 : return;
1180 :
1181 0 : size = mul_size(pcxt->nworkers, sizeof(IncrementalSortInfo));
1182 0 : size = add_size(size, offsetof(SharedIncrementalSortInfo, sinfo));
1183 0 : shm_toc_estimate_chunk(&pcxt->estimator, size);
1184 0 : shm_toc_estimate_keys(&pcxt->estimator, 1);
1185 : }
1186 :
1187 : /* ----------------------------------------------------------------
1188 : * ExecSortInitializeDSM
1189 : *
1190 : * Initialize DSM space for sort statistics.
1191 : * ----------------------------------------------------------------
1192 : */
1193 : void
1194 0 : ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)
1195 : {
1196 : Size size;
1197 :
1198 : /* don't need this if not instrumenting or no workers */
1199 0 : if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1200 0 : return;
1201 :
1202 0 : size = offsetof(SharedIncrementalSortInfo, sinfo)
1203 0 : + pcxt->nworkers * sizeof(IncrementalSortInfo);
1204 0 : node->shared_info = shm_toc_allocate(pcxt->toc, size);
1205 : /* ensure any unfilled slots will contain zeroes */
1206 0 : memset(node->shared_info, 0, size);
1207 0 : node->shared_info->num_workers = pcxt->nworkers;
1208 0 : shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
1209 0 : node->shared_info);
1210 : }
1211 :
1212 : /* ----------------------------------------------------------------
1213 : * ExecSortInitializeWorker
1214 : *
1215 : * Attach worker to DSM space for sort statistics.
1216 : * ----------------------------------------------------------------
1217 : */
1218 : void
1219 0 : ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)
1220 : {
1221 0 : node->shared_info =
1222 0 : shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
1223 0 : node->am_worker = true;
1224 0 : }
1225 :
1226 : /* ----------------------------------------------------------------
1227 : * ExecSortRetrieveInstrumentation
1228 : *
1229 : * Transfer sort statistics from DSM to private memory.
1230 : * ----------------------------------------------------------------
1231 : */
1232 : void
1233 0 : ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)
1234 : {
1235 : Size size;
1236 : SharedIncrementalSortInfo *si;
1237 :
1238 0 : if (node->shared_info == NULL)
1239 0 : return;
1240 :
1241 0 : size = offsetof(SharedIncrementalSortInfo, sinfo)
1242 0 : + node->shared_info->num_workers * sizeof(IncrementalSortInfo);
1243 0 : si = palloc(size);
1244 0 : memcpy(si, node->shared_info, size);
1245 0 : node->shared_info = si;
1246 : }
|