Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * tuplesort.c
4 : * Generalized tuple sorting routines.
5 : *
6 : * This module provides a generalized facility for tuple sorting, which can be
7 : * applied to different kinds of sortable objects. Implementation of
8 : * the particular sorting variants is given in tuplesortvariants.c.
9 : * This module works efficiently for both small and large amounts
10 : * of data. Small amounts are sorted in-memory. Large amounts are
11 : * sorted using temporary files and a standard external sort
12 : * algorithm.
13 : *
14 : * See Knuth, volume 3, for more than you want to know about external
15 : * sorting algorithms. The algorithm we use is a balanced k-way merge.
16 : * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
17 : * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
18 : * merge is better. Knuth is assuming that tape drives are expensive
19 : * beasts, and in particular that there will always be many more runs than
20 : * tape drives. The polyphase merge algorithm was good at keeping all the
21 : * tape drives busy, but in our implementation a "tape drive" doesn't cost
22 : * much more than a few Kb of memory buffers, so we can afford to have
23 : * lots of them. In particular, if we can have as many tape drives as
24 : * sorted runs, we can eliminate any repeated I/O at all.
25 : *
26 : * Historically, we divided the input into sorted runs using replacement
27 : * selection, in the form of a priority tree implemented as a heap
28 : * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
29 : * or radix sort for run generation.
30 : *
31 : * The approximate amount of memory allowed for any one sort operation
32 : * is specified in kilobytes by the caller (most pass work_mem). Initially,
33 : * we absorb tuples and simply store them in an unsorted array as long as
34 : * we haven't exceeded workMem. If we reach the end of the input without
35 : * exceeding workMem, we sort the array in memory and subsequently return
36 : * tuples just by scanning the tuple array sequentially. If we do exceed
37 : * workMem, we begin to emit tuples into sorted runs in temporary tapes.
38 : * When tuples are dumped in batch after in-memory sorting, we begin a new run
39 : * with a new output tape. If we reach the max number of tapes, we write
40 : * subsequent runs on the existing tapes in a round-robin fashion. We will
41 : * need multiple merge passes to finish the merge in that case. After the
42 : * end of the input is reached, we dump out remaining tuples in memory into
43 : * a final run, then merge the runs.
44 : *
45 : * When merging runs, we use a heap containing just the frontmost tuple from
46 : * each source run; we repeatedly output the smallest tuple and replace it
47 : * with the next tuple from its source tape (if any). When the heap empties,
48 : * the merge is complete. The basic merge algorithm thus needs very little
49 : * memory --- only M tuples for an M-way merge, and M is constrained to a
50 : * small number. However, we can still make good use of our full workMem
51 : * allocation by pre-reading additional blocks from each source tape. Without
52 : * prereading, our access pattern to the temporary file would be very erratic;
53 : * on average we'd read one block from each of M source tapes during the same
54 : * time that we're writing M blocks to the output tape, so there is no
55 : * sequentiality of access at all, defeating the read-ahead methods used by
56 : * most Unix kernels. Worse, the output tape gets written into a very random
57 : * sequence of blocks of the temp file, ensuring that things will be even
58 : * worse when it comes time to read that tape. A straightforward merge pass
59 : * thus ends up doing a lot of waiting for disk seeks. We can improve matters
60 : * by prereading from each source tape sequentially, loading about workMem/M
61 : * bytes from each tape in turn, and making the sequential blocks immediately
62 : * available for reuse. This approach helps to localize both read and write
63 : * accesses. The pre-reading is handled by logtape.c, we just tell it how
64 : * much memory to use for the buffers.
65 : *
66 : * In the current code we determine the number of input tapes M on the basis
67 : * of workMem: we want workMem/M to be large enough that we read a fair
68 : * amount of data each time we read from a tape, so as to maintain the
69 : * locality of access described above. Nonetheless, with large workMem we
70 : * can have many tapes. The logical "tapes" are implemented by logtape.c,
71 : * which avoids space wastage by recycling disk space as soon as each block
72 : * is read from its "tape".
73 : *
74 : * When the caller requests random access to the sort result, we form
75 : * the final sorted run on a logical tape which is then "frozen", so
76 : * that we can access it randomly. When the caller does not need random
77 : * access, we return from tuplesort_performsort() as soon as we are down
78 : * to one run per logical tape. The final merge is then performed
79 : * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
80 : * saves one cycle of writing all the data out to disk and reading it in.
81 : *
82 : * This module supports parallel sorting. Parallel sorts involve coordination
83 : * among one or more worker processes, and a leader process, each with its own
84 : * tuplesort state. The leader process (or, more accurately, the
85 : * Tuplesortstate associated with a leader process) creates a full tapeset
86 : * consisting of worker tapes with one run to merge; a run for every
87 : * worker process. This is then merged. Worker processes are guaranteed to
88 : * produce exactly one output run from their partial input.
89 : *
90 : *
91 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
92 : * Portions Copyright (c) 1994, Regents of the University of California
93 : *
94 : * IDENTIFICATION
95 : * src/backend/utils/sort/tuplesort.c
96 : *
97 : *-------------------------------------------------------------------------
98 : */
99 :
100 : #include "postgres.h"
101 :
102 : #include <limits.h>
103 :
104 : #include "commands/tablespace.h"
105 : #include "miscadmin.h"
106 : #include "pg_trace.h"
107 : #include "port/pg_bitutils.h"
108 : #include "storage/shmem.h"
109 : #include "utils/guc.h"
110 : #include "utils/memutils.h"
111 : #include "utils/pg_rusage.h"
112 : #include "utils/tuplesort.h"
113 :
114 : /*
115 : * Initial size of memtuples array. This must be more than
116 : * ALLOCSET_SEPARATE_THRESHOLD; see comments in grow_memtuples(). Clamp at
117 : * 1024 elements to avoid excessive reallocs.
118 : */
119 : #define INITIAL_MEMTUPSIZE Max(1024, \
120 : ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
121 :
122 : /* GUC variables */
123 : bool trace_sort = false;
124 :
125 : #ifdef DEBUG_BOUNDED_SORT
126 : bool optimize_bounded_sort = true;
127 : #endif
128 :
129 :
130 : /*
131 : * During merge, we use a pre-allocated set of fixed-size slots to hold
132 : * tuples. To avoid palloc/pfree overhead.
133 : *
134 : * Merge doesn't require a lot of memory, so we can afford to waste some,
135 : * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
136 : * palloc() overhead is not significant anymore.
137 : *
138 : * 'nextfree' is valid when this chunk is in the free list. When in use, the
139 : * slot holds a tuple.
140 : */
141 : #define SLAB_SLOT_SIZE 1024
142 :
143 : typedef union SlabSlot
144 : {
145 : union SlabSlot *nextfree;
146 : char buffer[SLAB_SLOT_SIZE];
147 : } SlabSlot;
148 :
149 : /*
150 : * Possible states of a Tuplesort object. These denote the states that
151 : * persist between calls of Tuplesort routines.
152 : */
153 : typedef enum
154 : {
155 : TSS_INITIAL, /* Loading tuples; still within memory limit */
156 : TSS_BOUNDED, /* Loading tuples into bounded-size heap */
157 : TSS_BUILDRUNS, /* Loading tuples; writing to tape */
158 : TSS_SORTEDINMEM, /* Sort completed entirely in memory */
159 : TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
160 : TSS_FINALMERGE, /* Performing final merge on-the-fly */
161 : } TupSortStatus;
162 :
163 : /*
164 : * Parameters for calculation of number of tapes to use --- see inittapes()
165 : * and tuplesort_merge_order().
166 : *
167 : * In this calculation we assume that each tape will cost us about 1 blocks
168 : * worth of buffer space. This ignores the overhead of all the other data
169 : * structures needed for each tape, but it's probably close enough.
170 : *
171 : * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
172 : * input tape, for pre-reading (see discussion at top of file). This is *in
173 : * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
174 : */
175 : #define MINORDER 6 /* minimum merge order */
176 : #define MAXORDER 500 /* maximum merge order */
177 : #define TAPE_BUFFER_OVERHEAD BLCKSZ
178 : #define MERGE_BUFFER_SIZE (BLCKSZ * 32)
179 :
180 :
181 : /*
182 : * Private state of a Tuplesort operation.
183 : */
184 : struct Tuplesortstate
185 : {
186 : TuplesortPublic base;
187 : TupSortStatus status; /* enumerated value as shown above */
188 : bool bounded; /* did caller specify a maximum number of
189 : * tuples to return? */
190 : bool boundUsed; /* true if we made use of a bounded heap */
191 : int bound; /* if bounded, the maximum number of tuples */
192 : int64 tupleMem; /* memory consumed by individual tuples.
193 : * storing this separately from what we track
194 : * in availMem allows us to subtract the
195 : * memory consumed by all tuples when dumping
196 : * tuples to tape */
197 : int64 availMem; /* remaining memory available, in bytes */
198 : int64 allowedMem; /* total memory allowed, in bytes */
199 : int maxTapes; /* max number of input tapes to merge in each
200 : * pass */
201 : int64 maxSpace; /* maximum amount of space occupied among sort
202 : * of groups, either in-memory or on-disk */
203 : bool isMaxSpaceDisk; /* true when maxSpace tracks on-disk space,
204 : * false means in-memory */
205 : TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
206 : LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
207 :
208 : /*
209 : * This array holds the tuples now in sort memory. If we are in state
210 : * INITIAL, the tuples are in no particular order; if we are in state
211 : * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
212 : * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
213 : * H. In state SORTEDONTAPE, the array is not used.
214 : */
215 : SortTuple *memtuples; /* array of SortTuple structs */
216 : int memtupcount; /* number of tuples currently present */
217 : int memtupsize; /* allocated length of memtuples array */
218 : bool growmemtuples; /* memtuples' growth still underway? */
219 :
220 : /*
221 : * Memory for tuples is sometimes allocated using a simple slab allocator,
222 : * rather than with palloc(). Currently, we switch to slab allocation
223 : * when we start merging. Merging only needs to keep a small, fixed
224 : * number of tuples in memory at any time, so we can avoid the
225 : * palloc/pfree overhead by recycling a fixed number of fixed-size slots
226 : * to hold the tuples.
227 : *
228 : * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
229 : * slots. The allocation is sized to have one slot per tape, plus one
230 : * additional slot. We need that many slots to hold all the tuples kept
231 : * in the heap during merge, plus the one we have last returned from the
232 : * sort, with tuplesort_gettuple.
233 : *
234 : * Initially, all the slots are kept in a linked list of free slots. When
235 : * a tuple is read from a tape, it is put to the next available slot, if
236 : * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
237 : * instead.
238 : *
239 : * When we're done processing a tuple, we return the slot back to the free
240 : * list, or pfree() if it was palloc'd. We know that a tuple was
241 : * allocated from the slab, if its pointer value is between
242 : * slabMemoryBegin and -End.
243 : *
244 : * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
245 : * tracking memory usage is not used.
246 : */
247 : bool slabAllocatorUsed;
248 :
249 : char *slabMemoryBegin; /* beginning of slab memory arena */
250 : char *slabMemoryEnd; /* end of slab memory arena */
251 : SlabSlot *slabFreeHead; /* head of free list */
252 :
253 : /* Memory used for input and output tape buffers. */
254 : size_t tape_buffer_mem;
255 :
256 : /*
257 : * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
258 : * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
259 : * modes), we remember the tuple in 'lastReturnedTuple', so that we can
260 : * recycle the memory on next gettuple call.
261 : */
262 : void *lastReturnedTuple;
263 :
264 : /*
265 : * While building initial runs, this is the current output run number.
266 : * Afterwards, it is the number of initial runs we made.
267 : */
268 : int currentRun;
269 :
270 : /*
271 : * Logical tapes, for merging.
272 : *
273 : * The initial runs are written in the output tapes. In each merge pass,
274 : * the output tapes of the previous pass become the input tapes, and new
275 : * output tapes are created as needed. When nInputTapes equals
276 : * nInputRuns, there is only one merge pass left.
277 : */
278 : LogicalTape **inputTapes;
279 : int nInputTapes;
280 : int nInputRuns;
281 :
282 : LogicalTape **outputTapes;
283 : int nOutputTapes;
284 : int nOutputRuns;
285 :
286 : LogicalTape *destTape; /* current output tape */
287 :
288 : /*
289 : * These variables are used after completion of sorting to keep track of
290 : * the next tuple to return. (In the tape case, the tape's current read
291 : * position is also critical state.)
292 : */
293 : LogicalTape *result_tape; /* actual tape of finished output */
294 : int current; /* array index (only used if SORTEDINMEM) */
295 : bool eof_reached; /* reached EOF (needed for cursors) */
296 :
297 : /* markpos_xxx holds marked position for mark and restore */
298 : int64 markpos_block; /* tape block# (only used if SORTEDONTAPE) */
299 : int markpos_offset; /* saved "current", or offset in tape block */
300 : bool markpos_eof; /* saved "eof_reached" */
301 :
302 : /*
303 : * These variables are used during parallel sorting.
304 : *
305 : * worker is our worker identifier. Follows the general convention that
306 : * -1 value relates to a leader tuplesort, and values >= 0 worker
307 : * tuplesorts. (-1 can also be a serial tuplesort.)
308 : *
309 : * shared is mutable shared memory state, which is used to coordinate
310 : * parallel sorts.
311 : *
312 : * nParticipants is the number of worker Tuplesortstates known by the
313 : * leader to have actually been launched, which implies that they must
314 : * finish a run that the leader needs to merge. Typically includes a
315 : * worker state held by the leader process itself. Set in the leader
316 : * Tuplesortstate only.
317 : */
318 : int worker;
319 : Sharedsort *shared;
320 : int nParticipants;
321 :
322 : /*
323 : * Additional state for managing "abbreviated key" sortsupport routines
324 : * (which currently may be used by all cases except the hash index case).
325 : * Tracks the intervals at which the optimization's effectiveness is
326 : * tested.
327 : */
328 : int64 abbrevNext; /* Tuple # at which to next check
329 : * applicability */
330 :
331 : /*
332 : * Resource snapshot for time of sort start.
333 : */
334 : PGRUsage ru_start;
335 : };
336 :
337 : /*
338 : * Private mutable state of tuplesort-parallel-operation. This is allocated
339 : * in shared memory.
340 : */
341 : struct Sharedsort
342 : {
343 : /* mutex protects all fields prior to tapes */
344 : slock_t mutex;
345 :
346 : /*
347 : * currentWorker generates ordinal identifier numbers for parallel sort
348 : * workers. These start from 0, and are always gapless.
349 : *
350 : * Workers increment workersFinished to indicate having finished. If this
351 : * is equal to state.nParticipants within the leader, leader is ready to
352 : * merge worker runs.
353 : */
354 : int currentWorker;
355 : int workersFinished;
356 :
357 : /* Temporary file space */
358 : SharedFileSet fileset;
359 :
360 : /* Size of tapes flexible array */
361 : int nTapes;
362 :
363 : /*
364 : * Tapes array used by workers to report back information needed by the
365 : * leader to concatenate all worker tapes into one for merging
366 : */
367 : TapeShare tapes[FLEXIBLE_ARRAY_MEMBER];
368 : };
369 :
370 : /*
371 : * Is the given tuple allocated from the slab memory arena?
372 : */
373 : #define IS_SLAB_SLOT(state, tuple) \
374 : ((char *) (tuple) >= (state)->slabMemoryBegin && \
375 : (char *) (tuple) < (state)->slabMemoryEnd)
376 :
377 : /*
378 : * Return the given tuple to the slab memory free list, or free it
379 : * if it was palloc'd.
380 : */
381 : #define RELEASE_SLAB_SLOT(state, tuple) \
382 : do { \
383 : SlabSlot *buf = (SlabSlot *) tuple; \
384 : \
385 : if (IS_SLAB_SLOT((state), buf)) \
386 : { \
387 : buf->nextfree = (state)->slabFreeHead; \
388 : (state)->slabFreeHead = buf; \
389 : } else \
390 : pfree(buf); \
391 : } while(0)
392 :
393 : #define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
394 : #define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
395 : #define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
396 : #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
397 : #define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
398 : #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
399 : #define USEMEM(state,amt) ((state)->availMem -= (amt))
400 : #define FREEMEM(state,amt) ((state)->availMem += (amt))
401 : #define SERIAL(state) ((state)->shared == NULL)
402 : #define WORKER(state) ((state)->shared && (state)->worker != -1)
403 : #define LEADER(state) ((state)->shared && (state)->worker == -1)
404 :
405 : /*
406 : * NOTES about on-tape representation of tuples:
407 : *
408 : * We require the first "unsigned int" of a stored tuple to be the total size
409 : * on-tape of the tuple, including itself (so it is never zero; an all-zero
410 : * unsigned int is used to delimit runs). The remainder of the stored tuple
411 : * may or may not match the in-memory representation of the tuple ---
412 : * any conversion needed is the job of the writetup and readtup routines.
413 : *
414 : * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
415 : * representation of the tuple must be followed by another "unsigned int" that
416 : * is a copy of the length --- so the total tape space used is actually
417 : * sizeof(unsigned int) more than the stored length value. This allows
418 : * read-backwards. When the random access flag was not specified, the
419 : * write/read routines may omit the extra length word.
420 : *
421 : * writetup is expected to write both length words as well as the tuple
422 : * data. When readtup is called, the tape is positioned just after the
423 : * front length word; readtup must read the tuple data and advance past
424 : * the back length word (if present).
425 : *
426 : * The write/read routines can make use of the tuple description data
427 : * stored in the Tuplesortstate record, if needed. They are also expected
428 : * to adjust state->availMem by the amount of memory space (not tape space!)
429 : * released or consumed. There is no error return from either writetup
430 : * or readtup; they should ereport() on failure.
431 : *
432 : *
433 : * NOTES about memory consumption calculations:
434 : *
435 : * We count space allocated for tuples against the workMem limit, plus
436 : * the space used by the variable-size memtuples array. Fixed-size space
437 : * is not counted; it's small enough to not be interesting.
438 : *
439 : * Note that we count actual space used (as shown by GetMemoryChunkSpace)
440 : * rather than the originally-requested size. This is important since
441 : * palloc can add substantial overhead. It's not a complete answer since
442 : * we won't count any wasted space in palloc allocation blocks, but it's
443 : * a lot better than what we were doing before 7.3. As of 9.6, a
444 : * separate memory context is used for caller passed tuples. Resetting
445 : * it at certain key increments significantly ameliorates fragmentation.
446 : * readtup routines use the slab allocator (they cannot use
447 : * the reset context because it gets deleted at the point that merging
448 : * begins).
449 : */
450 :
451 :
452 : static void tuplesort_begin_batch(Tuplesortstate *state);
453 : static bool consider_abort_common(Tuplesortstate *state);
454 : static void inittapes(Tuplesortstate *state, bool mergeruns);
455 : static void inittapestate(Tuplesortstate *state, int maxTapes);
456 : static void selectnewtape(Tuplesortstate *state);
457 : static void init_slab_allocator(Tuplesortstate *state, int numSlots);
458 : static void mergeruns(Tuplesortstate *state);
459 : static void mergeonerun(Tuplesortstate *state);
460 : static void beginmerge(Tuplesortstate *state);
461 : static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
462 : static void dumptuples(Tuplesortstate *state, bool alltuples);
463 : static void make_bounded_heap(Tuplesortstate *state);
464 : static void sort_bounded_heap(Tuplesortstate *state);
465 : static void tuplesort_sort_memtuples(Tuplesortstate *state);
466 : static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
467 : static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
468 : static void tuplesort_heap_delete_top(Tuplesortstate *state);
469 : static void reversedirection(Tuplesortstate *state);
470 : static unsigned int getlen(LogicalTape *tape, bool eofOK);
471 : static void markrunend(LogicalTape *tape);
472 : static int worker_get_identifier(Tuplesortstate *state);
473 : static void worker_freeze_result_tape(Tuplesortstate *state);
474 : static void worker_nomergeruns(Tuplesortstate *state);
475 : static void leader_takeover_tapes(Tuplesortstate *state);
476 : static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
477 : static void tuplesort_free(Tuplesortstate *state);
478 : static void tuplesort_updatemax(Tuplesortstate *state);
479 :
480 :
481 : /*
482 : * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
483 : * any variant of SortTuples, using the appropriate comparetup function.
484 : * qsort_ssup() is specialized for the case where the comparetup function
485 : * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
486 : * and Datum sorts.
487 : */
488 :
489 : #define ST_SORT qsort_tuple
490 : #define ST_ELEMENT_TYPE SortTuple
491 : #define ST_COMPARE_RUNTIME_POINTER
492 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
493 : #define ST_CHECK_FOR_INTERRUPTS
494 : #define ST_SCOPE static
495 : #define ST_DECLARE
496 : #define ST_DEFINE
497 : #include "lib/sort_template.h"
498 :
499 : #define ST_SORT qsort_ssup
500 : #define ST_ELEMENT_TYPE SortTuple
501 : #define ST_COMPARE(a, b, ssup) \
502 : ApplySortComparator((a)->datum1, (a)->isnull1, \
503 : (b)->datum1, (b)->isnull1, (ssup))
504 : #define ST_COMPARE_ARG_TYPE SortSupportData
505 : #define ST_CHECK_FOR_INTERRUPTS
506 : #define ST_SCOPE static
507 : #define ST_DEFINE
508 : #include "lib/sort_template.h"
509 :
510 : /* state for radix sort */
511 : typedef struct RadixSortInfo
512 : {
513 : union
514 : {
515 : size_t count;
516 : size_t offset;
517 : };
518 : size_t next_offset;
519 : } RadixSortInfo;
520 :
521 : /*
522 : * Threshold below which qsort_tuple() is generally faster than a radix sort.
523 : */
524 : #define QSORT_THRESHOLD 40
525 :
526 :
527 : /*
528 : * tuplesort_begin_xxx
529 : *
530 : * Initialize for a tuple sort operation.
531 : *
532 : * After calling tuplesort_begin, the caller should call tuplesort_putXXX
533 : * zero or more times, then call tuplesort_performsort when all the tuples
534 : * have been supplied. After performsort, retrieve the tuples in sorted
535 : * order by calling tuplesort_getXXX until it returns false/NULL. (If random
536 : * access was requested, rescan, markpos, and restorepos can also be called.)
537 : * Call tuplesort_end to terminate the operation and release memory/disk space.
538 : *
539 : * Each variant of tuplesort_begin has a workMem parameter specifying the
540 : * maximum number of kilobytes of RAM to use before spilling data to disk.
541 : * (The normal value of this parameter is work_mem, but some callers use
542 : * other values.) Each variant also has a sortopt which is a bitmask of
543 : * sort options. See TUPLESORT_* definitions in tuplesort.h
544 : */
545 :
546 : Tuplesortstate *
547 172875 : tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
548 : {
549 : Tuplesortstate *state;
550 : MemoryContext maincontext;
551 : MemoryContext sortcontext;
552 : MemoryContext oldcontext;
553 :
554 : /* See leader_takeover_tapes() remarks on random access support */
555 172875 : if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
556 0 : elog(ERROR, "random access disallowed under parallel sort");
557 :
558 : /*
559 : * Memory context surviving tuplesort_reset. This memory context holds
560 : * data which is useful to keep while sorting multiple similar batches.
561 : */
562 172875 : maincontext = AllocSetContextCreate(CurrentMemoryContext,
563 : "TupleSort main",
564 : ALLOCSET_DEFAULT_SIZES);
565 :
566 : /*
567 : * Create a working memory context for one sort operation. The content of
568 : * this context is deleted by tuplesort_reset.
569 : */
570 172875 : sortcontext = AllocSetContextCreate(maincontext,
571 : "TupleSort sort",
572 : ALLOCSET_DEFAULT_SIZES);
573 :
574 : /*
575 : * Additionally a working memory context for tuples is setup in
576 : * tuplesort_begin_batch.
577 : */
578 :
579 : /*
580 : * Make the Tuplesortstate within the per-sortstate context. This way, we
581 : * don't need a separate pfree() operation for it at shutdown.
582 : */
583 172875 : oldcontext = MemoryContextSwitchTo(maincontext);
584 :
585 172875 : state = palloc0_object(Tuplesortstate);
586 :
587 172875 : if (trace_sort)
588 0 : pg_rusage_init(&state->ru_start);
589 :
590 172875 : state->base.sortopt = sortopt;
591 172875 : state->base.tuples = true;
592 172875 : state->abbrevNext = 10;
593 :
594 : /*
595 : * workMem is forced to be at least 64KB, the current minimum valid value
596 : * for the work_mem GUC. This is a defense against parallel sort callers
597 : * that divide out memory among many workers in a way that leaves each
598 : * with very little memory.
599 : */
600 172875 : state->allowedMem = Max(workMem, 64) * (int64) 1024;
601 172875 : state->base.sortcontext = sortcontext;
602 172875 : state->base.maincontext = maincontext;
603 :
604 172875 : state->memtupsize = INITIAL_MEMTUPSIZE;
605 172875 : state->memtuples = NULL;
606 :
607 : /*
608 : * After all of the other non-parallel-related state, we setup all of the
609 : * state needed for each batch.
610 : */
611 172875 : tuplesort_begin_batch(state);
612 :
613 : /*
614 : * Initialize parallel-related state based on coordination information
615 : * from caller
616 : */
617 172875 : if (!coordinate)
618 : {
619 : /* Serial sort */
620 172318 : state->shared = NULL;
621 172318 : state->worker = -1;
622 172318 : state->nParticipants = -1;
623 : }
624 557 : else if (coordinate->isWorker)
625 : {
626 : /* Parallel worker produces exactly one final run from all input */
627 379 : state->shared = coordinate->sharedsort;
628 379 : state->worker = worker_get_identifier(state);
629 379 : state->nParticipants = -1;
630 : }
631 : else
632 : {
633 : /* Parallel leader state only used for final merge */
634 178 : state->shared = coordinate->sharedsort;
635 178 : state->worker = -1;
636 178 : state->nParticipants = coordinate->nParticipants;
637 : Assert(state->nParticipants >= 1);
638 : }
639 :
640 172875 : MemoryContextSwitchTo(oldcontext);
641 :
642 172875 : return state;
643 : }
644 :
645 : /*
646 : * tuplesort_begin_batch
647 : *
648 : * Setup, or reset, all state need for processing a new set of tuples with this
649 : * sort state. Called both from tuplesort_begin_common (the first time sorting
650 : * with this sort state) and tuplesort_reset (for subsequent usages).
651 : */
652 : static void
653 174776 : tuplesort_begin_batch(Tuplesortstate *state)
654 : {
655 : MemoryContext oldcontext;
656 :
657 174776 : oldcontext = MemoryContextSwitchTo(state->base.maincontext);
658 :
659 : /*
660 : * Caller tuple (e.g. IndexTuple) memory context.
661 : *
662 : * A dedicated child context used exclusively for caller passed tuples
663 : * eases memory management. Resetting at key points reduces
664 : * fragmentation. Note that the memtuples array of SortTuples is allocated
665 : * in the parent context, not this context, because there is no need to
666 : * free memtuples early. For bounded sorts, tuples may be pfreed in any
667 : * order, so we use a regular aset.c context so that it can make use of
668 : * free'd memory. When the sort is not bounded, we make use of a bump.c
669 : * context as this keeps allocations more compact with less wastage.
670 : * Allocations are also slightly more CPU efficient.
671 : */
672 174776 : if (TupleSortUseBumpTupleCxt(state->base.sortopt))
673 173907 : state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
674 : "Caller tuples",
675 : ALLOCSET_DEFAULT_SIZES);
676 : else
677 869 : state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
678 : "Caller tuples",
679 : ALLOCSET_DEFAULT_SIZES);
680 :
681 :
682 174776 : state->status = TSS_INITIAL;
683 174776 : state->bounded = false;
684 174776 : state->boundUsed = false;
685 :
686 174776 : state->availMem = state->allowedMem;
687 :
688 174776 : state->tapeset = NULL;
689 :
690 174776 : state->memtupcount = 0;
691 :
692 174776 : state->growmemtuples = true;
693 174776 : state->slabAllocatorUsed = false;
694 174776 : if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
695 : {
696 48 : pfree(state->memtuples);
697 48 : state->memtuples = NULL;
698 48 : state->memtupsize = INITIAL_MEMTUPSIZE;
699 : }
700 174776 : if (state->memtuples == NULL)
701 : {
702 172923 : state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
703 172923 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
704 : }
705 :
706 : /* workMem must be large enough for the minimal memtuples array */
707 174776 : if (LACKMEM(state))
708 0 : elog(ERROR, "insufficient memory allowed for sort");
709 :
710 174776 : state->currentRun = 0;
711 :
712 : /*
713 : * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
714 : * inittapes(), if needed.
715 : */
716 :
717 174776 : state->result_tape = NULL; /* flag that result tape has not been formed */
718 :
719 174776 : MemoryContextSwitchTo(oldcontext);
720 174776 : }
721 :
722 : /*
723 : * tuplesort_set_bound
724 : *
725 : * Advise tuplesort that at most the first N result tuples are required.
726 : *
727 : * Must be called before inserting any tuples. (Actually, we could allow it
728 : * as long as the sort hasn't spilled to disk, but there seems no need for
729 : * delayed calls at the moment.)
730 : *
731 : * This is a hint only. The tuplesort may still return more tuples than
732 : * requested. Parallel leader tuplesorts will always ignore the hint.
733 : */
734 : void
735 780 : tuplesort_set_bound(Tuplesortstate *state, int64 bound)
736 : {
737 : /* Assert we're called before loading any tuples */
738 : Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
739 : /* Assert we allow bounded sorts */
740 : Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
741 : /* Can't set the bound twice, either */
742 : Assert(!state->bounded);
743 : /* Also, this shouldn't be called in a parallel worker */
744 : Assert(!WORKER(state));
745 :
746 : /* Parallel leader allows but ignores hint */
747 780 : if (LEADER(state))
748 0 : return;
749 :
750 : #ifdef DEBUG_BOUNDED_SORT
751 : /* Honor GUC setting that disables the feature (for easy testing) */
752 : if (!optimize_bounded_sort)
753 : return;
754 : #endif
755 :
756 : /* We want to be able to compute bound * 2, so limit the setting */
757 780 : if (bound > (int64) (INT_MAX / 2))
758 0 : return;
759 :
760 780 : state->bounded = true;
761 780 : state->bound = (int) bound;
762 :
763 : /*
764 : * Bounded sorts are not an effective target for abbreviated key
765 : * optimization. Disable by setting state to be consistent with no
766 : * abbreviation support.
767 : */
768 780 : state->base.sortKeys->abbrev_converter = NULL;
769 780 : if (state->base.sortKeys->abbrev_full_comparator)
770 10 : state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
771 :
772 : /* Not strictly necessary, but be tidy */
773 780 : state->base.sortKeys->abbrev_abort = NULL;
774 780 : state->base.sortKeys->abbrev_full_comparator = NULL;
775 : }
776 :
777 : /*
778 : * tuplesort_used_bound
779 : *
780 : * Allow callers to find out if the sort state was able to use a bound.
781 : */
782 : bool
783 243 : tuplesort_used_bound(Tuplesortstate *state)
784 : {
785 243 : return state->boundUsed;
786 : }
787 :
788 : /*
789 : * tuplesort_free
790 : *
791 : * Internal routine for freeing resources of tuplesort.
792 : */
793 : static void
794 174602 : tuplesort_free(Tuplesortstate *state)
795 : {
796 : /* context swap probably not needed, but let's be safe */
797 174602 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
798 : int64 spaceUsed;
799 :
800 174602 : if (state->tapeset)
801 599 : spaceUsed = LogicalTapeSetBlocks(state->tapeset);
802 : else
803 174003 : spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
804 :
805 : /*
806 : * Delete temporary "tape" files, if any.
807 : *
808 : * We don't bother to destroy the individual tapes here. They will go away
809 : * with the sortcontext. (In TSS_FINALMERGE state, we have closed
810 : * finished tapes already.)
811 : */
812 174602 : if (state->tapeset)
813 599 : LogicalTapeSetClose(state->tapeset);
814 :
815 174602 : if (trace_sort)
816 : {
817 0 : if (state->tapeset)
818 0 : elog(LOG, "%s of worker %d ended, %" PRId64 " disk blocks used: %s",
819 : SERIAL(state) ? "external sort" : "parallel external sort",
820 : state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
821 : else
822 0 : elog(LOG, "%s of worker %d ended, %" PRId64 " KB used: %s",
823 : SERIAL(state) ? "internal sort" : "unperformed parallel sort",
824 : state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
825 : }
826 :
827 : TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
828 :
829 174602 : FREESTATE(state);
830 174602 : MemoryContextSwitchTo(oldcontext);
831 :
832 : /*
833 : * Free the per-sort memory context, thereby releasing all working memory.
834 : */
835 174602 : MemoryContextReset(state->base.sortcontext);
836 174602 : }
837 :
838 : /*
839 : * tuplesort_end
840 : *
841 : * Release resources and clean up.
842 : *
843 : * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
844 : * pointing to garbage. Be careful not to attempt to use or free such
845 : * pointers afterwards!
846 : */
847 : void
848 172701 : tuplesort_end(Tuplesortstate *state)
849 : {
850 172701 : tuplesort_free(state);
851 :
852 : /*
853 : * Free the main memory context, including the Tuplesortstate struct
854 : * itself.
855 : */
856 172701 : MemoryContextDelete(state->base.maincontext);
857 172701 : }
858 :
859 : /*
860 : * tuplesort_updatemax
861 : *
862 : * Update maximum resource usage statistics.
863 : */
864 : static void
865 2165 : tuplesort_updatemax(Tuplesortstate *state)
866 : {
867 : int64 spaceUsed;
868 : bool isSpaceDisk;
869 :
870 : /*
871 : * Note: it might seem we should provide both memory and disk usage for a
872 : * disk-based sort. However, the current code doesn't track memory space
873 : * accurately once we have begun to return tuples to the caller (since we
874 : * don't account for pfree's the caller is expected to do), so we cannot
875 : * rely on availMem in a disk sort. This does not seem worth the overhead
876 : * to fix. Is it worth creating an API for the memory context code to
877 : * tell us how much is actually used in sortcontext?
878 : */
879 2165 : if (state->tapeset)
880 : {
881 4 : isSpaceDisk = true;
882 4 : spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
883 : }
884 : else
885 : {
886 2161 : isSpaceDisk = false;
887 2161 : spaceUsed = state->allowedMem - state->availMem;
888 : }
889 :
890 : /*
891 : * Sort evicts data to the disk when it wasn't able to fit that data into
892 : * main memory. This is why we assume space used on the disk to be more
893 : * important for tracking resource usage than space used in memory. Note
894 : * that the amount of space occupied by some tupleset on the disk might be
895 : * less than amount of space occupied by the same tupleset in memory due
896 : * to more compact representation.
897 : */
898 2165 : if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
899 2161 : (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
900 : {
901 325 : state->maxSpace = spaceUsed;
902 325 : state->isMaxSpaceDisk = isSpaceDisk;
903 325 : state->maxSpaceStatus = state->status;
904 : }
905 2165 : }
906 :
907 : /*
908 : * tuplesort_reset
909 : *
910 : * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
911 : * meta-information in. After tuplesort_reset, tuplesort is ready to start
912 : * a new sort. This allows avoiding recreation of tuple sort states (and
913 : * save resources) when sorting multiple small batches.
914 : */
915 : void
916 1901 : tuplesort_reset(Tuplesortstate *state)
917 : {
918 1901 : tuplesort_updatemax(state);
919 1901 : tuplesort_free(state);
920 :
921 : /*
922 : * After we've freed up per-batch memory, re-setup all of the state common
923 : * to both the first batch and any subsequent batch.
924 : */
925 1901 : tuplesort_begin_batch(state);
926 :
927 1901 : state->lastReturnedTuple = NULL;
928 1901 : state->slabMemoryBegin = NULL;
929 1901 : state->slabMemoryEnd = NULL;
930 1901 : state->slabFreeHead = NULL;
931 1901 : }
932 :
933 : /*
934 : * Grow the memtuples[] array, if possible within our memory constraint. We
935 : * must not exceed INT_MAX tuples in memory or the caller-provided memory
936 : * limit. Return true if we were able to enlarge the array, false if not.
937 : *
938 : * Normally, at each increment we double the size of the array. When doing
939 : * that would exceed a limit, we attempt one last, smaller increase (and then
940 : * clear the growmemtuples flag so we don't try any more). That allows us to
941 : * use memory as fully as permitted; sticking to the pure doubling rule could
942 : * result in almost half going unused. Because availMem moves around with
943 : * tuple addition/removal, we need some rule to prevent making repeated small
944 : * increases in memtupsize, which would just be useless thrashing. The
945 : * growmemtuples flag accomplishes that and also prevents useless
946 : * recalculations in this function.
947 : */
948 : static bool
949 5316 : grow_memtuples(Tuplesortstate *state)
950 : {
951 : int newmemtupsize;
952 5316 : int memtupsize = state->memtupsize;
953 5316 : int64 memNowUsed = state->allowedMem - state->availMem;
954 :
955 : /* Forget it if we've already maxed out memtuples, per comment above */
956 5316 : if (!state->growmemtuples)
957 90 : return false;
958 :
959 : /* Select new value of memtupsize */
960 5226 : if (memNowUsed <= state->availMem)
961 : {
962 : /*
963 : * We've used no more than half of allowedMem; double our usage,
964 : * clamping at INT_MAX tuples.
965 : */
966 5133 : if (memtupsize < INT_MAX / 2)
967 5133 : newmemtupsize = memtupsize * 2;
968 : else
969 : {
970 0 : newmemtupsize = INT_MAX;
971 0 : state->growmemtuples = false;
972 : }
973 : }
974 : else
975 : {
976 : /*
977 : * This will be the last increment of memtupsize. Abandon doubling
978 : * strategy and instead increase as much as we safely can.
979 : *
980 : * To stay within allowedMem, we can't increase memtupsize by more
981 : * than availMem / sizeof(SortTuple) elements. In practice, we want
982 : * to increase it by considerably less, because we need to leave some
983 : * space for the tuples to which the new array slots will refer. We
984 : * assume the new tuples will be about the same size as the tuples
985 : * we've already seen, and thus we can extrapolate from the space
986 : * consumption so far to estimate an appropriate new size for the
987 : * memtuples array. The optimal value might be higher or lower than
988 : * this estimate, but it's hard to know that in advance. We again
989 : * clamp at INT_MAX tuples.
990 : *
991 : * This calculation is safe against enlarging the array so much that
992 : * LACKMEM becomes true, because the memory currently used includes
993 : * the present array; thus, there would be enough allowedMem for the
994 : * new array elements even if no other memory were currently used.
995 : *
996 : * We do the arithmetic in float8, because otherwise the product of
997 : * memtupsize and allowedMem could overflow. Any inaccuracy in the
998 : * result should be insignificant; but even if we computed a
999 : * completely insane result, the checks below will prevent anything
1000 : * really bad from happening.
1001 : */
1002 : double grow_ratio;
1003 :
1004 93 : grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1005 93 : if (memtupsize * grow_ratio < INT_MAX)
1006 93 : newmemtupsize = (int) (memtupsize * grow_ratio);
1007 : else
1008 0 : newmemtupsize = INT_MAX;
1009 :
1010 : /* We won't make any further enlargement attempts */
1011 93 : state->growmemtuples = false;
1012 : }
1013 :
1014 : /* Must enlarge array by at least one element, else report failure */
1015 5226 : if (newmemtupsize <= memtupsize)
1016 0 : goto noalloc;
1017 :
1018 : /*
1019 : * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1020 : * to ensure our request won't be rejected. Note that we can easily
1021 : * exhaust address space before facing this outcome. (This is presently
1022 : * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1023 : * don't rely on that at this distance.)
1024 : */
1025 5226 : if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1026 : {
1027 0 : newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1028 0 : state->growmemtuples = false; /* can't grow any more */
1029 : }
1030 :
1031 : /*
1032 : * We need to be sure that we do not cause LACKMEM to become true, else
1033 : * the space management algorithm will go nuts. The code above should
1034 : * never generate a dangerous request, but to be safe, check explicitly
1035 : * that the array growth fits within availMem. (We could still cause
1036 : * LACKMEM if the memory chunk overhead associated with the memtuples
1037 : * array were to increase. That shouldn't happen because we chose the
1038 : * initial array size large enough to ensure that palloc will be treating
1039 : * both old and new arrays as separate chunks. But we'll check LACKMEM
1040 : * explicitly below just in case.)
1041 : */
1042 5226 : if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1043 0 : goto noalloc;
1044 :
1045 : /* OK, do it */
1046 5226 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1047 5226 : state->memtupsize = newmemtupsize;
1048 5226 : state->memtuples = (SortTuple *)
1049 5226 : repalloc_huge(state->memtuples,
1050 5226 : state->memtupsize * sizeof(SortTuple));
1051 5226 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1052 5226 : if (LACKMEM(state))
1053 0 : elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1054 5226 : return true;
1055 :
1056 0 : noalloc:
1057 : /* If for any reason we didn't realloc, shut off future attempts */
1058 0 : state->growmemtuples = false;
1059 0 : return false;
1060 : }
1061 :
1062 : /*
1063 : * Shared code for tuple and datum cases.
1064 : */
1065 : void
1066 19362619 : tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple,
1067 : bool useAbbrev, Size tuplen)
1068 : {
1069 19362619 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1070 :
1071 : Assert(!LEADER(state));
1072 :
1073 : /* account for the memory used for this tuple */
1074 19362619 : USEMEM(state, tuplen);
1075 19362619 : state->tupleMem += tuplen;
1076 :
1077 19362619 : if (!useAbbrev)
1078 : {
1079 : /*
1080 : * Leave ordinary Datum representation, or NULL value. If there is a
1081 : * converter it won't expect NULL values, and cost model is not
1082 : * required to account for NULL, so in that case we avoid calling
1083 : * converter and just set datum1 to zeroed representation (to be
1084 : * consistent, and to support cheap inequality tests for NULL
1085 : * abbreviated keys).
1086 : */
1087 : }
1088 2899252 : else if (!consider_abort_common(state))
1089 : {
1090 : /* Store abbreviated key representation */
1091 2899188 : tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
1092 : state->base.sortKeys);
1093 : }
1094 : else
1095 : {
1096 : /*
1097 : * Set state to be consistent with never trying abbreviation.
1098 : *
1099 : * Alter datum1 representation in already-copied tuples, so as to
1100 : * ensure a consistent representation (current tuple was just
1101 : * handled). It does not matter if some dumped tuples are already
1102 : * sorted on tape, since serialized tuples lack abbreviated keys
1103 : * (TSS_BUILDRUNS state prevents control reaching here in any case).
1104 : */
1105 64 : REMOVEABBREV(state, state->memtuples, state->memtupcount);
1106 : }
1107 :
1108 19362619 : switch (state->status)
1109 : {
1110 16489625 : case TSS_INITIAL:
1111 :
1112 : /*
1113 : * Save the tuple into the unsorted array. First, grow the array
1114 : * as needed. Note that we try to grow the array when there is
1115 : * still one free slot remaining --- if we fail, there'll still be
1116 : * room to store the incoming tuple, and then we'll switch to
1117 : * tape-based operation.
1118 : */
1119 16489625 : if (state->memtupcount >= state->memtupsize - 1)
1120 : {
1121 5316 : (void) grow_memtuples(state);
1122 : Assert(state->memtupcount < state->memtupsize);
1123 : }
1124 16489625 : state->memtuples[state->memtupcount++] = *tuple;
1125 :
1126 : /*
1127 : * Check if it's time to switch over to a bounded heapsort. We do
1128 : * so if the input tuple count exceeds twice the desired tuple
1129 : * count (this is a heuristic for where heapsort becomes cheaper
1130 : * than a quicksort), or if we've just filled workMem and have
1131 : * enough tuples to meet the bound.
1132 : *
1133 : * Note that once we enter TSS_BOUNDED state we will always try to
1134 : * complete the sort that way. In the worst case, if later input
1135 : * tuples are larger than earlier ones, this might cause us to
1136 : * exceed workMem significantly.
1137 : */
1138 16489625 : if (state->bounded &&
1139 35488 : (state->memtupcount > state->bound * 2 ||
1140 35233 : (state->memtupcount > state->bound && LACKMEM(state))))
1141 : {
1142 255 : if (trace_sort)
1143 0 : elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1144 : state->memtupcount,
1145 : pg_rusage_show(&state->ru_start));
1146 255 : make_bounded_heap(state);
1147 255 : MemoryContextSwitchTo(oldcontext);
1148 255 : return;
1149 : }
1150 :
1151 : /*
1152 : * Done if we still fit in available memory and have array slots.
1153 : */
1154 16489370 : if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1155 : {
1156 16489280 : MemoryContextSwitchTo(oldcontext);
1157 16489280 : return;
1158 : }
1159 :
1160 : /*
1161 : * Nope; time to switch to tape-based operation.
1162 : */
1163 90 : inittapes(state, true);
1164 :
1165 : /*
1166 : * Dump all tuples.
1167 : */
1168 90 : dumptuples(state, false);
1169 90 : break;
1170 :
1171 2138921 : case TSS_BOUNDED:
1172 :
1173 : /*
1174 : * We don't want to grow the array here, so check whether the new
1175 : * tuple can be discarded before putting it in. This should be a
1176 : * good speed optimization, too, since when there are many more
1177 : * input tuples than the bound, most input tuples can be discarded
1178 : * with just this one comparison. Note that because we currently
1179 : * have the sort direction reversed, we must check for <= not >=.
1180 : */
1181 2138921 : if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1182 : {
1183 : /* new tuple <= top of the heap, so we can discard it */
1184 1804256 : free_sort_tuple(state, tuple);
1185 1804256 : CHECK_FOR_INTERRUPTS();
1186 : }
1187 : else
1188 : {
1189 : /* discard top of heap, replacing it with the new tuple */
1190 334665 : free_sort_tuple(state, &state->memtuples[0]);
1191 334665 : tuplesort_heap_replace_top(state, tuple);
1192 : }
1193 2138921 : break;
1194 :
1195 734073 : case TSS_BUILDRUNS:
1196 :
1197 : /*
1198 : * Save the tuple into the unsorted array (there must be space)
1199 : */
1200 734073 : state->memtuples[state->memtupcount++] = *tuple;
1201 :
1202 : /*
1203 : * If we are over the memory limit, dump all tuples.
1204 : */
1205 734073 : dumptuples(state, false);
1206 734073 : break;
1207 :
1208 0 : default:
1209 0 : elog(ERROR, "invalid tuplesort state");
1210 : break;
1211 : }
1212 2873084 : MemoryContextSwitchTo(oldcontext);
1213 : }
1214 :
1215 : static bool
1216 2899252 : consider_abort_common(Tuplesortstate *state)
1217 : {
1218 : Assert(state->base.sortKeys[0].abbrev_converter != NULL);
1219 : Assert(state->base.sortKeys[0].abbrev_abort != NULL);
1220 : Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
1221 :
1222 : /*
1223 : * Check effectiveness of abbreviation optimization. Consider aborting
1224 : * when still within memory limit.
1225 : */
1226 2899252 : if (state->status == TSS_INITIAL &&
1227 2597219 : state->memtupcount >= state->abbrevNext)
1228 : {
1229 3280 : state->abbrevNext *= 2;
1230 :
1231 : /*
1232 : * Check opclass-supplied abbreviation abort routine. It may indicate
1233 : * that abbreviation should not proceed.
1234 : */
1235 3280 : if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
1236 : state->base.sortKeys))
1237 3216 : return false;
1238 :
1239 : /*
1240 : * Finally, restore authoritative comparator, and indicate that
1241 : * abbreviation is not in play by setting abbrev_converter to NULL
1242 : */
1243 64 : state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
1244 64 : state->base.sortKeys[0].abbrev_converter = NULL;
1245 : /* Not strictly necessary, but be tidy */
1246 64 : state->base.sortKeys[0].abbrev_abort = NULL;
1247 64 : state->base.sortKeys[0].abbrev_full_comparator = NULL;
1248 :
1249 : /* Give up - expect original pass-by-value representation */
1250 64 : return true;
1251 : }
1252 :
1253 2895972 : return false;
1254 : }
1255 :
1256 : /*
1257 : * All tuples have been provided; finish the sort.
1258 : */
1259 : void
1260 149639 : tuplesort_performsort(Tuplesortstate *state)
1261 : {
1262 149639 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1263 :
1264 149639 : if (trace_sort)
1265 0 : elog(LOG, "performsort of worker %d starting: %s",
1266 : state->worker, pg_rusage_show(&state->ru_start));
1267 :
1268 149639 : switch (state->status)
1269 : {
1270 149294 : case TSS_INITIAL:
1271 :
1272 : /*
1273 : * We were able to accumulate all the tuples within the allowed
1274 : * amount of memory, or leader to take over worker tapes
1275 : */
1276 149294 : if (SERIAL(state))
1277 : {
1278 : /* Sort in memory and we're done */
1279 148785 : tuplesort_sort_memtuples(state);
1280 148725 : state->status = TSS_SORTEDINMEM;
1281 : }
1282 509 : else if (WORKER(state))
1283 : {
1284 : /*
1285 : * Parallel workers must still dump out tuples to tape. No
1286 : * merge is required to produce single output run, though.
1287 : */
1288 379 : inittapes(state, false);
1289 379 : dumptuples(state, true);
1290 379 : worker_nomergeruns(state);
1291 379 : state->status = TSS_SORTEDONTAPE;
1292 : }
1293 : else
1294 : {
1295 : /*
1296 : * Leader will take over worker tapes and merge worker runs.
1297 : * Note that mergeruns sets the correct state->status.
1298 : */
1299 130 : leader_takeover_tapes(state);
1300 130 : mergeruns(state);
1301 : }
1302 149234 : state->current = 0;
1303 149234 : state->eof_reached = false;
1304 149234 : state->markpos_block = 0L;
1305 149234 : state->markpos_offset = 0;
1306 149234 : state->markpos_eof = false;
1307 149234 : break;
1308 :
1309 255 : case TSS_BOUNDED:
1310 :
1311 : /*
1312 : * We were able to accumulate all the tuples required for output
1313 : * in memory, using a heap to eliminate excess tuples. Now we
1314 : * have to transform the heap to a properly-sorted array. Note
1315 : * that sort_bounded_heap sets the correct state->status.
1316 : */
1317 255 : sort_bounded_heap(state);
1318 255 : state->current = 0;
1319 255 : state->eof_reached = false;
1320 255 : state->markpos_offset = 0;
1321 255 : state->markpos_eof = false;
1322 255 : break;
1323 :
1324 90 : case TSS_BUILDRUNS:
1325 :
1326 : /*
1327 : * Finish tape-based sort. First, flush all tuples remaining in
1328 : * memory out to tape; then merge until we have a single remaining
1329 : * run (or, if !randomAccess and !WORKER(), one run per tape).
1330 : * Note that mergeruns sets the correct state->status.
1331 : */
1332 90 : dumptuples(state, true);
1333 90 : mergeruns(state);
1334 90 : state->eof_reached = false;
1335 90 : state->markpos_block = 0L;
1336 90 : state->markpos_offset = 0;
1337 90 : state->markpos_eof = false;
1338 90 : break;
1339 :
1340 0 : default:
1341 0 : elog(ERROR, "invalid tuplesort state");
1342 : break;
1343 : }
1344 :
1345 149579 : if (trace_sort)
1346 : {
1347 0 : if (state->status == TSS_FINALMERGE)
1348 0 : elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1349 : state->worker, state->nInputTapes,
1350 : pg_rusage_show(&state->ru_start));
1351 : else
1352 0 : elog(LOG, "performsort of worker %d done: %s",
1353 : state->worker, pg_rusage_show(&state->ru_start));
1354 : }
1355 :
1356 149579 : MemoryContextSwitchTo(oldcontext);
1357 149579 : }
1358 :
1359 : /*
1360 : * Internal routine to fetch the next tuple in either forward or back
1361 : * direction into *stup. Returns false if no more tuples.
1362 : * Returned tuple belongs to tuplesort memory context, and must not be freed
1363 : * by caller. Note that fetched tuple is stored in memory that may be
1364 : * recycled by any future fetch.
1365 : */
1366 : bool
1367 17725561 : tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
1368 : SortTuple *stup)
1369 : {
1370 : unsigned int tuplen;
1371 : size_t nmoved;
1372 :
1373 : Assert(!WORKER(state));
1374 :
1375 17725561 : switch (state->status)
1376 : {
1377 14784536 : case TSS_SORTEDINMEM:
1378 : Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1379 : Assert(!state->slabAllocatorUsed);
1380 14784536 : if (forward)
1381 : {
1382 14784492 : if (state->current < state->memtupcount)
1383 : {
1384 14636762 : *stup = state->memtuples[state->current++];
1385 14636762 : return true;
1386 : }
1387 147730 : state->eof_reached = true;
1388 :
1389 : /*
1390 : * Complain if caller tries to retrieve more tuples than
1391 : * originally asked for in a bounded sort. This is because
1392 : * returning EOF here might be the wrong thing.
1393 : */
1394 147730 : if (state->bounded && state->current >= state->bound)
1395 0 : elog(ERROR, "retrieved too many tuples in a bounded sort");
1396 :
1397 147730 : return false;
1398 : }
1399 : else
1400 : {
1401 44 : if (state->current <= 0)
1402 0 : return false;
1403 :
1404 : /*
1405 : * if all tuples are fetched already then we return last
1406 : * tuple, else - tuple before last returned.
1407 : */
1408 44 : if (state->eof_reached)
1409 8 : state->eof_reached = false;
1410 : else
1411 : {
1412 36 : state->current--; /* last returned tuple */
1413 36 : if (state->current <= 0)
1414 4 : return false;
1415 : }
1416 40 : *stup = state->memtuples[state->current - 1];
1417 40 : return true;
1418 : }
1419 : break;
1420 :
1421 196999 : case TSS_SORTEDONTAPE:
1422 : Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1423 : Assert(state->slabAllocatorUsed);
1424 :
1425 : /*
1426 : * The slot that held the tuple that we returned in previous
1427 : * gettuple call can now be reused.
1428 : */
1429 196999 : if (state->lastReturnedTuple)
1430 : {
1431 101900 : RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1432 101900 : state->lastReturnedTuple = NULL;
1433 : }
1434 :
1435 196999 : if (forward)
1436 : {
1437 196979 : if (state->eof_reached)
1438 0 : return false;
1439 :
1440 196979 : if ((tuplen = getlen(state->result_tape, true)) != 0)
1441 : {
1442 196960 : READTUP(state, stup, state->result_tape, tuplen);
1443 :
1444 : /*
1445 : * Remember the tuple we return, so that we can recycle
1446 : * its memory on next call. (This can be NULL, in the
1447 : * !state->tuples case).
1448 : */
1449 196960 : state->lastReturnedTuple = stup->tuple;
1450 :
1451 196960 : return true;
1452 : }
1453 : else
1454 : {
1455 19 : state->eof_reached = true;
1456 19 : return false;
1457 : }
1458 : }
1459 :
1460 : /*
1461 : * Backward.
1462 : *
1463 : * if all tuples are fetched already then we return last tuple,
1464 : * else - tuple before last returned.
1465 : */
1466 20 : if (state->eof_reached)
1467 : {
1468 : /*
1469 : * Seek position is pointing just past the zero tuplen at the
1470 : * end of file; back up to fetch last tuple's ending length
1471 : * word. If seek fails we must have a completely empty file.
1472 : */
1473 8 : nmoved = LogicalTapeBackspace(state->result_tape,
1474 : 2 * sizeof(unsigned int));
1475 8 : if (nmoved == 0)
1476 0 : return false;
1477 8 : else if (nmoved != 2 * sizeof(unsigned int))
1478 0 : elog(ERROR, "unexpected tape position");
1479 8 : state->eof_reached = false;
1480 : }
1481 : else
1482 : {
1483 : /*
1484 : * Back up and fetch previously-returned tuple's ending length
1485 : * word. If seek fails, assume we are at start of file.
1486 : */
1487 12 : nmoved = LogicalTapeBackspace(state->result_tape,
1488 : sizeof(unsigned int));
1489 12 : if (nmoved == 0)
1490 0 : return false;
1491 12 : else if (nmoved != sizeof(unsigned int))
1492 0 : elog(ERROR, "unexpected tape position");
1493 12 : tuplen = getlen(state->result_tape, false);
1494 :
1495 : /*
1496 : * Back up to get ending length word of tuple before it.
1497 : */
1498 12 : nmoved = LogicalTapeBackspace(state->result_tape,
1499 : tuplen + 2 * sizeof(unsigned int));
1500 12 : if (nmoved == tuplen + sizeof(unsigned int))
1501 : {
1502 : /*
1503 : * We backed up over the previous tuple, but there was no
1504 : * ending length word before it. That means that the prev
1505 : * tuple is the first tuple in the file. It is now the
1506 : * next to read in forward direction (not obviously right,
1507 : * but that is what in-memory case does).
1508 : */
1509 4 : return false;
1510 : }
1511 8 : else if (nmoved != tuplen + 2 * sizeof(unsigned int))
1512 0 : elog(ERROR, "bogus tuple length in backward scan");
1513 : }
1514 :
1515 16 : tuplen = getlen(state->result_tape, false);
1516 :
1517 : /*
1518 : * Now we have the length of the prior tuple, back up and read it.
1519 : * Note: READTUP expects we are positioned after the initial
1520 : * length word of the tuple, so back up to that point.
1521 : */
1522 16 : nmoved = LogicalTapeBackspace(state->result_tape,
1523 : tuplen);
1524 16 : if (nmoved != tuplen)
1525 0 : elog(ERROR, "bogus tuple length in backward scan");
1526 16 : READTUP(state, stup, state->result_tape, tuplen);
1527 :
1528 : /*
1529 : * Remember the tuple we return, so that we can recycle its memory
1530 : * on next call. (This can be NULL, in the Datum case).
1531 : */
1532 16 : state->lastReturnedTuple = stup->tuple;
1533 :
1534 16 : return true;
1535 :
1536 2744026 : case TSS_FINALMERGE:
1537 : Assert(forward);
1538 : /* We are managing memory ourselves, with the slab allocator. */
1539 : Assert(state->slabAllocatorUsed);
1540 :
1541 : /*
1542 : * The slab slot holding the tuple that we returned in previous
1543 : * gettuple call can now be reused.
1544 : */
1545 2744026 : if (state->lastReturnedTuple)
1546 : {
1547 2673800 : RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1548 2673800 : state->lastReturnedTuple = NULL;
1549 : }
1550 :
1551 : /*
1552 : * This code should match the inner loop of mergeonerun().
1553 : */
1554 2744026 : if (state->memtupcount > 0)
1555 : {
1556 2743832 : int srcTapeIndex = state->memtuples[0].srctape;
1557 2743832 : LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
1558 : SortTuple newtup;
1559 :
1560 2743832 : *stup = state->memtuples[0];
1561 :
1562 : /*
1563 : * Remember the tuple we return, so that we can recycle its
1564 : * memory on next call. (This can be NULL, in the Datum case).
1565 : */
1566 2743832 : state->lastReturnedTuple = stup->tuple;
1567 :
1568 : /*
1569 : * Pull next tuple from tape, and replace the returned tuple
1570 : * at top of the heap with it.
1571 : */
1572 2743832 : if (!mergereadnext(state, srcTape, &newtup))
1573 : {
1574 : /*
1575 : * If no more data, we've reached end of run on this tape.
1576 : * Remove the top node from the heap.
1577 : */
1578 283 : tuplesort_heap_delete_top(state);
1579 283 : state->nInputRuns--;
1580 :
1581 : /*
1582 : * Close the tape. It'd go away at the end of the sort
1583 : * anyway, but better to release the memory early.
1584 : */
1585 283 : LogicalTapeClose(srcTape);
1586 283 : return true;
1587 : }
1588 2743549 : newtup.srctape = srcTapeIndex;
1589 2743549 : tuplesort_heap_replace_top(state, &newtup);
1590 2743549 : return true;
1591 : }
1592 194 : return false;
1593 :
1594 0 : default:
1595 0 : elog(ERROR, "invalid tuplesort state");
1596 : return false; /* keep compiler quiet */
1597 : }
1598 : }
1599 :
1600 :
1601 : /*
1602 : * Advance over N tuples in either forward or back direction,
1603 : * without returning any data. N==0 is a no-op.
1604 : * Returns true if successful, false if ran out of tuples.
1605 : */
1606 : bool
1607 258 : tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
1608 : {
1609 : MemoryContext oldcontext;
1610 :
1611 : /*
1612 : * We don't actually support backwards skip yet, because no callers need
1613 : * it. The API is designed to allow for that later, though.
1614 : */
1615 : Assert(forward);
1616 : Assert(ntuples >= 0);
1617 : Assert(!WORKER(state));
1618 :
1619 258 : switch (state->status)
1620 : {
1621 242 : case TSS_SORTEDINMEM:
1622 242 : if (state->memtupcount - state->current >= ntuples)
1623 : {
1624 242 : state->current += ntuples;
1625 242 : return true;
1626 : }
1627 0 : state->current = state->memtupcount;
1628 0 : state->eof_reached = true;
1629 :
1630 : /*
1631 : * Complain if caller tries to retrieve more tuples than
1632 : * originally asked for in a bounded sort. This is because
1633 : * returning EOF here might be the wrong thing.
1634 : */
1635 0 : if (state->bounded && state->current >= state->bound)
1636 0 : elog(ERROR, "retrieved too many tuples in a bounded sort");
1637 :
1638 0 : return false;
1639 :
1640 16 : case TSS_SORTEDONTAPE:
1641 : case TSS_FINALMERGE:
1642 :
1643 : /*
1644 : * We could probably optimize these cases better, but for now it's
1645 : * not worth the trouble.
1646 : */
1647 16 : oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1648 160088 : while (ntuples-- > 0)
1649 : {
1650 : SortTuple stup;
1651 :
1652 160072 : if (!tuplesort_gettuple_common(state, forward, &stup))
1653 : {
1654 0 : MemoryContextSwitchTo(oldcontext);
1655 0 : return false;
1656 : }
1657 160072 : CHECK_FOR_INTERRUPTS();
1658 : }
1659 16 : MemoryContextSwitchTo(oldcontext);
1660 16 : return true;
1661 :
1662 0 : default:
1663 0 : elog(ERROR, "invalid tuplesort state");
1664 : return false; /* keep compiler quiet */
1665 : }
1666 : }
1667 :
1668 : /*
1669 : * tuplesort_merge_order - report merge order we'll use for given memory
1670 : * (note: "merge order" just means the number of input tapes in the merge).
1671 : *
1672 : * This is exported for use by the planner. allowedMem is in bytes.
1673 : */
1674 : int
1675 12478 : tuplesort_merge_order(int64 allowedMem)
1676 : {
1677 : int mOrder;
1678 :
1679 : /*----------
1680 : * In the merge phase, we need buffer space for each input and output tape.
1681 : * Each pass in the balanced merge algorithm reads from M input tapes, and
1682 : * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
1683 : * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
1684 : * input tape.
1685 : *
1686 : * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
1687 : * N * TAPE_BUFFER_OVERHEAD
1688 : *
1689 : * Except for the last and next-to-last merge passes, where there can be
1690 : * fewer tapes left to process, M = N. We choose M so that we have the
1691 : * desired amount of memory available for the input buffers
1692 : * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
1693 : * available for the tape buffers (allowedMem).
1694 : *
1695 : * Note: you might be thinking we need to account for the memtuples[]
1696 : * array in this calculation, but we effectively treat that as part of the
1697 : * MERGE_BUFFER_SIZE workspace.
1698 : *----------
1699 : */
1700 12478 : mOrder = allowedMem /
1701 : (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
1702 :
1703 : /*
1704 : * Even in minimum memory, use at least a MINORDER merge. On the other
1705 : * hand, even when we have lots of memory, do not use more than a MAXORDER
1706 : * merge. Tapes are pretty cheap, but they're not entirely free. Each
1707 : * additional tape reduces the amount of memory available to build runs,
1708 : * which in turn can cause the same sort to need more runs, which makes
1709 : * merging slower even if it can still be done in a single pass. Also,
1710 : * high order merges are quite slow due to CPU cache effects; it can be
1711 : * faster to pay the I/O cost of a multi-pass merge than to perform a
1712 : * single merge pass across many hundreds of tapes.
1713 : */
1714 12478 : mOrder = Max(mOrder, MINORDER);
1715 12478 : mOrder = Min(mOrder, MAXORDER);
1716 :
1717 12478 : return mOrder;
1718 : }
1719 :
1720 : /*
1721 : * Helper function to calculate how much memory to allocate for the read buffer
1722 : * of each input tape in a merge pass.
1723 : *
1724 : * 'avail_mem' is the amount of memory available for the buffers of all the
1725 : * tapes, both input and output.
1726 : * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
1727 : * 'maxOutputTapes' is the max. number of output tapes we should produce.
1728 : */
1729 : static int64
1730 240 : merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
1731 : int maxOutputTapes)
1732 : {
1733 : int nOutputRuns;
1734 : int nOutputTapes;
1735 :
1736 : /*
1737 : * How many output tapes will we produce in this pass?
1738 : *
1739 : * This is nInputRuns / nInputTapes, rounded up.
1740 : */
1741 240 : nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1742 :
1743 240 : nOutputTapes = Min(nOutputRuns, maxOutputTapes);
1744 :
1745 : /*
1746 : * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
1747 : * remaining memory is divided evenly between the input tapes.
1748 : *
1749 : * This also follows from the formula in tuplesort_merge_order, but here
1750 : * we derive the input buffer size from the amount of memory available,
1751 : * and M and N.
1752 : */
1753 240 : return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
1754 : }
1755 :
1756 : /*
1757 : * inittapes - initialize for tape sorting.
1758 : *
1759 : * This is called only if we have found we won't sort in memory.
1760 : */
1761 : static void
1762 469 : inittapes(Tuplesortstate *state, bool mergeruns)
1763 : {
1764 : Assert(!LEADER(state));
1765 :
1766 469 : if (mergeruns)
1767 : {
1768 : /* Compute number of input tapes to use when merging */
1769 90 : state->maxTapes = tuplesort_merge_order(state->allowedMem);
1770 : }
1771 : else
1772 : {
1773 : /* Workers can sometimes produce single run, output without merge */
1774 : Assert(WORKER(state));
1775 379 : state->maxTapes = MINORDER;
1776 : }
1777 :
1778 469 : if (trace_sort)
1779 0 : elog(LOG, "worker %d switching to external sort with %d tapes: %s",
1780 : state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
1781 :
1782 : /* Create the tape set */
1783 469 : inittapestate(state, state->maxTapes);
1784 469 : state->tapeset =
1785 469 : LogicalTapeSetCreate(false,
1786 469 : state->shared ? &state->shared->fileset : NULL,
1787 : state->worker);
1788 :
1789 469 : state->currentRun = 0;
1790 :
1791 : /*
1792 : * Initialize logical tape arrays.
1793 : */
1794 469 : state->inputTapes = NULL;
1795 469 : state->nInputTapes = 0;
1796 469 : state->nInputRuns = 0;
1797 :
1798 469 : state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
1799 469 : state->nOutputTapes = 0;
1800 469 : state->nOutputRuns = 0;
1801 :
1802 469 : state->status = TSS_BUILDRUNS;
1803 :
1804 469 : selectnewtape(state);
1805 469 : }
1806 :
1807 : /*
1808 : * inittapestate - initialize generic tape management state
1809 : */
1810 : static void
1811 599 : inittapestate(Tuplesortstate *state, int maxTapes)
1812 : {
1813 : int64 tapeSpace;
1814 :
1815 : /*
1816 : * Decrease availMem to reflect the space needed for tape buffers; but
1817 : * don't decrease it to the point that we have no room for tuples. (That
1818 : * case is only likely to occur if sorting pass-by-value Datums; in all
1819 : * other scenarios the memtuples[] array is unlikely to occupy more than
1820 : * half of allowedMem. In the pass-by-value case it's not important to
1821 : * account for tuple space, so we don't care if LACKMEM becomes
1822 : * inaccurate.)
1823 : */
1824 599 : tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
1825 :
1826 599 : if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1827 521 : USEMEM(state, tapeSpace);
1828 :
1829 : /*
1830 : * Make sure that the temp file(s) underlying the tape set are created in
1831 : * suitable temp tablespaces. For parallel sorts, this should have been
1832 : * called already, but it doesn't matter if it is called a second time.
1833 : */
1834 599 : PrepareTempTablespaces();
1835 599 : }
1836 :
1837 : /*
1838 : * selectnewtape -- select next tape to output to.
1839 : *
1840 : * This is called after finishing a run when we know another run
1841 : * must be started. This is used both when building the initial
1842 : * runs, and during merge passes.
1843 : */
1844 : static void
1845 1200 : selectnewtape(Tuplesortstate *state)
1846 : {
1847 : /*
1848 : * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
1849 : * both zero. On each call, we create a new output tape to hold the next
1850 : * run, until maxTapes is reached. After that, we assign new runs to the
1851 : * existing tapes in a round robin fashion.
1852 : */
1853 1200 : if (state->nOutputTapes < state->maxTapes)
1854 : {
1855 : /* Create a new tape to hold the next run */
1856 : Assert(state->outputTapes[state->nOutputRuns] == NULL);
1857 : Assert(state->nOutputRuns == state->nOutputTapes);
1858 812 : state->destTape = LogicalTapeCreate(state->tapeset);
1859 812 : state->outputTapes[state->nOutputTapes] = state->destTape;
1860 812 : state->nOutputTapes++;
1861 812 : state->nOutputRuns++;
1862 : }
1863 : else
1864 : {
1865 : /*
1866 : * We have reached the max number of tapes. Append to an existing
1867 : * tape.
1868 : */
1869 388 : state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
1870 388 : state->nOutputRuns++;
1871 : }
1872 1200 : }
1873 :
1874 : /*
1875 : * Initialize the slab allocation arena, for the given number of slots.
1876 : */
1877 : static void
1878 220 : init_slab_allocator(Tuplesortstate *state, int numSlots)
1879 : {
1880 220 : if (numSlots > 0)
1881 : {
1882 : char *p;
1883 : int i;
1884 :
1885 204 : state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
1886 204 : state->slabMemoryEnd = state->slabMemoryBegin +
1887 204 : numSlots * SLAB_SLOT_SIZE;
1888 204 : state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
1889 204 : USEMEM(state, numSlots * SLAB_SLOT_SIZE);
1890 :
1891 204 : p = state->slabMemoryBegin;
1892 778 : for (i = 0; i < numSlots - 1; i++)
1893 : {
1894 574 : ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
1895 574 : p += SLAB_SLOT_SIZE;
1896 : }
1897 204 : ((SlabSlot *) p)->nextfree = NULL;
1898 : }
1899 : else
1900 : {
1901 16 : state->slabMemoryBegin = state->slabMemoryEnd = NULL;
1902 16 : state->slabFreeHead = NULL;
1903 : }
1904 220 : state->slabAllocatorUsed = true;
1905 220 : }
1906 :
1907 : /*
1908 : * mergeruns -- merge all the completed initial runs.
1909 : *
1910 : * This implements the Balanced k-Way Merge Algorithm. All input data has
1911 : * already been written to initial runs on tape (see dumptuples).
1912 : */
1913 : static void
1914 220 : mergeruns(Tuplesortstate *state)
1915 : {
1916 : int tapenum;
1917 :
1918 : Assert(state->status == TSS_BUILDRUNS);
1919 : Assert(state->memtupcount == 0);
1920 :
1921 220 : if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
1922 : {
1923 : /*
1924 : * If there are multiple runs to be merged, when we go to read back
1925 : * tuples from disk, abbreviated keys will not have been stored, and
1926 : * we don't care to regenerate them. Disable abbreviation from this
1927 : * point on.
1928 : */
1929 19 : state->base.sortKeys->abbrev_converter = NULL;
1930 19 : state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
1931 :
1932 : /* Not strictly necessary, but be tidy */
1933 19 : state->base.sortKeys->abbrev_abort = NULL;
1934 19 : state->base.sortKeys->abbrev_full_comparator = NULL;
1935 : }
1936 :
1937 : /*
1938 : * Reset tuple memory. We've freed all the tuples that we previously
1939 : * allocated. We will use the slab allocator from now on.
1940 : */
1941 220 : MemoryContextResetOnly(state->base.tuplecontext);
1942 :
1943 : /*
1944 : * We no longer need a large memtuples array. (We will allocate a smaller
1945 : * one for the heap later.)
1946 : */
1947 220 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1948 220 : pfree(state->memtuples);
1949 220 : state->memtuples = NULL;
1950 :
1951 : /*
1952 : * Initialize the slab allocator. We need one slab slot per input tape,
1953 : * for the tuples in the heap, plus one to hold the tuple last returned
1954 : * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
1955 : * however, we don't need to do allocate anything.)
1956 : *
1957 : * In a multi-pass merge, we could shrink this allocation for the last
1958 : * merge pass, if it has fewer tapes than previous passes, but we don't
1959 : * bother.
1960 : *
1961 : * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
1962 : * to track memory usage of individual tuples.
1963 : */
1964 220 : if (state->base.tuples)
1965 204 : init_slab_allocator(state, state->nOutputTapes + 1);
1966 : else
1967 16 : init_slab_allocator(state, 0);
1968 :
1969 : /*
1970 : * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
1971 : * from each input tape.
1972 : *
1973 : * We could shrink this, too, between passes in a multi-pass merge, but we
1974 : * don't bother. (The initial input tapes are still in outputTapes. The
1975 : * number of input tapes will not increase between passes.)
1976 : */
1977 220 : state->memtupsize = state->nOutputTapes;
1978 440 : state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
1979 220 : state->nOutputTapes * sizeof(SortTuple));
1980 220 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1981 :
1982 : /*
1983 : * Use all the remaining memory we have available for tape buffers among
1984 : * all the input tapes. At the beginning of each merge pass, we will
1985 : * divide this memory between the input and output tapes in the pass.
1986 : */
1987 220 : state->tape_buffer_mem = state->availMem;
1988 220 : USEMEM(state, state->tape_buffer_mem);
1989 220 : if (trace_sort)
1990 0 : elog(LOG, "worker %d using %zu KB of memory for tape buffers",
1991 : state->worker, state->tape_buffer_mem / 1024);
1992 :
1993 : for (;;)
1994 : {
1995 : /*
1996 : * On the first iteration, or if we have read all the runs from the
1997 : * input tapes in a multi-pass merge, it's time to start a new pass.
1998 : * Rewind all the output tapes, and make them inputs for the next
1999 : * pass.
2000 : */
2001 312 : if (state->nInputRuns == 0)
2002 : {
2003 : int64 input_buffer_size;
2004 :
2005 : /* Close the old, emptied, input tapes */
2006 240 : if (state->nInputTapes > 0)
2007 : {
2008 140 : for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2009 120 : LogicalTapeClose(state->inputTapes[tapenum]);
2010 20 : pfree(state->inputTapes);
2011 : }
2012 :
2013 : /* Previous pass's outputs become next pass's inputs. */
2014 240 : state->inputTapes = state->outputTapes;
2015 240 : state->nInputTapes = state->nOutputTapes;
2016 240 : state->nInputRuns = state->nOutputRuns;
2017 :
2018 : /*
2019 : * Reset output tape variables. The actual LogicalTapes will be
2020 : * created as needed, here we only allocate the array to hold
2021 : * them.
2022 : */
2023 240 : state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
2024 240 : state->nOutputTapes = 0;
2025 240 : state->nOutputRuns = 0;
2026 :
2027 : /*
2028 : * Redistribute the memory allocated for tape buffers, among the
2029 : * new input and output tapes.
2030 : */
2031 240 : input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
2032 : state->nInputTapes,
2033 : state->nInputRuns,
2034 : state->maxTapes);
2035 :
2036 240 : if (trace_sort)
2037 0 : elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2038 : state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
2039 : pg_rusage_show(&state->ru_start));
2040 :
2041 : /* Prepare the new input tapes for merge pass. */
2042 942 : for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2043 702 : LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
2044 :
2045 : /*
2046 : * If there's just one run left on each input tape, then only one
2047 : * merge pass remains. If we don't have to produce a materialized
2048 : * sorted tape, we can stop at this point and do the final merge
2049 : * on-the-fly.
2050 : */
2051 240 : if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
2052 226 : && state->nInputRuns <= state->nInputTapes
2053 206 : && !WORKER(state))
2054 : {
2055 : /* Tell logtape.c we won't be writing anymore */
2056 206 : LogicalTapeSetForgetFreeSpace(state->tapeset);
2057 : /* Initialize for the final merge pass */
2058 206 : beginmerge(state);
2059 206 : state->status = TSS_FINALMERGE;
2060 206 : return;
2061 : }
2062 : }
2063 :
2064 : /* Select an output tape */
2065 106 : selectnewtape(state);
2066 :
2067 : /* Merge one run from each input tape. */
2068 106 : mergeonerun(state);
2069 :
2070 : /*
2071 : * If the input tapes are empty, and we output only one output run,
2072 : * we're done. The current output tape contains the final result.
2073 : */
2074 106 : if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
2075 14 : break;
2076 : }
2077 :
2078 : /*
2079 : * Done. The result is on a single run on a single tape.
2080 : */
2081 14 : state->result_tape = state->outputTapes[0];
2082 14 : if (!WORKER(state))
2083 14 : LogicalTapeFreeze(state->result_tape, NULL);
2084 : else
2085 0 : worker_freeze_result_tape(state);
2086 14 : state->status = TSS_SORTEDONTAPE;
2087 :
2088 : /* Close all the now-empty input tapes, to release their read buffers. */
2089 74 : for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2090 60 : LogicalTapeClose(state->inputTapes[tapenum]);
2091 : }
2092 :
2093 : /*
2094 : * Merge one run from each input tape.
2095 : */
2096 : static void
2097 106 : mergeonerun(Tuplesortstate *state)
2098 : {
2099 : int srcTapeIndex;
2100 : LogicalTape *srcTape;
2101 :
2102 : /*
2103 : * Start the merge by loading one tuple from each active source tape into
2104 : * the heap.
2105 : */
2106 106 : beginmerge(state);
2107 :
2108 : Assert(state->slabAllocatorUsed);
2109 :
2110 : /*
2111 : * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2112 : * out, and replacing it with next tuple from same tape (if there is
2113 : * another one).
2114 : */
2115 580394 : while (state->memtupcount > 0)
2116 : {
2117 : SortTuple stup;
2118 :
2119 : /* write the tuple to destTape */
2120 580288 : srcTapeIndex = state->memtuples[0].srctape;
2121 580288 : srcTape = state->inputTapes[srcTapeIndex];
2122 580288 : WRITETUP(state, state->destTape, &state->memtuples[0]);
2123 :
2124 : /* recycle the slot of the tuple we just wrote out, for the next read */
2125 580288 : if (state->memtuples[0].tuple)
2126 490232 : RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2127 :
2128 : /*
2129 : * pull next tuple from the tape, and replace the written-out tuple in
2130 : * the heap with it.
2131 : */
2132 580288 : if (mergereadnext(state, srcTape, &stup))
2133 : {
2134 579720 : stup.srctape = srcTapeIndex;
2135 579720 : tuplesort_heap_replace_top(state, &stup);
2136 : }
2137 : else
2138 : {
2139 568 : tuplesort_heap_delete_top(state);
2140 568 : state->nInputRuns--;
2141 : }
2142 : }
2143 :
2144 : /*
2145 : * When the heap empties, we're done. Write an end-of-run marker on the
2146 : * output tape.
2147 : */
2148 106 : markrunend(state->destTape);
2149 106 : }
2150 :
2151 : /*
2152 : * beginmerge - initialize for a merge pass
2153 : *
2154 : * Fill the merge heap with the first tuple from each input tape.
2155 : */
2156 : static void
2157 312 : beginmerge(Tuplesortstate *state)
2158 : {
2159 : int activeTapes;
2160 : int srcTapeIndex;
2161 :
2162 : /* Heap should be empty here */
2163 : Assert(state->memtupcount == 0);
2164 :
2165 312 : activeTapes = Min(state->nInputTapes, state->nInputRuns);
2166 :
2167 1402 : for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
2168 : {
2169 : SortTuple tup;
2170 :
2171 1090 : if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
2172 : {
2173 883 : tup.srctape = srcTapeIndex;
2174 883 : tuplesort_heap_insert(state, &tup);
2175 : }
2176 : }
2177 312 : }
2178 :
2179 : /*
2180 : * mergereadnext - read next tuple from one merge input tape
2181 : *
2182 : * Returns false on EOF.
2183 : */
2184 : static bool
2185 3325210 : mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
2186 : {
2187 : unsigned int tuplen;
2188 :
2189 : /* read next tuple, if any */
2190 3325210 : if ((tuplen = getlen(srcTape, true)) == 0)
2191 1058 : return false;
2192 3324152 : READTUP(state, stup, srcTape, tuplen);
2193 :
2194 3324152 : return true;
2195 : }
2196 :
2197 : /*
2198 : * dumptuples - remove tuples from memtuples and write initial run to tape
2199 : *
2200 : * When alltuples = true, dump everything currently in memory. (This case is
2201 : * only used at end of input data.)
2202 : */
2203 : static void
2204 734632 : dumptuples(Tuplesortstate *state, bool alltuples)
2205 : {
2206 : int memtupwrite;
2207 : int i;
2208 :
2209 : /*
2210 : * Nothing to do if we still fit in available memory and have array slots,
2211 : * unless this is the final call during initial run generation.
2212 : */
2213 734632 : if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2214 734007 : !alltuples)
2215 733538 : return;
2216 :
2217 : /*
2218 : * Final call might require no sorting, in rare cases where we just so
2219 : * happen to have previously LACKMEM()'d at the point where exactly all
2220 : * remaining tuples are loaded into memory, just before input was
2221 : * exhausted. In general, short final runs are quite possible, but avoid
2222 : * creating a completely empty run. In a worker, though, we must produce
2223 : * at least one tape, even if it's empty.
2224 : */
2225 1094 : if (state->memtupcount == 0 && state->currentRun > 0)
2226 0 : return;
2227 :
2228 : Assert(state->status == TSS_BUILDRUNS);
2229 :
2230 : /*
2231 : * It seems unlikely that this limit will ever be exceeded, but take no
2232 : * chances
2233 : */
2234 1094 : if (state->currentRun == INT_MAX)
2235 0 : ereport(ERROR,
2236 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2237 : errmsg("cannot have more than %d runs for an external sort",
2238 : INT_MAX)));
2239 :
2240 1094 : if (state->currentRun > 0)
2241 625 : selectnewtape(state);
2242 :
2243 1094 : state->currentRun++;
2244 :
2245 1094 : if (trace_sort)
2246 0 : elog(LOG, "worker %d starting quicksort of run %d: %s",
2247 : state->worker, state->currentRun,
2248 : pg_rusage_show(&state->ru_start));
2249 :
2250 : /*
2251 : * Sort all tuples accumulated within the allowed amount of memory for
2252 : * this run.
2253 : */
2254 1094 : tuplesort_sort_memtuples(state);
2255 :
2256 1094 : if (trace_sort)
2257 0 : elog(LOG, "worker %d finished quicksort of run %d: %s",
2258 : state->worker, state->currentRun,
2259 : pg_rusage_show(&state->ru_start));
2260 :
2261 1094 : memtupwrite = state->memtupcount;
2262 3085094 : for (i = 0; i < memtupwrite; i++)
2263 : {
2264 3084000 : SortTuple *stup = &state->memtuples[i];
2265 :
2266 3084000 : WRITETUP(state, state->destTape, stup);
2267 : }
2268 :
2269 1094 : state->memtupcount = 0;
2270 :
2271 : /*
2272 : * Reset tuple memory. We've freed all of the tuples that we previously
2273 : * allocated. It's important to avoid fragmentation when there is a stark
2274 : * change in the sizes of incoming tuples. In bounded sorts,
2275 : * fragmentation due to AllocSetFree's bucketing by size class might be
2276 : * particularly bad if this step wasn't taken.
2277 : */
2278 1094 : MemoryContextReset(state->base.tuplecontext);
2279 :
2280 : /*
2281 : * Now update the memory accounting to subtract the memory used by the
2282 : * tuple.
2283 : */
2284 1094 : FREEMEM(state, state->tupleMem);
2285 1094 : state->tupleMem = 0;
2286 :
2287 1094 : markrunend(state->destTape);
2288 :
2289 1094 : if (trace_sort)
2290 0 : elog(LOG, "worker %d finished writing run %d to tape %d: %s",
2291 : state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
2292 : pg_rusage_show(&state->ru_start));
2293 : }
2294 :
2295 : /*
2296 : * tuplesort_rescan - rewind and replay the scan
2297 : */
2298 : void
2299 29 : tuplesort_rescan(Tuplesortstate *state)
2300 : {
2301 29 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2302 :
2303 : Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2304 :
2305 29 : switch (state->status)
2306 : {
2307 24 : case TSS_SORTEDINMEM:
2308 24 : state->current = 0;
2309 24 : state->eof_reached = false;
2310 24 : state->markpos_offset = 0;
2311 24 : state->markpos_eof = false;
2312 24 : break;
2313 5 : case TSS_SORTEDONTAPE:
2314 5 : LogicalTapeRewindForRead(state->result_tape, 0);
2315 5 : state->eof_reached = false;
2316 5 : state->markpos_block = 0L;
2317 5 : state->markpos_offset = 0;
2318 5 : state->markpos_eof = false;
2319 5 : break;
2320 0 : default:
2321 0 : elog(ERROR, "invalid tuplesort state");
2322 : break;
2323 : }
2324 :
2325 29 : MemoryContextSwitchTo(oldcontext);
2326 29 : }
2327 :
2328 : /*
2329 : * tuplesort_markpos - saves current position in the merged sort file
2330 : */
2331 : void
2332 386309 : tuplesort_markpos(Tuplesortstate *state)
2333 : {
2334 386309 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2335 :
2336 : Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2337 :
2338 386309 : switch (state->status)
2339 : {
2340 380437 : case TSS_SORTEDINMEM:
2341 380437 : state->markpos_offset = state->current;
2342 380437 : state->markpos_eof = state->eof_reached;
2343 380437 : break;
2344 5872 : case TSS_SORTEDONTAPE:
2345 5872 : LogicalTapeTell(state->result_tape,
2346 : &state->markpos_block,
2347 : &state->markpos_offset);
2348 5872 : state->markpos_eof = state->eof_reached;
2349 5872 : break;
2350 0 : default:
2351 0 : elog(ERROR, "invalid tuplesort state");
2352 : break;
2353 : }
2354 :
2355 386309 : MemoryContextSwitchTo(oldcontext);
2356 386309 : }
2357 :
2358 : /*
2359 : * tuplesort_restorepos - restores current position in merged sort file to
2360 : * last saved position
2361 : */
2362 : void
2363 24037 : tuplesort_restorepos(Tuplesortstate *state)
2364 : {
2365 24037 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2366 :
2367 : Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2368 :
2369 24037 : switch (state->status)
2370 : {
2371 19909 : case TSS_SORTEDINMEM:
2372 19909 : state->current = state->markpos_offset;
2373 19909 : state->eof_reached = state->markpos_eof;
2374 19909 : break;
2375 4128 : case TSS_SORTEDONTAPE:
2376 4128 : LogicalTapeSeek(state->result_tape,
2377 : state->markpos_block,
2378 : state->markpos_offset);
2379 4128 : state->eof_reached = state->markpos_eof;
2380 4128 : break;
2381 0 : default:
2382 0 : elog(ERROR, "invalid tuplesort state");
2383 : break;
2384 : }
2385 :
2386 24037 : MemoryContextSwitchTo(oldcontext);
2387 24037 : }
2388 :
2389 : /*
2390 : * tuplesort_get_stats - extract summary statistics
2391 : *
2392 : * This can be called after tuplesort_performsort() finishes to obtain
2393 : * printable summary information about how the sort was performed.
2394 : */
2395 : void
2396 264 : tuplesort_get_stats(Tuplesortstate *state,
2397 : TuplesortInstrumentation *stats)
2398 : {
2399 : /*
2400 : * Note: it might seem we should provide both memory and disk usage for a
2401 : * disk-based sort. However, the current code doesn't track memory space
2402 : * accurately once we have begun to return tuples to the caller (since we
2403 : * don't account for pfree's the caller is expected to do), so we cannot
2404 : * rely on availMem in a disk sort. This does not seem worth the overhead
2405 : * to fix. Is it worth creating an API for the memory context code to
2406 : * tell us how much is actually used in sortcontext?
2407 : */
2408 264 : tuplesort_updatemax(state);
2409 :
2410 264 : if (state->isMaxSpaceDisk)
2411 4 : stats->spaceType = SORT_SPACE_TYPE_DISK;
2412 : else
2413 260 : stats->spaceType = SORT_SPACE_TYPE_MEMORY;
2414 264 : stats->spaceUsed = (state->maxSpace + 1023) / 1024;
2415 :
2416 264 : switch (state->maxSpaceStatus)
2417 : {
2418 260 : case TSS_SORTEDINMEM:
2419 260 : if (state->boundUsed)
2420 28 : stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
2421 : else
2422 232 : stats->sortMethod = SORT_TYPE_QUICKSORT;
2423 260 : break;
2424 0 : case TSS_SORTEDONTAPE:
2425 0 : stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
2426 0 : break;
2427 4 : case TSS_FINALMERGE:
2428 4 : stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
2429 4 : break;
2430 0 : default:
2431 0 : stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
2432 0 : break;
2433 : }
2434 264 : }
2435 :
2436 : /*
2437 : * Convert TuplesortMethod to a string.
2438 : */
2439 : const char *
2440 196 : tuplesort_method_name(TuplesortMethod m)
2441 : {
2442 196 : switch (m)
2443 : {
2444 0 : case SORT_TYPE_STILL_IN_PROGRESS:
2445 0 : return "still in progress";
2446 28 : case SORT_TYPE_TOP_N_HEAPSORT:
2447 28 : return "top-N heapsort";
2448 164 : case SORT_TYPE_QUICKSORT:
2449 164 : return "quicksort";
2450 0 : case SORT_TYPE_EXTERNAL_SORT:
2451 0 : return "external sort";
2452 4 : case SORT_TYPE_EXTERNAL_MERGE:
2453 4 : return "external merge";
2454 : }
2455 :
2456 0 : return "unknown";
2457 : }
2458 :
2459 : /*
2460 : * Convert TuplesortSpaceType to a string.
2461 : */
2462 : const char *
2463 172 : tuplesort_space_type_name(TuplesortSpaceType t)
2464 : {
2465 : Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
2466 172 : return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
2467 : }
2468 :
2469 :
2470 : /*
2471 : * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2472 : */
2473 :
2474 : /*
2475 : * Convert the existing unordered array of SortTuples to a bounded heap,
2476 : * discarding all but the smallest "state->bound" tuples.
2477 : *
2478 : * When working with a bounded heap, we want to keep the largest entry
2479 : * at the root (array entry zero), instead of the smallest as in the normal
2480 : * sort case. This allows us to discard the largest entry cheaply.
2481 : * Therefore, we temporarily reverse the sort direction.
2482 : */
2483 : static void
2484 255 : make_bounded_heap(Tuplesortstate *state)
2485 : {
2486 255 : int tupcount = state->memtupcount;
2487 : int i;
2488 :
2489 : Assert(state->status == TSS_INITIAL);
2490 : Assert(state->bounded);
2491 : Assert(tupcount >= state->bound);
2492 : Assert(SERIAL(state));
2493 :
2494 : /* Reverse sort direction so largest entry will be at root */
2495 255 : reversedirection(state);
2496 :
2497 255 : state->memtupcount = 0; /* make the heap empty */
2498 30240 : for (i = 0; i < tupcount; i++)
2499 : {
2500 29985 : if (state->memtupcount < state->bound)
2501 : {
2502 : /* Insert next tuple into heap */
2503 : /* Must copy source tuple to avoid possible overwrite */
2504 14865 : SortTuple stup = state->memtuples[i];
2505 :
2506 14865 : tuplesort_heap_insert(state, &stup);
2507 : }
2508 : else
2509 : {
2510 : /*
2511 : * The heap is full. Replace the largest entry with the new
2512 : * tuple, or just discard it, if it's larger than anything already
2513 : * in the heap.
2514 : */
2515 15120 : if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2516 : {
2517 6969 : free_sort_tuple(state, &state->memtuples[i]);
2518 6969 : CHECK_FOR_INTERRUPTS();
2519 : }
2520 : else
2521 8151 : tuplesort_heap_replace_top(state, &state->memtuples[i]);
2522 : }
2523 : }
2524 :
2525 : Assert(state->memtupcount == state->bound);
2526 255 : state->status = TSS_BOUNDED;
2527 255 : }
2528 :
2529 : /*
2530 : * Convert the bounded heap to a properly-sorted array
2531 : */
2532 : static void
2533 255 : sort_bounded_heap(Tuplesortstate *state)
2534 : {
2535 255 : int tupcount = state->memtupcount;
2536 :
2537 : Assert(state->status == TSS_BOUNDED);
2538 : Assert(state->bounded);
2539 : Assert(tupcount == state->bound);
2540 : Assert(SERIAL(state));
2541 :
2542 : /*
2543 : * We can unheapify in place because each delete-top call will remove the
2544 : * largest entry, which we can promptly store in the newly freed slot at
2545 : * the end. Once we're down to a single-entry heap, we're done.
2546 : */
2547 14865 : while (state->memtupcount > 1)
2548 : {
2549 14610 : SortTuple stup = state->memtuples[0];
2550 :
2551 : /* this sifts-up the next-largest entry and decreases memtupcount */
2552 14610 : tuplesort_heap_delete_top(state);
2553 14610 : state->memtuples[state->memtupcount] = stup;
2554 : }
2555 255 : state->memtupcount = tupcount;
2556 :
2557 : /*
2558 : * Reverse sort direction back to the original state. This is not
2559 : * actually necessary but seems like a good idea for tidiness.
2560 : */
2561 255 : reversedirection(state);
2562 :
2563 255 : state->status = TSS_SORTEDINMEM;
2564 255 : state->boundUsed = true;
2565 255 : }
2566 :
2567 :
2568 : /* radix sort routines */
2569 :
2570 : /*
2571 : * Retrieve byte from datum, indexed by 'level': 0 for MSB, 7 for LSB
2572 : */
2573 : static inline uint8
2574 15756245 : current_byte(Datum key, int level)
2575 : {
2576 15756245 : int shift = (sizeof(Datum) - 1 - level) * BITS_PER_BYTE;
2577 :
2578 15756245 : return (key >> shift) & 0xFF;
2579 : }
2580 :
2581 : /*
2582 : * Normalize datum such that unsigned comparison is order-preserving,
2583 : * taking ASC/DESC into account as well.
2584 : */
2585 : static inline Datum
2586 15784903 : normalize_datum(Datum orig, SortSupport ssup)
2587 : {
2588 : Datum norm_datum1;
2589 :
2590 15784903 : if (ssup->comparator == ssup_datum_signed_cmp)
2591 : {
2592 1107346 : norm_datum1 = orig + ((uint64) PG_INT64_MAX) + 1;
2593 : }
2594 14677557 : else if (ssup->comparator == ssup_datum_int32_cmp)
2595 : {
2596 : /*
2597 : * First truncate to uint32. Technically, we don't need to do this,
2598 : * but it forces the upper half of the datum to be zero regardless of
2599 : * sign.
2600 : */
2601 7713043 : uint32 u32 = DatumGetUInt32(orig) + ((uint32) PG_INT32_MAX) + 1;
2602 :
2603 7713043 : norm_datum1 = UInt32GetDatum(u32);
2604 : }
2605 : else
2606 : {
2607 : Assert(ssup->comparator == ssup_datum_unsigned_cmp);
2608 6964514 : norm_datum1 = orig;
2609 : }
2610 :
2611 15784903 : if (ssup->ssup_reverse)
2612 804395 : norm_datum1 = ~norm_datum1;
2613 :
2614 15784903 : return norm_datum1;
2615 : }
2616 :
2617 : /*
2618 : * radix_sort_recursive
2619 : *
2620 : * Radix sort by (pass-by-value) datum1, diverting to qsort_tuple()
2621 : * for tiebreaks.
2622 : *
2623 : * This is a modification of
2624 : * ska_byte_sort() from https://github.com/skarupke/ska_sort
2625 : * The original copyright notice follows:
2626 : *
2627 : * Copyright Malte Skarupke 2016.
2628 : * Distributed under the Boost Software License, Version 1.0.
2629 : *
2630 : * Boost Software License - Version 1.0 - August 17th, 2003
2631 : *
2632 : * Permission is hereby granted, free of charge, to any person or organization
2633 : * obtaining a copy of the software and accompanying documentation covered by
2634 : * this license (the "Software") to use, reproduce, display, distribute,
2635 : * execute, and transmit the Software, and to prepare derivative works of the
2636 : * Software, and to permit third-parties to whom the Software is furnished to
2637 : * do so, all subject to the following:
2638 : *
2639 : * The copyright notices in the Software and this entire statement, including
2640 : * the above license grant, this restriction and the following disclaimer,
2641 : * must be included in all copies of the Software, in whole or in part, and
2642 : * all derivative works of the Software, unless such copies or derivative
2643 : * works are solely in the form of machine-executable object code generated by
2644 : * a source language processor.
2645 : *
2646 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
2647 : * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
2648 : * FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
2649 : * SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
2650 : * FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
2651 : * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
2652 : * DEALINGS IN THE SOFTWARE.
2653 : */
2654 : static void
2655 28658 : radix_sort_recursive(SortTuple *begin, size_t n_elems, int level, Tuplesortstate *state)
2656 : {
2657 28658 : RadixSortInfo partitions[256] = {0};
2658 : uint8 remaining_partitions[256];
2659 28658 : size_t total = 0;
2660 28658 : int num_partitions = 0;
2661 : int num_remaining;
2662 28658 : SortSupport ssup = &state->base.sortKeys[0];
2663 : Datum ref_datum;
2664 28658 : Datum common_upper_bits = 0;
2665 28658 : size_t start_offset = 0;
2666 28658 : SortTuple *partition_begin = begin;
2667 : int next_level;
2668 :
2669 : /* count number of occurrences of each byte */
2670 28658 : ref_datum = normalize_datum(begin[0].datum1, ssup);
2671 15784903 : for (SortTuple *st = begin; st < begin + n_elems; st++)
2672 : {
2673 : Datum this_datum;
2674 : uint8 this_partition;
2675 :
2676 15756245 : this_datum = normalize_datum(st->datum1, ssup);
2677 : /* accumulate bits different from the reference datum */
2678 15756245 : common_upper_bits |= ref_datum ^ this_datum;
2679 :
2680 : /* extract the byte for this level from the normalized datum */
2681 15756245 : this_partition = current_byte(this_datum, level);
2682 :
2683 : /* save it for the permutation step */
2684 15756245 : st->curbyte = this_partition;
2685 :
2686 15756245 : partitions[this_partition].count++;
2687 :
2688 15756245 : CHECK_FOR_INTERRUPTS();
2689 : }
2690 :
2691 : /* compute partition offsets */
2692 7365106 : for (int i = 0; i < 256; i++)
2693 : {
2694 7336448 : size_t count = partitions[i].count;
2695 :
2696 7336448 : if (count != 0)
2697 : {
2698 1813770 : partitions[i].offset = total;
2699 1813770 : total += count;
2700 1813770 : remaining_partitions[num_partitions] = i;
2701 1813770 : num_partitions++;
2702 : }
2703 7336448 : partitions[i].next_offset = total;
2704 : }
2705 :
2706 : /*
2707 : * Swap tuples to correct partition.
2708 : *
2709 : * In traditional American flag sort, a swap sends the current element to
2710 : * the correct partition, but the array pointer only advances if the
2711 : * partner of the swap happens to be an element that belongs in the
2712 : * current partition. That only requires one pass through the array, but
2713 : * the disadvantage is we don't know if the pointer can advance until the
2714 : * swap completes. Here lies the most interesting innovation from the
2715 : * upstream ska_byte_sort: After initiating the swap, we immediately
2716 : * proceed to the next element. This makes better use of CPU pipelining,
2717 : * but also means that we will often need multiple iterations of this
2718 : * loop. ska_byte_sort() maintains a separate list of which partitions
2719 : * haven't finished, which is updated every loop iteration. Here we simply
2720 : * check each partition during every iteration.
2721 : *
2722 : * If we started with a single partition, there is nothing to do. If a
2723 : * previous loop iteration results in only one partition that hasn't been
2724 : * counted as sorted, we know it's actually sorted and can exit the loop.
2725 : */
2726 28658 : num_remaining = num_partitions;
2727 114549 : while (num_remaining > 1)
2728 : {
2729 : /* start the count over */
2730 85891 : num_remaining = num_partitions;
2731 :
2732 7577056 : for (int i = 0; i < num_partitions; i++)
2733 : {
2734 7491165 : uint8 idx = remaining_partitions[i];
2735 :
2736 7491165 : for (SortTuple *st = begin + partitions[idx].offset;
2737 17821761 : st < begin + partitions[idx].next_offset;
2738 10330596 : st++)
2739 : {
2740 10330596 : size_t offset = partitions[st->curbyte].offset++;
2741 : SortTuple tmp;
2742 :
2743 : /* swap current tuple with destination position */
2744 : Assert(offset < n_elems);
2745 10330596 : tmp = *st;
2746 10330596 : *st = begin[offset];
2747 10330596 : begin[offset] = tmp;
2748 :
2749 10330596 : CHECK_FOR_INTERRUPTS();
2750 : };
2751 :
2752 : /* Is this partition sorted? */
2753 7491165 : if (partitions[idx].offset == partitions[idx].next_offset)
2754 5685941 : num_remaining--;
2755 : }
2756 : }
2757 :
2758 : /* recurse */
2759 :
2760 28658 : if (num_partitions == 1)
2761 : {
2762 : /*
2763 : * There is only one distinct byte at the current level. It can happen
2764 : * that some subsequent bytes are also the same for all input values,
2765 : * such as the upper bytes of small integers. To skip unproductive
2766 : * passes for that case, we compute the level where the input has more
2767 : * than one distinct byte, so that the next recursion can start there.
2768 : */
2769 5437 : if (common_upper_bits == 0)
2770 352 : next_level = sizeof(Datum);
2771 : else
2772 : {
2773 : int diffpos;
2774 :
2775 : /*
2776 : * The upper bits of common_upper_bits are zero where all datums
2777 : * have the same bits.
2778 : */
2779 5085 : diffpos = pg_leftmost_one_pos64(DatumGetUInt64(common_upper_bits));
2780 5085 : next_level = sizeof(Datum) - 1 - (diffpos / BITS_PER_BYTE);
2781 : }
2782 : }
2783 : else
2784 23221 : next_level = level + 1;
2785 :
2786 28658 : for (uint8 *rp = remaining_partitions;
2787 1842428 : rp < remaining_partitions + num_partitions;
2788 1813770 : rp++)
2789 : {
2790 1813770 : size_t end_offset = partitions[*rp].next_offset;
2791 1813770 : SortTuple *partition_end = begin + end_offset;
2792 1813770 : size_t num_elements = end_offset - start_offset;
2793 :
2794 1813770 : if (num_elements > 1)
2795 : {
2796 744436 : if (next_level < sizeof(Datum))
2797 : {
2798 638919 : if (num_elements < QSORT_THRESHOLD)
2799 : {
2800 612441 : qsort_tuple(partition_begin,
2801 : num_elements,
2802 : state->base.comparetup,
2803 : state);
2804 : }
2805 : else
2806 : {
2807 26478 : radix_sort_recursive(partition_begin,
2808 : num_elements,
2809 : next_level,
2810 : state);
2811 : }
2812 : }
2813 105517 : else if (state->base.onlyKey == NULL)
2814 : {
2815 : /*
2816 : * We've finished radix sort on all bytes of the pass-by-value
2817 : * datum (possibly abbreviated), now sort using the tiebreak
2818 : * comparator.
2819 : */
2820 24099 : qsort_tuple(partition_begin,
2821 : num_elements,
2822 : state->base.comparetup_tiebreak,
2823 : state);
2824 : }
2825 : }
2826 :
2827 1813770 : start_offset = end_offset;
2828 1813770 : partition_begin = partition_end;
2829 : }
2830 28658 : }
2831 :
2832 : /*
2833 : * Entry point for radix_sort_recursive
2834 : *
2835 : * Partition tuples by isnull1, then sort both partitions, using
2836 : * radix sort on the NOT NULL partition if it's large enough.
2837 : */
2838 : static void
2839 3629 : radix_sort_tuple(SortTuple *data, size_t n, Tuplesortstate *state)
2840 : {
2841 3629 : bool nulls_first = state->base.sortKeys[0].ssup_nulls_first;
2842 : SortTuple *null_start;
2843 : SortTuple *not_null_start;
2844 3629 : size_t d1 = 0,
2845 : d2,
2846 : null_count,
2847 : not_null_count;
2848 :
2849 : /*
2850 : * Find the first NOT NULL if NULLS FIRST, or first NULL if NULLS LAST.
2851 : * This also serves as a quick check for the common case where all tuples
2852 : * are NOT NULL in the first sort key.
2853 : */
2854 9122056 : while (d1 < n && data[d1].isnull1 == nulls_first)
2855 : {
2856 9118427 : d1++;
2857 9118427 : CHECK_FOR_INTERRUPTS();
2858 : }
2859 :
2860 : /*
2861 : * If we have more than one tuple left after the quick check, partition
2862 : * the remainder using branchless cyclic permutation, based on
2863 : * https://orlp.net/blog/branchless-lomuto-partitioning/
2864 : */
2865 : Assert(n > 0);
2866 3629 : if (d1 < n - 1)
2867 : {
2868 735 : size_t i = d1,
2869 735 : j = d1;
2870 735 : SortTuple tmp = data[d1]; /* create gap at front */
2871 :
2872 443880 : while (j < n - 1)
2873 : {
2874 : /* gap is at j, move i's element to gap */
2875 443145 : data[j] = data[i];
2876 : /* advance j to the first unknown element */
2877 443145 : j += 1;
2878 : /* move the first unknown element back to i */
2879 443145 : data[i] = data[j];
2880 : /* advance i if this element belongs in the left partition */
2881 443145 : i += (data[i].isnull1 == nulls_first);
2882 :
2883 443145 : CHECK_FOR_INTERRUPTS();
2884 : }
2885 :
2886 : /* place gap between left and right partitions */
2887 735 : data[j] = data[i];
2888 : /* restore the saved element */
2889 735 : data[i] = tmp;
2890 : /* assign it to the correct partition */
2891 735 : i += (data[i].isnull1 == nulls_first);
2892 :
2893 : /* d1 is now the number of elements in the left partition */
2894 735 : d1 = i;
2895 : }
2896 :
2897 3629 : d2 = n - d1;
2898 :
2899 : /* set pointers and counts for each partition */
2900 3629 : if (nulls_first)
2901 : {
2902 630 : null_start = data;
2903 630 : null_count = d1;
2904 630 : not_null_start = data + d1;
2905 630 : not_null_count = d2;
2906 : }
2907 : else
2908 : {
2909 2999 : not_null_start = data;
2910 2999 : not_null_count = d1;
2911 2999 : null_start = data + d1;
2912 2999 : null_count = d2;
2913 : }
2914 :
2915 3629 : for (SortTuple *st = null_start;
2916 6036 : st < null_start + null_count;
2917 2407 : st++)
2918 : Assert(st->isnull1 == true);
2919 3629 : for (SortTuple *st = not_null_start;
2920 9563533 : st < not_null_start + not_null_count;
2921 9559904 : st++)
2922 : Assert(st->isnull1 == false);
2923 :
2924 : /*
2925 : * Sort the NULL partition using tiebreak comparator, if necessary.
2926 : */
2927 3629 : if (state->base.onlyKey == NULL && null_count > 1)
2928 : {
2929 111 : qsort_tuple(null_start,
2930 : null_count,
2931 : state->base.comparetup_tiebreak,
2932 : state);
2933 : }
2934 :
2935 : /*
2936 : * Sort the NOT NULL partition, using radix sort if large enough,
2937 : * otherwise fall back to quicksort.
2938 : */
2939 3629 : if (not_null_count < QSORT_THRESHOLD)
2940 : {
2941 14 : qsort_tuple(not_null_start,
2942 : not_null_count,
2943 : state->base.comparetup,
2944 : state);
2945 : }
2946 : else
2947 : {
2948 3615 : bool presorted = true;
2949 :
2950 3615 : for (SortTuple *st = not_null_start + 1;
2951 4519127 : st < not_null_start + not_null_count;
2952 4515512 : st++)
2953 : {
2954 4517692 : if (COMPARETUP(state, st - 1, st) > 0)
2955 : {
2956 2180 : presorted = false;
2957 2180 : break;
2958 : }
2959 :
2960 4515512 : CHECK_FOR_INTERRUPTS();
2961 : }
2962 :
2963 3615 : if (presorted)
2964 1435 : return;
2965 : else
2966 : {
2967 2180 : radix_sort_recursive(not_null_start,
2968 : not_null_count,
2969 : 0,
2970 : state);
2971 : }
2972 : }
2973 : }
2974 :
2975 : /* Verify in-memory sort using standard comparator. */
2976 : static void
2977 3629 : verify_memtuples_sorted(Tuplesortstate *state)
2978 : {
2979 : #ifdef USE_ASSERT_CHECKING
2980 : for (SortTuple *st = state->memtuples + 1;
2981 : st < state->memtuples + state->memtupcount;
2982 : st++)
2983 : Assert(COMPARETUP(state, st - 1, st) <= 0);
2984 : #endif
2985 3629 : }
2986 :
2987 : /*
2988 : * Sort all memtuples using specialized routines.
2989 : *
2990 : * Quicksort or radix sort is used for small in-memory sorts,
2991 : * and external sort runs.
2992 : */
2993 : static void
2994 149879 : tuplesort_sort_memtuples(Tuplesortstate *state)
2995 : {
2996 : Assert(!LEADER(state));
2997 :
2998 149879 : if (state->memtupcount > 1)
2999 : {
3000 : /*
3001 : * Do we have the leading column's value or abbreviation in datum1?
3002 : */
3003 44651 : if (state->base.haveDatum1 && state->base.sortKeys)
3004 : {
3005 44581 : SortSupport ssup = &state->base.sortKeys[0];
3006 :
3007 : /* Does it compare as an integer? */
3008 44581 : if (state->memtupcount >= QSORT_THRESHOLD &&
3009 8319 : (ssup->comparator == ssup_datum_unsigned_cmp ||
3010 7802 : ssup->comparator == ssup_datum_signed_cmp ||
3011 7612 : ssup->comparator == ssup_datum_int32_cmp))
3012 : {
3013 3629 : radix_sort_tuple(state->memtuples,
3014 3629 : state->memtupcount,
3015 : state);
3016 3629 : verify_memtuples_sorted(state);
3017 3629 : return;
3018 : }
3019 : }
3020 :
3021 : /* Can we use the single-key sort function? */
3022 41022 : if (state->base.onlyKey != NULL)
3023 : {
3024 27517 : qsort_ssup(state->memtuples, state->memtupcount,
3025 27517 : state->base.onlyKey);
3026 : }
3027 : else
3028 : {
3029 13505 : qsort_tuple(state->memtuples,
3030 13505 : state->memtupcount,
3031 : state->base.comparetup,
3032 : state);
3033 : }
3034 : }
3035 : }
3036 :
3037 : /*
3038 : * Insert a new tuple into an empty or existing heap, maintaining the
3039 : * heap invariant. Caller is responsible for ensuring there's room.
3040 : *
3041 : * Note: For some callers, tuple points to a memtuples[] entry above the
3042 : * end of the heap. This is safe as long as it's not immediately adjacent
3043 : * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
3044 : * is, it might get overwritten before being moved into the heap!
3045 : */
3046 : static void
3047 15748 : tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
3048 : {
3049 : SortTuple *memtuples;
3050 : int j;
3051 :
3052 15748 : memtuples = state->memtuples;
3053 : Assert(state->memtupcount < state->memtupsize);
3054 :
3055 15748 : CHECK_FOR_INTERRUPTS();
3056 :
3057 : /*
3058 : * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
3059 : * using 1-based array indexes, not 0-based.
3060 : */
3061 15748 : j = state->memtupcount++;
3062 42682 : while (j > 0)
3063 : {
3064 38540 : int i = (j - 1) >> 1;
3065 :
3066 38540 : if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
3067 11606 : break;
3068 26934 : memtuples[j] = memtuples[i];
3069 26934 : j = i;
3070 : }
3071 15748 : memtuples[j] = *tuple;
3072 15748 : }
3073 :
3074 : /*
3075 : * Remove the tuple at state->memtuples[0] from the heap. Decrement
3076 : * memtupcount, and sift up to maintain the heap invariant.
3077 : *
3078 : * The caller has already free'd the tuple the top node points to,
3079 : * if necessary.
3080 : */
3081 : static void
3082 15461 : tuplesort_heap_delete_top(Tuplesortstate *state)
3083 : {
3084 15461 : SortTuple *memtuples = state->memtuples;
3085 : SortTuple *tuple;
3086 :
3087 15461 : if (--state->memtupcount <= 0)
3088 217 : return;
3089 :
3090 : /*
3091 : * Remove the last tuple in the heap, and re-insert it, by replacing the
3092 : * current top node with it.
3093 : */
3094 15244 : tuple = &memtuples[state->memtupcount];
3095 15244 : tuplesort_heap_replace_top(state, tuple);
3096 : }
3097 :
3098 : /*
3099 : * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
3100 : * maintain the heap invariant.
3101 : *
3102 : * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
3103 : * Heapsort, steps H3-H8).
3104 : */
3105 : static void
3106 3681329 : tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
3107 : {
3108 3681329 : SortTuple *memtuples = state->memtuples;
3109 : unsigned int i,
3110 : n;
3111 :
3112 : Assert(state->memtupcount >= 1);
3113 :
3114 3681329 : CHECK_FOR_INTERRUPTS();
3115 :
3116 : /*
3117 : * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
3118 : * This prevents overflow in the "2 * i + 1" calculation, since at the top
3119 : * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
3120 : */
3121 3681329 : n = state->memtupcount;
3122 3681329 : i = 0; /* i is where the "hole" is */
3123 : for (;;)
3124 1229408 : {
3125 4910737 : unsigned int j = 2 * i + 1;
3126 :
3127 4910737 : if (j >= n)
3128 778817 : break;
3129 5803030 : if (j + 1 < n &&
3130 1671110 : COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
3131 664210 : j++;
3132 4131920 : if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
3133 2902512 : break;
3134 1229408 : memtuples[i] = memtuples[j];
3135 1229408 : i = j;
3136 : }
3137 3681329 : memtuples[i] = *tuple;
3138 3681329 : }
3139 :
3140 : /*
3141 : * Function to reverse the sort direction from its current state
3142 : *
3143 : * It is not safe to call this when performing hash tuplesorts
3144 : */
3145 : static void
3146 510 : reversedirection(Tuplesortstate *state)
3147 : {
3148 510 : SortSupport sortKey = state->base.sortKeys;
3149 : int nkey;
3150 :
3151 1244 : for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
3152 : {
3153 734 : sortKey->ssup_reverse = !sortKey->ssup_reverse;
3154 734 : sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
3155 : }
3156 510 : }
3157 :
3158 :
3159 : /*
3160 : * Tape interface routines
3161 : */
3162 :
3163 : static unsigned int
3164 3522217 : getlen(LogicalTape *tape, bool eofOK)
3165 : {
3166 : unsigned int len;
3167 :
3168 3522217 : if (LogicalTapeRead(tape,
3169 : &len, sizeof(len)) != sizeof(len))
3170 0 : elog(ERROR, "unexpected end of tape");
3171 3522217 : if (len == 0 && !eofOK)
3172 0 : elog(ERROR, "unexpected end of data");
3173 3522217 : return len;
3174 : }
3175 :
3176 : static void
3177 1200 : markrunend(LogicalTape *tape)
3178 : {
3179 1200 : unsigned int len = 0;
3180 :
3181 1200 : LogicalTapeWrite(tape, &len, sizeof(len));
3182 1200 : }
3183 :
3184 : /*
3185 : * Get memory for tuple from within READTUP() routine.
3186 : *
3187 : * We use next free slot from the slab allocator, or palloc() if the tuple
3188 : * is too large for that.
3189 : */
3190 : void *
3191 3265976 : tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
3192 : {
3193 : SlabSlot *buf;
3194 :
3195 : /*
3196 : * We pre-allocate enough slots in the slab arena that we should never run
3197 : * out.
3198 : */
3199 : Assert(state->slabFreeHead);
3200 :
3201 3265976 : if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
3202 4 : return MemoryContextAlloc(state->base.sortcontext, tuplen);
3203 : else
3204 : {
3205 3265972 : buf = state->slabFreeHead;
3206 : /* Reuse this slot */
3207 3265972 : state->slabFreeHead = buf->nextfree;
3208 :
3209 3265972 : return buf;
3210 : }
3211 : }
3212 :
3213 :
3214 : /*
3215 : * Parallel sort routines
3216 : */
3217 :
3218 : /*
3219 : * tuplesort_estimate_shared - estimate required shared memory allocation
3220 : *
3221 : * nWorkers is an estimate of the number of workers (it's the number that
3222 : * will be requested).
3223 : */
3224 : Size
3225 131 : tuplesort_estimate_shared(int nWorkers)
3226 : {
3227 : Size tapesSize;
3228 :
3229 : Assert(nWorkers > 0);
3230 :
3231 : /* Make sure that BufFile shared state is MAXALIGN'd */
3232 131 : tapesSize = mul_size(sizeof(TapeShare), nWorkers);
3233 131 : tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
3234 :
3235 131 : return tapesSize;
3236 : }
3237 :
3238 : /*
3239 : * tuplesort_initialize_shared - initialize shared tuplesort state
3240 : *
3241 : * Must be called from leader process before workers are launched, to
3242 : * establish state needed up-front for worker tuplesortstates. nWorkers
3243 : * should match the argument passed to tuplesort_estimate_shared().
3244 : */
3245 : void
3246 179 : tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
3247 : {
3248 : int i;
3249 :
3250 : Assert(nWorkers > 0);
3251 :
3252 179 : SpinLockInit(&shared->mutex);
3253 179 : shared->currentWorker = 0;
3254 179 : shared->workersFinished = 0;
3255 179 : SharedFileSetInit(&shared->fileset, seg);
3256 179 : shared->nTapes = nWorkers;
3257 562 : for (i = 0; i < nWorkers; i++)
3258 : {
3259 383 : shared->tapes[i].firstblocknumber = 0L;
3260 : }
3261 179 : }
3262 :
3263 : /*
3264 : * tuplesort_attach_shared - attach to shared tuplesort state
3265 : *
3266 : * Must be called by all worker processes.
3267 : */
3268 : void
3269 201 : tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
3270 : {
3271 : /* Attach to SharedFileSet */
3272 201 : SharedFileSetAttach(&shared->fileset, seg);
3273 201 : }
3274 :
3275 : /*
3276 : * worker_get_identifier - Assign and return ordinal identifier for worker
3277 : *
3278 : * The order in which these are assigned is not well defined, and should not
3279 : * matter; worker numbers across parallel sort participants need only be
3280 : * distinct and gapless. logtape.c requires this.
3281 : *
3282 : * Note that the identifiers assigned from here have no relation to
3283 : * ParallelWorkerNumber number, to avoid making any assumption about
3284 : * caller's requirements. However, we do follow the ParallelWorkerNumber
3285 : * convention of representing a non-worker with worker number -1. This
3286 : * includes the leader, as well as serial Tuplesort processes.
3287 : */
3288 : static int
3289 379 : worker_get_identifier(Tuplesortstate *state)
3290 : {
3291 379 : Sharedsort *shared = state->shared;
3292 : int worker;
3293 :
3294 : Assert(WORKER(state));
3295 :
3296 379 : SpinLockAcquire(&shared->mutex);
3297 379 : worker = shared->currentWorker++;
3298 379 : SpinLockRelease(&shared->mutex);
3299 :
3300 379 : return worker;
3301 : }
3302 :
3303 : /*
3304 : * worker_freeze_result_tape - freeze worker's result tape for leader
3305 : *
3306 : * This is called by workers just after the result tape has been determined,
3307 : * instead of calling LogicalTapeFreeze() directly. They do so because
3308 : * workers require a few additional steps over similar serial
3309 : * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
3310 : * steps are around freeing now unneeded resources, and representing to
3311 : * leader that worker's input run is available for its merge.
3312 : *
3313 : * There should only be one final output run for each worker, which consists
3314 : * of all tuples that were originally input into worker.
3315 : */
3316 : static void
3317 379 : worker_freeze_result_tape(Tuplesortstate *state)
3318 : {
3319 379 : Sharedsort *shared = state->shared;
3320 : TapeShare output;
3321 :
3322 : Assert(WORKER(state));
3323 : Assert(state->result_tape != NULL);
3324 : Assert(state->memtupcount == 0);
3325 :
3326 : /*
3327 : * Free most remaining memory, in case caller is sensitive to our holding
3328 : * on to it. memtuples may not be a tiny merge heap at this point.
3329 : */
3330 379 : pfree(state->memtuples);
3331 : /* Be tidy */
3332 379 : state->memtuples = NULL;
3333 379 : state->memtupsize = 0;
3334 :
3335 : /*
3336 : * Parallel worker requires result tape metadata, which is to be stored in
3337 : * shared memory for leader
3338 : */
3339 379 : LogicalTapeFreeze(state->result_tape, &output);
3340 :
3341 : /* Store properties of output tape, and update finished worker count */
3342 379 : SpinLockAcquire(&shared->mutex);
3343 379 : shared->tapes[state->worker] = output;
3344 379 : shared->workersFinished++;
3345 379 : SpinLockRelease(&shared->mutex);
3346 379 : }
3347 :
3348 : /*
3349 : * worker_nomergeruns - dump memtuples in worker, without merging
3350 : *
3351 : * This called as an alternative to mergeruns() with a worker when no
3352 : * merging is required.
3353 : */
3354 : static void
3355 379 : worker_nomergeruns(Tuplesortstate *state)
3356 : {
3357 : Assert(WORKER(state));
3358 : Assert(state->result_tape == NULL);
3359 : Assert(state->nOutputRuns == 1);
3360 :
3361 379 : state->result_tape = state->destTape;
3362 379 : worker_freeze_result_tape(state);
3363 379 : }
3364 :
3365 : /*
3366 : * leader_takeover_tapes - create tapeset for leader from worker tapes
3367 : *
3368 : * So far, leader Tuplesortstate has performed no actual sorting. By now, all
3369 : * sorting has occurred in workers, all of which must have already returned
3370 : * from tuplesort_performsort().
3371 : *
3372 : * When this returns, leader process is left in a state that is virtually
3373 : * indistinguishable from it having generated runs as a serial external sort
3374 : * might have.
3375 : */
3376 : static void
3377 130 : leader_takeover_tapes(Tuplesortstate *state)
3378 : {
3379 130 : Sharedsort *shared = state->shared;
3380 130 : int nParticipants = state->nParticipants;
3381 : int workersFinished;
3382 : int j;
3383 :
3384 : Assert(LEADER(state));
3385 : Assert(nParticipants >= 1);
3386 :
3387 130 : SpinLockAcquire(&shared->mutex);
3388 130 : workersFinished = shared->workersFinished;
3389 130 : SpinLockRelease(&shared->mutex);
3390 :
3391 130 : if (nParticipants != workersFinished)
3392 0 : elog(ERROR, "cannot take over tapes before all workers finish");
3393 :
3394 : /*
3395 : * Create the tapeset from worker tapes, including a leader-owned tape at
3396 : * the end. Parallel workers are far more expensive than logical tapes,
3397 : * so the number of tapes allocated here should never be excessive.
3398 : */
3399 130 : inittapestate(state, nParticipants);
3400 130 : state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
3401 :
3402 : /*
3403 : * Set currentRun to reflect the number of runs we will merge (it's not
3404 : * used for anything, this is just pro forma)
3405 : */
3406 130 : state->currentRun = nParticipants;
3407 :
3408 : /*
3409 : * Initialize the state to look the same as after building the initial
3410 : * runs.
3411 : *
3412 : * There will always be exactly 1 run per worker, and exactly one input
3413 : * tape per run, because workers always output exactly 1 run, even when
3414 : * there were no input tuples for workers to sort.
3415 : */
3416 130 : state->inputTapes = NULL;
3417 130 : state->nInputTapes = 0;
3418 130 : state->nInputRuns = 0;
3419 :
3420 130 : state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
3421 130 : state->nOutputTapes = nParticipants;
3422 130 : state->nOutputRuns = nParticipants;
3423 :
3424 413 : for (j = 0; j < nParticipants; j++)
3425 : {
3426 283 : state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
3427 : }
3428 :
3429 130 : state->status = TSS_BUILDRUNS;
3430 130 : }
3431 :
3432 : /*
3433 : * Convenience routine to free a tuple previously loaded into sort memory
3434 : */
3435 : static void
3436 2145890 : free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
3437 : {
3438 2145890 : if (stup->tuple)
3439 : {
3440 2044155 : FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
3441 2044155 : pfree(stup->tuple);
3442 2044155 : stup->tuple = NULL;
3443 : }
3444 2145890 : }
3445 :
3446 : int
3447 2536788 : ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
3448 : {
3449 2536788 : if (x < y)
3450 868211 : return -1;
3451 1668577 : else if (x > y)
3452 640190 : return 1;
3453 : else
3454 1028387 : return 0;
3455 : }
3456 :
3457 : int
3458 1712801 : ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
3459 : {
3460 1712801 : int64 xx = DatumGetInt64(x);
3461 1712801 : int64 yy = DatumGetInt64(y);
3462 :
3463 1712801 : if (xx < yy)
3464 1254308 : return -1;
3465 458493 : else if (xx > yy)
3466 228686 : return 1;
3467 : else
3468 229807 : return 0;
3469 : }
3470 :
3471 : int
3472 121018448 : ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
3473 : {
3474 121018448 : int32 xx = DatumGetInt32(x);
3475 121018448 : int32 yy = DatumGetInt32(y);
3476 :
3477 121018448 : if (xx < yy)
3478 30252076 : return -1;
3479 90766372 : else if (xx > yy)
3480 26447884 : return 1;
3481 : else
3482 64318488 : return 0;
3483 : }
|