Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * tuplesort.c
4 : * Generalized tuple sorting routines.
5 : *
6 : * This module provides a generalized facility for tuple sorting, which can be
7 : * applied to different kinds of sortable objects. Implementation of
8 : * the particular sorting variants is given in tuplesortvariants.c.
9 : * This module works efficiently for both small and large amounts
10 : * of data. Small amounts are sorted in-memory using qsort(). Large
11 : * amounts are sorted using temporary files and a standard external sort
12 : * algorithm.
13 : *
14 : * See Knuth, volume 3, for more than you want to know about external
15 : * sorting algorithms. The algorithm we use is a balanced k-way merge.
16 : * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
17 : * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
18 : * merge is better. Knuth is assuming that tape drives are expensive
19 : * beasts, and in particular that there will always be many more runs than
20 : * tape drives. The polyphase merge algorithm was good at keeping all the
21 : * tape drives busy, but in our implementation a "tape drive" doesn't cost
22 : * much more than a few Kb of memory buffers, so we can afford to have
23 : * lots of them. In particular, if we can have as many tape drives as
24 : * sorted runs, we can eliminate any repeated I/O at all.
25 : *
26 : * Historically, we divided the input into sorted runs using replacement
27 : * selection, in the form of a priority tree implemented as a heap
28 : * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
29 : * for run generation.
30 : *
31 : * The approximate amount of memory allowed for any one sort operation
32 : * is specified in kilobytes by the caller (most pass work_mem). Initially,
33 : * we absorb tuples and simply store them in an unsorted array as long as
34 : * we haven't exceeded workMem. If we reach the end of the input without
35 : * exceeding workMem, we sort the array using qsort() and subsequently return
36 : * tuples just by scanning the tuple array sequentially. If we do exceed
37 : * workMem, we begin to emit tuples into sorted runs in temporary tapes.
38 : * When tuples are dumped in batch after quicksorting, we begin a new run
39 : * with a new output tape. If we reach the max number of tapes, we write
40 : * subsequent runs on the existing tapes in a round-robin fashion. We will
41 : * need multiple merge passes to finish the merge in that case. After the
42 : * end of the input is reached, we dump out remaining tuples in memory into
43 : * a final run, then merge the runs.
44 : *
45 : * When merging runs, we use a heap containing just the frontmost tuple from
46 : * each source run; we repeatedly output the smallest tuple and replace it
47 : * with the next tuple from its source tape (if any). When the heap empties,
48 : * the merge is complete. The basic merge algorithm thus needs very little
49 : * memory --- only M tuples for an M-way merge, and M is constrained to a
50 : * small number. However, we can still make good use of our full workMem
51 : * allocation by pre-reading additional blocks from each source tape. Without
52 : * prereading, our access pattern to the temporary file would be very erratic;
53 : * on average we'd read one block from each of M source tapes during the same
54 : * time that we're writing M blocks to the output tape, so there is no
55 : * sequentiality of access at all, defeating the read-ahead methods used by
56 : * most Unix kernels. Worse, the output tape gets written into a very random
57 : * sequence of blocks of the temp file, ensuring that things will be even
58 : * worse when it comes time to read that tape. A straightforward merge pass
59 : * thus ends up doing a lot of waiting for disk seeks. We can improve matters
60 : * by prereading from each source tape sequentially, loading about workMem/M
61 : * bytes from each tape in turn, and making the sequential blocks immediately
62 : * available for reuse. This approach helps to localize both read and write
63 : * accesses. The pre-reading is handled by logtape.c, we just tell it how
64 : * much memory to use for the buffers.
65 : *
66 : * In the current code we determine the number of input tapes M on the basis
67 : * of workMem: we want workMem/M to be large enough that we read a fair
68 : * amount of data each time we read from a tape, so as to maintain the
69 : * locality of access described above. Nonetheless, with large workMem we
70 : * can have many tapes. The logical "tapes" are implemented by logtape.c,
71 : * which avoids space wastage by recycling disk space as soon as each block
72 : * is read from its "tape".
73 : *
74 : * When the caller requests random access to the sort result, we form
75 : * the final sorted run on a logical tape which is then "frozen", so
76 : * that we can access it randomly. When the caller does not need random
77 : * access, we return from tuplesort_performsort() as soon as we are down
78 : * to one run per logical tape. The final merge is then performed
79 : * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
80 : * saves one cycle of writing all the data out to disk and reading it in.
81 : *
82 : * This module supports parallel sorting. Parallel sorts involve coordination
83 : * among one or more worker processes, and a leader process, each with its own
84 : * tuplesort state. The leader process (or, more accurately, the
85 : * Tuplesortstate associated with a leader process) creates a full tapeset
86 : * consisting of worker tapes with one run to merge; a run for every
87 : * worker process. This is then merged. Worker processes are guaranteed to
88 : * produce exactly one output run from their partial input.
89 : *
90 : *
91 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
92 : * Portions Copyright (c) 1994, Regents of the University of California
93 : *
94 : * IDENTIFICATION
95 : * src/backend/utils/sort/tuplesort.c
96 : *
97 : *-------------------------------------------------------------------------
98 : */
99 :
100 : #include "postgres.h"
101 :
102 : #include <limits.h>
103 :
104 : #include "commands/tablespace.h"
105 : #include "miscadmin.h"
106 : #include "pg_trace.h"
107 : #include "storage/shmem.h"
108 : #include "utils/guc.h"
109 : #include "utils/memutils.h"
110 : #include "utils/pg_rusage.h"
111 : #include "utils/tuplesort.h"
112 :
113 : /*
114 : * Initial size of memtuples array. This must be more than
115 : * ALLOCSET_SEPARATE_THRESHOLD; see comments in grow_memtuples(). Clamp at
116 : * 1024 elements to avoid excessive reallocs.
117 : */
118 : #define INITIAL_MEMTUPSIZE Max(1024, \
119 : ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
120 :
121 : /* GUC variables */
122 : bool trace_sort = false;
123 :
124 : #ifdef DEBUG_BOUNDED_SORT
125 : bool optimize_bounded_sort = true;
126 : #endif
127 :
128 :
129 : /*
130 : * During merge, we use a pre-allocated set of fixed-size slots to hold
131 : * tuples. To avoid palloc/pfree overhead.
132 : *
133 : * Merge doesn't require a lot of memory, so we can afford to waste some,
134 : * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
135 : * palloc() overhead is not significant anymore.
136 : *
137 : * 'nextfree' is valid when this chunk is in the free list. When in use, the
138 : * slot holds a tuple.
139 : */
140 : #define SLAB_SLOT_SIZE 1024
141 :
142 : typedef union SlabSlot
143 : {
144 : union SlabSlot *nextfree;
145 : char buffer[SLAB_SLOT_SIZE];
146 : } SlabSlot;
147 :
148 : /*
149 : * Possible states of a Tuplesort object. These denote the states that
150 : * persist between calls of Tuplesort routines.
151 : */
152 : typedef enum
153 : {
154 : TSS_INITIAL, /* Loading tuples; still within memory limit */
155 : TSS_BOUNDED, /* Loading tuples into bounded-size heap */
156 : TSS_BUILDRUNS, /* Loading tuples; writing to tape */
157 : TSS_SORTEDINMEM, /* Sort completed entirely in memory */
158 : TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
159 : TSS_FINALMERGE, /* Performing final merge on-the-fly */
160 : } TupSortStatus;
161 :
162 : /*
163 : * Parameters for calculation of number of tapes to use --- see inittapes()
164 : * and tuplesort_merge_order().
165 : *
166 : * In this calculation we assume that each tape will cost us about 1 blocks
167 : * worth of buffer space. This ignores the overhead of all the other data
168 : * structures needed for each tape, but it's probably close enough.
169 : *
170 : * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
171 : * input tape, for pre-reading (see discussion at top of file). This is *in
172 : * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
173 : */
174 : #define MINORDER 6 /* minimum merge order */
175 : #define MAXORDER 500 /* maximum merge order */
176 : #define TAPE_BUFFER_OVERHEAD BLCKSZ
177 : #define MERGE_BUFFER_SIZE (BLCKSZ * 32)
178 :
179 :
180 : /*
181 : * Private state of a Tuplesort operation.
182 : */
183 : struct Tuplesortstate
184 : {
185 : TuplesortPublic base;
186 : TupSortStatus status; /* enumerated value as shown above */
187 : bool bounded; /* did caller specify a maximum number of
188 : * tuples to return? */
189 : bool boundUsed; /* true if we made use of a bounded heap */
190 : int bound; /* if bounded, the maximum number of tuples */
191 : int64 tupleMem; /* memory consumed by individual tuples.
192 : * storing this separately from what we track
193 : * in availMem allows us to subtract the
194 : * memory consumed by all tuples when dumping
195 : * tuples to tape */
196 : int64 availMem; /* remaining memory available, in bytes */
197 : int64 allowedMem; /* total memory allowed, in bytes */
198 : int maxTapes; /* max number of input tapes to merge in each
199 : * pass */
200 : int64 maxSpace; /* maximum amount of space occupied among sort
201 : * of groups, either in-memory or on-disk */
202 : bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk
203 : * space, false when its value for in-memory
204 : * space */
205 : TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
206 : LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
207 :
208 : /*
209 : * This array holds the tuples now in sort memory. If we are in state
210 : * INITIAL, the tuples are in no particular order; if we are in state
211 : * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
212 : * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
213 : * H. In state SORTEDONTAPE, the array is not used.
214 : */
215 : SortTuple *memtuples; /* array of SortTuple structs */
216 : int memtupcount; /* number of tuples currently present */
217 : int memtupsize; /* allocated length of memtuples array */
218 : bool growmemtuples; /* memtuples' growth still underway? */
219 :
220 : /*
221 : * Memory for tuples is sometimes allocated using a simple slab allocator,
222 : * rather than with palloc(). Currently, we switch to slab allocation
223 : * when we start merging. Merging only needs to keep a small, fixed
224 : * number of tuples in memory at any time, so we can avoid the
225 : * palloc/pfree overhead by recycling a fixed number of fixed-size slots
226 : * to hold the tuples.
227 : *
228 : * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
229 : * slots. The allocation is sized to have one slot per tape, plus one
230 : * additional slot. We need that many slots to hold all the tuples kept
231 : * in the heap during merge, plus the one we have last returned from the
232 : * sort, with tuplesort_gettuple.
233 : *
234 : * Initially, all the slots are kept in a linked list of free slots. When
235 : * a tuple is read from a tape, it is put to the next available slot, if
236 : * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
237 : * instead.
238 : *
239 : * When we're done processing a tuple, we return the slot back to the free
240 : * list, or pfree() if it was palloc'd. We know that a tuple was
241 : * allocated from the slab, if its pointer value is between
242 : * slabMemoryBegin and -End.
243 : *
244 : * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
245 : * tracking memory usage is not used.
246 : */
247 : bool slabAllocatorUsed;
248 :
249 : char *slabMemoryBegin; /* beginning of slab memory arena */
250 : char *slabMemoryEnd; /* end of slab memory arena */
251 : SlabSlot *slabFreeHead; /* head of free list */
252 :
253 : /* Memory used for input and output tape buffers. */
254 : size_t tape_buffer_mem;
255 :
256 : /*
257 : * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
258 : * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
259 : * modes), we remember the tuple in 'lastReturnedTuple', so that we can
260 : * recycle the memory on next gettuple call.
261 : */
262 : void *lastReturnedTuple;
263 :
264 : /*
265 : * While building initial runs, this is the current output run number.
266 : * Afterwards, it is the number of initial runs we made.
267 : */
268 : int currentRun;
269 :
270 : /*
271 : * Logical tapes, for merging.
272 : *
273 : * The initial runs are written in the output tapes. In each merge pass,
274 : * the output tapes of the previous pass become the input tapes, and new
275 : * output tapes are created as needed. When nInputTapes equals
276 : * nInputRuns, there is only one merge pass left.
277 : */
278 : LogicalTape **inputTapes;
279 : int nInputTapes;
280 : int nInputRuns;
281 :
282 : LogicalTape **outputTapes;
283 : int nOutputTapes;
284 : int nOutputRuns;
285 :
286 : LogicalTape *destTape; /* current output tape */
287 :
288 : /*
289 : * These variables are used after completion of sorting to keep track of
290 : * the next tuple to return. (In the tape case, the tape's current read
291 : * position is also critical state.)
292 : */
293 : LogicalTape *result_tape; /* actual tape of finished output */
294 : int current; /* array index (only used if SORTEDINMEM) */
295 : bool eof_reached; /* reached EOF (needed for cursors) */
296 :
297 : /* markpos_xxx holds marked position for mark and restore */
298 : int64 markpos_block; /* tape block# (only used if SORTEDONTAPE) */
299 : int markpos_offset; /* saved "current", or offset in tape block */
300 : bool markpos_eof; /* saved "eof_reached" */
301 :
302 : /*
303 : * These variables are used during parallel sorting.
304 : *
305 : * worker is our worker identifier. Follows the general convention that
306 : * -1 value relates to a leader tuplesort, and values >= 0 worker
307 : * tuplesorts. (-1 can also be a serial tuplesort.)
308 : *
309 : * shared is mutable shared memory state, which is used to coordinate
310 : * parallel sorts.
311 : *
312 : * nParticipants is the number of worker Tuplesortstates known by the
313 : * leader to have actually been launched, which implies that they must
314 : * finish a run that the leader needs to merge. Typically includes a
315 : * worker state held by the leader process itself. Set in the leader
316 : * Tuplesortstate only.
317 : */
318 : int worker;
319 : Sharedsort *shared;
320 : int nParticipants;
321 :
322 : /*
323 : * Additional state for managing "abbreviated key" sortsupport routines
324 : * (which currently may be used by all cases except the hash index case).
325 : * Tracks the intervals at which the optimization's effectiveness is
326 : * tested.
327 : */
328 : int64 abbrevNext; /* Tuple # at which to next check
329 : * applicability */
330 :
331 : /*
332 : * Resource snapshot for time of sort start.
333 : */
334 : PGRUsage ru_start;
335 : };
336 :
337 : /*
338 : * Private mutable state of tuplesort-parallel-operation. This is allocated
339 : * in shared memory.
340 : */
341 : struct Sharedsort
342 : {
343 : /* mutex protects all fields prior to tapes */
344 : slock_t mutex;
345 :
346 : /*
347 : * currentWorker generates ordinal identifier numbers for parallel sort
348 : * workers. These start from 0, and are always gapless.
349 : *
350 : * Workers increment workersFinished to indicate having finished. If this
351 : * is equal to state.nParticipants within the leader, leader is ready to
352 : * merge worker runs.
353 : */
354 : int currentWorker;
355 : int workersFinished;
356 :
357 : /* Temporary file space */
358 : SharedFileSet fileset;
359 :
360 : /* Size of tapes flexible array */
361 : int nTapes;
362 :
363 : /*
364 : * Tapes array used by workers to report back information needed by the
365 : * leader to concatenate all worker tapes into one for merging
366 : */
367 : TapeShare tapes[FLEXIBLE_ARRAY_MEMBER];
368 : };
369 :
370 : /*
371 : * Is the given tuple allocated from the slab memory arena?
372 : */
373 : #define IS_SLAB_SLOT(state, tuple) \
374 : ((char *) (tuple) >= (state)->slabMemoryBegin && \
375 : (char *) (tuple) < (state)->slabMemoryEnd)
376 :
377 : /*
378 : * Return the given tuple to the slab memory free list, or free it
379 : * if it was palloc'd.
380 : */
381 : #define RELEASE_SLAB_SLOT(state, tuple) \
382 : do { \
383 : SlabSlot *buf = (SlabSlot *) tuple; \
384 : \
385 : if (IS_SLAB_SLOT((state), buf)) \
386 : { \
387 : buf->nextfree = (state)->slabFreeHead; \
388 : (state)->slabFreeHead = buf; \
389 : } else \
390 : pfree(buf); \
391 : } while(0)
392 :
393 : #define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
394 : #define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
395 : #define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
396 : #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
397 : #define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
398 : #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
399 : #define USEMEM(state,amt) ((state)->availMem -= (amt))
400 : #define FREEMEM(state,amt) ((state)->availMem += (amt))
401 : #define SERIAL(state) ((state)->shared == NULL)
402 : #define WORKER(state) ((state)->shared && (state)->worker != -1)
403 : #define LEADER(state) ((state)->shared && (state)->worker == -1)
404 :
405 : /*
406 : * NOTES about on-tape representation of tuples:
407 : *
408 : * We require the first "unsigned int" of a stored tuple to be the total size
409 : * on-tape of the tuple, including itself (so it is never zero; an all-zero
410 : * unsigned int is used to delimit runs). The remainder of the stored tuple
411 : * may or may not match the in-memory representation of the tuple ---
412 : * any conversion needed is the job of the writetup and readtup routines.
413 : *
414 : * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
415 : * representation of the tuple must be followed by another "unsigned int" that
416 : * is a copy of the length --- so the total tape space used is actually
417 : * sizeof(unsigned int) more than the stored length value. This allows
418 : * read-backwards. When the random access flag was not specified, the
419 : * write/read routines may omit the extra length word.
420 : *
421 : * writetup is expected to write both length words as well as the tuple
422 : * data. When readtup is called, the tape is positioned just after the
423 : * front length word; readtup must read the tuple data and advance past
424 : * the back length word (if present).
425 : *
426 : * The write/read routines can make use of the tuple description data
427 : * stored in the Tuplesortstate record, if needed. They are also expected
428 : * to adjust state->availMem by the amount of memory space (not tape space!)
429 : * released or consumed. There is no error return from either writetup
430 : * or readtup; they should ereport() on failure.
431 : *
432 : *
433 : * NOTES about memory consumption calculations:
434 : *
435 : * We count space allocated for tuples against the workMem limit, plus
436 : * the space used by the variable-size memtuples array. Fixed-size space
437 : * is not counted; it's small enough to not be interesting.
438 : *
439 : * Note that we count actual space used (as shown by GetMemoryChunkSpace)
440 : * rather than the originally-requested size. This is important since
441 : * palloc can add substantial overhead. It's not a complete answer since
442 : * we won't count any wasted space in palloc allocation blocks, but it's
443 : * a lot better than what we were doing before 7.3. As of 9.6, a
444 : * separate memory context is used for caller passed tuples. Resetting
445 : * it at certain key increments significantly ameliorates fragmentation.
446 : * readtup routines use the slab allocator (they cannot use
447 : * the reset context because it gets deleted at the point that merging
448 : * begins).
449 : */
450 :
451 :
452 : static void tuplesort_begin_batch(Tuplesortstate *state);
453 : static bool consider_abort_common(Tuplesortstate *state);
454 : static void inittapes(Tuplesortstate *state, bool mergeruns);
455 : static void inittapestate(Tuplesortstate *state, int maxTapes);
456 : static void selectnewtape(Tuplesortstate *state);
457 : static void init_slab_allocator(Tuplesortstate *state, int numSlots);
458 : static void mergeruns(Tuplesortstate *state);
459 : static void mergeonerun(Tuplesortstate *state);
460 : static void beginmerge(Tuplesortstate *state);
461 : static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
462 : static void dumptuples(Tuplesortstate *state, bool alltuples);
463 : static void make_bounded_heap(Tuplesortstate *state);
464 : static void sort_bounded_heap(Tuplesortstate *state);
465 : static void tuplesort_sort_memtuples(Tuplesortstate *state);
466 : static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
467 : static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
468 : static void tuplesort_heap_delete_top(Tuplesortstate *state);
469 : static void reversedirection(Tuplesortstate *state);
470 : static unsigned int getlen(LogicalTape *tape, bool eofOK);
471 : static void markrunend(LogicalTape *tape);
472 : static int worker_get_identifier(Tuplesortstate *state);
473 : static void worker_freeze_result_tape(Tuplesortstate *state);
474 : static void worker_nomergeruns(Tuplesortstate *state);
475 : static void leader_takeover_tapes(Tuplesortstate *state);
476 : static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
477 : static void tuplesort_free(Tuplesortstate *state);
478 : static void tuplesort_updatemax(Tuplesortstate *state);
479 :
480 : /*
481 : * Specialized comparators that we can inline into specialized sorts. The goal
482 : * is to try to sort two tuples without having to follow the pointers to the
483 : * comparator or the tuple.
484 : *
485 : * XXX: For now, there is no specialization for cases where datum1 is
486 : * authoritative and we don't even need to fall back to a callback at all (that
487 : * would be true for types like int4/int8/timestamp/date, but not true for
488 : * abbreviations of text or multi-key sorts. There could be! Is it worth it?
489 : */
490 :
491 : /* Used if first key's comparator is ssup_datum_unsigned_cmp */
492 : static pg_attribute_always_inline int
493 46531200 : qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
494 : {
495 : int compare;
496 :
497 46531200 : compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
498 46531200 : b->datum1, b->isnull1,
499 : &state->base.sortKeys[0]);
500 46531200 : if (compare != 0)
501 41966136 : return compare;
502 :
503 : /*
504 : * No need to waste effort calling the tiebreak function when there are no
505 : * other keys to sort on.
506 : */
507 4565064 : if (state->base.onlyKey != NULL)
508 0 : return 0;
509 :
510 4565064 : return state->base.comparetup_tiebreak(a, b, state);
511 : }
512 :
513 : /* Used if first key's comparator is ssup_datum_signed_cmp */
514 : static pg_attribute_always_inline int
515 9309352 : qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
516 : {
517 : int compare;
518 :
519 9309352 : compare = ApplySignedSortComparator(a->datum1, a->isnull1,
520 9309352 : b->datum1, b->isnull1,
521 : &state->base.sortKeys[0]);
522 :
523 9309352 : if (compare != 0)
524 9236350 : return compare;
525 :
526 : /*
527 : * No need to waste effort calling the tiebreak function when there are no
528 : * other keys to sort on.
529 : */
530 73002 : if (state->base.onlyKey != NULL)
531 59078 : return 0;
532 :
533 13924 : return state->base.comparetup_tiebreak(a, b, state);
534 : }
535 :
536 : /* Used if first key's comparator is ssup_datum_int32_cmp */
537 : static pg_attribute_always_inline int
538 55928016 : qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
539 : {
540 : int compare;
541 :
542 55928016 : compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
543 55928016 : b->datum1, b->isnull1,
544 : &state->base.sortKeys[0]);
545 :
546 55928016 : if (compare != 0)
547 39909578 : return compare;
548 :
549 : /*
550 : * No need to waste effort calling the tiebreak function when there are no
551 : * other keys to sort on.
552 : */
553 16018438 : if (state->base.onlyKey != NULL)
554 2275436 : return 0;
555 :
556 13743002 : return state->base.comparetup_tiebreak(a, b, state);
557 : }
558 :
559 : /*
560 : * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
561 : * any variant of SortTuples, using the appropriate comparetup function.
562 : * qsort_ssup() is specialized for the case where the comparetup function
563 : * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
564 : * and Datum sorts. qsort_tuple_{unsigned,signed,int32} are specialized for
565 : * common comparison functions on pass-by-value leading datums.
566 : */
567 :
568 : #define ST_SORT qsort_tuple_unsigned
569 : #define ST_ELEMENT_TYPE SortTuple
570 : #define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
571 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
572 : #define ST_CHECK_FOR_INTERRUPTS
573 : #define ST_SCOPE static
574 : #define ST_DEFINE
575 : #include "lib/sort_template.h"
576 :
577 : #define ST_SORT qsort_tuple_signed
578 : #define ST_ELEMENT_TYPE SortTuple
579 : #define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
580 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
581 : #define ST_CHECK_FOR_INTERRUPTS
582 : #define ST_SCOPE static
583 : #define ST_DEFINE
584 : #include "lib/sort_template.h"
585 :
586 : #define ST_SORT qsort_tuple_int32
587 : #define ST_ELEMENT_TYPE SortTuple
588 : #define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
589 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
590 : #define ST_CHECK_FOR_INTERRUPTS
591 : #define ST_SCOPE static
592 : #define ST_DEFINE
593 : #include "lib/sort_template.h"
594 :
595 : #define ST_SORT qsort_tuple
596 : #define ST_ELEMENT_TYPE SortTuple
597 : #define ST_COMPARE_RUNTIME_POINTER
598 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
599 : #define ST_CHECK_FOR_INTERRUPTS
600 : #define ST_SCOPE static
601 : #define ST_DECLARE
602 : #define ST_DEFINE
603 : #include "lib/sort_template.h"
604 :
605 : #define ST_SORT qsort_ssup
606 : #define ST_ELEMENT_TYPE SortTuple
607 : #define ST_COMPARE(a, b, ssup) \
608 : ApplySortComparator((a)->datum1, (a)->isnull1, \
609 : (b)->datum1, (b)->isnull1, (ssup))
610 : #define ST_COMPARE_ARG_TYPE SortSupportData
611 : #define ST_CHECK_FOR_INTERRUPTS
612 : #define ST_SCOPE static
613 : #define ST_DEFINE
614 : #include "lib/sort_template.h"
615 :
616 : /*
617 : * tuplesort_begin_xxx
618 : *
619 : * Initialize for a tuple sort operation.
620 : *
621 : * After calling tuplesort_begin, the caller should call tuplesort_putXXX
622 : * zero or more times, then call tuplesort_performsort when all the tuples
623 : * have been supplied. After performsort, retrieve the tuples in sorted
624 : * order by calling tuplesort_getXXX until it returns false/NULL. (If random
625 : * access was requested, rescan, markpos, and restorepos can also be called.)
626 : * Call tuplesort_end to terminate the operation and release memory/disk space.
627 : *
628 : * Each variant of tuplesort_begin has a workMem parameter specifying the
629 : * maximum number of kilobytes of RAM to use before spilling data to disk.
630 : * (The normal value of this parameter is work_mem, but some callers use
631 : * other values.) Each variant also has a sortopt which is a bitmask of
632 : * sort options. See TUPLESORT_* definitions in tuplesort.h
633 : */
634 :
635 : Tuplesortstate *
636 275056 : tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
637 : {
638 : Tuplesortstate *state;
639 : MemoryContext maincontext;
640 : MemoryContext sortcontext;
641 : MemoryContext oldcontext;
642 :
643 : /* See leader_takeover_tapes() remarks on random access support */
644 275056 : if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
645 0 : elog(ERROR, "random access disallowed under parallel sort");
646 :
647 : /*
648 : * Memory context surviving tuplesort_reset. This memory context holds
649 : * data which is useful to keep while sorting multiple similar batches.
650 : */
651 275056 : maincontext = AllocSetContextCreate(CurrentMemoryContext,
652 : "TupleSort main",
653 : ALLOCSET_DEFAULT_SIZES);
654 :
655 : /*
656 : * Create a working memory context for one sort operation. The content of
657 : * this context is deleted by tuplesort_reset.
658 : */
659 275056 : sortcontext = AllocSetContextCreate(maincontext,
660 : "TupleSort sort",
661 : ALLOCSET_DEFAULT_SIZES);
662 :
663 : /*
664 : * Additionally a working memory context for tuples is setup in
665 : * tuplesort_begin_batch.
666 : */
667 :
668 : /*
669 : * Make the Tuplesortstate within the per-sortstate context. This way, we
670 : * don't need a separate pfree() operation for it at shutdown.
671 : */
672 275056 : oldcontext = MemoryContextSwitchTo(maincontext);
673 :
674 275056 : state = palloc0_object(Tuplesortstate);
675 :
676 275056 : if (trace_sort)
677 0 : pg_rusage_init(&state->ru_start);
678 :
679 275056 : state->base.sortopt = sortopt;
680 275056 : state->base.tuples = true;
681 275056 : state->abbrevNext = 10;
682 :
683 : /*
684 : * workMem is forced to be at least 64KB, the current minimum valid value
685 : * for the work_mem GUC. This is a defense against parallel sort callers
686 : * that divide out memory among many workers in a way that leaves each
687 : * with very little memory.
688 : */
689 275056 : state->allowedMem = Max(workMem, 64) * (int64) 1024;
690 275056 : state->base.sortcontext = sortcontext;
691 275056 : state->base.maincontext = maincontext;
692 :
693 275056 : state->memtupsize = INITIAL_MEMTUPSIZE;
694 275056 : state->memtuples = NULL;
695 :
696 : /*
697 : * After all of the other non-parallel-related state, we setup all of the
698 : * state needed for each batch.
699 : */
700 275056 : tuplesort_begin_batch(state);
701 :
702 : /*
703 : * Initialize parallel-related state based on coordination information
704 : * from caller
705 : */
706 275056 : if (!coordinate)
707 : {
708 : /* Serial sort */
709 274356 : state->shared = NULL;
710 274356 : state->worker = -1;
711 274356 : state->nParticipants = -1;
712 : }
713 700 : else if (coordinate->isWorker)
714 : {
715 : /* Parallel worker produces exactly one final run from all input */
716 468 : state->shared = coordinate->sharedsort;
717 468 : state->worker = worker_get_identifier(state);
718 468 : state->nParticipants = -1;
719 : }
720 : else
721 : {
722 : /* Parallel leader state only used for final merge */
723 232 : state->shared = coordinate->sharedsort;
724 232 : state->worker = -1;
725 232 : state->nParticipants = coordinate->nParticipants;
726 : Assert(state->nParticipants >= 1);
727 : }
728 :
729 275056 : MemoryContextSwitchTo(oldcontext);
730 :
731 275056 : return state;
732 : }
733 :
734 : /*
735 : * tuplesort_begin_batch
736 : *
737 : * Setup, or reset, all state need for processing a new set of tuples with this
738 : * sort state. Called both from tuplesort_begin_common (the first time sorting
739 : * with this sort state) and tuplesort_reset (for subsequent usages).
740 : */
741 : static void
742 278386 : tuplesort_begin_batch(Tuplesortstate *state)
743 : {
744 : MemoryContext oldcontext;
745 :
746 278386 : oldcontext = MemoryContextSwitchTo(state->base.maincontext);
747 :
748 : /*
749 : * Caller tuple (e.g. IndexTuple) memory context.
750 : *
751 : * A dedicated child context used exclusively for caller passed tuples
752 : * eases memory management. Resetting at key points reduces
753 : * fragmentation. Note that the memtuples array of SortTuples is allocated
754 : * in the parent context, not this context, because there is no need to
755 : * free memtuples early. For bounded sorts, tuples may be pfreed in any
756 : * order, so we use a regular aset.c context so that it can make use of
757 : * free'd memory. When the sort is not bounded, we make use of a bump.c
758 : * context as this keeps allocations more compact with less wastage.
759 : * Allocations are also slightly more CPU efficient.
760 : */
761 278386 : if (TupleSortUseBumpTupleCxt(state->base.sortopt))
762 277020 : state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
763 : "Caller tuples",
764 : ALLOCSET_DEFAULT_SIZES);
765 : else
766 1366 : state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
767 : "Caller tuples",
768 : ALLOCSET_DEFAULT_SIZES);
769 :
770 :
771 278386 : state->status = TSS_INITIAL;
772 278386 : state->bounded = false;
773 278386 : state->boundUsed = false;
774 :
775 278386 : state->availMem = state->allowedMem;
776 :
777 278386 : state->tapeset = NULL;
778 :
779 278386 : state->memtupcount = 0;
780 :
781 278386 : state->growmemtuples = true;
782 278386 : state->slabAllocatorUsed = false;
783 278386 : if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
784 : {
785 72 : pfree(state->memtuples);
786 72 : state->memtuples = NULL;
787 72 : state->memtupsize = INITIAL_MEMTUPSIZE;
788 : }
789 278386 : if (state->memtuples == NULL)
790 : {
791 275128 : state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
792 275128 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
793 : }
794 :
795 : /* workMem must be large enough for the minimal memtuples array */
796 278386 : if (LACKMEM(state))
797 0 : elog(ERROR, "insufficient memory allowed for sort");
798 :
799 278386 : state->currentRun = 0;
800 :
801 : /*
802 : * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
803 : * inittapes(), if needed.
804 : */
805 :
806 278386 : state->result_tape = NULL; /* flag that result tape has not been formed */
807 :
808 278386 : MemoryContextSwitchTo(oldcontext);
809 278386 : }
810 :
811 : /*
812 : * tuplesort_set_bound
813 : *
814 : * Advise tuplesort that at most the first N result tuples are required.
815 : *
816 : * Must be called before inserting any tuples. (Actually, we could allow it
817 : * as long as the sort hasn't spilled to disk, but there seems no need for
818 : * delayed calls at the moment.)
819 : *
820 : * This is a hint only. The tuplesort may still return more tuples than
821 : * requested. Parallel leader tuplesorts will always ignore the hint.
822 : */
823 : void
824 1232 : tuplesort_set_bound(Tuplesortstate *state, int64 bound)
825 : {
826 : /* Assert we're called before loading any tuples */
827 : Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
828 : /* Assert we allow bounded sorts */
829 : Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
830 : /* Can't set the bound twice, either */
831 : Assert(!state->bounded);
832 : /* Also, this shouldn't be called in a parallel worker */
833 : Assert(!WORKER(state));
834 :
835 : /* Parallel leader allows but ignores hint */
836 1232 : if (LEADER(state))
837 0 : return;
838 :
839 : #ifdef DEBUG_BOUNDED_SORT
840 : /* Honor GUC setting that disables the feature (for easy testing) */
841 : if (!optimize_bounded_sort)
842 : return;
843 : #endif
844 :
845 : /* We want to be able to compute bound * 2, so limit the setting */
846 1232 : if (bound > (int64) (INT_MAX / 2))
847 0 : return;
848 :
849 1232 : state->bounded = true;
850 1232 : state->bound = (int) bound;
851 :
852 : /*
853 : * Bounded sorts are not an effective target for abbreviated key
854 : * optimization. Disable by setting state to be consistent with no
855 : * abbreviation support.
856 : */
857 1232 : state->base.sortKeys->abbrev_converter = NULL;
858 1232 : if (state->base.sortKeys->abbrev_full_comparator)
859 16 : state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
860 :
861 : /* Not strictly necessary, but be tidy */
862 1232 : state->base.sortKeys->abbrev_abort = NULL;
863 1232 : state->base.sortKeys->abbrev_full_comparator = NULL;
864 : }
865 :
866 : /*
867 : * tuplesort_used_bound
868 : *
869 : * Allow callers to find out if the sort state was able to use a bound.
870 : */
871 : bool
872 380 : tuplesort_used_bound(Tuplesortstate *state)
873 : {
874 380 : return state->boundUsed;
875 : }
876 :
877 : /*
878 : * tuplesort_free
879 : *
880 : * Internal routine for freeing resources of tuplesort.
881 : */
882 : static void
883 278124 : tuplesort_free(Tuplesortstate *state)
884 : {
885 : /* context swap probably not needed, but let's be safe */
886 278124 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
887 : int64 spaceUsed;
888 :
889 278124 : if (state->tapeset)
890 770 : spaceUsed = LogicalTapeSetBlocks(state->tapeset);
891 : else
892 277354 : spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
893 :
894 : /*
895 : * Delete temporary "tape" files, if any.
896 : *
897 : * We don't bother to destroy the individual tapes here. They will go away
898 : * with the sortcontext. (In TSS_FINALMERGE state, we have closed
899 : * finished tapes already.)
900 : */
901 278124 : if (state->tapeset)
902 770 : LogicalTapeSetClose(state->tapeset);
903 :
904 278124 : if (trace_sort)
905 : {
906 0 : if (state->tapeset)
907 0 : elog(LOG, "%s of worker %d ended, %" PRId64 " disk blocks used: %s",
908 : SERIAL(state) ? "external sort" : "parallel external sort",
909 : state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
910 : else
911 0 : elog(LOG, "%s of worker %d ended, %" PRId64 " KB used: %s",
912 : SERIAL(state) ? "internal sort" : "unperformed parallel sort",
913 : state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
914 : }
915 :
916 : TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
917 :
918 278124 : FREESTATE(state);
919 278124 : MemoryContextSwitchTo(oldcontext);
920 :
921 : /*
922 : * Free the per-sort memory context, thereby releasing all working memory.
923 : */
924 278124 : MemoryContextReset(state->base.sortcontext);
925 278124 : }
926 :
927 : /*
928 : * tuplesort_end
929 : *
930 : * Release resources and clean up.
931 : *
932 : * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
933 : * pointing to garbage. Be careful not to attempt to use or free such
934 : * pointers afterwards!
935 : */
936 : void
937 274794 : tuplesort_end(Tuplesortstate *state)
938 : {
939 274794 : tuplesort_free(state);
940 :
941 : /*
942 : * Free the main memory context, including the Tuplesortstate struct
943 : * itself.
944 : */
945 274794 : MemoryContextDelete(state->base.maincontext);
946 274794 : }
947 :
948 : /*
949 : * tuplesort_updatemax
950 : *
951 : * Update maximum resource usage statistics.
952 : */
953 : static void
954 3726 : tuplesort_updatemax(Tuplesortstate *state)
955 : {
956 : int64 spaceUsed;
957 : bool isSpaceDisk;
958 :
959 : /*
960 : * Note: it might seem we should provide both memory and disk usage for a
961 : * disk-based sort. However, the current code doesn't track memory space
962 : * accurately once we have begun to return tuples to the caller (since we
963 : * don't account for pfree's the caller is expected to do), so we cannot
964 : * rely on availMem in a disk sort. This does not seem worth the overhead
965 : * to fix. Is it worth creating an API for the memory context code to
966 : * tell us how much is actually used in sortcontext?
967 : */
968 3726 : if (state->tapeset)
969 : {
970 6 : isSpaceDisk = true;
971 6 : spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
972 : }
973 : else
974 : {
975 3720 : isSpaceDisk = false;
976 3720 : spaceUsed = state->allowedMem - state->availMem;
977 : }
978 :
979 : /*
980 : * Sort evicts data to the disk when it wasn't able to fit that data into
981 : * main memory. This is why we assume space used on the disk to be more
982 : * important for tracking resource usage than space used in memory. Note
983 : * that the amount of space occupied by some tupleset on the disk might be
984 : * less than amount of space occupied by the same tupleset in memory due
985 : * to more compact representation.
986 : */
987 3726 : if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
988 3720 : (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
989 : {
990 506 : state->maxSpace = spaceUsed;
991 506 : state->isMaxSpaceDisk = isSpaceDisk;
992 506 : state->maxSpaceStatus = state->status;
993 : }
994 3726 : }
995 :
996 : /*
997 : * tuplesort_reset
998 : *
999 : * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
1000 : * meta-information in. After tuplesort_reset, tuplesort is ready to start
1001 : * a new sort. This allows avoiding recreation of tuple sort states (and
1002 : * save resources) when sorting multiple small batches.
1003 : */
1004 : void
1005 3330 : tuplesort_reset(Tuplesortstate *state)
1006 : {
1007 3330 : tuplesort_updatemax(state);
1008 3330 : tuplesort_free(state);
1009 :
1010 : /*
1011 : * After we've freed up per-batch memory, re-setup all of the state common
1012 : * to both the first batch and any subsequent batch.
1013 : */
1014 3330 : tuplesort_begin_batch(state);
1015 :
1016 3330 : state->lastReturnedTuple = NULL;
1017 3330 : state->slabMemoryBegin = NULL;
1018 3330 : state->slabMemoryEnd = NULL;
1019 3330 : state->slabFreeHead = NULL;
1020 3330 : }
1021 :
1022 : /*
1023 : * Grow the memtuples[] array, if possible within our memory constraint. We
1024 : * must not exceed INT_MAX tuples in memory or the caller-provided memory
1025 : * limit. Return true if we were able to enlarge the array, false if not.
1026 : *
1027 : * Normally, at each increment we double the size of the array. When doing
1028 : * that would exceed a limit, we attempt one last, smaller increase (and then
1029 : * clear the growmemtuples flag so we don't try any more). That allows us to
1030 : * use memory as fully as permitted; sticking to the pure doubling rule could
1031 : * result in almost half going unused. Because availMem moves around with
1032 : * tuple addition/removal, we need some rule to prevent making repeated small
1033 : * increases in memtupsize, which would just be useless thrashing. The
1034 : * growmemtuples flag accomplishes that and also prevents useless
1035 : * recalculations in this function.
1036 : */
1037 : static bool
1038 8470 : grow_memtuples(Tuplesortstate *state)
1039 : {
1040 : int newmemtupsize;
1041 8470 : int memtupsize = state->memtupsize;
1042 8470 : int64 memNowUsed = state->allowedMem - state->availMem;
1043 :
1044 : /* Forget it if we've already maxed out memtuples, per comment above */
1045 8470 : if (!state->growmemtuples)
1046 140 : return false;
1047 :
1048 : /* Select new value of memtupsize */
1049 8330 : if (memNowUsed <= state->availMem)
1050 : {
1051 : /*
1052 : * We've used no more than half of allowedMem; double our usage,
1053 : * clamping at INT_MAX tuples.
1054 : */
1055 8184 : if (memtupsize < INT_MAX / 2)
1056 8184 : newmemtupsize = memtupsize * 2;
1057 : else
1058 : {
1059 0 : newmemtupsize = INT_MAX;
1060 0 : state->growmemtuples = false;
1061 : }
1062 : }
1063 : else
1064 : {
1065 : /*
1066 : * This will be the last increment of memtupsize. Abandon doubling
1067 : * strategy and instead increase as much as we safely can.
1068 : *
1069 : * To stay within allowedMem, we can't increase memtupsize by more
1070 : * than availMem / sizeof(SortTuple) elements. In practice, we want
1071 : * to increase it by considerably less, because we need to leave some
1072 : * space for the tuples to which the new array slots will refer. We
1073 : * assume the new tuples will be about the same size as the tuples
1074 : * we've already seen, and thus we can extrapolate from the space
1075 : * consumption so far to estimate an appropriate new size for the
1076 : * memtuples array. The optimal value might be higher or lower than
1077 : * this estimate, but it's hard to know that in advance. We again
1078 : * clamp at INT_MAX tuples.
1079 : *
1080 : * This calculation is safe against enlarging the array so much that
1081 : * LACKMEM becomes true, because the memory currently used includes
1082 : * the present array; thus, there would be enough allowedMem for the
1083 : * new array elements even if no other memory were currently used.
1084 : *
1085 : * We do the arithmetic in float8, because otherwise the product of
1086 : * memtupsize and allowedMem could overflow. Any inaccuracy in the
1087 : * result should be insignificant; but even if we computed a
1088 : * completely insane result, the checks below will prevent anything
1089 : * really bad from happening.
1090 : */
1091 : double grow_ratio;
1092 :
1093 146 : grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1094 146 : if (memtupsize * grow_ratio < INT_MAX)
1095 146 : newmemtupsize = (int) (memtupsize * grow_ratio);
1096 : else
1097 0 : newmemtupsize = INT_MAX;
1098 :
1099 : /* We won't make any further enlargement attempts */
1100 146 : state->growmemtuples = false;
1101 : }
1102 :
1103 : /* Must enlarge array by at least one element, else report failure */
1104 8330 : if (newmemtupsize <= memtupsize)
1105 0 : goto noalloc;
1106 :
1107 : /*
1108 : * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1109 : * to ensure our request won't be rejected. Note that we can easily
1110 : * exhaust address space before facing this outcome. (This is presently
1111 : * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1112 : * don't rely on that at this distance.)
1113 : */
1114 8330 : if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1115 : {
1116 0 : newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1117 0 : state->growmemtuples = false; /* can't grow any more */
1118 : }
1119 :
1120 : /*
1121 : * We need to be sure that we do not cause LACKMEM to become true, else
1122 : * the space management algorithm will go nuts. The code above should
1123 : * never generate a dangerous request, but to be safe, check explicitly
1124 : * that the array growth fits within availMem. (We could still cause
1125 : * LACKMEM if the memory chunk overhead associated with the memtuples
1126 : * array were to increase. That shouldn't happen because we chose the
1127 : * initial array size large enough to ensure that palloc will be treating
1128 : * both old and new arrays as separate chunks. But we'll check LACKMEM
1129 : * explicitly below just in case.)
1130 : */
1131 8330 : if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1132 0 : goto noalloc;
1133 :
1134 : /* OK, do it */
1135 8330 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1136 8330 : state->memtupsize = newmemtupsize;
1137 8330 : state->memtuples = (SortTuple *)
1138 8330 : repalloc_huge(state->memtuples,
1139 8330 : state->memtupsize * sizeof(SortTuple));
1140 8330 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1141 8330 : if (LACKMEM(state))
1142 0 : elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1143 8330 : return true;
1144 :
1145 0 : noalloc:
1146 : /* If for any reason we didn't realloc, shut off future attempts */
1147 0 : state->growmemtuples = false;
1148 0 : return false;
1149 : }
1150 :
1151 : /*
1152 : * Shared code for tuple and datum cases.
1153 : */
1154 : void
1155 31111270 : tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple,
1156 : bool useAbbrev, Size tuplen)
1157 : {
1158 31111270 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1159 :
1160 : Assert(!LEADER(state));
1161 :
1162 : /* account for the memory used for this tuple */
1163 31111270 : USEMEM(state, tuplen);
1164 31111270 : state->tupleMem += tuplen;
1165 :
1166 31111270 : if (!useAbbrev)
1167 : {
1168 : /*
1169 : * Leave ordinary Datum representation, or NULL value. If there is a
1170 : * converter it won't expect NULL values, and cost model is not
1171 : * required to account for NULL, so in that case we avoid calling
1172 : * converter and just set datum1 to zeroed representation (to be
1173 : * consistent, and to support cheap inequality tests for NULL
1174 : * abbreviated keys).
1175 : */
1176 : }
1177 4441506 : else if (!consider_abort_common(state))
1178 : {
1179 : /* Store abbreviated key representation */
1180 4441410 : tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
1181 : state->base.sortKeys);
1182 : }
1183 : else
1184 : {
1185 : /*
1186 : * Set state to be consistent with never trying abbreviation.
1187 : *
1188 : * Alter datum1 representation in already-copied tuples, so as to
1189 : * ensure a consistent representation (current tuple was just
1190 : * handled). It does not matter if some dumped tuples are already
1191 : * sorted on tape, since serialized tuples lack abbreviated keys
1192 : * (TSS_BUILDRUNS state prevents control reaching here in any case).
1193 : */
1194 96 : REMOVEABBREV(state, state->memtuples, state->memtupcount);
1195 : }
1196 :
1197 31111270 : switch (state->status)
1198 : {
1199 26248778 : case TSS_INITIAL:
1200 :
1201 : /*
1202 : * Save the tuple into the unsorted array. First, grow the array
1203 : * as needed. Note that we try to grow the array when there is
1204 : * still one free slot remaining --- if we fail, there'll still be
1205 : * room to store the incoming tuple, and then we'll switch to
1206 : * tape-based operation.
1207 : */
1208 26248778 : if (state->memtupcount >= state->memtupsize - 1)
1209 : {
1210 8470 : (void) grow_memtuples(state);
1211 : Assert(state->memtupcount < state->memtupsize);
1212 : }
1213 26248778 : state->memtuples[state->memtupcount++] = *tuple;
1214 :
1215 : /*
1216 : * Check if it's time to switch over to a bounded heapsort. We do
1217 : * so if the input tuple count exceeds twice the desired tuple
1218 : * count (this is a heuristic for where heapsort becomes cheaper
1219 : * than a quicksort), or if we've just filled workMem and have
1220 : * enough tuples to meet the bound.
1221 : *
1222 : * Note that once we enter TSS_BOUNDED state we will always try to
1223 : * complete the sort that way. In the worst case, if later input
1224 : * tuples are larger than earlier ones, this might cause us to
1225 : * exceed workMem significantly.
1226 : */
1227 26248778 : if (state->bounded &&
1228 47316 : (state->memtupcount > state->bound * 2 ||
1229 46908 : (state->memtupcount > state->bound && LACKMEM(state))))
1230 : {
1231 408 : if (trace_sort)
1232 0 : elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1233 : state->memtupcount,
1234 : pg_rusage_show(&state->ru_start));
1235 408 : make_bounded_heap(state);
1236 408 : MemoryContextSwitchTo(oldcontext);
1237 408 : return;
1238 : }
1239 :
1240 : /*
1241 : * Done if we still fit in available memory and have array slots.
1242 : */
1243 26248370 : if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1244 : {
1245 26248230 : MemoryContextSwitchTo(oldcontext);
1246 26248230 : return;
1247 : }
1248 :
1249 : /*
1250 : * Nope; time to switch to tape-based operation.
1251 : */
1252 140 : inittapes(state, true);
1253 :
1254 : /*
1255 : * Dump all tuples.
1256 : */
1257 140 : dumptuples(state, false);
1258 140 : break;
1259 :
1260 3750206 : case TSS_BOUNDED:
1261 :
1262 : /*
1263 : * We don't want to grow the array here, so check whether the new
1264 : * tuple can be discarded before putting it in. This should be a
1265 : * good speed optimization, too, since when there are many more
1266 : * input tuples than the bound, most input tuples can be discarded
1267 : * with just this one comparison. Note that because we currently
1268 : * have the sort direction reversed, we must check for <= not >=.
1269 : */
1270 3750206 : if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1271 : {
1272 : /* new tuple <= top of the heap, so we can discard it */
1273 3247240 : free_sort_tuple(state, tuple);
1274 3247240 : CHECK_FOR_INTERRUPTS();
1275 : }
1276 : else
1277 : {
1278 : /* discard top of heap, replacing it with the new tuple */
1279 502966 : free_sort_tuple(state, &state->memtuples[0]);
1280 502966 : tuplesort_heap_replace_top(state, tuple);
1281 : }
1282 3750206 : break;
1283 :
1284 1112286 : case TSS_BUILDRUNS:
1285 :
1286 : /*
1287 : * Save the tuple into the unsorted array (there must be space)
1288 : */
1289 1112286 : state->memtuples[state->memtupcount++] = *tuple;
1290 :
1291 : /*
1292 : * If we are over the memory limit, dump all tuples.
1293 : */
1294 1112286 : dumptuples(state, false);
1295 1112286 : break;
1296 :
1297 0 : default:
1298 0 : elog(ERROR, "invalid tuplesort state");
1299 : break;
1300 : }
1301 4862632 : MemoryContextSwitchTo(oldcontext);
1302 : }
1303 :
1304 : static bool
1305 4441506 : consider_abort_common(Tuplesortstate *state)
1306 : {
1307 : Assert(state->base.sortKeys[0].abbrev_converter != NULL);
1308 : Assert(state->base.sortKeys[0].abbrev_abort != NULL);
1309 : Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
1310 :
1311 : /*
1312 : * Check effectiveness of abbreviation optimization. Consider aborting
1313 : * when still within memory limit.
1314 : */
1315 4441506 : if (state->status == TSS_INITIAL &&
1316 3969076 : state->memtupcount >= state->abbrevNext)
1317 : {
1318 5062 : state->abbrevNext *= 2;
1319 :
1320 : /*
1321 : * Check opclass-supplied abbreviation abort routine. It may indicate
1322 : * that abbreviation should not proceed.
1323 : */
1324 5062 : if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
1325 : state->base.sortKeys))
1326 4966 : return false;
1327 :
1328 : /*
1329 : * Finally, restore authoritative comparator, and indicate that
1330 : * abbreviation is not in play by setting abbrev_converter to NULL
1331 : */
1332 96 : state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
1333 96 : state->base.sortKeys[0].abbrev_converter = NULL;
1334 : /* Not strictly necessary, but be tidy */
1335 96 : state->base.sortKeys[0].abbrev_abort = NULL;
1336 96 : state->base.sortKeys[0].abbrev_full_comparator = NULL;
1337 :
1338 : /* Give up - expect original pass-by-value representation */
1339 96 : return true;
1340 : }
1341 :
1342 4436444 : return false;
1343 : }
1344 :
1345 : /*
1346 : * All tuples have been provided; finish the sort.
1347 : */
1348 : void
1349 236044 : tuplesort_performsort(Tuplesortstate *state)
1350 : {
1351 236044 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1352 :
1353 236044 : if (trace_sort)
1354 0 : elog(LOG, "performsort of worker %d starting: %s",
1355 : state->worker, pg_rusage_show(&state->ru_start));
1356 :
1357 236044 : switch (state->status)
1358 : {
1359 235496 : case TSS_INITIAL:
1360 :
1361 : /*
1362 : * We were able to accumulate all the tuples within the allowed
1363 : * amount of memory, or leader to take over worker tapes
1364 : */
1365 235496 : if (SERIAL(state))
1366 : {
1367 : /* Just qsort 'em and we're done */
1368 234866 : tuplesort_sort_memtuples(state);
1369 234776 : state->status = TSS_SORTEDINMEM;
1370 : }
1371 630 : else if (WORKER(state))
1372 : {
1373 : /*
1374 : * Parallel workers must still dump out tuples to tape. No
1375 : * merge is required to produce single output run, though.
1376 : */
1377 468 : inittapes(state, false);
1378 468 : dumptuples(state, true);
1379 468 : worker_nomergeruns(state);
1380 468 : state->status = TSS_SORTEDONTAPE;
1381 : }
1382 : else
1383 : {
1384 : /*
1385 : * Leader will take over worker tapes and merge worker runs.
1386 : * Note that mergeruns sets the correct state->status.
1387 : */
1388 162 : leader_takeover_tapes(state);
1389 162 : mergeruns(state);
1390 : }
1391 235406 : state->current = 0;
1392 235406 : state->eof_reached = false;
1393 235406 : state->markpos_block = 0L;
1394 235406 : state->markpos_offset = 0;
1395 235406 : state->markpos_eof = false;
1396 235406 : break;
1397 :
1398 408 : case TSS_BOUNDED:
1399 :
1400 : /*
1401 : * We were able to accumulate all the tuples required for output
1402 : * in memory, using a heap to eliminate excess tuples. Now we
1403 : * have to transform the heap to a properly-sorted array. Note
1404 : * that sort_bounded_heap sets the correct state->status.
1405 : */
1406 408 : sort_bounded_heap(state);
1407 408 : state->current = 0;
1408 408 : state->eof_reached = false;
1409 408 : state->markpos_offset = 0;
1410 408 : state->markpos_eof = false;
1411 408 : break;
1412 :
1413 140 : case TSS_BUILDRUNS:
1414 :
1415 : /*
1416 : * Finish tape-based sort. First, flush all tuples remaining in
1417 : * memory out to tape; then merge until we have a single remaining
1418 : * run (or, if !randomAccess and !WORKER(), one run per tape).
1419 : * Note that mergeruns sets the correct state->status.
1420 : */
1421 140 : dumptuples(state, true);
1422 140 : mergeruns(state);
1423 140 : state->eof_reached = false;
1424 140 : state->markpos_block = 0L;
1425 140 : state->markpos_offset = 0;
1426 140 : state->markpos_eof = false;
1427 140 : break;
1428 :
1429 0 : default:
1430 0 : elog(ERROR, "invalid tuplesort state");
1431 : break;
1432 : }
1433 :
1434 235954 : if (trace_sort)
1435 : {
1436 0 : if (state->status == TSS_FINALMERGE)
1437 0 : elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1438 : state->worker, state->nInputTapes,
1439 : pg_rusage_show(&state->ru_start));
1440 : else
1441 0 : elog(LOG, "performsort of worker %d done: %s",
1442 : state->worker, pg_rusage_show(&state->ru_start));
1443 : }
1444 :
1445 235954 : MemoryContextSwitchTo(oldcontext);
1446 235954 : }
1447 :
1448 : /*
1449 : * Internal routine to fetch the next tuple in either forward or back
1450 : * direction into *stup. Returns false if no more tuples.
1451 : * Returned tuple belongs to tuplesort memory context, and must not be freed
1452 : * by caller. Note that fetched tuple is stored in memory that may be
1453 : * recycled by any future fetch.
1454 : */
1455 : bool
1456 28605084 : tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
1457 : SortTuple *stup)
1458 : {
1459 : unsigned int tuplen;
1460 : size_t nmoved;
1461 :
1462 : Assert(!WORKER(state));
1463 :
1464 28605084 : switch (state->status)
1465 : {
1466 23657470 : case TSS_SORTEDINMEM:
1467 : Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1468 : Assert(!state->slabAllocatorUsed);
1469 23657470 : if (forward)
1470 : {
1471 23657404 : if (state->current < state->memtupcount)
1472 : {
1473 23423576 : *stup = state->memtuples[state->current++];
1474 23423576 : return true;
1475 : }
1476 233828 : state->eof_reached = true;
1477 :
1478 : /*
1479 : * Complain if caller tries to retrieve more tuples than
1480 : * originally asked for in a bounded sort. This is because
1481 : * returning EOF here might be the wrong thing.
1482 : */
1483 233828 : if (state->bounded && state->current >= state->bound)
1484 0 : elog(ERROR, "retrieved too many tuples in a bounded sort");
1485 :
1486 233828 : return false;
1487 : }
1488 : else
1489 : {
1490 66 : if (state->current <= 0)
1491 0 : return false;
1492 :
1493 : /*
1494 : * if all tuples are fetched already then we return last
1495 : * tuple, else - tuple before last returned.
1496 : */
1497 66 : if (state->eof_reached)
1498 12 : state->eof_reached = false;
1499 : else
1500 : {
1501 54 : state->current--; /* last returned tuple */
1502 54 : if (state->current <= 0)
1503 6 : return false;
1504 : }
1505 60 : *stup = state->memtuples[state->current - 1];
1506 60 : return true;
1507 : }
1508 : break;
1509 :
1510 303000 : case TSS_SORTEDONTAPE:
1511 : Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1512 : Assert(state->slabAllocatorUsed);
1513 :
1514 : /*
1515 : * The slot that held the tuple that we returned in previous
1516 : * gettuple call can now be reused.
1517 : */
1518 303000 : if (state->lastReturnedTuple)
1519 : {
1520 152850 : RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1521 152850 : state->lastReturnedTuple = NULL;
1522 : }
1523 :
1524 303000 : if (forward)
1525 : {
1526 302970 : if (state->eof_reached)
1527 0 : return false;
1528 :
1529 302970 : if ((tuplen = getlen(state->result_tape, true)) != 0)
1530 : {
1531 302940 : READTUP(state, stup, state->result_tape, tuplen);
1532 :
1533 : /*
1534 : * Remember the tuple we return, so that we can recycle
1535 : * its memory on next call. (This can be NULL, in the
1536 : * !state->tuples case).
1537 : */
1538 302940 : state->lastReturnedTuple = stup->tuple;
1539 :
1540 302940 : return true;
1541 : }
1542 : else
1543 : {
1544 30 : state->eof_reached = true;
1545 30 : return false;
1546 : }
1547 : }
1548 :
1549 : /*
1550 : * Backward.
1551 : *
1552 : * if all tuples are fetched already then we return last tuple,
1553 : * else - tuple before last returned.
1554 : */
1555 30 : if (state->eof_reached)
1556 : {
1557 : /*
1558 : * Seek position is pointing just past the zero tuplen at the
1559 : * end of file; back up to fetch last tuple's ending length
1560 : * word. If seek fails we must have a completely empty file.
1561 : */
1562 12 : nmoved = LogicalTapeBackspace(state->result_tape,
1563 : 2 * sizeof(unsigned int));
1564 12 : if (nmoved == 0)
1565 0 : return false;
1566 12 : else if (nmoved != 2 * sizeof(unsigned int))
1567 0 : elog(ERROR, "unexpected tape position");
1568 12 : state->eof_reached = false;
1569 : }
1570 : else
1571 : {
1572 : /*
1573 : * Back up and fetch previously-returned tuple's ending length
1574 : * word. If seek fails, assume we are at start of file.
1575 : */
1576 18 : nmoved = LogicalTapeBackspace(state->result_tape,
1577 : sizeof(unsigned int));
1578 18 : if (nmoved == 0)
1579 0 : return false;
1580 18 : else if (nmoved != sizeof(unsigned int))
1581 0 : elog(ERROR, "unexpected tape position");
1582 18 : tuplen = getlen(state->result_tape, false);
1583 :
1584 : /*
1585 : * Back up to get ending length word of tuple before it.
1586 : */
1587 18 : nmoved = LogicalTapeBackspace(state->result_tape,
1588 : tuplen + 2 * sizeof(unsigned int));
1589 18 : if (nmoved == tuplen + sizeof(unsigned int))
1590 : {
1591 : /*
1592 : * We backed up over the previous tuple, but there was no
1593 : * ending length word before it. That means that the prev
1594 : * tuple is the first tuple in the file. It is now the
1595 : * next to read in forward direction (not obviously right,
1596 : * but that is what in-memory case does).
1597 : */
1598 6 : return false;
1599 : }
1600 12 : else if (nmoved != tuplen + 2 * sizeof(unsigned int))
1601 0 : elog(ERROR, "bogus tuple length in backward scan");
1602 : }
1603 :
1604 24 : tuplen = getlen(state->result_tape, false);
1605 :
1606 : /*
1607 : * Now we have the length of the prior tuple, back up and read it.
1608 : * Note: READTUP expects we are positioned after the initial
1609 : * length word of the tuple, so back up to that point.
1610 : */
1611 24 : nmoved = LogicalTapeBackspace(state->result_tape,
1612 : tuplen);
1613 24 : if (nmoved != tuplen)
1614 0 : elog(ERROR, "bogus tuple length in backward scan");
1615 24 : READTUP(state, stup, state->result_tape, tuplen);
1616 :
1617 : /*
1618 : * Remember the tuple we return, so that we can recycle its memory
1619 : * on next call. (This can be NULL, in the Datum case).
1620 : */
1621 24 : state->lastReturnedTuple = stup->tuple;
1622 :
1623 24 : return true;
1624 :
1625 4644614 : case TSS_FINALMERGE:
1626 : Assert(forward);
1627 : /* We are managing memory ourselves, with the slab allocator. */
1628 : Assert(state->slabAllocatorUsed);
1629 :
1630 : /*
1631 : * The slab slot holding the tuple that we returned in previous
1632 : * gettuple call can now be reused.
1633 : */
1634 4644614 : if (state->lastReturnedTuple)
1635 : {
1636 4524304 : RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1637 4524304 : state->lastReturnedTuple = NULL;
1638 : }
1639 :
1640 : /*
1641 : * This code should match the inner loop of mergeonerun().
1642 : */
1643 4644614 : if (state->memtupcount > 0)
1644 : {
1645 4644352 : int srcTapeIndex = state->memtuples[0].srctape;
1646 4644352 : LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
1647 : SortTuple newtup;
1648 :
1649 4644352 : *stup = state->memtuples[0];
1650 :
1651 : /*
1652 : * Remember the tuple we return, so that we can recycle its
1653 : * memory on next call. (This can be NULL, in the Datum case).
1654 : */
1655 4644352 : state->lastReturnedTuple = stup->tuple;
1656 :
1657 : /*
1658 : * Pull next tuple from tape, and replace the returned tuple
1659 : * at top of the heap with it.
1660 : */
1661 4644352 : if (!mergereadnext(state, srcTape, &newtup))
1662 : {
1663 : /*
1664 : * If no more data, we've reached end of run on this tape.
1665 : * Remove the top node from the heap.
1666 : */
1667 388 : tuplesort_heap_delete_top(state);
1668 388 : state->nInputRuns--;
1669 :
1670 : /*
1671 : * Close the tape. It'd go away at the end of the sort
1672 : * anyway, but better to release the memory early.
1673 : */
1674 388 : LogicalTapeClose(srcTape);
1675 388 : return true;
1676 : }
1677 4643964 : newtup.srctape = srcTapeIndex;
1678 4643964 : tuplesort_heap_replace_top(state, &newtup);
1679 4643964 : return true;
1680 : }
1681 262 : return false;
1682 :
1683 0 : default:
1684 0 : elog(ERROR, "invalid tuplesort state");
1685 : return false; /* keep compiler quiet */
1686 : }
1687 : }
1688 :
1689 :
1690 : /*
1691 : * Advance over N tuples in either forward or back direction,
1692 : * without returning any data. N==0 is a no-op.
1693 : * Returns true if successful, false if ran out of tuples.
1694 : */
1695 : bool
1696 392 : tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
1697 : {
1698 : MemoryContext oldcontext;
1699 :
1700 : /*
1701 : * We don't actually support backwards skip yet, because no callers need
1702 : * it. The API is designed to allow for that later, though.
1703 : */
1704 : Assert(forward);
1705 : Assert(ntuples >= 0);
1706 : Assert(!WORKER(state));
1707 :
1708 392 : switch (state->status)
1709 : {
1710 368 : case TSS_SORTEDINMEM:
1711 368 : if (state->memtupcount - state->current >= ntuples)
1712 : {
1713 368 : state->current += ntuples;
1714 368 : return true;
1715 : }
1716 0 : state->current = state->memtupcount;
1717 0 : state->eof_reached = true;
1718 :
1719 : /*
1720 : * Complain if caller tries to retrieve more tuples than
1721 : * originally asked for in a bounded sort. This is because
1722 : * returning EOF here might be the wrong thing.
1723 : */
1724 0 : if (state->bounded && state->current >= state->bound)
1725 0 : elog(ERROR, "retrieved too many tuples in a bounded sort");
1726 :
1727 0 : return false;
1728 :
1729 24 : case TSS_SORTEDONTAPE:
1730 : case TSS_FINALMERGE:
1731 :
1732 : /*
1733 : * We could probably optimize these cases better, but for now it's
1734 : * not worth the trouble.
1735 : */
1736 24 : oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1737 240132 : while (ntuples-- > 0)
1738 : {
1739 : SortTuple stup;
1740 :
1741 240108 : if (!tuplesort_gettuple_common(state, forward, &stup))
1742 : {
1743 0 : MemoryContextSwitchTo(oldcontext);
1744 0 : return false;
1745 : }
1746 240108 : CHECK_FOR_INTERRUPTS();
1747 : }
1748 24 : MemoryContextSwitchTo(oldcontext);
1749 24 : return true;
1750 :
1751 0 : default:
1752 0 : elog(ERROR, "invalid tuplesort state");
1753 : return false; /* keep compiler quiet */
1754 : }
1755 : }
1756 :
1757 : /*
1758 : * tuplesort_merge_order - report merge order we'll use for given memory
1759 : * (note: "merge order" just means the number of input tapes in the merge).
1760 : *
1761 : * This is exported for use by the planner. allowedMem is in bytes.
1762 : */
1763 : int
1764 18648 : tuplesort_merge_order(int64 allowedMem)
1765 : {
1766 : int mOrder;
1767 :
1768 : /*----------
1769 : * In the merge phase, we need buffer space for each input and output tape.
1770 : * Each pass in the balanced merge algorithm reads from M input tapes, and
1771 : * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
1772 : * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
1773 : * input tape.
1774 : *
1775 : * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
1776 : * N * TAPE_BUFFER_OVERHEAD
1777 : *
1778 : * Except for the last and next-to-last merge passes, where there can be
1779 : * fewer tapes left to process, M = N. We choose M so that we have the
1780 : * desired amount of memory available for the input buffers
1781 : * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
1782 : * available for the tape buffers (allowedMem).
1783 : *
1784 : * Note: you might be thinking we need to account for the memtuples[]
1785 : * array in this calculation, but we effectively treat that as part of the
1786 : * MERGE_BUFFER_SIZE workspace.
1787 : *----------
1788 : */
1789 18648 : mOrder = allowedMem /
1790 : (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
1791 :
1792 : /*
1793 : * Even in minimum memory, use at least a MINORDER merge. On the other
1794 : * hand, even when we have lots of memory, do not use more than a MAXORDER
1795 : * merge. Tapes are pretty cheap, but they're not entirely free. Each
1796 : * additional tape reduces the amount of memory available to build runs,
1797 : * which in turn can cause the same sort to need more runs, which makes
1798 : * merging slower even if it can still be done in a single pass. Also,
1799 : * high order merges are quite slow due to CPU cache effects; it can be
1800 : * faster to pay the I/O cost of a multi-pass merge than to perform a
1801 : * single merge pass across many hundreds of tapes.
1802 : */
1803 18648 : mOrder = Max(mOrder, MINORDER);
1804 18648 : mOrder = Min(mOrder, MAXORDER);
1805 :
1806 18648 : return mOrder;
1807 : }
1808 :
1809 : /*
1810 : * Helper function to calculate how much memory to allocate for the read buffer
1811 : * of each input tape in a merge pass.
1812 : *
1813 : * 'avail_mem' is the amount of memory available for the buffers of all the
1814 : * tapes, both input and output.
1815 : * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
1816 : * 'maxOutputTapes' is the max. number of output tapes we should produce.
1817 : */
1818 : static int64
1819 332 : merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
1820 : int maxOutputTapes)
1821 : {
1822 : int nOutputRuns;
1823 : int nOutputTapes;
1824 :
1825 : /*
1826 : * How many output tapes will we produce in this pass?
1827 : *
1828 : * This is nInputRuns / nInputTapes, rounded up.
1829 : */
1830 332 : nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1831 :
1832 332 : nOutputTapes = Min(nOutputRuns, maxOutputTapes);
1833 :
1834 : /*
1835 : * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
1836 : * remaining memory is divided evenly between the input tapes.
1837 : *
1838 : * This also follows from the formula in tuplesort_merge_order, but here
1839 : * we derive the input buffer size from the amount of memory available,
1840 : * and M and N.
1841 : */
1842 332 : return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
1843 : }
1844 :
1845 : /*
1846 : * inittapes - initialize for tape sorting.
1847 : *
1848 : * This is called only if we have found we won't sort in memory.
1849 : */
1850 : static void
1851 608 : inittapes(Tuplesortstate *state, bool mergeruns)
1852 : {
1853 : Assert(!LEADER(state));
1854 :
1855 608 : if (mergeruns)
1856 : {
1857 : /* Compute number of input tapes to use when merging */
1858 140 : state->maxTapes = tuplesort_merge_order(state->allowedMem);
1859 : }
1860 : else
1861 : {
1862 : /* Workers can sometimes produce single run, output without merge */
1863 : Assert(WORKER(state));
1864 468 : state->maxTapes = MINORDER;
1865 : }
1866 :
1867 608 : if (trace_sort)
1868 0 : elog(LOG, "worker %d switching to external sort with %d tapes: %s",
1869 : state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
1870 :
1871 : /* Create the tape set */
1872 608 : inittapestate(state, state->maxTapes);
1873 608 : state->tapeset =
1874 608 : LogicalTapeSetCreate(false,
1875 608 : state->shared ? &state->shared->fileset : NULL,
1876 : state->worker);
1877 :
1878 608 : state->currentRun = 0;
1879 :
1880 : /*
1881 : * Initialize logical tape arrays.
1882 : */
1883 608 : state->inputTapes = NULL;
1884 608 : state->nInputTapes = 0;
1885 608 : state->nInputRuns = 0;
1886 :
1887 608 : state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
1888 608 : state->nOutputTapes = 0;
1889 608 : state->nOutputRuns = 0;
1890 :
1891 608 : state->status = TSS_BUILDRUNS;
1892 :
1893 608 : selectnewtape(state);
1894 608 : }
1895 :
1896 : /*
1897 : * inittapestate - initialize generic tape management state
1898 : */
1899 : static void
1900 770 : inittapestate(Tuplesortstate *state, int maxTapes)
1901 : {
1902 : int64 tapeSpace;
1903 :
1904 : /*
1905 : * Decrease availMem to reflect the space needed for tape buffers; but
1906 : * don't decrease it to the point that we have no room for tuples. (That
1907 : * case is only likely to occur if sorting pass-by-value Datums; in all
1908 : * other scenarios the memtuples[] array is unlikely to occupy more than
1909 : * half of allowedMem. In the pass-by-value case it's not important to
1910 : * account for tuple space, so we don't care if LACKMEM becomes
1911 : * inaccurate.)
1912 : */
1913 770 : tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
1914 :
1915 770 : if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1916 648 : USEMEM(state, tapeSpace);
1917 :
1918 : /*
1919 : * Make sure that the temp file(s) underlying the tape set are created in
1920 : * suitable temp tablespaces. For parallel sorts, this should have been
1921 : * called already, but it doesn't matter if it is called a second time.
1922 : */
1923 770 : PrepareTempTablespaces();
1924 770 : }
1925 :
1926 : /*
1927 : * selectnewtape -- select next tape to output to.
1928 : *
1929 : * This is called after finishing a run when we know another run
1930 : * must be started. This is used both when building the initial
1931 : * runs, and during merge passes.
1932 : */
1933 : static void
1934 1712 : selectnewtape(Tuplesortstate *state)
1935 : {
1936 : /*
1937 : * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
1938 : * both zero. On each call, we create a new output tape to hold the next
1939 : * run, until maxTapes is reached. After that, we assign new runs to the
1940 : * existing tapes in a round robin fashion.
1941 : */
1942 1712 : if (state->nOutputTapes < state->maxTapes)
1943 : {
1944 : /* Create a new tape to hold the next run */
1945 : Assert(state->outputTapes[state->nOutputRuns] == NULL);
1946 : Assert(state->nOutputRuns == state->nOutputTapes);
1947 1130 : state->destTape = LogicalTapeCreate(state->tapeset);
1948 1130 : state->outputTapes[state->nOutputTapes] = state->destTape;
1949 1130 : state->nOutputTapes++;
1950 1130 : state->nOutputRuns++;
1951 : }
1952 : else
1953 : {
1954 : /*
1955 : * We have reached the max number of tapes. Append to an existing
1956 : * tape.
1957 : */
1958 582 : state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
1959 582 : state->nOutputRuns++;
1960 : }
1961 1712 : }
1962 :
1963 : /*
1964 : * Initialize the slab allocation arena, for the given number of slots.
1965 : */
1966 : static void
1967 302 : init_slab_allocator(Tuplesortstate *state, int numSlots)
1968 : {
1969 302 : if (numSlots > 0)
1970 : {
1971 : char *p;
1972 : int i;
1973 :
1974 274 : state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
1975 274 : state->slabMemoryEnd = state->slabMemoryBegin +
1976 274 : numSlots * SLAB_SLOT_SIZE;
1977 274 : state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
1978 274 : USEMEM(state, numSlots * SLAB_SLOT_SIZE);
1979 :
1980 274 : p = state->slabMemoryBegin;
1981 1042 : for (i = 0; i < numSlots - 1; i++)
1982 : {
1983 768 : ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
1984 768 : p += SLAB_SLOT_SIZE;
1985 : }
1986 274 : ((SlabSlot *) p)->nextfree = NULL;
1987 : }
1988 : else
1989 : {
1990 28 : state->slabMemoryBegin = state->slabMemoryEnd = NULL;
1991 28 : state->slabFreeHead = NULL;
1992 : }
1993 302 : state->slabAllocatorUsed = true;
1994 302 : }
1995 :
1996 : /*
1997 : * mergeruns -- merge all the completed initial runs.
1998 : *
1999 : * This implements the Balanced k-Way Merge Algorithm. All input data has
2000 : * already been written to initial runs on tape (see dumptuples).
2001 : */
2002 : static void
2003 302 : mergeruns(Tuplesortstate *state)
2004 : {
2005 : int tapenum;
2006 :
2007 : Assert(state->status == TSS_BUILDRUNS);
2008 : Assert(state->memtupcount == 0);
2009 :
2010 302 : if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
2011 : {
2012 : /*
2013 : * If there are multiple runs to be merged, when we go to read back
2014 : * tuples from disk, abbreviated keys will not have been stored, and
2015 : * we don't care to regenerate them. Disable abbreviation from this
2016 : * point on.
2017 : */
2018 30 : state->base.sortKeys->abbrev_converter = NULL;
2019 30 : state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
2020 :
2021 : /* Not strictly necessary, but be tidy */
2022 30 : state->base.sortKeys->abbrev_abort = NULL;
2023 30 : state->base.sortKeys->abbrev_full_comparator = NULL;
2024 : }
2025 :
2026 : /*
2027 : * Reset tuple memory. We've freed all the tuples that we previously
2028 : * allocated. We will use the slab allocator from now on.
2029 : */
2030 302 : MemoryContextResetOnly(state->base.tuplecontext);
2031 :
2032 : /*
2033 : * We no longer need a large memtuples array. (We will allocate a smaller
2034 : * one for the heap later.)
2035 : */
2036 302 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2037 302 : pfree(state->memtuples);
2038 302 : state->memtuples = NULL;
2039 :
2040 : /*
2041 : * Initialize the slab allocator. We need one slab slot per input tape,
2042 : * for the tuples in the heap, plus one to hold the tuple last returned
2043 : * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
2044 : * however, we don't need to do allocate anything.)
2045 : *
2046 : * In a multi-pass merge, we could shrink this allocation for the last
2047 : * merge pass, if it has fewer tapes than previous passes, but we don't
2048 : * bother.
2049 : *
2050 : * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2051 : * to track memory usage of individual tuples.
2052 : */
2053 302 : if (state->base.tuples)
2054 274 : init_slab_allocator(state, state->nOutputTapes + 1);
2055 : else
2056 28 : init_slab_allocator(state, 0);
2057 :
2058 : /*
2059 : * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
2060 : * from each input tape.
2061 : *
2062 : * We could shrink this, too, between passes in a multi-pass merge, but we
2063 : * don't bother. (The initial input tapes are still in outputTapes. The
2064 : * number of input tapes will not increase between passes.)
2065 : */
2066 302 : state->memtupsize = state->nOutputTapes;
2067 604 : state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
2068 302 : state->nOutputTapes * sizeof(SortTuple));
2069 302 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2070 :
2071 : /*
2072 : * Use all the remaining memory we have available for tape buffers among
2073 : * all the input tapes. At the beginning of each merge pass, we will
2074 : * divide this memory between the input and output tapes in the pass.
2075 : */
2076 302 : state->tape_buffer_mem = state->availMem;
2077 302 : USEMEM(state, state->tape_buffer_mem);
2078 302 : if (trace_sort)
2079 0 : elog(LOG, "worker %d using %zu KB of memory for tape buffers",
2080 : state->worker, state->tape_buffer_mem / 1024);
2081 :
2082 : for (;;)
2083 : {
2084 : /*
2085 : * On the first iteration, or if we have read all the runs from the
2086 : * input tapes in a multi-pass merge, it's time to start a new pass.
2087 : * Rewind all the output tapes, and make them inputs for the next
2088 : * pass.
2089 : */
2090 440 : if (state->nInputRuns == 0)
2091 : {
2092 : int64 input_buffer_size;
2093 :
2094 : /* Close the old, emptied, input tapes */
2095 332 : if (state->nInputTapes > 0)
2096 : {
2097 210 : for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2098 180 : LogicalTapeClose(state->inputTapes[tapenum]);
2099 30 : pfree(state->inputTapes);
2100 : }
2101 :
2102 : /* Previous pass's outputs become next pass's inputs. */
2103 332 : state->inputTapes = state->outputTapes;
2104 332 : state->nInputTapes = state->nOutputTapes;
2105 332 : state->nInputRuns = state->nOutputRuns;
2106 :
2107 : /*
2108 : * Reset output tape variables. The actual LogicalTapes will be
2109 : * created as needed, here we only allocate the array to hold
2110 : * them.
2111 : */
2112 332 : state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
2113 332 : state->nOutputTapes = 0;
2114 332 : state->nOutputRuns = 0;
2115 :
2116 : /*
2117 : * Redistribute the memory allocated for tape buffers, among the
2118 : * new input and output tapes.
2119 : */
2120 332 : input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
2121 : state->nInputTapes,
2122 : state->nInputRuns,
2123 : state->maxTapes);
2124 :
2125 332 : if (trace_sort)
2126 0 : elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2127 : state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
2128 : pg_rusage_show(&state->ru_start));
2129 :
2130 : /* Prepare the new input tapes for merge pass. */
2131 1300 : for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2132 968 : LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
2133 :
2134 : /*
2135 : * If there's just one run left on each input tape, then only one
2136 : * merge pass remains. If we don't have to produce a materialized
2137 : * sorted tape, we can stop at this point and do the final merge
2138 : * on-the-fly.
2139 : */
2140 332 : if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
2141 310 : && state->nInputRuns <= state->nInputTapes
2142 280 : && !WORKER(state))
2143 : {
2144 : /* Tell logtape.c we won't be writing anymore */
2145 280 : LogicalTapeSetForgetFreeSpace(state->tapeset);
2146 : /* Initialize for the final merge pass */
2147 280 : beginmerge(state);
2148 280 : state->status = TSS_FINALMERGE;
2149 280 : return;
2150 : }
2151 : }
2152 :
2153 : /* Select an output tape */
2154 160 : selectnewtape(state);
2155 :
2156 : /* Merge one run from each input tape. */
2157 160 : mergeonerun(state);
2158 :
2159 : /*
2160 : * If the input tapes are empty, and we output only one output run,
2161 : * we're done. The current output tape contains the final result.
2162 : */
2163 160 : if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
2164 22 : break;
2165 : }
2166 :
2167 : /*
2168 : * Done. The result is on a single run on a single tape.
2169 : */
2170 22 : state->result_tape = state->outputTapes[0];
2171 22 : if (!WORKER(state))
2172 22 : LogicalTapeFreeze(state->result_tape, NULL);
2173 : else
2174 0 : worker_freeze_result_tape(state);
2175 22 : state->status = TSS_SORTEDONTAPE;
2176 :
2177 : /* Close all the now-empty input tapes, to release their read buffers. */
2178 114 : for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2179 92 : LogicalTapeClose(state->inputTapes[tapenum]);
2180 : }
2181 :
2182 : /*
2183 : * Merge one run from each input tape.
2184 : */
2185 : static void
2186 160 : mergeonerun(Tuplesortstate *state)
2187 : {
2188 : int srcTapeIndex;
2189 : LogicalTape *srcTape;
2190 :
2191 : /*
2192 : * Start the merge by loading one tuple from each active source tape into
2193 : * the heap.
2194 : */
2195 160 : beginmerge(state);
2196 :
2197 : Assert(state->slabAllocatorUsed);
2198 :
2199 : /*
2200 : * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2201 : * out, and replacing it with next tuple from same tape (if there is
2202 : * another one).
2203 : */
2204 875592 : while (state->memtupcount > 0)
2205 : {
2206 : SortTuple stup;
2207 :
2208 : /* write the tuple to destTape */
2209 875432 : srcTapeIndex = state->memtuples[0].srctape;
2210 875432 : srcTape = state->inputTapes[srcTapeIndex];
2211 875432 : WRITETUP(state, state->destTape, &state->memtuples[0]);
2212 :
2213 : /* recycle the slot of the tuple we just wrote out, for the next read */
2214 875432 : if (state->memtuples[0].tuple)
2215 735348 : RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2216 :
2217 : /*
2218 : * pull next tuple from the tape, and replace the written-out tuple in
2219 : * the heap with it.
2220 : */
2221 875432 : if (mergereadnext(state, srcTape, &stup))
2222 : {
2223 874578 : stup.srctape = srcTapeIndex;
2224 874578 : tuplesort_heap_replace_top(state, &stup);
2225 : }
2226 : else
2227 : {
2228 854 : tuplesort_heap_delete_top(state);
2229 854 : state->nInputRuns--;
2230 : }
2231 : }
2232 :
2233 : /*
2234 : * When the heap empties, we're done. Write an end-of-run marker on the
2235 : * output tape.
2236 : */
2237 160 : markrunend(state->destTape);
2238 160 : }
2239 :
2240 : /*
2241 : * beginmerge - initialize for a merge pass
2242 : *
2243 : * Fill the merge heap with the first tuple from each input tape.
2244 : */
2245 : static void
2246 440 : beginmerge(Tuplesortstate *state)
2247 : {
2248 : int activeTapes;
2249 : int srcTapeIndex;
2250 :
2251 : /* Heap should be empty here */
2252 : Assert(state->memtupcount == 0);
2253 :
2254 440 : activeTapes = Min(state->nInputTapes, state->nInputRuns);
2255 :
2256 1990 : for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
2257 : {
2258 : SortTuple tup;
2259 :
2260 1550 : if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
2261 : {
2262 1290 : tup.srctape = srcTapeIndex;
2263 1290 : tuplesort_heap_insert(state, &tup);
2264 : }
2265 : }
2266 440 : }
2267 :
2268 : /*
2269 : * mergereadnext - read next tuple from one merge input tape
2270 : *
2271 : * Returns false on EOF.
2272 : */
2273 : static bool
2274 5521334 : mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
2275 : {
2276 : unsigned int tuplen;
2277 :
2278 : /* read next tuple, if any */
2279 5521334 : if ((tuplen = getlen(srcTape, true)) == 0)
2280 1502 : return false;
2281 5519832 : READTUP(state, stup, srcTape, tuplen);
2282 :
2283 5519832 : return true;
2284 : }
2285 :
2286 : /*
2287 : * dumptuples - remove tuples from memtuples and write initial run to tape
2288 : *
2289 : * When alltuples = true, dump everything currently in memory. (This case is
2290 : * only used at end of input data.)
2291 : */
2292 : static void
2293 1113034 : dumptuples(Tuplesortstate *state, bool alltuples)
2294 : {
2295 : int memtupwrite;
2296 : int i;
2297 :
2298 : /*
2299 : * Nothing to do if we still fit in available memory and have array slots,
2300 : * unless this is the final call during initial run generation.
2301 : */
2302 1113034 : if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2303 1112090 : !alltuples)
2304 1111482 : return;
2305 :
2306 : /*
2307 : * Final call might require no sorting, in rare cases where we just so
2308 : * happen to have previously LACKMEM()'d at the point where exactly all
2309 : * remaining tuples are loaded into memory, just before input was
2310 : * exhausted. In general, short final runs are quite possible, but avoid
2311 : * creating a completely empty run. In a worker, though, we must produce
2312 : * at least one tape, even if it's empty.
2313 : */
2314 1552 : if (state->memtupcount == 0 && state->currentRun > 0)
2315 0 : return;
2316 :
2317 : Assert(state->status == TSS_BUILDRUNS);
2318 :
2319 : /*
2320 : * It seems unlikely that this limit will ever be exceeded, but take no
2321 : * chances
2322 : */
2323 1552 : if (state->currentRun == INT_MAX)
2324 0 : ereport(ERROR,
2325 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2326 : errmsg("cannot have more than %d runs for an external sort",
2327 : INT_MAX)));
2328 :
2329 1552 : if (state->currentRun > 0)
2330 944 : selectnewtape(state);
2331 :
2332 1552 : state->currentRun++;
2333 :
2334 1552 : if (trace_sort)
2335 0 : elog(LOG, "worker %d starting quicksort of run %d: %s",
2336 : state->worker, state->currentRun,
2337 : pg_rusage_show(&state->ru_start));
2338 :
2339 : /*
2340 : * Sort all tuples accumulated within the allowed amount of memory for
2341 : * this run using quicksort
2342 : */
2343 1552 : tuplesort_sort_memtuples(state);
2344 :
2345 1552 : if (trace_sort)
2346 0 : elog(LOG, "worker %d finished quicksort of run %d: %s",
2347 : state->worker, state->currentRun,
2348 : pg_rusage_show(&state->ru_start));
2349 :
2350 1552 : memtupwrite = state->memtupcount;
2351 5161156 : for (i = 0; i < memtupwrite; i++)
2352 : {
2353 5159604 : SortTuple *stup = &state->memtuples[i];
2354 :
2355 5159604 : WRITETUP(state, state->destTape, stup);
2356 : }
2357 :
2358 1552 : state->memtupcount = 0;
2359 :
2360 : /*
2361 : * Reset tuple memory. We've freed all of the tuples that we previously
2362 : * allocated. It's important to avoid fragmentation when there is a stark
2363 : * change in the sizes of incoming tuples. In bounded sorts,
2364 : * fragmentation due to AllocSetFree's bucketing by size class might be
2365 : * particularly bad if this step wasn't taken.
2366 : */
2367 1552 : MemoryContextReset(state->base.tuplecontext);
2368 :
2369 : /*
2370 : * Now update the memory accounting to subtract the memory used by the
2371 : * tuple.
2372 : */
2373 1552 : FREEMEM(state, state->tupleMem);
2374 1552 : state->tupleMem = 0;
2375 :
2376 1552 : markrunend(state->destTape);
2377 :
2378 1552 : if (trace_sort)
2379 0 : elog(LOG, "worker %d finished writing run %d to tape %d: %s",
2380 : state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
2381 : pg_rusage_show(&state->ru_start));
2382 : }
2383 :
2384 : /*
2385 : * tuplesort_rescan - rewind and replay the scan
2386 : */
2387 : void
2388 48 : tuplesort_rescan(Tuplesortstate *state)
2389 : {
2390 48 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2391 :
2392 : Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2393 :
2394 48 : switch (state->status)
2395 : {
2396 40 : case TSS_SORTEDINMEM:
2397 40 : state->current = 0;
2398 40 : state->eof_reached = false;
2399 40 : state->markpos_offset = 0;
2400 40 : state->markpos_eof = false;
2401 40 : break;
2402 8 : case TSS_SORTEDONTAPE:
2403 8 : LogicalTapeRewindForRead(state->result_tape, 0);
2404 8 : state->eof_reached = false;
2405 8 : state->markpos_block = 0L;
2406 8 : state->markpos_offset = 0;
2407 8 : state->markpos_eof = false;
2408 8 : break;
2409 0 : default:
2410 0 : elog(ERROR, "invalid tuplesort state");
2411 : break;
2412 : }
2413 :
2414 48 : MemoryContextSwitchTo(oldcontext);
2415 48 : }
2416 :
2417 : /*
2418 : * tuplesort_markpos - saves current position in the merged sort file
2419 : */
2420 : void
2421 583640 : tuplesort_markpos(Tuplesortstate *state)
2422 : {
2423 583640 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2424 :
2425 : Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2426 :
2427 583640 : switch (state->status)
2428 : {
2429 574832 : case TSS_SORTEDINMEM:
2430 574832 : state->markpos_offset = state->current;
2431 574832 : state->markpos_eof = state->eof_reached;
2432 574832 : break;
2433 8808 : case TSS_SORTEDONTAPE:
2434 8808 : LogicalTapeTell(state->result_tape,
2435 : &state->markpos_block,
2436 : &state->markpos_offset);
2437 8808 : state->markpos_eof = state->eof_reached;
2438 8808 : break;
2439 0 : default:
2440 0 : elog(ERROR, "invalid tuplesort state");
2441 : break;
2442 : }
2443 :
2444 583640 : MemoryContextSwitchTo(oldcontext);
2445 583640 : }
2446 :
2447 : /*
2448 : * tuplesort_restorepos - restores current position in merged sort file to
2449 : * last saved position
2450 : */
2451 : void
2452 37660 : tuplesort_restorepos(Tuplesortstate *state)
2453 : {
2454 37660 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2455 :
2456 : Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2457 :
2458 37660 : switch (state->status)
2459 : {
2460 31468 : case TSS_SORTEDINMEM:
2461 31468 : state->current = state->markpos_offset;
2462 31468 : state->eof_reached = state->markpos_eof;
2463 31468 : break;
2464 6192 : case TSS_SORTEDONTAPE:
2465 6192 : LogicalTapeSeek(state->result_tape,
2466 : state->markpos_block,
2467 : state->markpos_offset);
2468 6192 : state->eof_reached = state->markpos_eof;
2469 6192 : break;
2470 0 : default:
2471 0 : elog(ERROR, "invalid tuplesort state");
2472 : break;
2473 : }
2474 :
2475 37660 : MemoryContextSwitchTo(oldcontext);
2476 37660 : }
2477 :
2478 : /*
2479 : * tuplesort_get_stats - extract summary statistics
2480 : *
2481 : * This can be called after tuplesort_performsort() finishes to obtain
2482 : * printable summary information about how the sort was performed.
2483 : */
2484 : void
2485 396 : tuplesort_get_stats(Tuplesortstate *state,
2486 : TuplesortInstrumentation *stats)
2487 : {
2488 : /*
2489 : * Note: it might seem we should provide both memory and disk usage for a
2490 : * disk-based sort. However, the current code doesn't track memory space
2491 : * accurately once we have begun to return tuples to the caller (since we
2492 : * don't account for pfree's the caller is expected to do), so we cannot
2493 : * rely on availMem in a disk sort. This does not seem worth the overhead
2494 : * to fix. Is it worth creating an API for the memory context code to
2495 : * tell us how much is actually used in sortcontext?
2496 : */
2497 396 : tuplesort_updatemax(state);
2498 :
2499 396 : if (state->isMaxSpaceDisk)
2500 6 : stats->spaceType = SORT_SPACE_TYPE_DISK;
2501 : else
2502 390 : stats->spaceType = SORT_SPACE_TYPE_MEMORY;
2503 396 : stats->spaceUsed = (state->maxSpace + 1023) / 1024;
2504 :
2505 396 : switch (state->maxSpaceStatus)
2506 : {
2507 390 : case TSS_SORTEDINMEM:
2508 390 : if (state->boundUsed)
2509 42 : stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
2510 : else
2511 348 : stats->sortMethod = SORT_TYPE_QUICKSORT;
2512 390 : break;
2513 0 : case TSS_SORTEDONTAPE:
2514 0 : stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
2515 0 : break;
2516 6 : case TSS_FINALMERGE:
2517 6 : stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
2518 6 : break;
2519 0 : default:
2520 0 : stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
2521 0 : break;
2522 : }
2523 396 : }
2524 :
2525 : /*
2526 : * Convert TuplesortMethod to a string.
2527 : */
2528 : const char *
2529 294 : tuplesort_method_name(TuplesortMethod m)
2530 : {
2531 294 : switch (m)
2532 : {
2533 0 : case SORT_TYPE_STILL_IN_PROGRESS:
2534 0 : return "still in progress";
2535 42 : case SORT_TYPE_TOP_N_HEAPSORT:
2536 42 : return "top-N heapsort";
2537 246 : case SORT_TYPE_QUICKSORT:
2538 246 : return "quicksort";
2539 0 : case SORT_TYPE_EXTERNAL_SORT:
2540 0 : return "external sort";
2541 6 : case SORT_TYPE_EXTERNAL_MERGE:
2542 6 : return "external merge";
2543 : }
2544 :
2545 0 : return "unknown";
2546 : }
2547 :
2548 : /*
2549 : * Convert TuplesortSpaceType to a string.
2550 : */
2551 : const char *
2552 258 : tuplesort_space_type_name(TuplesortSpaceType t)
2553 : {
2554 : Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
2555 258 : return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
2556 : }
2557 :
2558 :
2559 : /*
2560 : * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2561 : */
2562 :
2563 : /*
2564 : * Convert the existing unordered array of SortTuples to a bounded heap,
2565 : * discarding all but the smallest "state->bound" tuples.
2566 : *
2567 : * When working with a bounded heap, we want to keep the largest entry
2568 : * at the root (array entry zero), instead of the smallest as in the normal
2569 : * sort case. This allows us to discard the largest entry cheaply.
2570 : * Therefore, we temporarily reverse the sort direction.
2571 : */
2572 : static void
2573 408 : make_bounded_heap(Tuplesortstate *state)
2574 : {
2575 408 : int tupcount = state->memtupcount;
2576 : int i;
2577 :
2578 : Assert(state->status == TSS_INITIAL);
2579 : Assert(state->bounded);
2580 : Assert(tupcount >= state->bound);
2581 : Assert(SERIAL(state));
2582 :
2583 : /* Reverse sort direction so largest entry will be at root */
2584 408 : reversedirection(state);
2585 :
2586 408 : state->memtupcount = 0; /* make the heap empty */
2587 37980 : for (i = 0; i < tupcount; i++)
2588 : {
2589 37572 : if (state->memtupcount < state->bound)
2590 : {
2591 : /* Insert next tuple into heap */
2592 : /* Must copy source tuple to avoid possible overwrite */
2593 18582 : SortTuple stup = state->memtuples[i];
2594 :
2595 18582 : tuplesort_heap_insert(state, &stup);
2596 : }
2597 : else
2598 : {
2599 : /*
2600 : * The heap is full. Replace the largest entry with the new
2601 : * tuple, or just discard it, if it's larger than anything already
2602 : * in the heap.
2603 : */
2604 18990 : if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2605 : {
2606 9794 : free_sort_tuple(state, &state->memtuples[i]);
2607 9794 : CHECK_FOR_INTERRUPTS();
2608 : }
2609 : else
2610 9196 : tuplesort_heap_replace_top(state, &state->memtuples[i]);
2611 : }
2612 : }
2613 :
2614 : Assert(state->memtupcount == state->bound);
2615 408 : state->status = TSS_BOUNDED;
2616 408 : }
2617 :
2618 : /*
2619 : * Convert the bounded heap to a properly-sorted array
2620 : */
2621 : static void
2622 408 : sort_bounded_heap(Tuplesortstate *state)
2623 : {
2624 408 : int tupcount = state->memtupcount;
2625 :
2626 : Assert(state->status == TSS_BOUNDED);
2627 : Assert(state->bounded);
2628 : Assert(tupcount == state->bound);
2629 : Assert(SERIAL(state));
2630 :
2631 : /*
2632 : * We can unheapify in place because each delete-top call will remove the
2633 : * largest entry, which we can promptly store in the newly freed slot at
2634 : * the end. Once we're down to a single-entry heap, we're done.
2635 : */
2636 18582 : while (state->memtupcount > 1)
2637 : {
2638 18174 : SortTuple stup = state->memtuples[0];
2639 :
2640 : /* this sifts-up the next-largest entry and decreases memtupcount */
2641 18174 : tuplesort_heap_delete_top(state);
2642 18174 : state->memtuples[state->memtupcount] = stup;
2643 : }
2644 408 : state->memtupcount = tupcount;
2645 :
2646 : /*
2647 : * Reverse sort direction back to the original state. This is not
2648 : * actually necessary but seems like a good idea for tidiness.
2649 : */
2650 408 : reversedirection(state);
2651 :
2652 408 : state->status = TSS_SORTEDINMEM;
2653 408 : state->boundUsed = true;
2654 408 : }
2655 :
2656 : /*
2657 : * Sort all memtuples using specialized qsort() routines.
2658 : *
2659 : * Quicksort is used for small in-memory sorts, and external sort runs.
2660 : */
2661 : static void
2662 236418 : tuplesort_sort_memtuples(Tuplesortstate *state)
2663 : {
2664 : Assert(!LEADER(state));
2665 :
2666 236418 : if (state->memtupcount > 1)
2667 : {
2668 : /*
2669 : * Do we have the leading column's value or abbreviation in datum1,
2670 : * and is there a specialization for its comparator?
2671 : */
2672 67950 : if (state->base.haveDatum1 && state->base.sortKeys)
2673 : {
2674 67914 : if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
2675 : {
2676 3356 : qsort_tuple_unsigned(state->memtuples,
2677 3356 : state->memtupcount,
2678 : state);
2679 3340 : return;
2680 : }
2681 64558 : else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
2682 : {
2683 1340 : qsort_tuple_signed(state->memtuples,
2684 1340 : state->memtupcount,
2685 : state);
2686 1340 : return;
2687 : }
2688 63218 : else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
2689 : {
2690 39732 : qsort_tuple_int32(state->memtuples,
2691 39732 : state->memtupcount,
2692 : state);
2693 39672 : return;
2694 : }
2695 : }
2696 :
2697 : /* Can we use the single-key sort function? */
2698 23522 : if (state->base.onlyKey != NULL)
2699 : {
2700 10064 : qsort_ssup(state->memtuples, state->memtupcount,
2701 10064 : state->base.onlyKey);
2702 : }
2703 : else
2704 : {
2705 13458 : qsort_tuple(state->memtuples,
2706 13458 : state->memtupcount,
2707 : state->base.comparetup,
2708 : state);
2709 : }
2710 : }
2711 : }
2712 :
2713 : /*
2714 : * Insert a new tuple into an empty or existing heap, maintaining the
2715 : * heap invariant. Caller is responsible for ensuring there's room.
2716 : *
2717 : * Note: For some callers, tuple points to a memtuples[] entry above the
2718 : * end of the heap. This is safe as long as it's not immediately adjacent
2719 : * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
2720 : * is, it might get overwritten before being moved into the heap!
2721 : */
2722 : static void
2723 19872 : tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
2724 : {
2725 : SortTuple *memtuples;
2726 : int j;
2727 :
2728 19872 : memtuples = state->memtuples;
2729 : Assert(state->memtupcount < state->memtupsize);
2730 :
2731 19872 : CHECK_FOR_INTERRUPTS();
2732 :
2733 : /*
2734 : * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2735 : * using 1-based array indexes, not 0-based.
2736 : */
2737 19872 : j = state->memtupcount++;
2738 57742 : while (j > 0)
2739 : {
2740 50940 : int i = (j - 1) >> 1;
2741 :
2742 50940 : if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
2743 13070 : break;
2744 37870 : memtuples[j] = memtuples[i];
2745 37870 : j = i;
2746 : }
2747 19872 : memtuples[j] = *tuple;
2748 19872 : }
2749 :
2750 : /*
2751 : * Remove the tuple at state->memtuples[0] from the heap. Decrement
2752 : * memtupcount, and sift up to maintain the heap invariant.
2753 : *
2754 : * The caller has already free'd the tuple the top node points to,
2755 : * if necessary.
2756 : */
2757 : static void
2758 19416 : tuplesort_heap_delete_top(Tuplesortstate *state)
2759 : {
2760 19416 : SortTuple *memtuples = state->memtuples;
2761 : SortTuple *tuple;
2762 :
2763 19416 : if (--state->memtupcount <= 0)
2764 302 : return;
2765 :
2766 : /*
2767 : * Remove the last tuple in the heap, and re-insert it, by replacing the
2768 : * current top node with it.
2769 : */
2770 19114 : tuple = &memtuples[state->memtupcount];
2771 19114 : tuplesort_heap_replace_top(state, tuple);
2772 : }
2773 :
2774 : /*
2775 : * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
2776 : * maintain the heap invariant.
2777 : *
2778 : * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
2779 : * Heapsort, steps H3-H8).
2780 : */
2781 : static void
2782 6049818 : tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
2783 : {
2784 6049818 : SortTuple *memtuples = state->memtuples;
2785 : unsigned int i,
2786 : n;
2787 :
2788 : Assert(state->memtupcount >= 1);
2789 :
2790 6049818 : CHECK_FOR_INTERRUPTS();
2791 :
2792 : /*
2793 : * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
2794 : * This prevents overflow in the "2 * i + 1" calculation, since at the top
2795 : * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
2796 : */
2797 6049818 : n = state->memtupcount;
2798 6049818 : i = 0; /* i is where the "hole" is */
2799 : for (;;)
2800 1827310 : {
2801 7877128 : unsigned int j = 2 * i + 1;
2802 :
2803 7877128 : if (j >= n)
2804 1250928 : break;
2805 9066576 : if (j + 1 < n &&
2806 2440376 : COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
2807 962962 : j++;
2808 6626200 : if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
2809 4798890 : break;
2810 1827310 : memtuples[i] = memtuples[j];
2811 1827310 : i = j;
2812 : }
2813 6049818 : memtuples[i] = *tuple;
2814 6049818 : }
2815 :
2816 : /*
2817 : * Function to reverse the sort direction from its current state
2818 : *
2819 : * It is not safe to call this when performing hash tuplesorts
2820 : */
2821 : static void
2822 816 : reversedirection(Tuplesortstate *state)
2823 : {
2824 816 : SortSupport sortKey = state->base.sortKeys;
2825 : int nkey;
2826 :
2827 1992 : for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
2828 : {
2829 1176 : sortKey->ssup_reverse = !sortKey->ssup_reverse;
2830 1176 : sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
2831 : }
2832 816 : }
2833 :
2834 :
2835 : /*
2836 : * Tape interface routines
2837 : */
2838 :
2839 : static unsigned int
2840 5824346 : getlen(LogicalTape *tape, bool eofOK)
2841 : {
2842 : unsigned int len;
2843 :
2844 5824346 : if (LogicalTapeRead(tape,
2845 : &len, sizeof(len)) != sizeof(len))
2846 0 : elog(ERROR, "unexpected end of tape");
2847 5824346 : if (len == 0 && !eofOK)
2848 0 : elog(ERROR, "unexpected end of data");
2849 5824346 : return len;
2850 : }
2851 :
2852 : static void
2853 1712 : markrunend(LogicalTape *tape)
2854 : {
2855 1712 : unsigned int len = 0;
2856 :
2857 1712 : LogicalTapeWrite(tape, &len, sizeof(len));
2858 1712 : }
2859 :
2860 : /*
2861 : * Get memory for tuple from within READTUP() routine.
2862 : *
2863 : * We use next free slot from the slab allocator, or palloc() if the tuple
2864 : * is too large for that.
2865 : */
2866 : void *
2867 5412568 : tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
2868 : {
2869 : SlabSlot *buf;
2870 :
2871 : /*
2872 : * We pre-allocate enough slots in the slab arena that we should never run
2873 : * out.
2874 : */
2875 : Assert(state->slabFreeHead);
2876 :
2877 5412568 : if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
2878 0 : return MemoryContextAlloc(state->base.sortcontext, tuplen);
2879 : else
2880 : {
2881 5412568 : buf = state->slabFreeHead;
2882 : /* Reuse this slot */
2883 5412568 : state->slabFreeHead = buf->nextfree;
2884 :
2885 5412568 : return buf;
2886 : }
2887 : }
2888 :
2889 :
2890 : /*
2891 : * Parallel sort routines
2892 : */
2893 :
2894 : /*
2895 : * tuplesort_estimate_shared - estimate required shared memory allocation
2896 : *
2897 : * nWorkers is an estimate of the number of workers (it's the number that
2898 : * will be requested).
2899 : */
2900 : Size
2901 164 : tuplesort_estimate_shared(int nWorkers)
2902 : {
2903 : Size tapesSize;
2904 :
2905 : Assert(nWorkers > 0);
2906 :
2907 : /* Make sure that BufFile shared state is MAXALIGN'd */
2908 164 : tapesSize = mul_size(sizeof(TapeShare), nWorkers);
2909 164 : tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
2910 :
2911 164 : return tapesSize;
2912 : }
2913 :
2914 : /*
2915 : * tuplesort_initialize_shared - initialize shared tuplesort state
2916 : *
2917 : * Must be called from leader process before workers are launched, to
2918 : * establish state needed up-front for worker tuplesortstates. nWorkers
2919 : * should match the argument passed to tuplesort_estimate_shared().
2920 : */
2921 : void
2922 234 : tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
2923 : {
2924 : int i;
2925 :
2926 : Assert(nWorkers > 0);
2927 :
2928 234 : SpinLockInit(&shared->mutex);
2929 234 : shared->currentWorker = 0;
2930 234 : shared->workersFinished = 0;
2931 234 : SharedFileSetInit(&shared->fileset, seg);
2932 234 : shared->nTapes = nWorkers;
2933 710 : for (i = 0; i < nWorkers; i++)
2934 : {
2935 476 : shared->tapes[i].firstblocknumber = 0L;
2936 : }
2937 234 : }
2938 :
2939 : /*
2940 : * tuplesort_attach_shared - attach to shared tuplesort state
2941 : *
2942 : * Must be called by all worker processes.
2943 : */
2944 : void
2945 236 : tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
2946 : {
2947 : /* Attach to SharedFileSet */
2948 236 : SharedFileSetAttach(&shared->fileset, seg);
2949 236 : }
2950 :
2951 : /*
2952 : * worker_get_identifier - Assign and return ordinal identifier for worker
2953 : *
2954 : * The order in which these are assigned is not well defined, and should not
2955 : * matter; worker numbers across parallel sort participants need only be
2956 : * distinct and gapless. logtape.c requires this.
2957 : *
2958 : * Note that the identifiers assigned from here have no relation to
2959 : * ParallelWorkerNumber number, to avoid making any assumption about
2960 : * caller's requirements. However, we do follow the ParallelWorkerNumber
2961 : * convention of representing a non-worker with worker number -1. This
2962 : * includes the leader, as well as serial Tuplesort processes.
2963 : */
2964 : static int
2965 468 : worker_get_identifier(Tuplesortstate *state)
2966 : {
2967 468 : Sharedsort *shared = state->shared;
2968 : int worker;
2969 :
2970 : Assert(WORKER(state));
2971 :
2972 468 : SpinLockAcquire(&shared->mutex);
2973 468 : worker = shared->currentWorker++;
2974 468 : SpinLockRelease(&shared->mutex);
2975 :
2976 468 : return worker;
2977 : }
2978 :
2979 : /*
2980 : * worker_freeze_result_tape - freeze worker's result tape for leader
2981 : *
2982 : * This is called by workers just after the result tape has been determined,
2983 : * instead of calling LogicalTapeFreeze() directly. They do so because
2984 : * workers require a few additional steps over similar serial
2985 : * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
2986 : * steps are around freeing now unneeded resources, and representing to
2987 : * leader that worker's input run is available for its merge.
2988 : *
2989 : * There should only be one final output run for each worker, which consists
2990 : * of all tuples that were originally input into worker.
2991 : */
2992 : static void
2993 468 : worker_freeze_result_tape(Tuplesortstate *state)
2994 : {
2995 468 : Sharedsort *shared = state->shared;
2996 : TapeShare output;
2997 :
2998 : Assert(WORKER(state));
2999 : Assert(state->result_tape != NULL);
3000 : Assert(state->memtupcount == 0);
3001 :
3002 : /*
3003 : * Free most remaining memory, in case caller is sensitive to our holding
3004 : * on to it. memtuples may not be a tiny merge heap at this point.
3005 : */
3006 468 : pfree(state->memtuples);
3007 : /* Be tidy */
3008 468 : state->memtuples = NULL;
3009 468 : state->memtupsize = 0;
3010 :
3011 : /*
3012 : * Parallel worker requires result tape metadata, which is to be stored in
3013 : * shared memory for leader
3014 : */
3015 468 : LogicalTapeFreeze(state->result_tape, &output);
3016 :
3017 : /* Store properties of output tape, and update finished worker count */
3018 468 : SpinLockAcquire(&shared->mutex);
3019 468 : shared->tapes[state->worker] = output;
3020 468 : shared->workersFinished++;
3021 468 : SpinLockRelease(&shared->mutex);
3022 468 : }
3023 :
3024 : /*
3025 : * worker_nomergeruns - dump memtuples in worker, without merging
3026 : *
3027 : * This called as an alternative to mergeruns() with a worker when no
3028 : * merging is required.
3029 : */
3030 : static void
3031 468 : worker_nomergeruns(Tuplesortstate *state)
3032 : {
3033 : Assert(WORKER(state));
3034 : Assert(state->result_tape == NULL);
3035 : Assert(state->nOutputRuns == 1);
3036 :
3037 468 : state->result_tape = state->destTape;
3038 468 : worker_freeze_result_tape(state);
3039 468 : }
3040 :
3041 : /*
3042 : * leader_takeover_tapes - create tapeset for leader from worker tapes
3043 : *
3044 : * So far, leader Tuplesortstate has performed no actual sorting. By now, all
3045 : * sorting has occurred in workers, all of which must have already returned
3046 : * from tuplesort_performsort().
3047 : *
3048 : * When this returns, leader process is left in a state that is virtually
3049 : * indistinguishable from it having generated runs as a serial external sort
3050 : * might have.
3051 : */
3052 : static void
3053 162 : leader_takeover_tapes(Tuplesortstate *state)
3054 : {
3055 162 : Sharedsort *shared = state->shared;
3056 162 : int nParticipants = state->nParticipants;
3057 : int workersFinished;
3058 : int j;
3059 :
3060 : Assert(LEADER(state));
3061 : Assert(nParticipants >= 1);
3062 :
3063 162 : SpinLockAcquire(&shared->mutex);
3064 162 : workersFinished = shared->workersFinished;
3065 162 : SpinLockRelease(&shared->mutex);
3066 :
3067 162 : if (nParticipants != workersFinished)
3068 0 : elog(ERROR, "cannot take over tapes before all workers finish");
3069 :
3070 : /*
3071 : * Create the tapeset from worker tapes, including a leader-owned tape at
3072 : * the end. Parallel workers are far more expensive than logical tapes,
3073 : * so the number of tapes allocated here should never be excessive.
3074 : */
3075 162 : inittapestate(state, nParticipants);
3076 162 : state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
3077 :
3078 : /*
3079 : * Set currentRun to reflect the number of runs we will merge (it's not
3080 : * used for anything, this is just pro forma)
3081 : */
3082 162 : state->currentRun = nParticipants;
3083 :
3084 : /*
3085 : * Initialize the state to look the same as after building the initial
3086 : * runs.
3087 : *
3088 : * There will always be exactly 1 run per worker, and exactly one input
3089 : * tape per run, because workers always output exactly 1 run, even when
3090 : * there were no input tuples for workers to sort.
3091 : */
3092 162 : state->inputTapes = NULL;
3093 162 : state->nInputTapes = 0;
3094 162 : state->nInputRuns = 0;
3095 :
3096 162 : state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
3097 162 : state->nOutputTapes = nParticipants;
3098 162 : state->nOutputRuns = nParticipants;
3099 :
3100 490 : for (j = 0; j < nParticipants; j++)
3101 : {
3102 328 : state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
3103 : }
3104 :
3105 162 : state->status = TSS_BUILDRUNS;
3106 162 : }
3107 :
3108 : /*
3109 : * Convenience routine to free a tuple previously loaded into sort memory
3110 : */
3111 : static void
3112 3760000 : free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
3113 : {
3114 3760000 : if (stup->tuple)
3115 : {
3116 3599722 : FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
3117 3599722 : pfree(stup->tuple);
3118 3599722 : stup->tuple = NULL;
3119 : }
3120 3760000 : }
3121 :
3122 : int
3123 0 : ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
3124 : {
3125 0 : if (x < y)
3126 0 : return -1;
3127 0 : else if (x > y)
3128 0 : return 1;
3129 : else
3130 0 : return 0;
3131 : }
3132 :
3133 : int
3134 1096186 : ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
3135 : {
3136 1096186 : int64 xx = DatumGetInt64(x);
3137 1096186 : int64 yy = DatumGetInt64(y);
3138 :
3139 1096186 : if (xx < yy)
3140 398422 : return -1;
3141 697764 : else if (xx > yy)
3142 353162 : return 1;
3143 : else
3144 344602 : return 0;
3145 : }
3146 :
3147 : int
3148 175706596 : ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
3149 : {
3150 175706596 : int32 xx = DatumGetInt32(x);
3151 175706596 : int32 yy = DatumGetInt32(y);
3152 :
3153 175706596 : if (xx < yy)
3154 42496654 : return -1;
3155 133209942 : else if (xx > yy)
3156 39877852 : return 1;
3157 : else
3158 93332090 : return 0;
3159 : }
|