Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * tuplesort.c
4 : * Generalized tuple sorting routines.
5 : *
6 : * This module provides a generalized facility for tuple sorting, which can be
7 : * applied to different kinds of sortable objects. Implementation of
8 : * the particular sorting variants is given in tuplesortvariants.c.
9 : * This module works efficiently for both small and large amounts
10 : * of data. Small amounts are sorted in-memory. Large amounts are
11 : * sorted using temporary files and a standard external sort
12 : * algorithm.
13 : *
14 : * See Knuth, volume 3, for more than you want to know about external
15 : * sorting algorithms. The algorithm we use is a balanced k-way merge.
16 : * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
17 : * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
18 : * merge is better. Knuth is assuming that tape drives are expensive
19 : * beasts, and in particular that there will always be many more runs than
20 : * tape drives. The polyphase merge algorithm was good at keeping all the
21 : * tape drives busy, but in our implementation a "tape drive" doesn't cost
22 : * much more than a few Kb of memory buffers, so we can afford to have
23 : * lots of them. In particular, if we can have as many tape drives as
24 : * sorted runs, we can eliminate any repeated I/O at all.
25 : *
26 : * Historically, we divided the input into sorted runs using replacement
27 : * selection, in the form of a priority tree implemented as a heap
28 : * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
29 : * or radix sort for run generation.
30 : *
31 : * The approximate amount of memory allowed for any one sort operation
32 : * is specified in kilobytes by the caller (most pass work_mem). Initially,
33 : * we absorb tuples and simply store them in an unsorted array as long as
34 : * we haven't exceeded workMem. If we reach the end of the input without
35 : * exceeding workMem, we sort the array in memory and subsequently return
36 : * tuples just by scanning the tuple array sequentially. If we do exceed
37 : * workMem, we begin to emit tuples into sorted runs in temporary tapes.
38 : * When tuples are dumped in batch after in-memory sorting, we begin a new run
39 : * with a new output tape. If we reach the max number of tapes, we write
40 : * subsequent runs on the existing tapes in a round-robin fashion. We will
41 : * need multiple merge passes to finish the merge in that case. After the
42 : * end of the input is reached, we dump out remaining tuples in memory into
43 : * a final run, then merge the runs.
44 : *
45 : * When merging runs, we use a heap containing just the frontmost tuple from
46 : * each source run; we repeatedly output the smallest tuple and replace it
47 : * with the next tuple from its source tape (if any). When the heap empties,
48 : * the merge is complete. The basic merge algorithm thus needs very little
49 : * memory --- only M tuples for an M-way merge, and M is constrained to a
50 : * small number. However, we can still make good use of our full workMem
51 : * allocation by pre-reading additional blocks from each source tape. Without
52 : * prereading, our access pattern to the temporary file would be very erratic;
53 : * on average we'd read one block from each of M source tapes during the same
54 : * time that we're writing M blocks to the output tape, so there is no
55 : * sequentiality of access at all, defeating the read-ahead methods used by
56 : * most Unix kernels. Worse, the output tape gets written into a very random
57 : * sequence of blocks of the temp file, ensuring that things will be even
58 : * worse when it comes time to read that tape. A straightforward merge pass
59 : * thus ends up doing a lot of waiting for disk seeks. We can improve matters
60 : * by prereading from each source tape sequentially, loading about workMem/M
61 : * bytes from each tape in turn, and making the sequential blocks immediately
62 : * available for reuse. This approach helps to localize both read and write
63 : * accesses. The pre-reading is handled by logtape.c, we just tell it how
64 : * much memory to use for the buffers.
65 : *
66 : * In the current code we determine the number of input tapes M on the basis
67 : * of workMem: we want workMem/M to be large enough that we read a fair
68 : * amount of data each time we read from a tape, so as to maintain the
69 : * locality of access described above. Nonetheless, with large workMem we
70 : * can have many tapes. The logical "tapes" are implemented by logtape.c,
71 : * which avoids space wastage by recycling disk space as soon as each block
72 : * is read from its "tape".
73 : *
74 : * When the caller requests random access to the sort result, we form
75 : * the final sorted run on a logical tape which is then "frozen", so
76 : * that we can access it randomly. When the caller does not need random
77 : * access, we return from tuplesort_performsort() as soon as we are down
78 : * to one run per logical tape. The final merge is then performed
79 : * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
80 : * saves one cycle of writing all the data out to disk and reading it in.
81 : *
82 : * This module supports parallel sorting. Parallel sorts involve coordination
83 : * among one or more worker processes, and a leader process, each with its own
84 : * tuplesort state. The leader process (or, more accurately, the
85 : * Tuplesortstate associated with a leader process) creates a full tapeset
86 : * consisting of worker tapes with one run to merge; a run for every
87 : * worker process. This is then merged. Worker processes are guaranteed to
88 : * produce exactly one output run from their partial input.
89 : *
90 : *
91 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
92 : * Portions Copyright (c) 1994, Regents of the University of California
93 : *
94 : * IDENTIFICATION
95 : * src/backend/utils/sort/tuplesort.c
96 : *
97 : *-------------------------------------------------------------------------
98 : */
99 :
100 : #include "postgres.h"
101 :
102 : #include <limits.h>
103 :
104 : #include "commands/tablespace.h"
105 : #include "miscadmin.h"
106 : #include "pg_trace.h"
107 : #include "storage/shmem.h"
108 : #include "utils/guc.h"
109 : #include "utils/memutils.h"
110 : #include "utils/pg_rusage.h"
111 : #include "utils/tuplesort.h"
112 :
113 : /*
114 : * Initial size of memtuples array. This must be more than
115 : * ALLOCSET_SEPARATE_THRESHOLD; see comments in grow_memtuples(). Clamp at
116 : * 1024 elements to avoid excessive reallocs.
117 : */
118 : #define INITIAL_MEMTUPSIZE Max(1024, \
119 : ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
120 :
121 : /* GUC variables */
122 : bool trace_sort = false;
123 :
124 : #ifdef DEBUG_BOUNDED_SORT
125 : bool optimize_bounded_sort = true;
126 : #endif
127 :
128 :
129 : /*
130 : * During merge, we use a pre-allocated set of fixed-size slots to hold
131 : * tuples. To avoid palloc/pfree overhead.
132 : *
133 : * Merge doesn't require a lot of memory, so we can afford to waste some,
134 : * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
135 : * palloc() overhead is not significant anymore.
136 : *
137 : * 'nextfree' is valid when this chunk is in the free list. When in use, the
138 : * slot holds a tuple.
139 : */
140 : #define SLAB_SLOT_SIZE 1024
141 :
142 : typedef union SlabSlot
143 : {
144 : union SlabSlot *nextfree;
145 : char buffer[SLAB_SLOT_SIZE];
146 : } SlabSlot;
147 :
148 : /*
149 : * Possible states of a Tuplesort object. These denote the states that
150 : * persist between calls of Tuplesort routines.
151 : */
152 : typedef enum
153 : {
154 : TSS_INITIAL, /* Loading tuples; still within memory limit */
155 : TSS_BOUNDED, /* Loading tuples into bounded-size heap */
156 : TSS_BUILDRUNS, /* Loading tuples; writing to tape */
157 : TSS_SORTEDINMEM, /* Sort completed entirely in memory */
158 : TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
159 : TSS_FINALMERGE, /* Performing final merge on-the-fly */
160 : } TupSortStatus;
161 :
162 : /*
163 : * Parameters for calculation of number of tapes to use --- see inittapes()
164 : * and tuplesort_merge_order().
165 : *
166 : * In this calculation we assume that each tape will cost us about 1 blocks
167 : * worth of buffer space. This ignores the overhead of all the other data
168 : * structures needed for each tape, but it's probably close enough.
169 : *
170 : * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
171 : * input tape, for pre-reading (see discussion at top of file). This is *in
172 : * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
173 : */
174 : #define MINORDER 6 /* minimum merge order */
175 : #define MAXORDER 500 /* maximum merge order */
176 : #define TAPE_BUFFER_OVERHEAD BLCKSZ
177 : #define MERGE_BUFFER_SIZE (BLCKSZ * 32)
178 :
179 :
180 : /*
181 : * Private state of a Tuplesort operation.
182 : */
183 : struct Tuplesortstate
184 : {
185 : TuplesortPublic base;
186 : TupSortStatus status; /* enumerated value as shown above */
187 : bool bounded; /* did caller specify a maximum number of
188 : * tuples to return? */
189 : bool boundUsed; /* true if we made use of a bounded heap */
190 : int bound; /* if bounded, the maximum number of tuples */
191 : int64 tupleMem; /* memory consumed by individual tuples.
192 : * storing this separately from what we track
193 : * in availMem allows us to subtract the
194 : * memory consumed by all tuples when dumping
195 : * tuples to tape */
196 : int64 availMem; /* remaining memory available, in bytes */
197 : int64 allowedMem; /* total memory allowed, in bytes */
198 : int maxTapes; /* max number of input tapes to merge in each
199 : * pass */
200 : int64 maxSpace; /* maximum amount of space occupied among sort
201 : * of groups, either in-memory or on-disk */
202 : bool isMaxSpaceDisk; /* true when maxSpace tracks on-disk space,
203 : * false means in-memory */
204 : TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
205 : LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
206 :
207 : /*
208 : * This array holds the tuples now in sort memory. If we are in state
209 : * INITIAL, the tuples are in no particular order; if we are in state
210 : * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
211 : * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
212 : * H. In state SORTEDONTAPE, the array is not used.
213 : */
214 : SortTuple *memtuples; /* array of SortTuple structs */
215 : int memtupcount; /* number of tuples currently present */
216 : int memtupsize; /* allocated length of memtuples array */
217 : bool growmemtuples; /* memtuples' growth still underway? */
218 :
219 : /*
220 : * Memory for tuples is sometimes allocated using a simple slab allocator,
221 : * rather than with palloc(). Currently, we switch to slab allocation
222 : * when we start merging. Merging only needs to keep a small, fixed
223 : * number of tuples in memory at any time, so we can avoid the
224 : * palloc/pfree overhead by recycling a fixed number of fixed-size slots
225 : * to hold the tuples.
226 : *
227 : * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
228 : * slots. The allocation is sized to have one slot per tape, plus one
229 : * additional slot. We need that many slots to hold all the tuples kept
230 : * in the heap during merge, plus the one we have last returned from the
231 : * sort, with tuplesort_gettuple.
232 : *
233 : * Initially, all the slots are kept in a linked list of free slots. When
234 : * a tuple is read from a tape, it is put to the next available slot, if
235 : * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
236 : * instead.
237 : *
238 : * When we're done processing a tuple, we return the slot back to the free
239 : * list, or pfree() if it was palloc'd. We know that a tuple was
240 : * allocated from the slab, if its pointer value is between
241 : * slabMemoryBegin and -End.
242 : *
243 : * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
244 : * tracking memory usage is not used.
245 : */
246 : bool slabAllocatorUsed;
247 :
248 : char *slabMemoryBegin; /* beginning of slab memory arena */
249 : char *slabMemoryEnd; /* end of slab memory arena */
250 : SlabSlot *slabFreeHead; /* head of free list */
251 :
252 : /* Memory used for input and output tape buffers. */
253 : size_t tape_buffer_mem;
254 :
255 : /*
256 : * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
257 : * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
258 : * modes), we remember the tuple in 'lastReturnedTuple', so that we can
259 : * recycle the memory on next gettuple call.
260 : */
261 : void *lastReturnedTuple;
262 :
263 : /*
264 : * While building initial runs, this is the current output run number.
265 : * Afterwards, it is the number of initial runs we made.
266 : */
267 : int currentRun;
268 :
269 : /*
270 : * Logical tapes, for merging.
271 : *
272 : * The initial runs are written in the output tapes. In each merge pass,
273 : * the output tapes of the previous pass become the input tapes, and new
274 : * output tapes are created as needed. When nInputTapes equals
275 : * nInputRuns, there is only one merge pass left.
276 : */
277 : LogicalTape **inputTapes;
278 : int nInputTapes;
279 : int nInputRuns;
280 :
281 : LogicalTape **outputTapes;
282 : int nOutputTapes;
283 : int nOutputRuns;
284 :
285 : LogicalTape *destTape; /* current output tape */
286 :
287 : /*
288 : * These variables are used after completion of sorting to keep track of
289 : * the next tuple to return. (In the tape case, the tape's current read
290 : * position is also critical state.)
291 : */
292 : LogicalTape *result_tape; /* actual tape of finished output */
293 : int current; /* array index (only used if SORTEDINMEM) */
294 : bool eof_reached; /* reached EOF (needed for cursors) */
295 :
296 : /* markpos_xxx holds marked position for mark and restore */
297 : int64 markpos_block; /* tape block# (only used if SORTEDONTAPE) */
298 : int markpos_offset; /* saved "current", or offset in tape block */
299 : bool markpos_eof; /* saved "eof_reached" */
300 :
301 : /*
302 : * These variables are used during parallel sorting.
303 : *
304 : * worker is our worker identifier. Follows the general convention that
305 : * -1 value relates to a leader tuplesort, and values >= 0 worker
306 : * tuplesorts. (-1 can also be a serial tuplesort.)
307 : *
308 : * shared is mutable shared memory state, which is used to coordinate
309 : * parallel sorts.
310 : *
311 : * nParticipants is the number of worker Tuplesortstates known by the
312 : * leader to have actually been launched, which implies that they must
313 : * finish a run that the leader needs to merge. Typically includes a
314 : * worker state held by the leader process itself. Set in the leader
315 : * Tuplesortstate only.
316 : */
317 : int worker;
318 : Sharedsort *shared;
319 : int nParticipants;
320 :
321 : /*
322 : * Additional state for managing "abbreviated key" sortsupport routines
323 : * (which currently may be used by all cases except the hash index case).
324 : * Tracks the intervals at which the optimization's effectiveness is
325 : * tested.
326 : */
327 : int64 abbrevNext; /* Tuple # at which to next check
328 : * applicability */
329 :
330 : /*
331 : * Resource snapshot for time of sort start.
332 : */
333 : PGRUsage ru_start;
334 : };
335 :
336 : /*
337 : * Private mutable state of tuplesort-parallel-operation. This is allocated
338 : * in shared memory.
339 : */
340 : struct Sharedsort
341 : {
342 : /* mutex protects all fields prior to tapes */
343 : slock_t mutex;
344 :
345 : /*
346 : * currentWorker generates ordinal identifier numbers for parallel sort
347 : * workers. These start from 0, and are always gapless.
348 : *
349 : * Workers increment workersFinished to indicate having finished. If this
350 : * is equal to state.nParticipants within the leader, leader is ready to
351 : * merge worker runs.
352 : */
353 : int currentWorker;
354 : int workersFinished;
355 :
356 : /* Temporary file space */
357 : SharedFileSet fileset;
358 :
359 : /* Size of tapes flexible array */
360 : int nTapes;
361 :
362 : /*
363 : * Tapes array used by workers to report back information needed by the
364 : * leader to concatenate all worker tapes into one for merging
365 : */
366 : TapeShare tapes[FLEXIBLE_ARRAY_MEMBER];
367 : };
368 :
369 : /*
370 : * Is the given tuple allocated from the slab memory arena?
371 : */
372 : #define IS_SLAB_SLOT(state, tuple) \
373 : ((char *) (tuple) >= (state)->slabMemoryBegin && \
374 : (char *) (tuple) < (state)->slabMemoryEnd)
375 :
376 : /*
377 : * Return the given tuple to the slab memory free list, or free it
378 : * if it was palloc'd.
379 : */
380 : #define RELEASE_SLAB_SLOT(state, tuple) \
381 : do { \
382 : SlabSlot *buf = (SlabSlot *) tuple; \
383 : \
384 : if (IS_SLAB_SLOT((state), buf)) \
385 : { \
386 : buf->nextfree = (state)->slabFreeHead; \
387 : (state)->slabFreeHead = buf; \
388 : } else \
389 : pfree(buf); \
390 : } while(0)
391 :
392 : #define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
393 : #define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
394 : #define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
395 : #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
396 : #define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
397 : #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
398 : #define USEMEM(state,amt) ((state)->availMem -= (amt))
399 : #define FREEMEM(state,amt) ((state)->availMem += (amt))
400 : #define SERIAL(state) ((state)->shared == NULL)
401 : #define WORKER(state) ((state)->shared && (state)->worker != -1)
402 : #define LEADER(state) ((state)->shared && (state)->worker == -1)
403 :
404 : /*
405 : * NOTES about on-tape representation of tuples:
406 : *
407 : * We require the first "unsigned int" of a stored tuple to be the total size
408 : * on-tape of the tuple, including itself (so it is never zero; an all-zero
409 : * unsigned int is used to delimit runs). The remainder of the stored tuple
410 : * may or may not match the in-memory representation of the tuple ---
411 : * any conversion needed is the job of the writetup and readtup routines.
412 : *
413 : * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
414 : * representation of the tuple must be followed by another "unsigned int" that
415 : * is a copy of the length --- so the total tape space used is actually
416 : * sizeof(unsigned int) more than the stored length value. This allows
417 : * read-backwards. When the random access flag was not specified, the
418 : * write/read routines may omit the extra length word.
419 : *
420 : * writetup is expected to write both length words as well as the tuple
421 : * data. When readtup is called, the tape is positioned just after the
422 : * front length word; readtup must read the tuple data and advance past
423 : * the back length word (if present).
424 : *
425 : * The write/read routines can make use of the tuple description data
426 : * stored in the Tuplesortstate record, if needed. They are also expected
427 : * to adjust state->availMem by the amount of memory space (not tape space!)
428 : * released or consumed. There is no error return from either writetup
429 : * or readtup; they should ereport() on failure.
430 : *
431 : *
432 : * NOTES about memory consumption calculations:
433 : *
434 : * We count space allocated for tuples against the workMem limit, plus
435 : * the space used by the variable-size memtuples array. Fixed-size space
436 : * is not counted; it's small enough to not be interesting.
437 : *
438 : * Note that we count actual space used (as shown by GetMemoryChunkSpace)
439 : * rather than the originally-requested size. This is important since
440 : * palloc can add substantial overhead. It's not a complete answer since
441 : * we won't count any wasted space in palloc allocation blocks, but it's
442 : * a lot better than what we were doing before 7.3. As of 9.6, a
443 : * separate memory context is used for caller passed tuples. Resetting
444 : * it at certain key increments significantly ameliorates fragmentation.
445 : * readtup routines use the slab allocator (they cannot use
446 : * the reset context because it gets deleted at the point that merging
447 : * begins).
448 : */
449 :
450 :
451 : static void tuplesort_begin_batch(Tuplesortstate *state);
452 : static bool consider_abort_common(Tuplesortstate *state);
453 : static void inittapes(Tuplesortstate *state, bool mergeruns);
454 : static void inittapestate(Tuplesortstate *state, int maxTapes);
455 : static void selectnewtape(Tuplesortstate *state);
456 : static void init_slab_allocator(Tuplesortstate *state, int numSlots);
457 : static void mergeruns(Tuplesortstate *state);
458 : static void mergeonerun(Tuplesortstate *state);
459 : static void beginmerge(Tuplesortstate *state);
460 : static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
461 : static void dumptuples(Tuplesortstate *state, bool alltuples);
462 : static void make_bounded_heap(Tuplesortstate *state);
463 : static void sort_bounded_heap(Tuplesortstate *state);
464 : static void tuplesort_sort_memtuples(Tuplesortstate *state);
465 : static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
466 : static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
467 : static void tuplesort_heap_delete_top(Tuplesortstate *state);
468 : static void reversedirection(Tuplesortstate *state);
469 : static unsigned int getlen(LogicalTape *tape, bool eofOK);
470 : static void markrunend(LogicalTape *tape);
471 : static int worker_get_identifier(Tuplesortstate *state);
472 : static void worker_freeze_result_tape(Tuplesortstate *state);
473 : static void worker_nomergeruns(Tuplesortstate *state);
474 : static void leader_takeover_tapes(Tuplesortstate *state);
475 : static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
476 : static void tuplesort_free(Tuplesortstate *state);
477 : static void tuplesort_updatemax(Tuplesortstate *state);
478 :
479 :
480 : /*
481 : * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
482 : * any variant of SortTuples, using the appropriate comparetup function.
483 : * qsort_ssup() is specialized for the case where the comparetup function
484 : * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
485 : * and Datum sorts.
486 : */
487 :
488 : #define ST_SORT qsort_tuple
489 : #define ST_ELEMENT_TYPE SortTuple
490 : #define ST_COMPARE_RUNTIME_POINTER
491 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
492 : #define ST_CHECK_FOR_INTERRUPTS
493 : #define ST_SCOPE static
494 : #define ST_DECLARE
495 : #define ST_DEFINE
496 : #include "lib/sort_template.h"
497 :
498 : #define ST_SORT qsort_ssup
499 : #define ST_ELEMENT_TYPE SortTuple
500 : #define ST_COMPARE(a, b, ssup) \
501 : ApplySortComparator((a)->datum1, (a)->isnull1, \
502 : (b)->datum1, (b)->isnull1, (ssup))
503 : #define ST_COMPARE_ARG_TYPE SortSupportData
504 : #define ST_CHECK_FOR_INTERRUPTS
505 : #define ST_SCOPE static
506 : #define ST_DEFINE
507 : #include "lib/sort_template.h"
508 :
509 : /* state for radix sort */
510 : typedef struct RadixSortInfo
511 : {
512 : union
513 : {
514 : size_t count;
515 : size_t offset;
516 : };
517 : size_t next_offset;
518 : } RadixSortInfo;
519 :
520 : /*
521 : * Threshold below which qsort_tuple() is generally faster than a radix sort.
522 : */
523 : #define QSORT_THRESHOLD 40
524 :
525 :
526 : /*
527 : * tuplesort_begin_xxx
528 : *
529 : * Initialize for a tuple sort operation.
530 : *
531 : * After calling tuplesort_begin, the caller should call tuplesort_putXXX
532 : * zero or more times, then call tuplesort_performsort when all the tuples
533 : * have been supplied. After performsort, retrieve the tuples in sorted
534 : * order by calling tuplesort_getXXX until it returns false/NULL. (If random
535 : * access was requested, rescan, markpos, and restorepos can also be called.)
536 : * Call tuplesort_end to terminate the operation and release memory/disk space.
537 : *
538 : * Each variant of tuplesort_begin has a workMem parameter specifying the
539 : * maximum number of kilobytes of RAM to use before spilling data to disk.
540 : * (The normal value of this parameter is work_mem, but some callers use
541 : * other values.) Each variant also has a sortopt which is a bitmask of
542 : * sort options. See TUPLESORT_* definitions in tuplesort.h
543 : */
544 :
545 : Tuplesortstate *
546 141303 : tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
547 : {
548 : Tuplesortstate *state;
549 : MemoryContext maincontext;
550 : MemoryContext sortcontext;
551 : MemoryContext oldcontext;
552 :
553 : /* See leader_takeover_tapes() remarks on random access support */
554 141303 : if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
555 0 : elog(ERROR, "random access disallowed under parallel sort");
556 :
557 : /*
558 : * Memory context surviving tuplesort_reset. This memory context holds
559 : * data which is useful to keep while sorting multiple similar batches.
560 : */
561 141303 : maincontext = AllocSetContextCreate(CurrentMemoryContext,
562 : "TupleSort main",
563 : ALLOCSET_DEFAULT_SIZES);
564 :
565 : /*
566 : * Create a working memory context for one sort operation. The content of
567 : * this context is deleted by tuplesort_reset.
568 : */
569 141303 : sortcontext = AllocSetContextCreate(maincontext,
570 : "TupleSort sort",
571 : ALLOCSET_DEFAULT_SIZES);
572 :
573 : /*
574 : * Additionally a working memory context for tuples is setup in
575 : * tuplesort_begin_batch.
576 : */
577 :
578 : /*
579 : * Make the Tuplesortstate within the per-sortstate context. This way, we
580 : * don't need a separate pfree() operation for it at shutdown.
581 : */
582 141303 : oldcontext = MemoryContextSwitchTo(maincontext);
583 :
584 141303 : state = palloc0_object(Tuplesortstate);
585 :
586 141303 : if (trace_sort)
587 0 : pg_rusage_init(&state->ru_start);
588 :
589 141303 : state->base.sortopt = sortopt;
590 141303 : state->base.tuples = true;
591 141303 : state->abbrevNext = 10;
592 :
593 : /*
594 : * workMem is forced to be at least 64KB, the current minimum valid value
595 : * for the work_mem GUC. This is a defense against parallel sort callers
596 : * that divide out memory among many workers in a way that leaves each
597 : * with very little memory.
598 : */
599 141303 : state->allowedMem = Max(workMem, 64) * (int64) 1024;
600 141303 : state->base.sortcontext = sortcontext;
601 141303 : state->base.maincontext = maincontext;
602 :
603 141303 : state->memtupsize = INITIAL_MEMTUPSIZE;
604 141303 : state->memtuples = NULL;
605 :
606 : /*
607 : * After all of the other non-parallel-related state, we setup all of the
608 : * state needed for each batch.
609 : */
610 141303 : tuplesort_begin_batch(state);
611 :
612 : /*
613 : * Initialize parallel-related state based on coordination information
614 : * from caller
615 : */
616 141303 : if (!coordinate)
617 : {
618 : /* Serial sort */
619 140889 : state->shared = NULL;
620 140889 : state->worker = -1;
621 140889 : state->nParticipants = -1;
622 : }
623 414 : else if (coordinate->isWorker)
624 : {
625 : /* Parallel worker produces exactly one final run from all input */
626 282 : state->shared = coordinate->sharedsort;
627 282 : state->worker = worker_get_identifier(state);
628 282 : state->nParticipants = -1;
629 : }
630 : else
631 : {
632 : /* Parallel leader state only used for final merge */
633 132 : state->shared = coordinate->sharedsort;
634 132 : state->worker = -1;
635 132 : state->nParticipants = coordinate->nParticipants;
636 : Assert(state->nParticipants >= 1);
637 : }
638 :
639 141303 : MemoryContextSwitchTo(oldcontext);
640 :
641 141303 : return state;
642 : }
643 :
644 : /*
645 : * tuplesort_begin_batch
646 : *
647 : * Setup, or reset, all state need for processing a new set of tuples with this
648 : * sort state. Called both from tuplesort_begin_common (the first time sorting
649 : * with this sort state) and tuplesort_reset (for subsequent usages).
650 : */
651 : static void
652 143010 : tuplesort_begin_batch(Tuplesortstate *state)
653 : {
654 : MemoryContext oldcontext;
655 :
656 143010 : oldcontext = MemoryContextSwitchTo(state->base.maincontext);
657 :
658 : /*
659 : * Caller tuple (e.g. IndexTuple) memory context.
660 : *
661 : * A dedicated child context used exclusively for caller passed tuples
662 : * eases memory management. Resetting at key points reduces
663 : * fragmentation. Note that the memtuples array of SortTuples is allocated
664 : * in the parent context, not this context, because there is no need to
665 : * free memtuples early. For bounded sorts, tuples may be pfreed in any
666 : * order, so we use a regular aset.c context so that it can make use of
667 : * free'd memory. When the sort is not bounded, we make use of a bump.c
668 : * context as this keeps allocations more compact with less wastage.
669 : * Allocations are also slightly more CPU efficient.
670 : */
671 143010 : if (TupleSortUseBumpTupleCxt(state->base.sortopt))
672 142324 : state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
673 : "Caller tuples",
674 : ALLOCSET_DEFAULT_SIZES);
675 : else
676 686 : state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
677 : "Caller tuples",
678 : ALLOCSET_DEFAULT_SIZES);
679 :
680 :
681 143010 : state->status = TSS_INITIAL;
682 143010 : state->bounded = false;
683 143010 : state->boundUsed = false;
684 :
685 143010 : state->availMem = state->allowedMem;
686 :
687 143010 : state->tapeset = NULL;
688 :
689 143010 : state->memtupcount = 0;
690 :
691 143010 : state->growmemtuples = true;
692 143010 : state->slabAllocatorUsed = false;
693 143010 : if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
694 : {
695 36 : pfree(state->memtuples);
696 36 : state->memtuples = NULL;
697 36 : state->memtupsize = INITIAL_MEMTUPSIZE;
698 : }
699 143010 : if (state->memtuples == NULL)
700 : {
701 141339 : state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
702 141339 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
703 : }
704 :
705 : /* workMem must be large enough for the minimal memtuples array */
706 143010 : if (LACKMEM(state))
707 0 : elog(ERROR, "insufficient memory allowed for sort");
708 :
709 143010 : state->currentRun = 0;
710 :
711 : /*
712 : * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
713 : * inittapes(), if needed.
714 : */
715 :
716 143010 : state->result_tape = NULL; /* flag that result tape has not been formed */
717 :
718 143010 : MemoryContextSwitchTo(oldcontext);
719 143010 : }
720 :
721 : /*
722 : * tuplesort_set_bound
723 : *
724 : * Advise tuplesort that at most the first N result tuples are required.
725 : *
726 : * Must be called before inserting any tuples. (Actually, we could allow it
727 : * as long as the sort hasn't spilled to disk, but there seems no need for
728 : * delayed calls at the moment.)
729 : *
730 : * This is a hint only. The tuplesort may still return more tuples than
731 : * requested. Parallel leader tuplesorts will always ignore the hint.
732 : */
733 : void
734 619 : tuplesort_set_bound(Tuplesortstate *state, int64 bound)
735 : {
736 : /* Assert we're called before loading any tuples */
737 : Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
738 : /* Assert we allow bounded sorts */
739 : Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
740 : /* Can't set the bound twice, either */
741 : Assert(!state->bounded);
742 : /* Also, this shouldn't be called in a parallel worker */
743 : Assert(!WORKER(state));
744 :
745 : /* Parallel leader allows but ignores hint */
746 619 : if (LEADER(state))
747 0 : return;
748 :
749 : #ifdef DEBUG_BOUNDED_SORT
750 : /* Honor GUC setting that disables the feature (for easy testing) */
751 : if (!optimize_bounded_sort)
752 : return;
753 : #endif
754 :
755 : /* We want to be able to compute bound * 2, so limit the setting */
756 619 : if (bound > (int64) (INT_MAX / 2))
757 0 : return;
758 :
759 619 : state->bounded = true;
760 619 : state->bound = (int) bound;
761 :
762 : /*
763 : * Bounded sorts are not an effective target for abbreviated key
764 : * optimization. Disable by setting state to be consistent with no
765 : * abbreviation support.
766 : */
767 619 : state->base.sortKeys->abbrev_converter = NULL;
768 619 : if (state->base.sortKeys->abbrev_full_comparator)
769 8 : state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
770 :
771 : /* Not strictly necessary, but be tidy */
772 619 : state->base.sortKeys->abbrev_abort = NULL;
773 619 : state->base.sortKeys->abbrev_full_comparator = NULL;
774 : }
775 :
776 : /*
777 : * tuplesort_used_bound
778 : *
779 : * Allow callers to find out if the sort state was able to use a bound.
780 : */
781 : bool
782 190 : tuplesort_used_bound(Tuplesortstate *state)
783 : {
784 190 : return state->boundUsed;
785 : }
786 :
787 : /*
788 : * tuplesort_free
789 : *
790 : * Internal routine for freeing resources of tuplesort.
791 : */
792 : static void
793 142879 : tuplesort_free(Tuplesortstate *state)
794 : {
795 : /* context swap probably not needed, but let's be safe */
796 142879 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
797 : int64 spaceUsed;
798 :
799 142879 : if (state->tapeset)
800 449 : spaceUsed = LogicalTapeSetBlocks(state->tapeset);
801 : else
802 142430 : spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
803 :
804 : /*
805 : * Delete temporary "tape" files, if any.
806 : *
807 : * We don't bother to destroy the individual tapes here. They will go away
808 : * with the sortcontext. (In TSS_FINALMERGE state, we have closed
809 : * finished tapes already.)
810 : */
811 142879 : if (state->tapeset)
812 449 : LogicalTapeSetClose(state->tapeset);
813 :
814 142879 : if (trace_sort)
815 : {
816 0 : if (state->tapeset)
817 0 : elog(LOG, "%s of worker %d ended, %" PRId64 " disk blocks used: %s",
818 : SERIAL(state) ? "external sort" : "parallel external sort",
819 : state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
820 : else
821 0 : elog(LOG, "%s of worker %d ended, %" PRId64 " KB used: %s",
822 : SERIAL(state) ? "internal sort" : "unperformed parallel sort",
823 : state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
824 : }
825 :
826 : TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
827 :
828 142879 : FREESTATE(state);
829 142879 : MemoryContextSwitchTo(oldcontext);
830 :
831 : /*
832 : * Free the per-sort memory context, thereby releasing all working memory.
833 : */
834 142879 : MemoryContextReset(state->base.sortcontext);
835 142879 : }
836 :
837 : /*
838 : * tuplesort_end
839 : *
840 : * Release resources and clean up.
841 : *
842 : * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
843 : * pointing to garbage. Be careful not to attempt to use or free such
844 : * pointers afterwards!
845 : */
846 : void
847 141172 : tuplesort_end(Tuplesortstate *state)
848 : {
849 141172 : tuplesort_free(state);
850 :
851 : /*
852 : * Free the main memory context, including the Tuplesortstate struct
853 : * itself.
854 : */
855 141172 : MemoryContextDelete(state->base.maincontext);
856 141172 : }
857 :
858 : /*
859 : * tuplesort_updatemax
860 : *
861 : * Update maximum resource usage statistics.
862 : */
863 : static void
864 1905 : tuplesort_updatemax(Tuplesortstate *state)
865 : {
866 : int64 spaceUsed;
867 : bool isSpaceDisk;
868 :
869 : /*
870 : * Note: it might seem we should provide both memory and disk usage for a
871 : * disk-based sort. However, the current code doesn't track memory space
872 : * accurately once we have begun to return tuples to the caller (since we
873 : * don't account for pfree's the caller is expected to do), so we cannot
874 : * rely on availMem in a disk sort. This does not seem worth the overhead
875 : * to fix. Is it worth creating an API for the memory context code to
876 : * tell us how much is actually used in sortcontext?
877 : */
878 1905 : if (state->tapeset)
879 : {
880 3 : isSpaceDisk = true;
881 3 : spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
882 : }
883 : else
884 : {
885 1902 : isSpaceDisk = false;
886 1902 : spaceUsed = state->allowedMem - state->availMem;
887 : }
888 :
889 : /*
890 : * Sort evicts data to the disk when it wasn't able to fit that data into
891 : * main memory. This is why we assume space used on the disk to be more
892 : * important for tracking resource usage than space used in memory. Note
893 : * that the amount of space occupied by some tupleset on the disk might be
894 : * less than amount of space occupied by the same tupleset in memory due
895 : * to more compact representation.
896 : */
897 1905 : if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
898 1902 : (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
899 : {
900 253 : state->maxSpace = spaceUsed;
901 253 : state->isMaxSpaceDisk = isSpaceDisk;
902 253 : state->maxSpaceStatus = state->status;
903 : }
904 1905 : }
905 :
906 : /*
907 : * tuplesort_reset
908 : *
909 : * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
910 : * meta-information in. After tuplesort_reset, tuplesort is ready to start
911 : * a new sort. This allows avoiding recreation of tuple sort states (and
912 : * save resources) when sorting multiple small batches.
913 : */
914 : void
915 1707 : tuplesort_reset(Tuplesortstate *state)
916 : {
917 1707 : tuplesort_updatemax(state);
918 1707 : tuplesort_free(state);
919 :
920 : /*
921 : * After we've freed up per-batch memory, re-setup all of the state common
922 : * to both the first batch and any subsequent batch.
923 : */
924 1707 : tuplesort_begin_batch(state);
925 :
926 1707 : state->lastReturnedTuple = NULL;
927 1707 : state->slabMemoryBegin = NULL;
928 1707 : state->slabMemoryEnd = NULL;
929 1707 : state->slabFreeHead = NULL;
930 1707 : }
931 :
932 : /*
933 : * Grow the memtuples[] array, if possible within our memory constraint. We
934 : * must not exceed INT_MAX tuples in memory or the caller-provided memory
935 : * limit. Return true if we were able to enlarge the array, false if not.
936 : *
937 : * Normally, at each increment we double the size of the array. When doing
938 : * that would exceed a limit, we attempt one last, smaller increase (and then
939 : * clear the growmemtuples flag so we don't try any more). That allows us to
940 : * use memory as fully as permitted; sticking to the pure doubling rule could
941 : * result in almost half going unused. Because availMem moves around with
942 : * tuple addition/removal, we need some rule to prevent making repeated small
943 : * increases in memtupsize, which would just be useless thrashing. The
944 : * growmemtuples flag accomplishes that and also prevents useless
945 : * recalculations in this function.
946 : */
947 : static bool
948 4520 : grow_memtuples(Tuplesortstate *state)
949 : {
950 : int newmemtupsize;
951 4520 : int memtupsize = state->memtupsize;
952 4520 : int64 memNowUsed = state->allowedMem - state->availMem;
953 :
954 : /* Forget it if we've already maxed out memtuples, per comment above */
955 4520 : if (!state->growmemtuples)
956 70 : return false;
957 :
958 : /* Select new value of memtupsize */
959 4450 : if (memNowUsed <= state->availMem)
960 : {
961 : /*
962 : * We've used no more than half of allowedMem; double our usage,
963 : * clamping at INT_MAX tuples.
964 : */
965 4377 : if (memtupsize < INT_MAX / 2)
966 4377 : newmemtupsize = memtupsize * 2;
967 : else
968 : {
969 0 : newmemtupsize = INT_MAX;
970 0 : state->growmemtuples = false;
971 : }
972 : }
973 : else
974 : {
975 : /*
976 : * This will be the last increment of memtupsize. Abandon doubling
977 : * strategy and instead increase as much as we safely can.
978 : *
979 : * To stay within allowedMem, we can't increase memtupsize by more
980 : * than availMem / sizeof(SortTuple) elements. In practice, we want
981 : * to increase it by considerably less, because we need to leave some
982 : * space for the tuples to which the new array slots will refer. We
983 : * assume the new tuples will be about the same size as the tuples
984 : * we've already seen, and thus we can extrapolate from the space
985 : * consumption so far to estimate an appropriate new size for the
986 : * memtuples array. The optimal value might be higher or lower than
987 : * this estimate, but it's hard to know that in advance. We again
988 : * clamp at INT_MAX tuples.
989 : *
990 : * This calculation is safe against enlarging the array so much that
991 : * LACKMEM becomes true, because the memory currently used includes
992 : * the present array; thus, there would be enough allowedMem for the
993 : * new array elements even if no other memory were currently used.
994 : *
995 : * We do the arithmetic in float8, because otherwise the product of
996 : * memtupsize and allowedMem could overflow. Any inaccuracy in the
997 : * result should be insignificant; but even if we computed a
998 : * completely insane result, the checks below will prevent anything
999 : * really bad from happening.
1000 : */
1001 : double grow_ratio;
1002 :
1003 73 : grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1004 73 : if (memtupsize * grow_ratio < INT_MAX)
1005 73 : newmemtupsize = (int) (memtupsize * grow_ratio);
1006 : else
1007 0 : newmemtupsize = INT_MAX;
1008 :
1009 : /* We won't make any further enlargement attempts */
1010 73 : state->growmemtuples = false;
1011 : }
1012 :
1013 : /* Must enlarge array by at least one element, else report failure */
1014 4450 : if (newmemtupsize <= memtupsize)
1015 0 : goto noalloc;
1016 :
1017 : /*
1018 : * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1019 : * to ensure our request won't be rejected. Note that we can easily
1020 : * exhaust address space before facing this outcome. (This is presently
1021 : * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1022 : * don't rely on that at this distance.)
1023 : */
1024 4450 : if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1025 : {
1026 0 : newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1027 0 : state->growmemtuples = false; /* can't grow any more */
1028 : }
1029 :
1030 : /*
1031 : * We need to be sure that we do not cause LACKMEM to become true, else
1032 : * the space management algorithm will go nuts. The code above should
1033 : * never generate a dangerous request, but to be safe, check explicitly
1034 : * that the array growth fits within availMem. (We could still cause
1035 : * LACKMEM if the memory chunk overhead associated with the memtuples
1036 : * array were to increase. That shouldn't happen because we chose the
1037 : * initial array size large enough to ensure that palloc will be treating
1038 : * both old and new arrays as separate chunks. But we'll check LACKMEM
1039 : * explicitly below just in case.)
1040 : */
1041 4450 : if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1042 0 : goto noalloc;
1043 :
1044 : /* OK, do it */
1045 4450 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1046 4450 : state->memtupsize = newmemtupsize;
1047 4450 : state->memtuples = (SortTuple *)
1048 4450 : repalloc_huge(state->memtuples,
1049 4450 : state->memtupsize * sizeof(SortTuple));
1050 4450 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1051 4450 : if (LACKMEM(state))
1052 0 : elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1053 4450 : return true;
1054 :
1055 0 : noalloc:
1056 : /* If for any reason we didn't realloc, shut off future attempts */
1057 0 : state->growmemtuples = false;
1058 0 : return false;
1059 : }
1060 :
1061 : /*
1062 : * Shared code for tuple and datum cases.
1063 : */
1064 : void
1065 16180503 : tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple,
1066 : bool useAbbrev, Size tuplen)
1067 : {
1068 16180503 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1069 :
1070 : Assert(!LEADER(state));
1071 :
1072 : /* account for the memory used for this tuple */
1073 16180503 : USEMEM(state, tuplen);
1074 16180503 : state->tupleMem += tuplen;
1075 :
1076 16180503 : if (!useAbbrev)
1077 : {
1078 : /*
1079 : * Leave ordinary Datum representation, or NULL value. If there is a
1080 : * converter it won't expect NULL values, and cost model is not
1081 : * required to account for NULL, so in that case we avoid calling
1082 : * converter and just set datum1 to zeroed representation (to be
1083 : * consistent, and to support cheap inequality tests for NULL
1084 : * abbreviated keys).
1085 : */
1086 : }
1087 2222975 : else if (!consider_abort_common(state))
1088 : {
1089 : /* Store abbreviated key representation */
1090 2222927 : tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
1091 : state->base.sortKeys);
1092 : }
1093 : else
1094 : {
1095 : /*
1096 : * Set state to be consistent with never trying abbreviation.
1097 : *
1098 : * Alter datum1 representation in already-copied tuples, so as to
1099 : * ensure a consistent representation (current tuple was just
1100 : * handled). It does not matter if some dumped tuples are already
1101 : * sorted on tape, since serialized tuples lack abbreviated keys
1102 : * (TSS_BUILDRUNS state prevents control reaching here in any case).
1103 : */
1104 48 : REMOVEABBREV(state, state->memtuples, state->memtupcount);
1105 : }
1106 :
1107 16180503 : switch (state->status)
1108 : {
1109 13749288 : case TSS_INITIAL:
1110 :
1111 : /*
1112 : * Save the tuple into the unsorted array. First, grow the array
1113 : * as needed. Note that we try to grow the array when there is
1114 : * still one free slot remaining --- if we fail, there'll still be
1115 : * room to store the incoming tuple, and then we'll switch to
1116 : * tape-based operation.
1117 : */
1118 13749288 : if (state->memtupcount >= state->memtupsize - 1)
1119 : {
1120 4520 : (void) grow_memtuples(state);
1121 : Assert(state->memtupcount < state->memtupsize);
1122 : }
1123 13749288 : state->memtuples[state->memtupcount++] = *tuple;
1124 :
1125 : /*
1126 : * Check if it's time to switch over to a bounded heapsort. We do
1127 : * so if the input tuple count exceeds twice the desired tuple
1128 : * count (this is a heuristic for where heapsort becomes cheaper
1129 : * than a quicksort), or if we've just filled workMem and have
1130 : * enough tuples to meet the bound.
1131 : *
1132 : * Note that once we enter TSS_BOUNDED state we will always try to
1133 : * complete the sort that way. In the worst case, if later input
1134 : * tuples are larger than earlier ones, this might cause us to
1135 : * exceed workMem significantly.
1136 : */
1137 13749288 : if (state->bounded &&
1138 23683 : (state->memtupcount > state->bound * 2 ||
1139 23475 : (state->memtupcount > state->bound && LACKMEM(state))))
1140 : {
1141 208 : if (trace_sort)
1142 0 : elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1143 : state->memtupcount,
1144 : pg_rusage_show(&state->ru_start));
1145 208 : make_bounded_heap(state);
1146 208 : MemoryContextSwitchTo(oldcontext);
1147 208 : return;
1148 : }
1149 :
1150 : /*
1151 : * Done if we still fit in available memory and have array slots.
1152 : */
1153 13749080 : if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1154 : {
1155 13749010 : MemoryContextSwitchTo(oldcontext);
1156 13749010 : return;
1157 : }
1158 :
1159 : /*
1160 : * Nope; time to switch to tape-based operation.
1161 : */
1162 70 : inittapes(state, true);
1163 :
1164 : /*
1165 : * Dump all tuples.
1166 : */
1167 70 : dumptuples(state, false);
1168 70 : break;
1169 :
1170 1875072 : case TSS_BOUNDED:
1171 :
1172 : /*
1173 : * We don't want to grow the array here, so check whether the new
1174 : * tuple can be discarded before putting it in. This should be a
1175 : * good speed optimization, too, since when there are many more
1176 : * input tuples than the bound, most input tuples can be discarded
1177 : * with just this one comparison. Note that because we currently
1178 : * have the sort direction reversed, we must check for <= not >=.
1179 : */
1180 1875072 : if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1181 : {
1182 : /* new tuple <= top of the heap, so we can discard it */
1183 1623542 : free_sort_tuple(state, tuple);
1184 1623542 : CHECK_FOR_INTERRUPTS();
1185 : }
1186 : else
1187 : {
1188 : /* discard top of heap, replacing it with the new tuple */
1189 251530 : free_sort_tuple(state, &state->memtuples[0]);
1190 251530 : tuplesort_heap_replace_top(state, tuple);
1191 : }
1192 1875072 : break;
1193 :
1194 556143 : case TSS_BUILDRUNS:
1195 :
1196 : /*
1197 : * Save the tuple into the unsorted array (there must be space)
1198 : */
1199 556143 : state->memtuples[state->memtupcount++] = *tuple;
1200 :
1201 : /*
1202 : * If we are over the memory limit, dump all tuples.
1203 : */
1204 556143 : dumptuples(state, false);
1205 556143 : break;
1206 :
1207 0 : default:
1208 0 : elog(ERROR, "invalid tuplesort state");
1209 : break;
1210 : }
1211 2431285 : MemoryContextSwitchTo(oldcontext);
1212 : }
1213 :
1214 : static bool
1215 2222975 : consider_abort_common(Tuplesortstate *state)
1216 : {
1217 : Assert(state->base.sortKeys[0].abbrev_converter != NULL);
1218 : Assert(state->base.sortKeys[0].abbrev_abort != NULL);
1219 : Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
1220 :
1221 : /*
1222 : * Check effectiveness of abbreviation optimization. Consider aborting
1223 : * when still within memory limit.
1224 : */
1225 2222975 : if (state->status == TSS_INITIAL &&
1226 1986760 : state->memtupcount >= state->abbrevNext)
1227 : {
1228 2572 : state->abbrevNext *= 2;
1229 :
1230 : /*
1231 : * Check opclass-supplied abbreviation abort routine. It may indicate
1232 : * that abbreviation should not proceed.
1233 : */
1234 2572 : if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
1235 : state->base.sortKeys))
1236 2524 : return false;
1237 :
1238 : /*
1239 : * Finally, restore authoritative comparator, and indicate that
1240 : * abbreviation is not in play by setting abbrev_converter to NULL
1241 : */
1242 48 : state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
1243 48 : state->base.sortKeys[0].abbrev_converter = NULL;
1244 : /* Not strictly necessary, but be tidy */
1245 48 : state->base.sortKeys[0].abbrev_abort = NULL;
1246 48 : state->base.sortKeys[0].abbrev_full_comparator = NULL;
1247 :
1248 : /* Give up - expect original pass-by-value representation */
1249 48 : return true;
1250 : }
1251 :
1252 2220403 : return false;
1253 : }
1254 :
1255 : /*
1256 : * All tuples have been provided; finish the sort.
1257 : */
1258 : void
1259 121508 : tuplesort_performsort(Tuplesortstate *state)
1260 : {
1261 121508 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1262 :
1263 121508 : if (trace_sort)
1264 0 : elog(LOG, "performsort of worker %d starting: %s",
1265 : state->worker, pg_rusage_show(&state->ru_start));
1266 :
1267 121508 : switch (state->status)
1268 : {
1269 121230 : case TSS_INITIAL:
1270 :
1271 : /*
1272 : * We were able to accumulate all the tuples within the allowed
1273 : * amount of memory, or leader to take over worker tapes
1274 : */
1275 121230 : if (SERIAL(state))
1276 : {
1277 : /* Sort in memory and we're done */
1278 120851 : tuplesort_sort_memtuples(state);
1279 120806 : state->status = TSS_SORTEDINMEM;
1280 : }
1281 379 : else if (WORKER(state))
1282 : {
1283 : /*
1284 : * Parallel workers must still dump out tuples to tape. No
1285 : * merge is required to produce single output run, though.
1286 : */
1287 282 : inittapes(state, false);
1288 282 : dumptuples(state, true);
1289 282 : worker_nomergeruns(state);
1290 282 : state->status = TSS_SORTEDONTAPE;
1291 : }
1292 : else
1293 : {
1294 : /*
1295 : * Leader will take over worker tapes and merge worker runs.
1296 : * Note that mergeruns sets the correct state->status.
1297 : */
1298 97 : leader_takeover_tapes(state);
1299 97 : mergeruns(state);
1300 : }
1301 121185 : state->current = 0;
1302 121185 : state->eof_reached = false;
1303 121185 : state->markpos_block = 0L;
1304 121185 : state->markpos_offset = 0;
1305 121185 : state->markpos_eof = false;
1306 121185 : break;
1307 :
1308 208 : case TSS_BOUNDED:
1309 :
1310 : /*
1311 : * We were able to accumulate all the tuples required for output
1312 : * in memory, using a heap to eliminate excess tuples. Now we
1313 : * have to transform the heap to a properly-sorted array. Note
1314 : * that sort_bounded_heap sets the correct state->status.
1315 : */
1316 208 : sort_bounded_heap(state);
1317 208 : state->current = 0;
1318 208 : state->eof_reached = false;
1319 208 : state->markpos_offset = 0;
1320 208 : state->markpos_eof = false;
1321 208 : break;
1322 :
1323 70 : case TSS_BUILDRUNS:
1324 :
1325 : /*
1326 : * Finish tape-based sort. First, flush all tuples remaining in
1327 : * memory out to tape; then merge until we have a single remaining
1328 : * run (or, if !randomAccess and !WORKER(), one run per tape).
1329 : * Note that mergeruns sets the correct state->status.
1330 : */
1331 70 : dumptuples(state, true);
1332 70 : mergeruns(state);
1333 70 : state->eof_reached = false;
1334 70 : state->markpos_block = 0L;
1335 70 : state->markpos_offset = 0;
1336 70 : state->markpos_eof = false;
1337 70 : break;
1338 :
1339 0 : default:
1340 0 : elog(ERROR, "invalid tuplesort state");
1341 : break;
1342 : }
1343 :
1344 121463 : if (trace_sort)
1345 : {
1346 0 : if (state->status == TSS_FINALMERGE)
1347 0 : elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1348 : state->worker, state->nInputTapes,
1349 : pg_rusage_show(&state->ru_start));
1350 : else
1351 0 : elog(LOG, "performsort of worker %d done: %s",
1352 : state->worker, pg_rusage_show(&state->ru_start));
1353 : }
1354 :
1355 121463 : MemoryContextSwitchTo(oldcontext);
1356 121463 : }
1357 :
1358 : /*
1359 : * Internal routine to fetch the next tuple in either forward or back
1360 : * direction into *stup. Returns false if no more tuples.
1361 : * Returned tuple belongs to tuplesort memory context, and must not be freed
1362 : * by caller. Note that fetched tuple is stored in memory that may be
1363 : * recycled by any future fetch.
1364 : */
1365 : bool
1366 14932698 : tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
1367 : SortTuple *stup)
1368 : {
1369 : unsigned int tuplen;
1370 : size_t nmoved;
1371 :
1372 : Assert(!WORKER(state));
1373 :
1374 14932698 : switch (state->status)
1375 : {
1376 12439312 : case TSS_SORTEDINMEM:
1377 : Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1378 : Assert(!state->slabAllocatorUsed);
1379 12439312 : if (forward)
1380 : {
1381 12439279 : if (state->current < state->memtupcount)
1382 : {
1383 12319048 : *stup = state->memtuples[state->current++];
1384 12319048 : return true;
1385 : }
1386 120231 : state->eof_reached = true;
1387 :
1388 : /*
1389 : * Complain if caller tries to retrieve more tuples than
1390 : * originally asked for in a bounded sort. This is because
1391 : * returning EOF here might be the wrong thing.
1392 : */
1393 120231 : if (state->bounded && state->current >= state->bound)
1394 0 : elog(ERROR, "retrieved too many tuples in a bounded sort");
1395 :
1396 120231 : return false;
1397 : }
1398 : else
1399 : {
1400 33 : if (state->current <= 0)
1401 0 : return false;
1402 :
1403 : /*
1404 : * if all tuples are fetched already then we return last
1405 : * tuple, else - tuple before last returned.
1406 : */
1407 33 : if (state->eof_reached)
1408 6 : state->eof_reached = false;
1409 : else
1410 : {
1411 27 : state->current--; /* last returned tuple */
1412 27 : if (state->current <= 0)
1413 3 : return false;
1414 : }
1415 30 : *stup = state->memtuples[state->current - 1];
1416 30 : return true;
1417 : }
1418 : break;
1419 :
1420 151500 : case TSS_SORTEDONTAPE:
1421 : Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1422 : Assert(state->slabAllocatorUsed);
1423 :
1424 : /*
1425 : * The slot that held the tuple that we returned in previous
1426 : * gettuple call can now be reused.
1427 : */
1428 151500 : if (state->lastReturnedTuple)
1429 : {
1430 76425 : RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1431 76425 : state->lastReturnedTuple = NULL;
1432 : }
1433 :
1434 151500 : if (forward)
1435 : {
1436 151485 : if (state->eof_reached)
1437 0 : return false;
1438 :
1439 151485 : if ((tuplen = getlen(state->result_tape, true)) != 0)
1440 : {
1441 151470 : READTUP(state, stup, state->result_tape, tuplen);
1442 :
1443 : /*
1444 : * Remember the tuple we return, so that we can recycle
1445 : * its memory on next call. (This can be NULL, in the
1446 : * !state->tuples case).
1447 : */
1448 151470 : state->lastReturnedTuple = stup->tuple;
1449 :
1450 151470 : return true;
1451 : }
1452 : else
1453 : {
1454 15 : state->eof_reached = true;
1455 15 : return false;
1456 : }
1457 : }
1458 :
1459 : /*
1460 : * Backward.
1461 : *
1462 : * if all tuples are fetched already then we return last tuple,
1463 : * else - tuple before last returned.
1464 : */
1465 15 : if (state->eof_reached)
1466 : {
1467 : /*
1468 : * Seek position is pointing just past the zero tuplen at the
1469 : * end of file; back up to fetch last tuple's ending length
1470 : * word. If seek fails we must have a completely empty file.
1471 : */
1472 6 : nmoved = LogicalTapeBackspace(state->result_tape,
1473 : 2 * sizeof(unsigned int));
1474 6 : if (nmoved == 0)
1475 0 : return false;
1476 6 : else if (nmoved != 2 * sizeof(unsigned int))
1477 0 : elog(ERROR, "unexpected tape position");
1478 6 : state->eof_reached = false;
1479 : }
1480 : else
1481 : {
1482 : /*
1483 : * Back up and fetch previously-returned tuple's ending length
1484 : * word. If seek fails, assume we are at start of file.
1485 : */
1486 9 : nmoved = LogicalTapeBackspace(state->result_tape,
1487 : sizeof(unsigned int));
1488 9 : if (nmoved == 0)
1489 0 : return false;
1490 9 : else if (nmoved != sizeof(unsigned int))
1491 0 : elog(ERROR, "unexpected tape position");
1492 9 : tuplen = getlen(state->result_tape, false);
1493 :
1494 : /*
1495 : * Back up to get ending length word of tuple before it.
1496 : */
1497 9 : nmoved = LogicalTapeBackspace(state->result_tape,
1498 : tuplen + 2 * sizeof(unsigned int));
1499 9 : if (nmoved == tuplen + sizeof(unsigned int))
1500 : {
1501 : /*
1502 : * We backed up over the previous tuple, but there was no
1503 : * ending length word before it. That means that the prev
1504 : * tuple is the first tuple in the file. It is now the
1505 : * next to read in forward direction (not obviously right,
1506 : * but that is what in-memory case does).
1507 : */
1508 3 : return false;
1509 : }
1510 6 : else if (nmoved != tuplen + 2 * sizeof(unsigned int))
1511 0 : elog(ERROR, "bogus tuple length in backward scan");
1512 : }
1513 :
1514 12 : tuplen = getlen(state->result_tape, false);
1515 :
1516 : /*
1517 : * Now we have the length of the prior tuple, back up and read it.
1518 : * Note: READTUP expects we are positioned after the initial
1519 : * length word of the tuple, so back up to that point.
1520 : */
1521 12 : nmoved = LogicalTapeBackspace(state->result_tape,
1522 : tuplen);
1523 12 : if (nmoved != tuplen)
1524 0 : elog(ERROR, "bogus tuple length in backward scan");
1525 12 : READTUP(state, stup, state->result_tape, tuplen);
1526 :
1527 : /*
1528 : * Remember the tuple we return, so that we can recycle its memory
1529 : * on next call. (This can be NULL, in the Datum case).
1530 : */
1531 12 : state->lastReturnedTuple = stup->tuple;
1532 :
1533 12 : return true;
1534 :
1535 2341886 : case TSS_FINALMERGE:
1536 : Assert(forward);
1537 : /* We are managing memory ourselves, with the slab allocator. */
1538 : Assert(state->slabAllocatorUsed);
1539 :
1540 : /*
1541 : * The slab slot holding the tuple that we returned in previous
1542 : * gettuple call can now be reused.
1543 : */
1544 2341886 : if (state->lastReturnedTuple)
1545 : {
1546 2281715 : RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1547 2281715 : state->lastReturnedTuple = NULL;
1548 : }
1549 :
1550 : /*
1551 : * This code should match the inner loop of mergeonerun().
1552 : */
1553 2341886 : if (state->memtupcount > 0)
1554 : {
1555 2341739 : int srcTapeIndex = state->memtuples[0].srctape;
1556 2341739 : LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
1557 : SortTuple newtup;
1558 :
1559 2341739 : *stup = state->memtuples[0];
1560 :
1561 : /*
1562 : * Remember the tuple we return, so that we can recycle its
1563 : * memory on next call. (This can be NULL, in the Datum case).
1564 : */
1565 2341739 : state->lastReturnedTuple = stup->tuple;
1566 :
1567 : /*
1568 : * Pull next tuple from tape, and replace the returned tuple
1569 : * at top of the heap with it.
1570 : */
1571 2341739 : if (!mergereadnext(state, srcTape, &newtup))
1572 : {
1573 : /*
1574 : * If no more data, we've reached end of run on this tape.
1575 : * Remove the top node from the heap.
1576 : */
1577 216 : tuplesort_heap_delete_top(state);
1578 216 : state->nInputRuns--;
1579 :
1580 : /*
1581 : * Close the tape. It'd go away at the end of the sort
1582 : * anyway, but better to release the memory early.
1583 : */
1584 216 : LogicalTapeClose(srcTape);
1585 216 : return true;
1586 : }
1587 2341523 : newtup.srctape = srcTapeIndex;
1588 2341523 : tuplesort_heap_replace_top(state, &newtup);
1589 2341523 : return true;
1590 : }
1591 147 : return false;
1592 :
1593 0 : default:
1594 0 : elog(ERROR, "invalid tuplesort state");
1595 : return false; /* keep compiler quiet */
1596 : }
1597 : }
1598 :
1599 :
1600 : /*
1601 : * Advance over N tuples in either forward or back direction,
1602 : * without returning any data. N==0 is a no-op.
1603 : * Returns true if successful, false if ran out of tuples.
1604 : */
1605 : bool
1606 196 : tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
1607 : {
1608 : MemoryContext oldcontext;
1609 :
1610 : /*
1611 : * We don't actually support backwards skip yet, because no callers need
1612 : * it. The API is designed to allow for that later, though.
1613 : */
1614 : Assert(forward);
1615 : Assert(ntuples >= 0);
1616 : Assert(!WORKER(state));
1617 :
1618 196 : switch (state->status)
1619 : {
1620 184 : case TSS_SORTEDINMEM:
1621 184 : if (state->memtupcount - state->current >= ntuples)
1622 : {
1623 184 : state->current += ntuples;
1624 184 : return true;
1625 : }
1626 0 : state->current = state->memtupcount;
1627 0 : state->eof_reached = true;
1628 :
1629 : /*
1630 : * Complain if caller tries to retrieve more tuples than
1631 : * originally asked for in a bounded sort. This is because
1632 : * returning EOF here might be the wrong thing.
1633 : */
1634 0 : if (state->bounded && state->current >= state->bound)
1635 0 : elog(ERROR, "retrieved too many tuples in a bounded sort");
1636 :
1637 0 : return false;
1638 :
1639 12 : case TSS_SORTEDONTAPE:
1640 : case TSS_FINALMERGE:
1641 :
1642 : /*
1643 : * We could probably optimize these cases better, but for now it's
1644 : * not worth the trouble.
1645 : */
1646 12 : oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1647 120066 : while (ntuples-- > 0)
1648 : {
1649 : SortTuple stup;
1650 :
1651 120054 : if (!tuplesort_gettuple_common(state, forward, &stup))
1652 : {
1653 0 : MemoryContextSwitchTo(oldcontext);
1654 0 : return false;
1655 : }
1656 120054 : CHECK_FOR_INTERRUPTS();
1657 : }
1658 12 : MemoryContextSwitchTo(oldcontext);
1659 12 : return true;
1660 :
1661 0 : default:
1662 0 : elog(ERROR, "invalid tuplesort state");
1663 : return false; /* keep compiler quiet */
1664 : }
1665 : }
1666 :
1667 : /*
1668 : * tuplesort_merge_order - report merge order we'll use for given memory
1669 : * (note: "merge order" just means the number of input tapes in the merge).
1670 : *
1671 : * This is exported for use by the planner. allowedMem is in bytes.
1672 : */
1673 : int
1674 10295 : tuplesort_merge_order(int64 allowedMem)
1675 : {
1676 : int mOrder;
1677 :
1678 : /*----------
1679 : * In the merge phase, we need buffer space for each input and output tape.
1680 : * Each pass in the balanced merge algorithm reads from M input tapes, and
1681 : * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
1682 : * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
1683 : * input tape.
1684 : *
1685 : * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
1686 : * N * TAPE_BUFFER_OVERHEAD
1687 : *
1688 : * Except for the last and next-to-last merge passes, where there can be
1689 : * fewer tapes left to process, M = N. We choose M so that we have the
1690 : * desired amount of memory available for the input buffers
1691 : * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
1692 : * available for the tape buffers (allowedMem).
1693 : *
1694 : * Note: you might be thinking we need to account for the memtuples[]
1695 : * array in this calculation, but we effectively treat that as part of the
1696 : * MERGE_BUFFER_SIZE workspace.
1697 : *----------
1698 : */
1699 10295 : mOrder = allowedMem /
1700 : (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
1701 :
1702 : /*
1703 : * Even in minimum memory, use at least a MINORDER merge. On the other
1704 : * hand, even when we have lots of memory, do not use more than a MAXORDER
1705 : * merge. Tapes are pretty cheap, but they're not entirely free. Each
1706 : * additional tape reduces the amount of memory available to build runs,
1707 : * which in turn can cause the same sort to need more runs, which makes
1708 : * merging slower even if it can still be done in a single pass. Also,
1709 : * high order merges are quite slow due to CPU cache effects; it can be
1710 : * faster to pay the I/O cost of a multi-pass merge than to perform a
1711 : * single merge pass across many hundreds of tapes.
1712 : */
1713 10295 : mOrder = Max(mOrder, MINORDER);
1714 10295 : mOrder = Min(mOrder, MAXORDER);
1715 :
1716 10295 : return mOrder;
1717 : }
1718 :
1719 : /*
1720 : * Helper function to calculate how much memory to allocate for the read buffer
1721 : * of each input tape in a merge pass.
1722 : *
1723 : * 'avail_mem' is the amount of memory available for the buffers of all the
1724 : * tapes, both input and output.
1725 : * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
1726 : * 'maxOutputTapes' is the max. number of output tapes we should produce.
1727 : */
1728 : static int64
1729 182 : merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
1730 : int maxOutputTapes)
1731 : {
1732 : int nOutputRuns;
1733 : int nOutputTapes;
1734 :
1735 : /*
1736 : * How many output tapes will we produce in this pass?
1737 : *
1738 : * This is nInputRuns / nInputTapes, rounded up.
1739 : */
1740 182 : nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1741 :
1742 182 : nOutputTapes = Min(nOutputRuns, maxOutputTapes);
1743 :
1744 : /*
1745 : * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
1746 : * remaining memory is divided evenly between the input tapes.
1747 : *
1748 : * This also follows from the formula in tuplesort_merge_order, but here
1749 : * we derive the input buffer size from the amount of memory available,
1750 : * and M and N.
1751 : */
1752 182 : return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
1753 : }
1754 :
1755 : /*
1756 : * inittapes - initialize for tape sorting.
1757 : *
1758 : * This is called only if we have found we won't sort in memory.
1759 : */
1760 : static void
1761 352 : inittapes(Tuplesortstate *state, bool mergeruns)
1762 : {
1763 : Assert(!LEADER(state));
1764 :
1765 352 : if (mergeruns)
1766 : {
1767 : /* Compute number of input tapes to use when merging */
1768 70 : state->maxTapes = tuplesort_merge_order(state->allowedMem);
1769 : }
1770 : else
1771 : {
1772 : /* Workers can sometimes produce single run, output without merge */
1773 : Assert(WORKER(state));
1774 282 : state->maxTapes = MINORDER;
1775 : }
1776 :
1777 352 : if (trace_sort)
1778 0 : elog(LOG, "worker %d switching to external sort with %d tapes: %s",
1779 : state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
1780 :
1781 : /* Create the tape set */
1782 352 : inittapestate(state, state->maxTapes);
1783 352 : state->tapeset =
1784 352 : LogicalTapeSetCreate(false,
1785 352 : state->shared ? &state->shared->fileset : NULL,
1786 : state->worker);
1787 :
1788 352 : state->currentRun = 0;
1789 :
1790 : /*
1791 : * Initialize logical tape arrays.
1792 : */
1793 352 : state->inputTapes = NULL;
1794 352 : state->nInputTapes = 0;
1795 352 : state->nInputRuns = 0;
1796 :
1797 352 : state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
1798 352 : state->nOutputTapes = 0;
1799 352 : state->nOutputRuns = 0;
1800 :
1801 352 : state->status = TSS_BUILDRUNS;
1802 :
1803 352 : selectnewtape(state);
1804 352 : }
1805 :
1806 : /*
1807 : * inittapestate - initialize generic tape management state
1808 : */
1809 : static void
1810 449 : inittapestate(Tuplesortstate *state, int maxTapes)
1811 : {
1812 : int64 tapeSpace;
1813 :
1814 : /*
1815 : * Decrease availMem to reflect the space needed for tape buffers; but
1816 : * don't decrease it to the point that we have no room for tuples. (That
1817 : * case is only likely to occur if sorting pass-by-value Datums; in all
1818 : * other scenarios the memtuples[] array is unlikely to occupy more than
1819 : * half of allowedMem. In the pass-by-value case it's not important to
1820 : * account for tuple space, so we don't care if LACKMEM becomes
1821 : * inaccurate.)
1822 : */
1823 449 : tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
1824 :
1825 449 : if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1826 388 : USEMEM(state, tapeSpace);
1827 :
1828 : /*
1829 : * Make sure that the temp file(s) underlying the tape set are created in
1830 : * suitable temp tablespaces. For parallel sorts, this should have been
1831 : * called already, but it doesn't matter if it is called a second time.
1832 : */
1833 449 : PrepareTempTablespaces();
1834 449 : }
1835 :
1836 : /*
1837 : * selectnewtape -- select next tape to output to.
1838 : *
1839 : * This is called after finishing a run when we know another run
1840 : * must be started. This is used both when building the initial
1841 : * runs, and during merge passes.
1842 : */
1843 : static void
1844 904 : selectnewtape(Tuplesortstate *state)
1845 : {
1846 : /*
1847 : * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
1848 : * both zero. On each call, we create a new output tape to hold the next
1849 : * run, until maxTapes is reached. After that, we assign new runs to the
1850 : * existing tapes in a round robin fashion.
1851 : */
1852 904 : if (state->nOutputTapes < state->maxTapes)
1853 : {
1854 : /* Create a new tape to hold the next run */
1855 : Assert(state->outputTapes[state->nOutputRuns] == NULL);
1856 : Assert(state->nOutputRuns == state->nOutputTapes);
1857 613 : state->destTape = LogicalTapeCreate(state->tapeset);
1858 613 : state->outputTapes[state->nOutputTapes] = state->destTape;
1859 613 : state->nOutputTapes++;
1860 613 : state->nOutputRuns++;
1861 : }
1862 : else
1863 : {
1864 : /*
1865 : * We have reached the max number of tapes. Append to an existing
1866 : * tape.
1867 : */
1868 291 : state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
1869 291 : state->nOutputRuns++;
1870 : }
1871 904 : }
1872 :
1873 : /*
1874 : * Initialize the slab allocation arena, for the given number of slots.
1875 : */
1876 : static void
1877 167 : init_slab_allocator(Tuplesortstate *state, int numSlots)
1878 : {
1879 167 : if (numSlots > 0)
1880 : {
1881 : char *p;
1882 : int i;
1883 :
1884 153 : state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
1885 153 : state->slabMemoryEnd = state->slabMemoryBegin +
1886 153 : numSlots * SLAB_SLOT_SIZE;
1887 153 : state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
1888 153 : USEMEM(state, numSlots * SLAB_SLOT_SIZE);
1889 :
1890 153 : p = state->slabMemoryBegin;
1891 585 : for (i = 0; i < numSlots - 1; i++)
1892 : {
1893 432 : ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
1894 432 : p += SLAB_SLOT_SIZE;
1895 : }
1896 153 : ((SlabSlot *) p)->nextfree = NULL;
1897 : }
1898 : else
1899 : {
1900 14 : state->slabMemoryBegin = state->slabMemoryEnd = NULL;
1901 14 : state->slabFreeHead = NULL;
1902 : }
1903 167 : state->slabAllocatorUsed = true;
1904 167 : }
1905 :
1906 : /*
1907 : * mergeruns -- merge all the completed initial runs.
1908 : *
1909 : * This implements the Balanced k-Way Merge Algorithm. All input data has
1910 : * already been written to initial runs on tape (see dumptuples).
1911 : */
1912 : static void
1913 167 : mergeruns(Tuplesortstate *state)
1914 : {
1915 : int tapenum;
1916 :
1917 : Assert(state->status == TSS_BUILDRUNS);
1918 : Assert(state->memtupcount == 0);
1919 :
1920 167 : if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
1921 : {
1922 : /*
1923 : * If there are multiple runs to be merged, when we go to read back
1924 : * tuples from disk, abbreviated keys will not have been stored, and
1925 : * we don't care to regenerate them. Disable abbreviation from this
1926 : * point on.
1927 : */
1928 15 : state->base.sortKeys->abbrev_converter = NULL;
1929 15 : state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
1930 :
1931 : /* Not strictly necessary, but be tidy */
1932 15 : state->base.sortKeys->abbrev_abort = NULL;
1933 15 : state->base.sortKeys->abbrev_full_comparator = NULL;
1934 : }
1935 :
1936 : /*
1937 : * Reset tuple memory. We've freed all the tuples that we previously
1938 : * allocated. We will use the slab allocator from now on.
1939 : */
1940 167 : MemoryContextResetOnly(state->base.tuplecontext);
1941 :
1942 : /*
1943 : * We no longer need a large memtuples array. (We will allocate a smaller
1944 : * one for the heap later.)
1945 : */
1946 167 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1947 167 : pfree(state->memtuples);
1948 167 : state->memtuples = NULL;
1949 :
1950 : /*
1951 : * Initialize the slab allocator. We need one slab slot per input tape,
1952 : * for the tuples in the heap, plus one to hold the tuple last returned
1953 : * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
1954 : * however, we don't need to do allocate anything.)
1955 : *
1956 : * In a multi-pass merge, we could shrink this allocation for the last
1957 : * merge pass, if it has fewer tapes than previous passes, but we don't
1958 : * bother.
1959 : *
1960 : * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
1961 : * to track memory usage of individual tuples.
1962 : */
1963 167 : if (state->base.tuples)
1964 153 : init_slab_allocator(state, state->nOutputTapes + 1);
1965 : else
1966 14 : init_slab_allocator(state, 0);
1967 :
1968 : /*
1969 : * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
1970 : * from each input tape.
1971 : *
1972 : * We could shrink this, too, between passes in a multi-pass merge, but we
1973 : * don't bother. (The initial input tapes are still in outputTapes. The
1974 : * number of input tapes will not increase between passes.)
1975 : */
1976 167 : state->memtupsize = state->nOutputTapes;
1977 334 : state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
1978 167 : state->nOutputTapes * sizeof(SortTuple));
1979 167 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1980 :
1981 : /*
1982 : * Use all the remaining memory we have available for tape buffers among
1983 : * all the input tapes. At the beginning of each merge pass, we will
1984 : * divide this memory between the input and output tapes in the pass.
1985 : */
1986 167 : state->tape_buffer_mem = state->availMem;
1987 167 : USEMEM(state, state->tape_buffer_mem);
1988 167 : if (trace_sort)
1989 0 : elog(LOG, "worker %d using %zu KB of memory for tape buffers",
1990 : state->worker, state->tape_buffer_mem / 1024);
1991 :
1992 : for (;;)
1993 : {
1994 : /*
1995 : * On the first iteration, or if we have read all the runs from the
1996 : * input tapes in a multi-pass merge, it's time to start a new pass.
1997 : * Rewind all the output tapes, and make them inputs for the next
1998 : * pass.
1999 : */
2000 236 : if (state->nInputRuns == 0)
2001 : {
2002 : int64 input_buffer_size;
2003 :
2004 : /* Close the old, emptied, input tapes */
2005 182 : if (state->nInputTapes > 0)
2006 : {
2007 105 : for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2008 90 : LogicalTapeClose(state->inputTapes[tapenum]);
2009 15 : pfree(state->inputTapes);
2010 : }
2011 :
2012 : /* Previous pass's outputs become next pass's inputs. */
2013 182 : state->inputTapes = state->outputTapes;
2014 182 : state->nInputTapes = state->nOutputTapes;
2015 182 : state->nInputRuns = state->nOutputRuns;
2016 :
2017 : /*
2018 : * Reset output tape variables. The actual LogicalTapes will be
2019 : * created as needed, here we only allocate the array to hold
2020 : * them.
2021 : */
2022 182 : state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
2023 182 : state->nOutputTapes = 0;
2024 182 : state->nOutputRuns = 0;
2025 :
2026 : /*
2027 : * Redistribute the memory allocated for tape buffers, among the
2028 : * new input and output tapes.
2029 : */
2030 182 : input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
2031 : state->nInputTapes,
2032 : state->nInputRuns,
2033 : state->maxTapes);
2034 :
2035 182 : if (trace_sort)
2036 0 : elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2037 : state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
2038 : pg_rusage_show(&state->ru_start));
2039 :
2040 : /* Prepare the new input tapes for merge pass. */
2041 714 : for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2042 532 : LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
2043 :
2044 : /*
2045 : * If there's just one run left on each input tape, then only one
2046 : * merge pass remains. If we don't have to produce a materialized
2047 : * sorted tape, we can stop at this point and do the final merge
2048 : * on-the-fly.
2049 : */
2050 182 : if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
2051 171 : && state->nInputRuns <= state->nInputTapes
2052 156 : && !WORKER(state))
2053 : {
2054 : /* Tell logtape.c we won't be writing anymore */
2055 156 : LogicalTapeSetForgetFreeSpace(state->tapeset);
2056 : /* Initialize for the final merge pass */
2057 156 : beginmerge(state);
2058 156 : state->status = TSS_FINALMERGE;
2059 156 : return;
2060 : }
2061 : }
2062 :
2063 : /* Select an output tape */
2064 80 : selectnewtape(state);
2065 :
2066 : /* Merge one run from each input tape. */
2067 80 : mergeonerun(state);
2068 :
2069 : /*
2070 : * If the input tapes are empty, and we output only one output run,
2071 : * we're done. The current output tape contains the final result.
2072 : */
2073 80 : if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
2074 11 : break;
2075 : }
2076 :
2077 : /*
2078 : * Done. The result is on a single run on a single tape.
2079 : */
2080 11 : state->result_tape = state->outputTapes[0];
2081 11 : if (!WORKER(state))
2082 11 : LogicalTapeFreeze(state->result_tape, NULL);
2083 : else
2084 0 : worker_freeze_result_tape(state);
2085 11 : state->status = TSS_SORTEDONTAPE;
2086 :
2087 : /* Close all the now-empty input tapes, to release their read buffers. */
2088 57 : for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2089 46 : LogicalTapeClose(state->inputTapes[tapenum]);
2090 : }
2091 :
2092 : /*
2093 : * Merge one run from each input tape.
2094 : */
2095 : static void
2096 80 : mergeonerun(Tuplesortstate *state)
2097 : {
2098 : int srcTapeIndex;
2099 : LogicalTape *srcTape;
2100 :
2101 : /*
2102 : * Start the merge by loading one tuple from each active source tape into
2103 : * the heap.
2104 : */
2105 80 : beginmerge(state);
2106 :
2107 : Assert(state->slabAllocatorUsed);
2108 :
2109 : /*
2110 : * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2111 : * out, and replacing it with next tuple from same tape (if there is
2112 : * another one).
2113 : */
2114 437796 : while (state->memtupcount > 0)
2115 : {
2116 : SortTuple stup;
2117 :
2118 : /* write the tuple to destTape */
2119 437716 : srcTapeIndex = state->memtuples[0].srctape;
2120 437716 : srcTape = state->inputTapes[srcTapeIndex];
2121 437716 : WRITETUP(state, state->destTape, &state->memtuples[0]);
2122 :
2123 : /* recycle the slot of the tuple we just wrote out, for the next read */
2124 437716 : if (state->memtuples[0].tuple)
2125 367674 : RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2126 :
2127 : /*
2128 : * pull next tuple from the tape, and replace the written-out tuple in
2129 : * the heap with it.
2130 : */
2131 437716 : if (mergereadnext(state, srcTape, &stup))
2132 : {
2133 437289 : stup.srctape = srcTapeIndex;
2134 437289 : tuplesort_heap_replace_top(state, &stup);
2135 : }
2136 : else
2137 : {
2138 427 : tuplesort_heap_delete_top(state);
2139 427 : state->nInputRuns--;
2140 : }
2141 : }
2142 :
2143 : /*
2144 : * When the heap empties, we're done. Write an end-of-run marker on the
2145 : * output tape.
2146 : */
2147 80 : markrunend(state->destTape);
2148 80 : }
2149 :
2150 : /*
2151 : * beginmerge - initialize for a merge pass
2152 : *
2153 : * Fill the merge heap with the first tuple from each input tape.
2154 : */
2155 : static void
2156 236 : beginmerge(Tuplesortstate *state)
2157 : {
2158 : int activeTapes;
2159 : int srcTapeIndex;
2160 :
2161 : /* Heap should be empty here */
2162 : Assert(state->memtupcount == 0);
2163 :
2164 236 : activeTapes = Min(state->nInputTapes, state->nInputRuns);
2165 :
2166 1059 : for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
2167 : {
2168 : SortTuple tup;
2169 :
2170 823 : if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
2171 : {
2172 667 : tup.srctape = srcTapeIndex;
2173 667 : tuplesort_heap_insert(state, &tup);
2174 : }
2175 : }
2176 236 : }
2177 :
2178 : /*
2179 : * mergereadnext - read next tuple from one merge input tape
2180 : *
2181 : * Returns false on EOF.
2182 : */
2183 : static bool
2184 2780278 : mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
2185 : {
2186 : unsigned int tuplen;
2187 :
2188 : /* read next tuple, if any */
2189 2780278 : if ((tuplen = getlen(srcTape, true)) == 0)
2190 799 : return false;
2191 2779479 : READTUP(state, stup, srcTape, tuplen);
2192 :
2193 2779479 : return true;
2194 : }
2195 :
2196 : /*
2197 : * dumptuples - remove tuples from memtuples and write initial run to tape
2198 : *
2199 : * When alltuples = true, dump everything currently in memory. (This case is
2200 : * only used at end of input data.)
2201 : */
2202 : static void
2203 556565 : dumptuples(Tuplesortstate *state, bool alltuples)
2204 : {
2205 : int memtupwrite;
2206 : int i;
2207 :
2208 : /*
2209 : * Nothing to do if we still fit in available memory and have array slots,
2210 : * unless this is the final call during initial run generation.
2211 : */
2212 556565 : if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2213 556093 : !alltuples)
2214 555741 : return;
2215 :
2216 : /*
2217 : * Final call might require no sorting, in rare cases where we just so
2218 : * happen to have previously LACKMEM()'d at the point where exactly all
2219 : * remaining tuples are loaded into memory, just before input was
2220 : * exhausted. In general, short final runs are quite possible, but avoid
2221 : * creating a completely empty run. In a worker, though, we must produce
2222 : * at least one tape, even if it's empty.
2223 : */
2224 824 : if (state->memtupcount == 0 && state->currentRun > 0)
2225 0 : return;
2226 :
2227 : Assert(state->status == TSS_BUILDRUNS);
2228 :
2229 : /*
2230 : * It seems unlikely that this limit will ever be exceeded, but take no
2231 : * chances
2232 : */
2233 824 : if (state->currentRun == INT_MAX)
2234 0 : ereport(ERROR,
2235 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2236 : errmsg("cannot have more than %d runs for an external sort",
2237 : INT_MAX)));
2238 :
2239 824 : if (state->currentRun > 0)
2240 472 : selectnewtape(state);
2241 :
2242 824 : state->currentRun++;
2243 :
2244 824 : if (trace_sort)
2245 0 : elog(LOG, "worker %d starting quicksort of run %d: %s",
2246 : state->worker, state->currentRun,
2247 : pg_rusage_show(&state->ru_start));
2248 :
2249 : /*
2250 : * Sort all tuples accumulated within the allowed amount of memory for
2251 : * this run.
2252 : */
2253 824 : tuplesort_sort_memtuples(state);
2254 :
2255 824 : if (trace_sort)
2256 0 : elog(LOG, "worker %d finished quicksort of run %d: %s",
2257 : state->worker, state->currentRun,
2258 : pg_rusage_show(&state->ru_start));
2259 :
2260 824 : memtupwrite = state->memtupcount;
2261 2600189 : for (i = 0; i < memtupwrite; i++)
2262 : {
2263 2599365 : SortTuple *stup = &state->memtuples[i];
2264 :
2265 2599365 : WRITETUP(state, state->destTape, stup);
2266 : }
2267 :
2268 824 : state->memtupcount = 0;
2269 :
2270 : /*
2271 : * Reset tuple memory. We've freed all of the tuples that we previously
2272 : * allocated. It's important to avoid fragmentation when there is a stark
2273 : * change in the sizes of incoming tuples. In bounded sorts,
2274 : * fragmentation due to AllocSetFree's bucketing by size class might be
2275 : * particularly bad if this step wasn't taken.
2276 : */
2277 824 : MemoryContextReset(state->base.tuplecontext);
2278 :
2279 : /*
2280 : * Now update the memory accounting to subtract the memory used by the
2281 : * tuple.
2282 : */
2283 824 : FREEMEM(state, state->tupleMem);
2284 824 : state->tupleMem = 0;
2285 :
2286 824 : markrunend(state->destTape);
2287 :
2288 824 : if (trace_sort)
2289 0 : elog(LOG, "worker %d finished writing run %d to tape %d: %s",
2290 : state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
2291 : pg_rusage_show(&state->ru_start));
2292 : }
2293 :
2294 : /*
2295 : * tuplesort_rescan - rewind and replay the scan
2296 : */
2297 : void
2298 24 : tuplesort_rescan(Tuplesortstate *state)
2299 : {
2300 24 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2301 :
2302 : Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2303 :
2304 24 : switch (state->status)
2305 : {
2306 20 : case TSS_SORTEDINMEM:
2307 20 : state->current = 0;
2308 20 : state->eof_reached = false;
2309 20 : state->markpos_offset = 0;
2310 20 : state->markpos_eof = false;
2311 20 : break;
2312 4 : case TSS_SORTEDONTAPE:
2313 4 : LogicalTapeRewindForRead(state->result_tape, 0);
2314 4 : state->eof_reached = false;
2315 4 : state->markpos_block = 0L;
2316 4 : state->markpos_offset = 0;
2317 4 : state->markpos_eof = false;
2318 4 : break;
2319 0 : default:
2320 0 : elog(ERROR, "invalid tuplesort state");
2321 : break;
2322 : }
2323 :
2324 24 : MemoryContextSwitchTo(oldcontext);
2325 24 : }
2326 :
2327 : /*
2328 : * tuplesort_markpos - saves current position in the merged sort file
2329 : */
2330 : void
2331 292960 : tuplesort_markpos(Tuplesortstate *state)
2332 : {
2333 292960 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2334 :
2335 : Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2336 :
2337 292960 : switch (state->status)
2338 : {
2339 288556 : case TSS_SORTEDINMEM:
2340 288556 : state->markpos_offset = state->current;
2341 288556 : state->markpos_eof = state->eof_reached;
2342 288556 : break;
2343 4404 : case TSS_SORTEDONTAPE:
2344 4404 : LogicalTapeTell(state->result_tape,
2345 : &state->markpos_block,
2346 : &state->markpos_offset);
2347 4404 : state->markpos_eof = state->eof_reached;
2348 4404 : break;
2349 0 : default:
2350 0 : elog(ERROR, "invalid tuplesort state");
2351 : break;
2352 : }
2353 :
2354 292960 : MemoryContextSwitchTo(oldcontext);
2355 292960 : }
2356 :
2357 : /*
2358 : * tuplesort_restorepos - restores current position in merged sort file to
2359 : * last saved position
2360 : */
2361 : void
2362 18841 : tuplesort_restorepos(Tuplesortstate *state)
2363 : {
2364 18841 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2365 :
2366 : Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2367 :
2368 18841 : switch (state->status)
2369 : {
2370 15745 : case TSS_SORTEDINMEM:
2371 15745 : state->current = state->markpos_offset;
2372 15745 : state->eof_reached = state->markpos_eof;
2373 15745 : break;
2374 3096 : case TSS_SORTEDONTAPE:
2375 3096 : LogicalTapeSeek(state->result_tape,
2376 : state->markpos_block,
2377 : state->markpos_offset);
2378 3096 : state->eof_reached = state->markpos_eof;
2379 3096 : break;
2380 0 : default:
2381 0 : elog(ERROR, "invalid tuplesort state");
2382 : break;
2383 : }
2384 :
2385 18841 : MemoryContextSwitchTo(oldcontext);
2386 18841 : }
2387 :
2388 : /*
2389 : * tuplesort_get_stats - extract summary statistics
2390 : *
2391 : * This can be called after tuplesort_performsort() finishes to obtain
2392 : * printable summary information about how the sort was performed.
2393 : */
2394 : void
2395 198 : tuplesort_get_stats(Tuplesortstate *state,
2396 : TuplesortInstrumentation *stats)
2397 : {
2398 : /*
2399 : * Note: it might seem we should provide both memory and disk usage for a
2400 : * disk-based sort. However, the current code doesn't track memory space
2401 : * accurately once we have begun to return tuples to the caller (since we
2402 : * don't account for pfree's the caller is expected to do), so we cannot
2403 : * rely on availMem in a disk sort. This does not seem worth the overhead
2404 : * to fix. Is it worth creating an API for the memory context code to
2405 : * tell us how much is actually used in sortcontext?
2406 : */
2407 198 : tuplesort_updatemax(state);
2408 :
2409 198 : if (state->isMaxSpaceDisk)
2410 3 : stats->spaceType = SORT_SPACE_TYPE_DISK;
2411 : else
2412 195 : stats->spaceType = SORT_SPACE_TYPE_MEMORY;
2413 198 : stats->spaceUsed = (state->maxSpace + 1023) / 1024;
2414 :
2415 198 : switch (state->maxSpaceStatus)
2416 : {
2417 195 : case TSS_SORTEDINMEM:
2418 195 : if (state->boundUsed)
2419 21 : stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
2420 : else
2421 174 : stats->sortMethod = SORT_TYPE_QUICKSORT;
2422 195 : break;
2423 0 : case TSS_SORTEDONTAPE:
2424 0 : stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
2425 0 : break;
2426 3 : case TSS_FINALMERGE:
2427 3 : stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
2428 3 : break;
2429 0 : default:
2430 0 : stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
2431 0 : break;
2432 : }
2433 198 : }
2434 :
2435 : /*
2436 : * Convert TuplesortMethod to a string.
2437 : */
2438 : const char *
2439 147 : tuplesort_method_name(TuplesortMethod m)
2440 : {
2441 147 : switch (m)
2442 : {
2443 0 : case SORT_TYPE_STILL_IN_PROGRESS:
2444 0 : return "still in progress";
2445 21 : case SORT_TYPE_TOP_N_HEAPSORT:
2446 21 : return "top-N heapsort";
2447 123 : case SORT_TYPE_QUICKSORT:
2448 123 : return "quicksort";
2449 0 : case SORT_TYPE_EXTERNAL_SORT:
2450 0 : return "external sort";
2451 3 : case SORT_TYPE_EXTERNAL_MERGE:
2452 3 : return "external merge";
2453 : }
2454 :
2455 0 : return "unknown";
2456 : }
2457 :
2458 : /*
2459 : * Convert TuplesortSpaceType to a string.
2460 : */
2461 : const char *
2462 129 : tuplesort_space_type_name(TuplesortSpaceType t)
2463 : {
2464 : Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
2465 129 : return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
2466 : }
2467 :
2468 :
2469 : /*
2470 : * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2471 : */
2472 :
2473 : /*
2474 : * Convert the existing unordered array of SortTuples to a bounded heap,
2475 : * discarding all but the smallest "state->bound" tuples.
2476 : *
2477 : * When working with a bounded heap, we want to keep the largest entry
2478 : * at the root (array entry zero), instead of the smallest as in the normal
2479 : * sort case. This allows us to discard the largest entry cheaply.
2480 : * Therefore, we temporarily reverse the sort direction.
2481 : */
2482 : static void
2483 208 : make_bounded_heap(Tuplesortstate *state)
2484 : {
2485 208 : int tupcount = state->memtupcount;
2486 : int i;
2487 :
2488 : Assert(state->status == TSS_INITIAL);
2489 : Assert(state->bounded);
2490 : Assert(tupcount >= state->bound);
2491 : Assert(SERIAL(state));
2492 :
2493 : /* Reverse sort direction so largest entry will be at root */
2494 208 : reversedirection(state);
2495 :
2496 208 : state->memtupcount = 0; /* make the heap empty */
2497 19018 : for (i = 0; i < tupcount; i++)
2498 : {
2499 18810 : if (state->memtupcount < state->bound)
2500 : {
2501 : /* Insert next tuple into heap */
2502 : /* Must copy source tuple to avoid possible overwrite */
2503 9301 : SortTuple stup = state->memtuples[i];
2504 :
2505 9301 : tuplesort_heap_insert(state, &stup);
2506 : }
2507 : else
2508 : {
2509 : /*
2510 : * The heap is full. Replace the largest entry with the new
2511 : * tuple, or just discard it, if it's larger than anything already
2512 : * in the heap.
2513 : */
2514 9509 : if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2515 : {
2516 4902 : free_sort_tuple(state, &state->memtuples[i]);
2517 4902 : CHECK_FOR_INTERRUPTS();
2518 : }
2519 : else
2520 4607 : tuplesort_heap_replace_top(state, &state->memtuples[i]);
2521 : }
2522 : }
2523 :
2524 : Assert(state->memtupcount == state->bound);
2525 208 : state->status = TSS_BOUNDED;
2526 208 : }
2527 :
2528 : /*
2529 : * Convert the bounded heap to a properly-sorted array
2530 : */
2531 : static void
2532 208 : sort_bounded_heap(Tuplesortstate *state)
2533 : {
2534 208 : int tupcount = state->memtupcount;
2535 :
2536 : Assert(state->status == TSS_BOUNDED);
2537 : Assert(state->bounded);
2538 : Assert(tupcount == state->bound);
2539 : Assert(SERIAL(state));
2540 :
2541 : /*
2542 : * We can unheapify in place because each delete-top call will remove the
2543 : * largest entry, which we can promptly store in the newly freed slot at
2544 : * the end. Once we're down to a single-entry heap, we're done.
2545 : */
2546 9301 : while (state->memtupcount > 1)
2547 : {
2548 9093 : SortTuple stup = state->memtuples[0];
2549 :
2550 : /* this sifts-up the next-largest entry and decreases memtupcount */
2551 9093 : tuplesort_heap_delete_top(state);
2552 9093 : state->memtuples[state->memtupcount] = stup;
2553 : }
2554 208 : state->memtupcount = tupcount;
2555 :
2556 : /*
2557 : * Reverse sort direction back to the original state. This is not
2558 : * actually necessary but seems like a good idea for tidiness.
2559 : */
2560 208 : reversedirection(state);
2561 :
2562 208 : state->status = TSS_SORTEDINMEM;
2563 208 : state->boundUsed = true;
2564 208 : }
2565 :
2566 :
2567 : /* radix sort routines */
2568 :
2569 : /*
2570 : * Retrieve byte from datum, indexed by 'level': 0 for MSB, 7 for LSB
2571 : */
2572 : static inline uint8
2573 27671507 : current_byte(Datum key, int level)
2574 : {
2575 27671507 : int shift = (sizeof(Datum) - 1 - level) * BITS_PER_BYTE;
2576 :
2577 27671507 : return (key >> shift) & 0xFF;
2578 : }
2579 :
2580 : /*
2581 : * Normalize datum such that unsigned comparison is order-preserving,
2582 : * taking ASC/DESC into account as well.
2583 : */
2584 : static inline Datum
2585 27671507 : normalize_datum(Datum orig, SortSupport ssup)
2586 : {
2587 : Datum norm_datum1;
2588 :
2589 27671507 : if (ssup->comparator == ssup_datum_signed_cmp)
2590 : {
2591 2073340 : norm_datum1 = orig + ((uint64) PG_INT64_MAX) + 1;
2592 : }
2593 25598167 : else if (ssup->comparator == ssup_datum_int32_cmp)
2594 : {
2595 : /*
2596 : * First truncate to uint32. Technically, we don't need to do this,
2597 : * but it forces the upper half of the datum to be zero regardless of
2598 : * sign.
2599 : */
2600 19507753 : uint32 u32 = DatumGetUInt32(orig) + ((uint32) PG_INT32_MAX) + 1;
2601 :
2602 19507753 : norm_datum1 = UInt32GetDatum(u32);
2603 : }
2604 : else
2605 : {
2606 : Assert(ssup->comparator == ssup_datum_unsigned_cmp);
2607 6090414 : norm_datum1 = orig;
2608 : }
2609 :
2610 27671507 : if (ssup->ssup_reverse)
2611 1617873 : norm_datum1 = ~norm_datum1;
2612 :
2613 27671507 : return norm_datum1;
2614 : }
2615 :
2616 : /*
2617 : * radix_sort_recursive
2618 : *
2619 : * Radix sort by (pass-by-value) datum1, diverting to qsort_tuple()
2620 : * for tiebreaks.
2621 : *
2622 : * This is a modification of
2623 : * ska_byte_sort() from https://github.com/skarupke/ska_sort
2624 : * The original copyright notice follows:
2625 : *
2626 : * Copyright Malte Skarupke 2016.
2627 : * Distributed under the Boost Software License, Version 1.0.
2628 : *
2629 : * Boost Software License - Version 1.0 - August 17th, 2003
2630 : *
2631 : * Permission is hereby granted, free of charge, to any person or organization
2632 : * obtaining a copy of the software and accompanying documentation covered by
2633 : * this license (the "Software") to use, reproduce, display, distribute,
2634 : * execute, and transmit the Software, and to prepare derivative works of the
2635 : * Software, and to permit third-parties to whom the Software is furnished to
2636 : * do so, all subject to the following:
2637 : *
2638 : * The copyright notices in the Software and this entire statement, including
2639 : * the above license grant, this restriction and the following disclaimer,
2640 : * must be included in all copies of the Software, in whole or in part, and
2641 : * all derivative works of the Software, unless such copies or derivative
2642 : * works are solely in the form of machine-executable object code generated by
2643 : * a source language processor.
2644 : *
2645 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
2646 : * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
2647 : * FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
2648 : * SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
2649 : * FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
2650 : * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
2651 : * DEALINGS IN THE SOFTWARE.
2652 : */
2653 : static void
2654 32281 : radix_sort_recursive(SortTuple *begin, size_t n_elems, int level, Tuplesortstate *state)
2655 : {
2656 32281 : RadixSortInfo partitions[256] = {0};
2657 : uint8 remaining_partitions[256];
2658 32281 : size_t total = 0;
2659 32281 : int num_partitions = 0;
2660 : int num_remaining;
2661 32281 : SortSupport ssup = &state->base.sortKeys[0];
2662 32281 : size_t start_offset = 0;
2663 32281 : SortTuple *partition_begin = begin;
2664 :
2665 : /* count number of occurrences of each byte */
2666 27703788 : for (SortTuple *st = begin; st < begin + n_elems; st++)
2667 : {
2668 : uint8 this_partition;
2669 :
2670 : /* extract the byte for this level from the normalized datum */
2671 27671507 : this_partition = current_byte(normalize_datum(st->datum1, ssup),
2672 : level);
2673 :
2674 : /* save it for the permutation step */
2675 27671507 : st->curbyte = this_partition;
2676 :
2677 27671507 : partitions[this_partition].count++;
2678 :
2679 27671507 : CHECK_FOR_INTERRUPTS();
2680 : }
2681 :
2682 : /* compute partition offsets */
2683 8296217 : for (int i = 0; i < 256; i++)
2684 : {
2685 8263936 : size_t count = partitions[i].count;
2686 :
2687 8263936 : if (count != 0)
2688 : {
2689 1457233 : partitions[i].offset = total;
2690 1457233 : total += count;
2691 1457233 : remaining_partitions[num_partitions] = i;
2692 1457233 : num_partitions++;
2693 : }
2694 8263936 : partitions[i].next_offset = total;
2695 : }
2696 :
2697 : /*
2698 : * Swap tuples to correct partition.
2699 : *
2700 : * In traditional American flag sort, a swap sends the current element to
2701 : * the correct partition, but the array pointer only advances if the
2702 : * partner of the swap happens to be an element that belongs in the
2703 : * current partition. That only requires one pass through the array, but
2704 : * the disadvantage is we don't know if the pointer can advance until the
2705 : * swap completes. Here lies the most interesting innovation from the
2706 : * upstream ska_byte_sort: After initiating the swap, we immediately
2707 : * proceed to the next element. This makes better use of CPU pipelining,
2708 : * but also means that we will often need multiple iterations of this
2709 : * loop. ska_byte_sort() maintains a separate list of which partitions
2710 : * haven't finished, which is updated every loop iteration. Here we simply
2711 : * check each partition during every iteration.
2712 : *
2713 : * If we started with a single partition, there is nothing to do. If a
2714 : * previous loop iteration results in only one partition that hasn't been
2715 : * counted as sorted, we know it's actually sorted and can exit the loop.
2716 : */
2717 32281 : num_remaining = num_partitions;
2718 99761 : while (num_remaining > 1)
2719 : {
2720 : /* start the count over */
2721 67480 : num_remaining = num_partitions;
2722 :
2723 6006244 : for (int i = 0; i < num_partitions; i++)
2724 : {
2725 5938764 : uint8 idx = remaining_partitions[i];
2726 :
2727 5938764 : for (SortTuple *st = begin + partitions[idx].offset;
2728 13948678 : st < begin + partitions[idx].next_offset;
2729 8009914 : st++)
2730 : {
2731 8009914 : size_t offset = partitions[st->curbyte].offset++;
2732 : SortTuple tmp;
2733 :
2734 : /* swap current tuple with destination position */
2735 : Assert(offset < n_elems);
2736 8009914 : tmp = *st;
2737 8009914 : *st = begin[offset];
2738 8009914 : begin[offset] = tmp;
2739 :
2740 8009914 : CHECK_FOR_INTERRUPTS();
2741 : };
2742 :
2743 : /* Is this partition sorted? */
2744 5938764 : if (partitions[idx].offset == partitions[idx].next_offset)
2745 4528730 : num_remaining--;
2746 : }
2747 : }
2748 :
2749 : /* recurse */
2750 32281 : for (uint8 *rp = remaining_partitions;
2751 1489514 : rp < remaining_partitions + num_partitions;
2752 1457233 : rp++)
2753 : {
2754 1457233 : size_t end_offset = partitions[*rp].next_offset;
2755 1457233 : SortTuple *partition_end = begin + end_offset;
2756 1457233 : size_t num_elements = end_offset - start_offset;
2757 :
2758 1457233 : if (num_elements > 1)
2759 : {
2760 575215 : if (level < sizeof(Datum) - 1)
2761 : {
2762 491867 : if (num_elements < QSORT_THRESHOLD)
2763 : {
2764 461278 : qsort_tuple(partition_begin,
2765 : num_elements,
2766 : state->base.comparetup,
2767 : state);
2768 : }
2769 : else
2770 : {
2771 30589 : radix_sort_recursive(partition_begin,
2772 : num_elements,
2773 : level + 1,
2774 : state);
2775 : }
2776 : }
2777 83348 : else if (state->base.onlyKey == NULL)
2778 : {
2779 : /*
2780 : * We've finished radix sort on all bytes of the pass-by-value
2781 : * datum (possibly abbreviated), now sort using the tiebreak
2782 : * comparator.
2783 : */
2784 18143 : qsort_tuple(partition_begin,
2785 : num_elements,
2786 : state->base.comparetup_tiebreak,
2787 : state);
2788 : }
2789 : }
2790 :
2791 1457233 : start_offset = end_offset;
2792 1457233 : partition_begin = partition_end;
2793 : }
2794 32281 : }
2795 :
2796 : /*
2797 : * Entry point for radix_sort_recursive
2798 : *
2799 : * Partition tuples by isnull1, then sort both partitions, using
2800 : * radix sort on the NOT NULL partition if it's large enough.
2801 : */
2802 : static void
2803 2858 : radix_sort_tuple(SortTuple *data, size_t n, Tuplesortstate *state)
2804 : {
2805 2858 : bool nulls_first = state->base.sortKeys[0].ssup_nulls_first;
2806 : SortTuple *null_start;
2807 : SortTuple *not_null_start;
2808 2858 : size_t d1 = 0,
2809 : d2,
2810 : null_count,
2811 : not_null_count;
2812 :
2813 : /*
2814 : * Find the first NOT NULL if NULLS FIRST, or first NULL if NULLS LAST.
2815 : * This also serves as a quick check for the common case where all tuples
2816 : * are NOT NULL in the first sort key.
2817 : */
2818 7314428 : while (d1 < n && data[d1].isnull1 == nulls_first)
2819 : {
2820 7311570 : d1++;
2821 7311570 : CHECK_FOR_INTERRUPTS();
2822 : }
2823 :
2824 : /*
2825 : * If we have more than one tuple left after the quick check, partition
2826 : * the remainder using branchless cyclic permutation, based on
2827 : * https://orlp.net/blog/branchless-lomuto-partitioning/
2828 : */
2829 : Assert(n > 0);
2830 2858 : if (d1 < n - 1)
2831 : {
2832 578 : size_t i = d1,
2833 578 : j = d1;
2834 578 : SortTuple tmp = data[d1]; /* create gap at front */
2835 :
2836 341703 : while (j < n - 1)
2837 : {
2838 : /* gap is at j, move i's element to gap */
2839 341125 : data[j] = data[i];
2840 : /* advance j to the first unknown element */
2841 341125 : j += 1;
2842 : /* move the first unknown element back to i */
2843 341125 : data[i] = data[j];
2844 : /* advance i if this element belongs in the left partition */
2845 341125 : i += (data[i].isnull1 == nulls_first);
2846 :
2847 341125 : CHECK_FOR_INTERRUPTS();
2848 : }
2849 :
2850 : /* place gap between left and right partitions */
2851 578 : data[j] = data[i];
2852 : /* restore the saved element */
2853 578 : data[i] = tmp;
2854 : /* assign it to the correct partition */
2855 578 : i += (data[i].isnull1 == nulls_first);
2856 :
2857 : /* d1 is now the number of elements in the left partition */
2858 578 : d1 = i;
2859 : }
2860 :
2861 2858 : d2 = n - d1;
2862 :
2863 : /* set pointers and counts for each partition */
2864 2858 : if (nulls_first)
2865 : {
2866 495 : null_start = data;
2867 495 : null_count = d1;
2868 495 : not_null_start = data + d1;
2869 495 : not_null_count = d2;
2870 : }
2871 : else
2872 : {
2873 2363 : not_null_start = data;
2874 2363 : not_null_count = d1;
2875 2363 : null_start = data + d1;
2876 2363 : null_count = d2;
2877 : }
2878 :
2879 2858 : for (SortTuple *st = null_start;
2880 5068 : st < null_start + null_count;
2881 2210 : st++)
2882 : Assert(st->isnull1 == true);
2883 2858 : for (SortTuple *st = not_null_start;
2884 7653924 : st < not_null_start + not_null_count;
2885 7651066 : st++)
2886 : Assert(st->isnull1 == false);
2887 :
2888 : /*
2889 : * Sort the NULL partition using tiebreak comparator, if necessary.
2890 : */
2891 2858 : if (state->base.onlyKey == NULL && null_count > 1)
2892 : {
2893 87 : qsort_tuple(null_start,
2894 : null_count,
2895 : state->base.comparetup_tiebreak,
2896 : state);
2897 : }
2898 :
2899 : /*
2900 : * Sort the NOT NULL partition, using radix sort if large enough,
2901 : * otherwise fall back to quicksort.
2902 : */
2903 2858 : if (not_null_count < QSORT_THRESHOLD)
2904 : {
2905 14 : qsort_tuple(not_null_start,
2906 : not_null_count,
2907 : state->base.comparetup,
2908 : state);
2909 : }
2910 : else
2911 : {
2912 2844 : bool presorted = true;
2913 :
2914 2844 : for (SortTuple *st = not_null_start + 1;
2915 3805561 : st < not_null_start + not_null_count;
2916 3802717 : st++)
2917 : {
2918 3804409 : if (COMPARETUP(state, st - 1, st) > 0)
2919 : {
2920 1692 : presorted = false;
2921 1692 : break;
2922 : }
2923 :
2924 3802717 : CHECK_FOR_INTERRUPTS();
2925 : }
2926 :
2927 2844 : if (presorted)
2928 1152 : return;
2929 : else
2930 : {
2931 1692 : radix_sort_recursive(not_null_start,
2932 : not_null_count,
2933 : 0,
2934 : state);
2935 : }
2936 : }
2937 : }
2938 :
2939 : /* Verify in-memory sort using standard comparator. */
2940 : static void
2941 2858 : verify_memtuples_sorted(Tuplesortstate *state)
2942 : {
2943 : #ifdef USE_ASSERT_CHECKING
2944 : for (SortTuple *st = state->memtuples + 1;
2945 : st < state->memtuples + state->memtupcount;
2946 : st++)
2947 : Assert(COMPARETUP(state, st - 1, st) <= 0);
2948 : #endif
2949 2858 : }
2950 :
2951 : /*
2952 : * Sort all memtuples using specialized routines.
2953 : *
2954 : * Quicksort or radix sort is used for small in-memory sorts,
2955 : * and external sort runs.
2956 : */
2957 : static void
2958 121675 : tuplesort_sort_memtuples(Tuplesortstate *state)
2959 : {
2960 : Assert(!LEADER(state));
2961 :
2962 121675 : if (state->memtupcount > 1)
2963 : {
2964 : /*
2965 : * Do we have the leading column's value or abbreviation in datum1?
2966 : */
2967 35174 : if (state->base.haveDatum1 && state->base.sortKeys)
2968 : {
2969 35117 : SortSupport ssup = &state->base.sortKeys[0];
2970 :
2971 : /* Does it compare as an integer? */
2972 35117 : if (state->memtupcount >= QSORT_THRESHOLD &&
2973 7334 : (ssup->comparator == ssup_datum_unsigned_cmp ||
2974 6927 : ssup->comparator == ssup_datum_signed_cmp ||
2975 6746 : ssup->comparator == ssup_datum_int32_cmp))
2976 : {
2977 2858 : radix_sort_tuple(state->memtuples,
2978 2858 : state->memtupcount,
2979 : state);
2980 2858 : verify_memtuples_sorted(state);
2981 2858 : return;
2982 : }
2983 : }
2984 :
2985 : /* Can we use the single-key sort function? */
2986 32316 : if (state->base.onlyKey != NULL)
2987 : {
2988 21254 : qsort_ssup(state->memtuples, state->memtupcount,
2989 21254 : state->base.onlyKey);
2990 : }
2991 : else
2992 : {
2993 11062 : qsort_tuple(state->memtuples,
2994 11062 : state->memtupcount,
2995 : state->base.comparetup,
2996 : state);
2997 : }
2998 : }
2999 : }
3000 :
3001 : /*
3002 : * Insert a new tuple into an empty or existing heap, maintaining the
3003 : * heap invariant. Caller is responsible for ensuring there's room.
3004 : *
3005 : * Note: For some callers, tuple points to a memtuples[] entry above the
3006 : * end of the heap. This is safe as long as it's not immediately adjacent
3007 : * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
3008 : * is, it might get overwritten before being moved into the heap!
3009 : */
3010 : static void
3011 9968 : tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
3012 : {
3013 : SortTuple *memtuples;
3014 : int j;
3015 :
3016 9968 : memtuples = state->memtuples;
3017 : Assert(state->memtupcount < state->memtupsize);
3018 :
3019 9968 : CHECK_FOR_INTERRUPTS();
3020 :
3021 : /*
3022 : * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
3023 : * using 1-based array indexes, not 0-based.
3024 : */
3025 9968 : j = state->memtupcount++;
3026 28913 : while (j > 0)
3027 : {
3028 25485 : int i = (j - 1) >> 1;
3029 :
3030 25485 : if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
3031 6540 : break;
3032 18945 : memtuples[j] = memtuples[i];
3033 18945 : j = i;
3034 : }
3035 9968 : memtuples[j] = *tuple;
3036 9968 : }
3037 :
3038 : /*
3039 : * Remove the tuple at state->memtuples[0] from the heap. Decrement
3040 : * memtupcount, and sift up to maintain the heap invariant.
3041 : *
3042 : * The caller has already free'd the tuple the top node points to,
3043 : * if necessary.
3044 : */
3045 : static void
3046 9736 : tuplesort_heap_delete_top(Tuplesortstate *state)
3047 : {
3048 9736 : SortTuple *memtuples = state->memtuples;
3049 : SortTuple *tuple;
3050 :
3051 9736 : if (--state->memtupcount <= 0)
3052 166 : return;
3053 :
3054 : /*
3055 : * Remove the last tuple in the heap, and re-insert it, by replacing the
3056 : * current top node with it.
3057 : */
3058 9570 : tuple = &memtuples[state->memtupcount];
3059 9570 : tuplesort_heap_replace_top(state, tuple);
3060 : }
3061 :
3062 : /*
3063 : * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
3064 : * maintain the heap invariant.
3065 : *
3066 : * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
3067 : * Heapsort, steps H3-H8).
3068 : */
3069 : static void
3070 3044519 : tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
3071 : {
3072 3044519 : SortTuple *memtuples = state->memtuples;
3073 : unsigned int i,
3074 : n;
3075 :
3076 : Assert(state->memtupcount >= 1);
3077 :
3078 3044519 : CHECK_FOR_INTERRUPTS();
3079 :
3080 : /*
3081 : * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
3082 : * This prevents overflow in the "2 * i + 1" calculation, since at the top
3083 : * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
3084 : */
3085 3044519 : n = state->memtupcount;
3086 3044519 : i = 0; /* i is where the "hole" is */
3087 : for (;;)
3088 873919 : {
3089 3918438 : unsigned int j = 2 * i + 1;
3090 :
3091 3918438 : if (j >= n)
3092 575677 : break;
3093 4569068 : if (j + 1 < n &&
3094 1226307 : COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
3095 484509 : j++;
3096 3342761 : if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
3097 2468842 : break;
3098 873919 : memtuples[i] = memtuples[j];
3099 873919 : i = j;
3100 : }
3101 3044519 : memtuples[i] = *tuple;
3102 3044519 : }
3103 :
3104 : /*
3105 : * Function to reverse the sort direction from its current state
3106 : *
3107 : * It is not safe to call this when performing hash tuplesorts
3108 : */
3109 : static void
3110 416 : reversedirection(Tuplesortstate *state)
3111 : {
3112 416 : SortSupport sortKey = state->base.sortKeys;
3113 : int nkey;
3114 :
3115 1012 : for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
3116 : {
3117 596 : sortKey->ssup_reverse = !sortKey->ssup_reverse;
3118 596 : sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
3119 : }
3120 416 : }
3121 :
3122 :
3123 : /*
3124 : * Tape interface routines
3125 : */
3126 :
3127 : static unsigned int
3128 2931784 : getlen(LogicalTape *tape, bool eofOK)
3129 : {
3130 : unsigned int len;
3131 :
3132 2931784 : if (LogicalTapeRead(tape,
3133 : &len, sizeof(len)) != sizeof(len))
3134 0 : elog(ERROR, "unexpected end of tape");
3135 2931784 : if (len == 0 && !eofOK)
3136 0 : elog(ERROR, "unexpected end of data");
3137 2931784 : return len;
3138 : }
3139 :
3140 : static void
3141 904 : markrunend(LogicalTape *tape)
3142 : {
3143 904 : unsigned int len = 0;
3144 :
3145 904 : LogicalTapeWrite(tape, &len, sizeof(len));
3146 904 : }
3147 :
3148 : /*
3149 : * Get memory for tuple from within READTUP() routine.
3150 : *
3151 : * We use next free slot from the slab allocator, or palloc() if the tuple
3152 : * is too large for that.
3153 : */
3154 : void *
3155 2725847 : tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
3156 : {
3157 : SlabSlot *buf;
3158 :
3159 : /*
3160 : * We pre-allocate enough slots in the slab arena that we should never run
3161 : * out.
3162 : */
3163 : Assert(state->slabFreeHead);
3164 :
3165 2725847 : if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
3166 3 : return MemoryContextAlloc(state->base.sortcontext, tuplen);
3167 : else
3168 : {
3169 2725844 : buf = state->slabFreeHead;
3170 : /* Reuse this slot */
3171 2725844 : state->slabFreeHead = buf->nextfree;
3172 :
3173 2725844 : return buf;
3174 : }
3175 : }
3176 :
3177 :
3178 : /*
3179 : * Parallel sort routines
3180 : */
3181 :
3182 : /*
3183 : * tuplesort_estimate_shared - estimate required shared memory allocation
3184 : *
3185 : * nWorkers is an estimate of the number of workers (it's the number that
3186 : * will be requested).
3187 : */
3188 : Size
3189 98 : tuplesort_estimate_shared(int nWorkers)
3190 : {
3191 : Size tapesSize;
3192 :
3193 : Assert(nWorkers > 0);
3194 :
3195 : /* Make sure that BufFile shared state is MAXALIGN'd */
3196 98 : tapesSize = mul_size(sizeof(TapeShare), nWorkers);
3197 98 : tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
3198 :
3199 98 : return tapesSize;
3200 : }
3201 :
3202 : /*
3203 : * tuplesort_initialize_shared - initialize shared tuplesort state
3204 : *
3205 : * Must be called from leader process before workers are launched, to
3206 : * establish state needed up-front for worker tuplesortstates. nWorkers
3207 : * should match the argument passed to tuplesort_estimate_shared().
3208 : */
3209 : void
3210 133 : tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
3211 : {
3212 : int i;
3213 :
3214 : Assert(nWorkers > 0);
3215 :
3216 133 : SpinLockInit(&shared->mutex);
3217 133 : shared->currentWorker = 0;
3218 133 : shared->workersFinished = 0;
3219 133 : SharedFileSetInit(&shared->fileset, seg);
3220 133 : shared->nTapes = nWorkers;
3221 419 : for (i = 0; i < nWorkers; i++)
3222 : {
3223 286 : shared->tapes[i].firstblocknumber = 0L;
3224 : }
3225 133 : }
3226 :
3227 : /*
3228 : * tuplesort_attach_shared - attach to shared tuplesort state
3229 : *
3230 : * Must be called by all worker processes.
3231 : */
3232 : void
3233 150 : tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
3234 : {
3235 : /* Attach to SharedFileSet */
3236 150 : SharedFileSetAttach(&shared->fileset, seg);
3237 150 : }
3238 :
3239 : /*
3240 : * worker_get_identifier - Assign and return ordinal identifier for worker
3241 : *
3242 : * The order in which these are assigned is not well defined, and should not
3243 : * matter; worker numbers across parallel sort participants need only be
3244 : * distinct and gapless. logtape.c requires this.
3245 : *
3246 : * Note that the identifiers assigned from here have no relation to
3247 : * ParallelWorkerNumber number, to avoid making any assumption about
3248 : * caller's requirements. However, we do follow the ParallelWorkerNumber
3249 : * convention of representing a non-worker with worker number -1. This
3250 : * includes the leader, as well as serial Tuplesort processes.
3251 : */
3252 : static int
3253 282 : worker_get_identifier(Tuplesortstate *state)
3254 : {
3255 282 : Sharedsort *shared = state->shared;
3256 : int worker;
3257 :
3258 : Assert(WORKER(state));
3259 :
3260 282 : SpinLockAcquire(&shared->mutex);
3261 282 : worker = shared->currentWorker++;
3262 282 : SpinLockRelease(&shared->mutex);
3263 :
3264 282 : return worker;
3265 : }
3266 :
3267 : /*
3268 : * worker_freeze_result_tape - freeze worker's result tape for leader
3269 : *
3270 : * This is called by workers just after the result tape has been determined,
3271 : * instead of calling LogicalTapeFreeze() directly. They do so because
3272 : * workers require a few additional steps over similar serial
3273 : * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
3274 : * steps are around freeing now unneeded resources, and representing to
3275 : * leader that worker's input run is available for its merge.
3276 : *
3277 : * There should only be one final output run for each worker, which consists
3278 : * of all tuples that were originally input into worker.
3279 : */
3280 : static void
3281 282 : worker_freeze_result_tape(Tuplesortstate *state)
3282 : {
3283 282 : Sharedsort *shared = state->shared;
3284 : TapeShare output;
3285 :
3286 : Assert(WORKER(state));
3287 : Assert(state->result_tape != NULL);
3288 : Assert(state->memtupcount == 0);
3289 :
3290 : /*
3291 : * Free most remaining memory, in case caller is sensitive to our holding
3292 : * on to it. memtuples may not be a tiny merge heap at this point.
3293 : */
3294 282 : pfree(state->memtuples);
3295 : /* Be tidy */
3296 282 : state->memtuples = NULL;
3297 282 : state->memtupsize = 0;
3298 :
3299 : /*
3300 : * Parallel worker requires result tape metadata, which is to be stored in
3301 : * shared memory for leader
3302 : */
3303 282 : LogicalTapeFreeze(state->result_tape, &output);
3304 :
3305 : /* Store properties of output tape, and update finished worker count */
3306 282 : SpinLockAcquire(&shared->mutex);
3307 282 : shared->tapes[state->worker] = output;
3308 282 : shared->workersFinished++;
3309 282 : SpinLockRelease(&shared->mutex);
3310 282 : }
3311 :
3312 : /*
3313 : * worker_nomergeruns - dump memtuples in worker, without merging
3314 : *
3315 : * This called as an alternative to mergeruns() with a worker when no
3316 : * merging is required.
3317 : */
3318 : static void
3319 282 : worker_nomergeruns(Tuplesortstate *state)
3320 : {
3321 : Assert(WORKER(state));
3322 : Assert(state->result_tape == NULL);
3323 : Assert(state->nOutputRuns == 1);
3324 :
3325 282 : state->result_tape = state->destTape;
3326 282 : worker_freeze_result_tape(state);
3327 282 : }
3328 :
3329 : /*
3330 : * leader_takeover_tapes - create tapeset for leader from worker tapes
3331 : *
3332 : * So far, leader Tuplesortstate has performed no actual sorting. By now, all
3333 : * sorting has occurred in workers, all of which must have already returned
3334 : * from tuplesort_performsort().
3335 : *
3336 : * When this returns, leader process is left in a state that is virtually
3337 : * indistinguishable from it having generated runs as a serial external sort
3338 : * might have.
3339 : */
3340 : static void
3341 97 : leader_takeover_tapes(Tuplesortstate *state)
3342 : {
3343 97 : Sharedsort *shared = state->shared;
3344 97 : int nParticipants = state->nParticipants;
3345 : int workersFinished;
3346 : int j;
3347 :
3348 : Assert(LEADER(state));
3349 : Assert(nParticipants >= 1);
3350 :
3351 97 : SpinLockAcquire(&shared->mutex);
3352 97 : workersFinished = shared->workersFinished;
3353 97 : SpinLockRelease(&shared->mutex);
3354 :
3355 97 : if (nParticipants != workersFinished)
3356 0 : elog(ERROR, "cannot take over tapes before all workers finish");
3357 :
3358 : /*
3359 : * Create the tapeset from worker tapes, including a leader-owned tape at
3360 : * the end. Parallel workers are far more expensive than logical tapes,
3361 : * so the number of tapes allocated here should never be excessive.
3362 : */
3363 97 : inittapestate(state, nParticipants);
3364 97 : state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
3365 :
3366 : /*
3367 : * Set currentRun to reflect the number of runs we will merge (it's not
3368 : * used for anything, this is just pro forma)
3369 : */
3370 97 : state->currentRun = nParticipants;
3371 :
3372 : /*
3373 : * Initialize the state to look the same as after building the initial
3374 : * runs.
3375 : *
3376 : * There will always be exactly 1 run per worker, and exactly one input
3377 : * tape per run, because workers always output exactly 1 run, even when
3378 : * there were no input tuples for workers to sort.
3379 : */
3380 97 : state->inputTapes = NULL;
3381 97 : state->nInputTapes = 0;
3382 97 : state->nInputRuns = 0;
3383 :
3384 97 : state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
3385 97 : state->nOutputTapes = nParticipants;
3386 97 : state->nOutputRuns = nParticipants;
3387 :
3388 309 : for (j = 0; j < nParticipants; j++)
3389 : {
3390 212 : state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
3391 : }
3392 :
3393 97 : state->status = TSS_BUILDRUNS;
3394 97 : }
3395 :
3396 : /*
3397 : * Convenience routine to free a tuple previously loaded into sort memory
3398 : */
3399 : static void
3400 1879974 : free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
3401 : {
3402 1879974 : if (stup->tuple)
3403 : {
3404 1799848 : FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
3405 1799848 : pfree(stup->tuple);
3406 1799848 : stup->tuple = NULL;
3407 : }
3408 1879974 : }
3409 :
3410 : int
3411 1995287 : ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
3412 : {
3413 1995287 : if (x < y)
3414 682720 : return -1;
3415 1312567 : else if (x > y)
3416 523138 : return 1;
3417 : else
3418 789429 : return 0;
3419 : }
3420 :
3421 : int
3422 1388845 : ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
3423 : {
3424 1388845 : int64 xx = DatumGetInt64(x);
3425 1388845 : int64 yy = DatumGetInt64(y);
3426 :
3427 1388845 : if (xx < yy)
3428 1028830 : return -1;
3429 360015 : else if (xx > yy)
3430 180449 : return 1;
3431 : else
3432 179566 : return 0;
3433 : }
3434 :
3435 : int
3436 91546889 : ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
3437 : {
3438 91546889 : int32 xx = DatumGetInt32(x);
3439 91546889 : int32 yy = DatumGetInt32(y);
3440 :
3441 91546889 : if (xx < yy)
3442 23648694 : return -1;
3443 67898195 : else if (xx > yy)
3444 20331801 : return 1;
3445 : else
3446 47566394 : return 0;
3447 : }
|