Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * tuplesort.c
4 : * Generalized tuple sorting routines.
5 : *
6 : * This module provides a generalized facility for tuple sorting, which can be
7 : * applied to different kinds of sortable objects. Implementation of
8 : * the particular sorting variants is given in tuplesortvariants.c.
9 : * This module works efficiently for both small and large amounts
10 : * of data. Small amounts are sorted in-memory using qsort(). Large
11 : * amounts are sorted using temporary files and a standard external sort
12 : * algorithm.
13 : *
14 : * See Knuth, volume 3, for more than you want to know about external
15 : * sorting algorithms. The algorithm we use is a balanced k-way merge.
16 : * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
17 : * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
18 : * merge is better. Knuth is assuming that tape drives are expensive
19 : * beasts, and in particular that there will always be many more runs than
20 : * tape drives. The polyphase merge algorithm was good at keeping all the
21 : * tape drives busy, but in our implementation a "tape drive" doesn't cost
22 : * much more than a few Kb of memory buffers, so we can afford to have
23 : * lots of them. In particular, if we can have as many tape drives as
24 : * sorted runs, we can eliminate any repeated I/O at all.
25 : *
26 : * Historically, we divided the input into sorted runs using replacement
27 : * selection, in the form of a priority tree implemented as a heap
28 : * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
29 : * for run generation.
30 : *
31 : * The approximate amount of memory allowed for any one sort operation
32 : * is specified in kilobytes by the caller (most pass work_mem). Initially,
33 : * we absorb tuples and simply store them in an unsorted array as long as
34 : * we haven't exceeded workMem. If we reach the end of the input without
35 : * exceeding workMem, we sort the array using qsort() and subsequently return
36 : * tuples just by scanning the tuple array sequentially. If we do exceed
37 : * workMem, we begin to emit tuples into sorted runs in temporary tapes.
38 : * When tuples are dumped in batch after quicksorting, we begin a new run
39 : * with a new output tape. If we reach the max number of tapes, we write
40 : * subsequent runs on the existing tapes in a round-robin fashion. We will
41 : * need multiple merge passes to finish the merge in that case. After the
42 : * end of the input is reached, we dump out remaining tuples in memory into
43 : * a final run, then merge the runs.
44 : *
45 : * When merging runs, we use a heap containing just the frontmost tuple from
46 : * each source run; we repeatedly output the smallest tuple and replace it
47 : * with the next tuple from its source tape (if any). When the heap empties,
48 : * the merge is complete. The basic merge algorithm thus needs very little
49 : * memory --- only M tuples for an M-way merge, and M is constrained to a
50 : * small number. However, we can still make good use of our full workMem
51 : * allocation by pre-reading additional blocks from each source tape. Without
52 : * prereading, our access pattern to the temporary file would be very erratic;
53 : * on average we'd read one block from each of M source tapes during the same
54 : * time that we're writing M blocks to the output tape, so there is no
55 : * sequentiality of access at all, defeating the read-ahead methods used by
56 : * most Unix kernels. Worse, the output tape gets written into a very random
57 : * sequence of blocks of the temp file, ensuring that things will be even
58 : * worse when it comes time to read that tape. A straightforward merge pass
59 : * thus ends up doing a lot of waiting for disk seeks. We can improve matters
60 : * by prereading from each source tape sequentially, loading about workMem/M
61 : * bytes from each tape in turn, and making the sequential blocks immediately
62 : * available for reuse. This approach helps to localize both read and write
63 : * accesses. The pre-reading is handled by logtape.c, we just tell it how
64 : * much memory to use for the buffers.
65 : *
66 : * In the current code we determine the number of input tapes M on the basis
67 : * of workMem: we want workMem/M to be large enough that we read a fair
68 : * amount of data each time we read from a tape, so as to maintain the
69 : * locality of access described above. Nonetheless, with large workMem we
70 : * can have many tapes. The logical "tapes" are implemented by logtape.c,
71 : * which avoids space wastage by recycling disk space as soon as each block
72 : * is read from its "tape".
73 : *
74 : * When the caller requests random access to the sort result, we form
75 : * the final sorted run on a logical tape which is then "frozen", so
76 : * that we can access it randomly. When the caller does not need random
77 : * access, we return from tuplesort_performsort() as soon as we are down
78 : * to one run per logical tape. The final merge is then performed
79 : * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
80 : * saves one cycle of writing all the data out to disk and reading it in.
81 : *
82 : * This module supports parallel sorting. Parallel sorts involve coordination
83 : * among one or more worker processes, and a leader process, each with its own
84 : * tuplesort state. The leader process (or, more accurately, the
85 : * Tuplesortstate associated with a leader process) creates a full tapeset
86 : * consisting of worker tapes with one run to merge; a run for every
87 : * worker process. This is then merged. Worker processes are guaranteed to
88 : * produce exactly one output run from their partial input.
89 : *
90 : *
91 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
92 : * Portions Copyright (c) 1994, Regents of the University of California
93 : *
94 : * IDENTIFICATION
95 : * src/backend/utils/sort/tuplesort.c
96 : *
97 : *-------------------------------------------------------------------------
98 : */
99 :
100 : #include "postgres.h"
101 :
102 : #include <limits.h>
103 :
104 : #include "commands/tablespace.h"
105 : #include "miscadmin.h"
106 : #include "pg_trace.h"
107 : #include "storage/shmem.h"
108 : #include "utils/guc.h"
109 : #include "utils/memutils.h"
110 : #include "utils/pg_rusage.h"
111 : #include "utils/tuplesort.h"
112 :
113 : /*
114 : * Initial size of memtuples array. We're trying to select this size so that
115 : * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of
116 : * allocation might possibly be lowered. However, we don't consider array sizes
117 : * less than 1024.
118 : *
119 : */
120 : #define INITIAL_MEMTUPSIZE Max(1024, \
121 : ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
122 :
123 : /* GUC variables */
124 : bool trace_sort = false;
125 :
126 : #ifdef DEBUG_BOUNDED_SORT
127 : bool optimize_bounded_sort = true;
128 : #endif
129 :
130 :
131 : /*
132 : * During merge, we use a pre-allocated set of fixed-size slots to hold
133 : * tuples. To avoid palloc/pfree overhead.
134 : *
135 : * Merge doesn't require a lot of memory, so we can afford to waste some,
136 : * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
137 : * palloc() overhead is not significant anymore.
138 : *
139 : * 'nextfree' is valid when this chunk is in the free list. When in use, the
140 : * slot holds a tuple.
141 : */
142 : #define SLAB_SLOT_SIZE 1024
143 :
144 : typedef union SlabSlot
145 : {
146 : union SlabSlot *nextfree;
147 : char buffer[SLAB_SLOT_SIZE];
148 : } SlabSlot;
149 :
150 : /*
151 : * Possible states of a Tuplesort object. These denote the states that
152 : * persist between calls of Tuplesort routines.
153 : */
154 : typedef enum
155 : {
156 : TSS_INITIAL, /* Loading tuples; still within memory limit */
157 : TSS_BOUNDED, /* Loading tuples into bounded-size heap */
158 : TSS_BUILDRUNS, /* Loading tuples; writing to tape */
159 : TSS_SORTEDINMEM, /* Sort completed entirely in memory */
160 : TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
161 : TSS_FINALMERGE, /* Performing final merge on-the-fly */
162 : } TupSortStatus;
163 :
164 : /*
165 : * Parameters for calculation of number of tapes to use --- see inittapes()
166 : * and tuplesort_merge_order().
167 : *
168 : * In this calculation we assume that each tape will cost us about 1 blocks
169 : * worth of buffer space. This ignores the overhead of all the other data
170 : * structures needed for each tape, but it's probably close enough.
171 : *
172 : * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
173 : * input tape, for pre-reading (see discussion at top of file). This is *in
174 : * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
175 : */
176 : #define MINORDER 6 /* minimum merge order */
177 : #define MAXORDER 500 /* maximum merge order */
178 : #define TAPE_BUFFER_OVERHEAD BLCKSZ
179 : #define MERGE_BUFFER_SIZE (BLCKSZ * 32)
180 :
181 :
182 : /*
183 : * Private state of a Tuplesort operation.
184 : */
185 : struct Tuplesortstate
186 : {
187 : TuplesortPublic base;
188 : TupSortStatus status; /* enumerated value as shown above */
189 : bool bounded; /* did caller specify a maximum number of
190 : * tuples to return? */
191 : bool boundUsed; /* true if we made use of a bounded heap */
192 : int bound; /* if bounded, the maximum number of tuples */
193 : int64 tupleMem; /* memory consumed by individual tuples.
194 : * storing this separately from what we track
195 : * in availMem allows us to subtract the
196 : * memory consumed by all tuples when dumping
197 : * tuples to tape */
198 : int64 availMem; /* remaining memory available, in bytes */
199 : int64 allowedMem; /* total memory allowed, in bytes */
200 : int maxTapes; /* max number of input tapes to merge in each
201 : * pass */
202 : int64 maxSpace; /* maximum amount of space occupied among sort
203 : * of groups, either in-memory or on-disk */
204 : bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk
205 : * space, false when it's value for in-memory
206 : * space */
207 : TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
208 : LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
209 :
210 : /*
211 : * This array holds the tuples now in sort memory. If we are in state
212 : * INITIAL, the tuples are in no particular order; if we are in state
213 : * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
214 : * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
215 : * H. In state SORTEDONTAPE, the array is not used.
216 : */
217 : SortTuple *memtuples; /* array of SortTuple structs */
218 : int memtupcount; /* number of tuples currently present */
219 : int memtupsize; /* allocated length of memtuples array */
220 : bool growmemtuples; /* memtuples' growth still underway? */
221 :
222 : /*
223 : * Memory for tuples is sometimes allocated using a simple slab allocator,
224 : * rather than with palloc(). Currently, we switch to slab allocation
225 : * when we start merging. Merging only needs to keep a small, fixed
226 : * number of tuples in memory at any time, so we can avoid the
227 : * palloc/pfree overhead by recycling a fixed number of fixed-size slots
228 : * to hold the tuples.
229 : *
230 : * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
231 : * slots. The allocation is sized to have one slot per tape, plus one
232 : * additional slot. We need that many slots to hold all the tuples kept
233 : * in the heap during merge, plus the one we have last returned from the
234 : * sort, with tuplesort_gettuple.
235 : *
236 : * Initially, all the slots are kept in a linked list of free slots. When
237 : * a tuple is read from a tape, it is put to the next available slot, if
238 : * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
239 : * instead.
240 : *
241 : * When we're done processing a tuple, we return the slot back to the free
242 : * list, or pfree() if it was palloc'd. We know that a tuple was
243 : * allocated from the slab, if its pointer value is between
244 : * slabMemoryBegin and -End.
245 : *
246 : * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
247 : * tracking memory usage is not used.
248 : */
249 : bool slabAllocatorUsed;
250 :
251 : char *slabMemoryBegin; /* beginning of slab memory arena */
252 : char *slabMemoryEnd; /* end of slab memory arena */
253 : SlabSlot *slabFreeHead; /* head of free list */
254 :
255 : /* Memory used for input and output tape buffers. */
256 : size_t tape_buffer_mem;
257 :
258 : /*
259 : * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
260 : * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
261 : * modes), we remember the tuple in 'lastReturnedTuple', so that we can
262 : * recycle the memory on next gettuple call.
263 : */
264 : void *lastReturnedTuple;
265 :
266 : /*
267 : * While building initial runs, this is the current output run number.
268 : * Afterwards, it is the number of initial runs we made.
269 : */
270 : int currentRun;
271 :
272 : /*
273 : * Logical tapes, for merging.
274 : *
275 : * The initial runs are written in the output tapes. In each merge pass,
276 : * the output tapes of the previous pass become the input tapes, and new
277 : * output tapes are created as needed. When nInputTapes equals
278 : * nInputRuns, there is only one merge pass left.
279 : */
280 : LogicalTape **inputTapes;
281 : int nInputTapes;
282 : int nInputRuns;
283 :
284 : LogicalTape **outputTapes;
285 : int nOutputTapes;
286 : int nOutputRuns;
287 :
288 : LogicalTape *destTape; /* current output tape */
289 :
290 : /*
291 : * These variables are used after completion of sorting to keep track of
292 : * the next tuple to return. (In the tape case, the tape's current read
293 : * position is also critical state.)
294 : */
295 : LogicalTape *result_tape; /* actual tape of finished output */
296 : int current; /* array index (only used if SORTEDINMEM) */
297 : bool eof_reached; /* reached EOF (needed for cursors) */
298 :
299 : /* markpos_xxx holds marked position for mark and restore */
300 : int64 markpos_block; /* tape block# (only used if SORTEDONTAPE) */
301 : int markpos_offset; /* saved "current", or offset in tape block */
302 : bool markpos_eof; /* saved "eof_reached" */
303 :
304 : /*
305 : * These variables are used during parallel sorting.
306 : *
307 : * worker is our worker identifier. Follows the general convention that
308 : * -1 value relates to a leader tuplesort, and values >= 0 worker
309 : * tuplesorts. (-1 can also be a serial tuplesort.)
310 : *
311 : * shared is mutable shared memory state, which is used to coordinate
312 : * parallel sorts.
313 : *
314 : * nParticipants is the number of worker Tuplesortstates known by the
315 : * leader to have actually been launched, which implies that they must
316 : * finish a run that the leader needs to merge. Typically includes a
317 : * worker state held by the leader process itself. Set in the leader
318 : * Tuplesortstate only.
319 : */
320 : int worker;
321 : Sharedsort *shared;
322 : int nParticipants;
323 :
324 : /*
325 : * Additional state for managing "abbreviated key" sortsupport routines
326 : * (which currently may be used by all cases except the hash index case).
327 : * Tracks the intervals at which the optimization's effectiveness is
328 : * tested.
329 : */
330 : int64 abbrevNext; /* Tuple # at which to next check
331 : * applicability */
332 :
333 : /*
334 : * Resource snapshot for time of sort start.
335 : */
336 : PGRUsage ru_start;
337 : };
338 :
339 : /*
340 : * Private mutable state of tuplesort-parallel-operation. This is allocated
341 : * in shared memory.
342 : */
343 : struct Sharedsort
344 : {
345 : /* mutex protects all fields prior to tapes */
346 : slock_t mutex;
347 :
348 : /*
349 : * currentWorker generates ordinal identifier numbers for parallel sort
350 : * workers. These start from 0, and are always gapless.
351 : *
352 : * Workers increment workersFinished to indicate having finished. If this
353 : * is equal to state.nParticipants within the leader, leader is ready to
354 : * merge worker runs.
355 : */
356 : int currentWorker;
357 : int workersFinished;
358 :
359 : /* Temporary file space */
360 : SharedFileSet fileset;
361 :
362 : /* Size of tapes flexible array */
363 : int nTapes;
364 :
365 : /*
366 : * Tapes array used by workers to report back information needed by the
367 : * leader to concatenate all worker tapes into one for merging
368 : */
369 : TapeShare tapes[FLEXIBLE_ARRAY_MEMBER];
370 : };
371 :
372 : /*
373 : * Is the given tuple allocated from the slab memory arena?
374 : */
375 : #define IS_SLAB_SLOT(state, tuple) \
376 : ((char *) (tuple) >= (state)->slabMemoryBegin && \
377 : (char *) (tuple) < (state)->slabMemoryEnd)
378 :
379 : /*
380 : * Return the given tuple to the slab memory free list, or free it
381 : * if it was palloc'd.
382 : */
383 : #define RELEASE_SLAB_SLOT(state, tuple) \
384 : do { \
385 : SlabSlot *buf = (SlabSlot *) tuple; \
386 : \
387 : if (IS_SLAB_SLOT((state), buf)) \
388 : { \
389 : buf->nextfree = (state)->slabFreeHead; \
390 : (state)->slabFreeHead = buf; \
391 : } else \
392 : pfree(buf); \
393 : } while(0)
394 :
395 : #define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
396 : #define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
397 : #define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
398 : #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
399 : #define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
400 : #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
401 : #define USEMEM(state,amt) ((state)->availMem -= (amt))
402 : #define FREEMEM(state,amt) ((state)->availMem += (amt))
403 : #define SERIAL(state) ((state)->shared == NULL)
404 : #define WORKER(state) ((state)->shared && (state)->worker != -1)
405 : #define LEADER(state) ((state)->shared && (state)->worker == -1)
406 :
407 : /*
408 : * NOTES about on-tape representation of tuples:
409 : *
410 : * We require the first "unsigned int" of a stored tuple to be the total size
411 : * on-tape of the tuple, including itself (so it is never zero; an all-zero
412 : * unsigned int is used to delimit runs). The remainder of the stored tuple
413 : * may or may not match the in-memory representation of the tuple ---
414 : * any conversion needed is the job of the writetup and readtup routines.
415 : *
416 : * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
417 : * representation of the tuple must be followed by another "unsigned int" that
418 : * is a copy of the length --- so the total tape space used is actually
419 : * sizeof(unsigned int) more than the stored length value. This allows
420 : * read-backwards. When the random access flag was not specified, the
421 : * write/read routines may omit the extra length word.
422 : *
423 : * writetup is expected to write both length words as well as the tuple
424 : * data. When readtup is called, the tape is positioned just after the
425 : * front length word; readtup must read the tuple data and advance past
426 : * the back length word (if present).
427 : *
428 : * The write/read routines can make use of the tuple description data
429 : * stored in the Tuplesortstate record, if needed. They are also expected
430 : * to adjust state->availMem by the amount of memory space (not tape space!)
431 : * released or consumed. There is no error return from either writetup
432 : * or readtup; they should ereport() on failure.
433 : *
434 : *
435 : * NOTES about memory consumption calculations:
436 : *
437 : * We count space allocated for tuples against the workMem limit, plus
438 : * the space used by the variable-size memtuples array. Fixed-size space
439 : * is not counted; it's small enough to not be interesting.
440 : *
441 : * Note that we count actual space used (as shown by GetMemoryChunkSpace)
442 : * rather than the originally-requested size. This is important since
443 : * palloc can add substantial overhead. It's not a complete answer since
444 : * we won't count any wasted space in palloc allocation blocks, but it's
445 : * a lot better than what we were doing before 7.3. As of 9.6, a
446 : * separate memory context is used for caller passed tuples. Resetting
447 : * it at certain key increments significantly ameliorates fragmentation.
448 : * readtup routines use the slab allocator (they cannot use
449 : * the reset context because it gets deleted at the point that merging
450 : * begins).
451 : */
452 :
453 :
454 : static void tuplesort_begin_batch(Tuplesortstate *state);
455 : static bool consider_abort_common(Tuplesortstate *state);
456 : static void inittapes(Tuplesortstate *state, bool mergeruns);
457 : static void inittapestate(Tuplesortstate *state, int maxTapes);
458 : static void selectnewtape(Tuplesortstate *state);
459 : static void init_slab_allocator(Tuplesortstate *state, int numSlots);
460 : static void mergeruns(Tuplesortstate *state);
461 : static void mergeonerun(Tuplesortstate *state);
462 : static void beginmerge(Tuplesortstate *state);
463 : static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
464 : static void dumptuples(Tuplesortstate *state, bool alltuples);
465 : static void make_bounded_heap(Tuplesortstate *state);
466 : static void sort_bounded_heap(Tuplesortstate *state);
467 : static void tuplesort_sort_memtuples(Tuplesortstate *state);
468 : static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
469 : static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
470 : static void tuplesort_heap_delete_top(Tuplesortstate *state);
471 : static void reversedirection(Tuplesortstate *state);
472 : static unsigned int getlen(LogicalTape *tape, bool eofOK);
473 : static void markrunend(LogicalTape *tape);
474 : static int worker_get_identifier(Tuplesortstate *state);
475 : static void worker_freeze_result_tape(Tuplesortstate *state);
476 : static void worker_nomergeruns(Tuplesortstate *state);
477 : static void leader_takeover_tapes(Tuplesortstate *state);
478 : static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
479 : static void tuplesort_free(Tuplesortstate *state);
480 : static void tuplesort_updatemax(Tuplesortstate *state);
481 :
482 : /*
483 : * Specialized comparators that we can inline into specialized sorts. The goal
484 : * is to try to sort two tuples without having to follow the pointers to the
485 : * comparator or the tuple.
486 : *
487 : * XXX: For now, there is no specialization for cases where datum1 is
488 : * authoritative and we don't even need to fall back to a callback at all (that
489 : * would be true for types like int4/int8/timestamp/date, but not true for
490 : * abbreviations of text or multi-key sorts. There could be! Is it worth it?
491 : */
492 :
493 : /* Used if first key's comparator is ssup_datum_unsigned_cmp */
494 : static pg_attribute_always_inline int
495 46429924 : qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
496 : {
497 : int compare;
498 :
499 46429924 : compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
500 46429924 : b->datum1, b->isnull1,
501 : &state->base.sortKeys[0]);
502 46429924 : if (compare != 0)
503 41888254 : return compare;
504 :
505 : /*
506 : * No need to waste effort calling the tiebreak function when there are no
507 : * other keys to sort on.
508 : */
509 4541670 : if (state->base.onlyKey != NULL)
510 0 : return 0;
511 :
512 4541670 : return state->base.comparetup_tiebreak(a, b, state);
513 : }
514 :
515 : #if SIZEOF_DATUM >= 8
516 : /* Used if first key's comparator is ssup_datum_signed_cmp */
517 : static pg_attribute_always_inline int
518 5624776 : qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
519 : {
520 : int compare;
521 :
522 5624776 : compare = ApplySignedSortComparator(a->datum1, a->isnull1,
523 5624776 : b->datum1, b->isnull1,
524 : &state->base.sortKeys[0]);
525 :
526 5624776 : if (compare != 0)
527 5612842 : return compare;
528 :
529 : /*
530 : * No need to waste effort calling the tiebreak function when there are no
531 : * other keys to sort on.
532 : */
533 11934 : if (state->base.onlyKey != NULL)
534 1102 : return 0;
535 :
536 10832 : return state->base.comparetup_tiebreak(a, b, state);
537 : }
538 : #endif
539 :
540 : /* Used if first key's comparator is ssup_datum_int32_cmp */
541 : static pg_attribute_always_inline int
542 52302590 : qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
543 : {
544 : int compare;
545 :
546 52302590 : compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
547 52302590 : b->datum1, b->isnull1,
548 : &state->base.sortKeys[0]);
549 :
550 52302590 : if (compare != 0)
551 36873066 : return compare;
552 :
553 : /*
554 : * No need to waste effort calling the tiebreak function when there are no
555 : * other keys to sort on.
556 : */
557 15429524 : if (state->base.onlyKey != NULL)
558 1677044 : return 0;
559 :
560 13752480 : return state->base.comparetup_tiebreak(a, b, state);
561 : }
562 :
563 : /*
564 : * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
565 : * any variant of SortTuples, using the appropriate comparetup function.
566 : * qsort_ssup() is specialized for the case where the comparetup function
567 : * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
568 : * and Datum sorts. qsort_tuple_{unsigned,signed,int32} are specialized for
569 : * common comparison functions on pass-by-value leading datums.
570 : */
571 :
572 : #define ST_SORT qsort_tuple_unsigned
573 : #define ST_ELEMENT_TYPE SortTuple
574 : #define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
575 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
576 : #define ST_CHECK_FOR_INTERRUPTS
577 : #define ST_SCOPE static
578 : #define ST_DEFINE
579 : #include "lib/sort_template.h"
580 :
581 : #if SIZEOF_DATUM >= 8
582 : #define ST_SORT qsort_tuple_signed
583 : #define ST_ELEMENT_TYPE SortTuple
584 : #define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
585 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
586 : #define ST_CHECK_FOR_INTERRUPTS
587 : #define ST_SCOPE static
588 : #define ST_DEFINE
589 : #include "lib/sort_template.h"
590 : #endif
591 :
592 : #define ST_SORT qsort_tuple_int32
593 : #define ST_ELEMENT_TYPE SortTuple
594 : #define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
595 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
596 : #define ST_CHECK_FOR_INTERRUPTS
597 : #define ST_SCOPE static
598 : #define ST_DEFINE
599 : #include "lib/sort_template.h"
600 :
601 : #define ST_SORT qsort_tuple
602 : #define ST_ELEMENT_TYPE SortTuple
603 : #define ST_COMPARE_RUNTIME_POINTER
604 : #define ST_COMPARE_ARG_TYPE Tuplesortstate
605 : #define ST_CHECK_FOR_INTERRUPTS
606 : #define ST_SCOPE static
607 : #define ST_DECLARE
608 : #define ST_DEFINE
609 : #include "lib/sort_template.h"
610 :
611 : #define ST_SORT qsort_ssup
612 : #define ST_ELEMENT_TYPE SortTuple
613 : #define ST_COMPARE(a, b, ssup) \
614 : ApplySortComparator((a)->datum1, (a)->isnull1, \
615 : (b)->datum1, (b)->isnull1, (ssup))
616 : #define ST_COMPARE_ARG_TYPE SortSupportData
617 : #define ST_CHECK_FOR_INTERRUPTS
618 : #define ST_SCOPE static
619 : #define ST_DEFINE
620 : #include "lib/sort_template.h"
621 :
622 : /*
623 : * tuplesort_begin_xxx
624 : *
625 : * Initialize for a tuple sort operation.
626 : *
627 : * After calling tuplesort_begin, the caller should call tuplesort_putXXX
628 : * zero or more times, then call tuplesort_performsort when all the tuples
629 : * have been supplied. After performsort, retrieve the tuples in sorted
630 : * order by calling tuplesort_getXXX until it returns false/NULL. (If random
631 : * access was requested, rescan, markpos, and restorepos can also be called.)
632 : * Call tuplesort_end to terminate the operation and release memory/disk space.
633 : *
634 : * Each variant of tuplesort_begin has a workMem parameter specifying the
635 : * maximum number of kilobytes of RAM to use before spilling data to disk.
636 : * (The normal value of this parameter is work_mem, but some callers use
637 : * other values.) Each variant also has a sortopt which is a bitmask of
638 : * sort options. See TUPLESORT_* definitions in tuplesort.h
639 : */
640 :
641 : Tuplesortstate *
642 250550 : tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
643 : {
644 : Tuplesortstate *state;
645 : MemoryContext maincontext;
646 : MemoryContext sortcontext;
647 : MemoryContext oldcontext;
648 :
649 : /* See leader_takeover_tapes() remarks on random access support */
650 250550 : if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
651 0 : elog(ERROR, "random access disallowed under parallel sort");
652 :
653 : /*
654 : * Memory context surviving tuplesort_reset. This memory context holds
655 : * data which is useful to keep while sorting multiple similar batches.
656 : */
657 250550 : maincontext = AllocSetContextCreate(CurrentMemoryContext,
658 : "TupleSort main",
659 : ALLOCSET_DEFAULT_SIZES);
660 :
661 : /*
662 : * Create a working memory context for one sort operation. The content of
663 : * this context is deleted by tuplesort_reset.
664 : */
665 250550 : sortcontext = AllocSetContextCreate(maincontext,
666 : "TupleSort sort",
667 : ALLOCSET_DEFAULT_SIZES);
668 :
669 : /*
670 : * Additionally a working memory context for tuples is setup in
671 : * tuplesort_begin_batch.
672 : */
673 :
674 : /*
675 : * Make the Tuplesortstate within the per-sortstate context. This way, we
676 : * don't need a separate pfree() operation for it at shutdown.
677 : */
678 250550 : oldcontext = MemoryContextSwitchTo(maincontext);
679 :
680 250550 : state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
681 :
682 250550 : if (trace_sort)
683 0 : pg_rusage_init(&state->ru_start);
684 :
685 250550 : state->base.sortopt = sortopt;
686 250550 : state->base.tuples = true;
687 250550 : state->abbrevNext = 10;
688 :
689 : /*
690 : * workMem is forced to be at least 64KB, the current minimum valid value
691 : * for the work_mem GUC. This is a defense against parallel sort callers
692 : * that divide out memory among many workers in a way that leaves each
693 : * with very little memory.
694 : */
695 250550 : state->allowedMem = Max(workMem, 64) * (int64) 1024;
696 250550 : state->base.sortcontext = sortcontext;
697 250550 : state->base.maincontext = maincontext;
698 :
699 : /*
700 : * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
701 : * see comments in grow_memtuples().
702 : */
703 250550 : state->memtupsize = INITIAL_MEMTUPSIZE;
704 250550 : state->memtuples = NULL;
705 :
706 : /*
707 : * After all of the other non-parallel-related state, we setup all of the
708 : * state needed for each batch.
709 : */
710 250550 : tuplesort_begin_batch(state);
711 :
712 : /*
713 : * Initialize parallel-related state based on coordination information
714 : * from caller
715 : */
716 250550 : if (!coordinate)
717 : {
718 : /* Serial sort */
719 249886 : state->shared = NULL;
720 249886 : state->worker = -1;
721 249886 : state->nParticipants = -1;
722 : }
723 664 : else if (coordinate->isWorker)
724 : {
725 : /* Parallel worker produces exactly one final run from all input */
726 444 : state->shared = coordinate->sharedsort;
727 444 : state->worker = worker_get_identifier(state);
728 444 : state->nParticipants = -1;
729 : }
730 : else
731 : {
732 : /* Parallel leader state only used for final merge */
733 220 : state->shared = coordinate->sharedsort;
734 220 : state->worker = -1;
735 220 : state->nParticipants = coordinate->nParticipants;
736 : Assert(state->nParticipants >= 1);
737 : }
738 :
739 250550 : MemoryContextSwitchTo(oldcontext);
740 :
741 250550 : return state;
742 : }
743 :
744 : /*
745 : * tuplesort_begin_batch
746 : *
747 : * Setup, or reset, all state need for processing a new set of tuples with this
748 : * sort state. Called both from tuplesort_begin_common (the first time sorting
749 : * with this sort state) and tuplesort_reset (for subsequent usages).
750 : */
751 : static void
752 253316 : tuplesort_begin_batch(Tuplesortstate *state)
753 : {
754 : MemoryContext oldcontext;
755 :
756 253316 : oldcontext = MemoryContextSwitchTo(state->base.maincontext);
757 :
758 : /*
759 : * Caller tuple (e.g. IndexTuple) memory context.
760 : *
761 : * A dedicated child context used exclusively for caller passed tuples
762 : * eases memory management. Resetting at key points reduces
763 : * fragmentation. Note that the memtuples array of SortTuples is allocated
764 : * in the parent context, not this context, because there is no need to
765 : * free memtuples early. For bounded sorts, tuples may be pfreed in any
766 : * order, so we use a regular aset.c context so that it can make use of
767 : * free'd memory. When the sort is not bounded, we make use of a bump.c
768 : * context as this keeps allocations more compact with less wastage.
769 : * Allocations are also slightly more CPU efficient.
770 : */
771 253316 : if (TupleSortUseBumpTupleCxt(state->base.sortopt))
772 251988 : state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
773 : "Caller tuples",
774 : ALLOCSET_DEFAULT_SIZES);
775 : else
776 1328 : state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
777 : "Caller tuples",
778 : ALLOCSET_DEFAULT_SIZES);
779 :
780 :
781 253316 : state->status = TSS_INITIAL;
782 253316 : state->bounded = false;
783 253316 : state->boundUsed = false;
784 :
785 253316 : state->availMem = state->allowedMem;
786 :
787 253316 : state->tapeset = NULL;
788 :
789 253316 : state->memtupcount = 0;
790 :
791 : /*
792 : * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
793 : * see comments in grow_memtuples().
794 : */
795 253316 : state->growmemtuples = true;
796 253316 : state->slabAllocatorUsed = false;
797 253316 : if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
798 : {
799 0 : pfree(state->memtuples);
800 0 : state->memtuples = NULL;
801 0 : state->memtupsize = INITIAL_MEMTUPSIZE;
802 : }
803 253316 : if (state->memtuples == NULL)
804 : {
805 250550 : state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
806 250550 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
807 : }
808 :
809 : /* workMem must be large enough for the minimal memtuples array */
810 253316 : if (LACKMEM(state))
811 0 : elog(ERROR, "insufficient memory allowed for sort");
812 :
813 253316 : state->currentRun = 0;
814 :
815 : /*
816 : * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
817 : * inittapes(), if needed.
818 : */
819 :
820 253316 : state->result_tape = NULL; /* flag that result tape has not been formed */
821 :
822 253316 : MemoryContextSwitchTo(oldcontext);
823 253316 : }
824 :
825 : /*
826 : * tuplesort_set_bound
827 : *
828 : * Advise tuplesort that at most the first N result tuples are required.
829 : *
830 : * Must be called before inserting any tuples. (Actually, we could allow it
831 : * as long as the sort hasn't spilled to disk, but there seems no need for
832 : * delayed calls at the moment.)
833 : *
834 : * This is a hint only. The tuplesort may still return more tuples than
835 : * requested. Parallel leader tuplesorts will always ignore the hint.
836 : */
837 : void
838 1194 : tuplesort_set_bound(Tuplesortstate *state, int64 bound)
839 : {
840 : /* Assert we're called before loading any tuples */
841 : Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
842 : /* Assert we allow bounded sorts */
843 : Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
844 : /* Can't set the bound twice, either */
845 : Assert(!state->bounded);
846 : /* Also, this shouldn't be called in a parallel worker */
847 : Assert(!WORKER(state));
848 :
849 : /* Parallel leader allows but ignores hint */
850 1194 : if (LEADER(state))
851 0 : return;
852 :
853 : #ifdef DEBUG_BOUNDED_SORT
854 : /* Honor GUC setting that disables the feature (for easy testing) */
855 : if (!optimize_bounded_sort)
856 : return;
857 : #endif
858 :
859 : /* We want to be able to compute bound * 2, so limit the setting */
860 1194 : if (bound > (int64) (INT_MAX / 2))
861 0 : return;
862 :
863 1194 : state->bounded = true;
864 1194 : state->bound = (int) bound;
865 :
866 : /*
867 : * Bounded sorts are not an effective target for abbreviated key
868 : * optimization. Disable by setting state to be consistent with no
869 : * abbreviation support.
870 : */
871 1194 : state->base.sortKeys->abbrev_converter = NULL;
872 1194 : if (state->base.sortKeys->abbrev_full_comparator)
873 16 : state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
874 :
875 : /* Not strictly necessary, but be tidy */
876 1194 : state->base.sortKeys->abbrev_abort = NULL;
877 1194 : state->base.sortKeys->abbrev_full_comparator = NULL;
878 : }
879 :
880 : /*
881 : * tuplesort_used_bound
882 : *
883 : * Allow callers to find out if the sort state was able to use a bound.
884 : */
885 : bool
886 110 : tuplesort_used_bound(Tuplesortstate *state)
887 : {
888 110 : return state->boundUsed;
889 : }
890 :
891 : /*
892 : * tuplesort_free
893 : *
894 : * Internal routine for freeing resources of tuplesort.
895 : */
896 : static void
897 253072 : tuplesort_free(Tuplesortstate *state)
898 : {
899 : /* context swap probably not needed, but let's be safe */
900 253072 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
901 : int64 spaceUsed;
902 :
903 253072 : if (state->tapeset)
904 720 : spaceUsed = LogicalTapeSetBlocks(state->tapeset);
905 : else
906 252352 : spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
907 :
908 : /*
909 : * Delete temporary "tape" files, if any.
910 : *
911 : * We don't bother to destroy the individual tapes here. They will go away
912 : * with the sortcontext. (In TSS_FINALMERGE state, we have closed
913 : * finished tapes already.)
914 : */
915 253072 : if (state->tapeset)
916 720 : LogicalTapeSetClose(state->tapeset);
917 :
918 253072 : if (trace_sort)
919 : {
920 0 : if (state->tapeset)
921 0 : elog(LOG, "%s of worker %d ended, %lld disk blocks used: %s",
922 : SERIAL(state) ? "external sort" : "parallel external sort",
923 : state->worker, (long long) spaceUsed, pg_rusage_show(&state->ru_start));
924 : else
925 0 : elog(LOG, "%s of worker %d ended, %lld KB used: %s",
926 : SERIAL(state) ? "internal sort" : "unperformed parallel sort",
927 : state->worker, (long long) spaceUsed, pg_rusage_show(&state->ru_start));
928 : }
929 :
930 : TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
931 :
932 253072 : FREESTATE(state);
933 253072 : MemoryContextSwitchTo(oldcontext);
934 :
935 : /*
936 : * Free the per-sort memory context, thereby releasing all working memory.
937 : */
938 253072 : MemoryContextReset(state->base.sortcontext);
939 253072 : }
940 :
941 : /*
942 : * tuplesort_end
943 : *
944 : * Release resources and clean up.
945 : *
946 : * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
947 : * pointing to garbage. Be careful not to attempt to use or free such
948 : * pointers afterwards!
949 : */
950 : void
951 250306 : tuplesort_end(Tuplesortstate *state)
952 : {
953 250306 : tuplesort_free(state);
954 :
955 : /*
956 : * Free the main memory context, including the Tuplesortstate struct
957 : * itself.
958 : */
959 250306 : MemoryContextDelete(state->base.maincontext);
960 250306 : }
961 :
962 : /*
963 : * tuplesort_updatemax
964 : *
965 : * Update maximum resource usage statistics.
966 : */
967 : static void
968 3156 : tuplesort_updatemax(Tuplesortstate *state)
969 : {
970 : int64 spaceUsed;
971 : bool isSpaceDisk;
972 :
973 : /*
974 : * Note: it might seem we should provide both memory and disk usage for a
975 : * disk-based sort. However, the current code doesn't track memory space
976 : * accurately once we have begun to return tuples to the caller (since we
977 : * don't account for pfree's the caller is expected to do), so we cannot
978 : * rely on availMem in a disk sort. This does not seem worth the overhead
979 : * to fix. Is it worth creating an API for the memory context code to
980 : * tell us how much is actually used in sortcontext?
981 : */
982 3156 : if (state->tapeset)
983 : {
984 6 : isSpaceDisk = true;
985 6 : spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
986 : }
987 : else
988 : {
989 3150 : isSpaceDisk = false;
990 3150 : spaceUsed = state->allowedMem - state->availMem;
991 : }
992 :
993 : /*
994 : * Sort evicts data to the disk when it wasn't able to fit that data into
995 : * main memory. This is why we assume space used on the disk to be more
996 : * important for tracking resource usage than space used in memory. Note
997 : * that the amount of space occupied by some tupleset on the disk might be
998 : * less than amount of space occupied by the same tupleset in memory due
999 : * to more compact representation.
1000 : */
1001 3156 : if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
1002 3150 : (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
1003 : {
1004 412 : state->maxSpace = spaceUsed;
1005 412 : state->isMaxSpaceDisk = isSpaceDisk;
1006 412 : state->maxSpaceStatus = state->status;
1007 : }
1008 3156 : }
1009 :
1010 : /*
1011 : * tuplesort_reset
1012 : *
1013 : * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
1014 : * meta-information in. After tuplesort_reset, tuplesort is ready to start
1015 : * a new sort. This allows avoiding recreation of tuple sort states (and
1016 : * save resources) when sorting multiple small batches.
1017 : */
1018 : void
1019 2766 : tuplesort_reset(Tuplesortstate *state)
1020 : {
1021 2766 : tuplesort_updatemax(state);
1022 2766 : tuplesort_free(state);
1023 :
1024 : /*
1025 : * After we've freed up per-batch memory, re-setup all of the state common
1026 : * to both the first batch and any subsequent batch.
1027 : */
1028 2766 : tuplesort_begin_batch(state);
1029 :
1030 2766 : state->lastReturnedTuple = NULL;
1031 2766 : state->slabMemoryBegin = NULL;
1032 2766 : state->slabMemoryEnd = NULL;
1033 2766 : state->slabFreeHead = NULL;
1034 2766 : }
1035 :
1036 : /*
1037 : * Grow the memtuples[] array, if possible within our memory constraint. We
1038 : * must not exceed INT_MAX tuples in memory or the caller-provided memory
1039 : * limit. Return true if we were able to enlarge the array, false if not.
1040 : *
1041 : * Normally, at each increment we double the size of the array. When doing
1042 : * that would exceed a limit, we attempt one last, smaller increase (and then
1043 : * clear the growmemtuples flag so we don't try any more). That allows us to
1044 : * use memory as fully as permitted; sticking to the pure doubling rule could
1045 : * result in almost half going unused. Because availMem moves around with
1046 : * tuple addition/removal, we need some rule to prevent making repeated small
1047 : * increases in memtupsize, which would just be useless thrashing. The
1048 : * growmemtuples flag accomplishes that and also prevents useless
1049 : * recalculations in this function.
1050 : */
1051 : static bool
1052 7068 : grow_memtuples(Tuplesortstate *state)
1053 : {
1054 : int newmemtupsize;
1055 7068 : int memtupsize = state->memtupsize;
1056 7068 : int64 memNowUsed = state->allowedMem - state->availMem;
1057 :
1058 : /* Forget it if we've already maxed out memtuples, per comment above */
1059 7068 : if (!state->growmemtuples)
1060 120 : return false;
1061 :
1062 : /* Select new value of memtupsize */
1063 6948 : if (memNowUsed <= state->availMem)
1064 : {
1065 : /*
1066 : * We've used no more than half of allowedMem; double our usage,
1067 : * clamping at INT_MAX tuples.
1068 : */
1069 6820 : if (memtupsize < INT_MAX / 2)
1070 6820 : newmemtupsize = memtupsize * 2;
1071 : else
1072 : {
1073 0 : newmemtupsize = INT_MAX;
1074 0 : state->growmemtuples = false;
1075 : }
1076 : }
1077 : else
1078 : {
1079 : /*
1080 : * This will be the last increment of memtupsize. Abandon doubling
1081 : * strategy and instead increase as much as we safely can.
1082 : *
1083 : * To stay within allowedMem, we can't increase memtupsize by more
1084 : * than availMem / sizeof(SortTuple) elements. In practice, we want
1085 : * to increase it by considerably less, because we need to leave some
1086 : * space for the tuples to which the new array slots will refer. We
1087 : * assume the new tuples will be about the same size as the tuples
1088 : * we've already seen, and thus we can extrapolate from the space
1089 : * consumption so far to estimate an appropriate new size for the
1090 : * memtuples array. The optimal value might be higher or lower than
1091 : * this estimate, but it's hard to know that in advance. We again
1092 : * clamp at INT_MAX tuples.
1093 : *
1094 : * This calculation is safe against enlarging the array so much that
1095 : * LACKMEM becomes true, because the memory currently used includes
1096 : * the present array; thus, there would be enough allowedMem for the
1097 : * new array elements even if no other memory were currently used.
1098 : *
1099 : * We do the arithmetic in float8, because otherwise the product of
1100 : * memtupsize and allowedMem could overflow. Any inaccuracy in the
1101 : * result should be insignificant; but even if we computed a
1102 : * completely insane result, the checks below will prevent anything
1103 : * really bad from happening.
1104 : */
1105 : double grow_ratio;
1106 :
1107 128 : grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1108 128 : if (memtupsize * grow_ratio < INT_MAX)
1109 128 : newmemtupsize = (int) (memtupsize * grow_ratio);
1110 : else
1111 0 : newmemtupsize = INT_MAX;
1112 :
1113 : /* We won't make any further enlargement attempts */
1114 128 : state->growmemtuples = false;
1115 : }
1116 :
1117 : /* Must enlarge array by at least one element, else report failure */
1118 6948 : if (newmemtupsize <= memtupsize)
1119 0 : goto noalloc;
1120 :
1121 : /*
1122 : * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1123 : * to ensure our request won't be rejected. Note that we can easily
1124 : * exhaust address space before facing this outcome. (This is presently
1125 : * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1126 : * don't rely on that at this distance.)
1127 : */
1128 6948 : if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1129 : {
1130 0 : newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1131 0 : state->growmemtuples = false; /* can't grow any more */
1132 : }
1133 :
1134 : /*
1135 : * We need to be sure that we do not cause LACKMEM to become true, else
1136 : * the space management algorithm will go nuts. The code above should
1137 : * never generate a dangerous request, but to be safe, check explicitly
1138 : * that the array growth fits within availMem. (We could still cause
1139 : * LACKMEM if the memory chunk overhead associated with the memtuples
1140 : * array were to increase. That shouldn't happen because we chose the
1141 : * initial array size large enough to ensure that palloc will be treating
1142 : * both old and new arrays as separate chunks. But we'll check LACKMEM
1143 : * explicitly below just in case.)
1144 : */
1145 6948 : if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1146 0 : goto noalloc;
1147 :
1148 : /* OK, do it */
1149 6948 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1150 6948 : state->memtupsize = newmemtupsize;
1151 6948 : state->memtuples = (SortTuple *)
1152 6948 : repalloc_huge(state->memtuples,
1153 6948 : state->memtupsize * sizeof(SortTuple));
1154 6948 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1155 6948 : if (LACKMEM(state))
1156 0 : elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1157 6948 : return true;
1158 :
1159 0 : noalloc:
1160 : /* If for any reason we didn't realloc, shut off future attempts */
1161 0 : state->growmemtuples = false;
1162 0 : return false;
1163 : }
1164 :
1165 : /*
1166 : * Shared code for tuple and datum cases.
1167 : */
1168 : void
1169 28288244 : tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple,
1170 : bool useAbbrev, Size tuplen)
1171 : {
1172 28288244 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1173 :
1174 : Assert(!LEADER(state));
1175 :
1176 : /* account for the memory used for this tuple */
1177 28288244 : USEMEM(state, tuplen);
1178 28288244 : state->tupleMem += tuplen;
1179 :
1180 28288244 : if (!useAbbrev)
1181 : {
1182 : /*
1183 : * Leave ordinary Datum representation, or NULL value. If there is a
1184 : * converter it won't expect NULL values, and cost model is not
1185 : * required to account for NULL, so in that case we avoid calling
1186 : * converter and just set datum1 to zeroed representation (to be
1187 : * consistent, and to support cheap inequality tests for NULL
1188 : * abbreviated keys).
1189 : */
1190 : }
1191 4425368 : else if (!consider_abort_common(state))
1192 : {
1193 : /* Store abbreviated key representation */
1194 4425272 : tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
1195 : state->base.sortKeys);
1196 : }
1197 : else
1198 : {
1199 : /*
1200 : * Set state to be consistent with never trying abbreviation.
1201 : *
1202 : * Alter datum1 representation in already-copied tuples, so as to
1203 : * ensure a consistent representation (current tuple was just
1204 : * handled). It does not matter if some dumped tuples are already
1205 : * sorted on tape, since serialized tuples lack abbreviated keys
1206 : * (TSS_BUILDRUNS state prevents control reaching here in any case).
1207 : */
1208 96 : REMOVEABBREV(state, state->memtuples, state->memtupcount);
1209 : }
1210 :
1211 28288244 : switch (state->status)
1212 : {
1213 23473644 : case TSS_INITIAL:
1214 :
1215 : /*
1216 : * Save the tuple into the unsorted array. First, grow the array
1217 : * as needed. Note that we try to grow the array when there is
1218 : * still one free slot remaining --- if we fail, there'll still be
1219 : * room to store the incoming tuple, and then we'll switch to
1220 : * tape-based operation.
1221 : */
1222 23473644 : if (state->memtupcount >= state->memtupsize - 1)
1223 : {
1224 7068 : (void) grow_memtuples(state);
1225 : Assert(state->memtupcount < state->memtupsize);
1226 : }
1227 23473644 : state->memtuples[state->memtupcount++] = *tuple;
1228 :
1229 : /*
1230 : * Check if it's time to switch over to a bounded heapsort. We do
1231 : * so if the input tuple count exceeds twice the desired tuple
1232 : * count (this is a heuristic for where heapsort becomes cheaper
1233 : * than a quicksort), or if we've just filled workMem and have
1234 : * enough tuples to meet the bound.
1235 : *
1236 : * Note that once we enter TSS_BOUNDED state we will always try to
1237 : * complete the sort that way. In the worst case, if later input
1238 : * tuples are larger than earlier ones, this might cause us to
1239 : * exceed workMem significantly.
1240 : */
1241 23473644 : if (state->bounded &&
1242 45214 : (state->memtupcount > state->bound * 2 ||
1243 44814 : (state->memtupcount > state->bound && LACKMEM(state))))
1244 : {
1245 400 : if (trace_sort)
1246 0 : elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1247 : state->memtupcount,
1248 : pg_rusage_show(&state->ru_start));
1249 400 : make_bounded_heap(state);
1250 400 : MemoryContextSwitchTo(oldcontext);
1251 400 : return;
1252 : }
1253 :
1254 : /*
1255 : * Done if we still fit in available memory and have array slots.
1256 : */
1257 23473244 : if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1258 : {
1259 23473124 : MemoryContextSwitchTo(oldcontext);
1260 23473124 : return;
1261 : }
1262 :
1263 : /*
1264 : * Nope; time to switch to tape-based operation.
1265 : */
1266 120 : inittapes(state, true);
1267 :
1268 : /*
1269 : * Dump all tuples.
1270 : */
1271 120 : dumptuples(state, false);
1272 120 : break;
1273 :
1274 3750020 : case TSS_BOUNDED:
1275 :
1276 : /*
1277 : * We don't want to grow the array here, so check whether the new
1278 : * tuple can be discarded before putting it in. This should be a
1279 : * good speed optimization, too, since when there are many more
1280 : * input tuples than the bound, most input tuples can be discarded
1281 : * with just this one comparison. Note that because we currently
1282 : * have the sort direction reversed, we must check for <= not >=.
1283 : */
1284 3750020 : if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1285 : {
1286 : /* new tuple <= top of the heap, so we can discard it */
1287 3247044 : free_sort_tuple(state, tuple);
1288 3247044 : CHECK_FOR_INTERRUPTS();
1289 : }
1290 : else
1291 : {
1292 : /* discard top of heap, replacing it with the new tuple */
1293 502976 : free_sort_tuple(state, &state->memtuples[0]);
1294 502976 : tuplesort_heap_replace_top(state, tuple);
1295 : }
1296 3750020 : break;
1297 :
1298 1064580 : case TSS_BUILDRUNS:
1299 :
1300 : /*
1301 : * Save the tuple into the unsorted array (there must be space)
1302 : */
1303 1064580 : state->memtuples[state->memtupcount++] = *tuple;
1304 :
1305 : /*
1306 : * If we are over the memory limit, dump all tuples.
1307 : */
1308 1064580 : dumptuples(state, false);
1309 1064580 : break;
1310 :
1311 0 : default:
1312 0 : elog(ERROR, "invalid tuplesort state");
1313 : break;
1314 : }
1315 4814720 : MemoryContextSwitchTo(oldcontext);
1316 : }
1317 :
1318 : static bool
1319 4425368 : consider_abort_common(Tuplesortstate *state)
1320 : {
1321 : Assert(state->base.sortKeys[0].abbrev_converter != NULL);
1322 : Assert(state->base.sortKeys[0].abbrev_abort != NULL);
1323 : Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
1324 :
1325 : /*
1326 : * Check effectiveness of abbreviation optimization. Consider aborting
1327 : * when still within memory limit.
1328 : */
1329 4425368 : if (state->status == TSS_INITIAL &&
1330 3952938 : state->memtupcount >= state->abbrevNext)
1331 : {
1332 4948 : state->abbrevNext *= 2;
1333 :
1334 : /*
1335 : * Check opclass-supplied abbreviation abort routine. It may indicate
1336 : * that abbreviation should not proceed.
1337 : */
1338 4948 : if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
1339 : state->base.sortKeys))
1340 4852 : return false;
1341 :
1342 : /*
1343 : * Finally, restore authoritative comparator, and indicate that
1344 : * abbreviation is not in play by setting abbrev_converter to NULL
1345 : */
1346 96 : state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
1347 96 : state->base.sortKeys[0].abbrev_converter = NULL;
1348 : /* Not strictly necessary, but be tidy */
1349 96 : state->base.sortKeys[0].abbrev_abort = NULL;
1350 96 : state->base.sortKeys[0].abbrev_full_comparator = NULL;
1351 :
1352 : /* Give up - expect original pass-by-value representation */
1353 96 : return true;
1354 : }
1355 :
1356 4420420 : return false;
1357 : }
1358 :
1359 : /*
1360 : * All tuples have been provided; finish the sort.
1361 : */
1362 : void
1363 214096 : tuplesort_performsort(Tuplesortstate *state)
1364 : {
1365 214096 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1366 :
1367 214096 : if (trace_sort)
1368 0 : elog(LOG, "performsort of worker %d starting: %s",
1369 : state->worker, pg_rusage_show(&state->ru_start));
1370 :
1371 214096 : switch (state->status)
1372 : {
1373 213576 : case TSS_INITIAL:
1374 :
1375 : /*
1376 : * We were able to accumulate all the tuples within the allowed
1377 : * amount of memory, or leader to take over worker tapes
1378 : */
1379 213576 : if (SERIAL(state))
1380 : {
1381 : /* Just qsort 'em and we're done */
1382 212976 : tuplesort_sort_memtuples(state);
1383 212892 : state->status = TSS_SORTEDINMEM;
1384 : }
1385 600 : else if (WORKER(state))
1386 : {
1387 : /*
1388 : * Parallel workers must still dump out tuples to tape. No
1389 : * merge is required to produce single output run, though.
1390 : */
1391 444 : inittapes(state, false);
1392 444 : dumptuples(state, true);
1393 444 : worker_nomergeruns(state);
1394 444 : state->status = TSS_SORTEDONTAPE;
1395 : }
1396 : else
1397 : {
1398 : /*
1399 : * Leader will take over worker tapes and merge worker runs.
1400 : * Note that mergeruns sets the correct state->status.
1401 : */
1402 156 : leader_takeover_tapes(state);
1403 156 : mergeruns(state);
1404 : }
1405 213492 : state->current = 0;
1406 213492 : state->eof_reached = false;
1407 213492 : state->markpos_block = 0L;
1408 213492 : state->markpos_offset = 0;
1409 213492 : state->markpos_eof = false;
1410 213492 : break;
1411 :
1412 400 : case TSS_BOUNDED:
1413 :
1414 : /*
1415 : * We were able to accumulate all the tuples required for output
1416 : * in memory, using a heap to eliminate excess tuples. Now we
1417 : * have to transform the heap to a properly-sorted array. Note
1418 : * that sort_bounded_heap sets the correct state->status.
1419 : */
1420 400 : sort_bounded_heap(state);
1421 400 : state->current = 0;
1422 400 : state->eof_reached = false;
1423 400 : state->markpos_offset = 0;
1424 400 : state->markpos_eof = false;
1425 400 : break;
1426 :
1427 120 : case TSS_BUILDRUNS:
1428 :
1429 : /*
1430 : * Finish tape-based sort. First, flush all tuples remaining in
1431 : * memory out to tape; then merge until we have a single remaining
1432 : * run (or, if !randomAccess and !WORKER(), one run per tape).
1433 : * Note that mergeruns sets the correct state->status.
1434 : */
1435 120 : dumptuples(state, true);
1436 120 : mergeruns(state);
1437 120 : state->eof_reached = false;
1438 120 : state->markpos_block = 0L;
1439 120 : state->markpos_offset = 0;
1440 120 : state->markpos_eof = false;
1441 120 : break;
1442 :
1443 0 : default:
1444 0 : elog(ERROR, "invalid tuplesort state");
1445 : break;
1446 : }
1447 :
1448 214012 : if (trace_sort)
1449 : {
1450 0 : if (state->status == TSS_FINALMERGE)
1451 0 : elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1452 : state->worker, state->nInputTapes,
1453 : pg_rusage_show(&state->ru_start));
1454 : else
1455 0 : elog(LOG, "performsort of worker %d done: %s",
1456 : state->worker, pg_rusage_show(&state->ru_start));
1457 : }
1458 :
1459 214012 : MemoryContextSwitchTo(oldcontext);
1460 214012 : }
1461 :
1462 : /*
1463 : * Internal routine to fetch the next tuple in either forward or back
1464 : * direction into *stup. Returns false if no more tuples.
1465 : * Returned tuple belongs to tuplesort memory context, and must not be freed
1466 : * by caller. Note that fetched tuple is stored in memory that may be
1467 : * recycled by any future fetch.
1468 : */
1469 : bool
1470 25450436 : tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
1471 : SortTuple *stup)
1472 : {
1473 : unsigned int tuplen;
1474 : size_t nmoved;
1475 :
1476 : Assert(!WORKER(state));
1477 :
1478 25450436 : switch (state->status)
1479 : {
1480 20609862 : case TSS_SORTEDINMEM:
1481 : Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1482 : Assert(!state->slabAllocatorUsed);
1483 20609862 : if (forward)
1484 : {
1485 20609796 : if (state->current < state->memtupcount)
1486 : {
1487 20398242 : *stup = state->memtuples[state->current++];
1488 20398242 : return true;
1489 : }
1490 211554 : state->eof_reached = true;
1491 :
1492 : /*
1493 : * Complain if caller tries to retrieve more tuples than
1494 : * originally asked for in a bounded sort. This is because
1495 : * returning EOF here might be the wrong thing.
1496 : */
1497 211554 : if (state->bounded && state->current >= state->bound)
1498 0 : elog(ERROR, "retrieved too many tuples in a bounded sort");
1499 :
1500 211554 : return false;
1501 : }
1502 : else
1503 : {
1504 66 : if (state->current <= 0)
1505 0 : return false;
1506 :
1507 : /*
1508 : * if all tuples are fetched already then we return last
1509 : * tuple, else - tuple before last returned.
1510 : */
1511 66 : if (state->eof_reached)
1512 12 : state->eof_reached = false;
1513 : else
1514 : {
1515 54 : state->current--; /* last returned tuple */
1516 54 : if (state->current <= 0)
1517 6 : return false;
1518 : }
1519 60 : *stup = state->memtuples[state->current - 1];
1520 60 : return true;
1521 : }
1522 : break;
1523 :
1524 272994 : case TSS_SORTEDONTAPE:
1525 : Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1526 : Assert(state->slabAllocatorUsed);
1527 :
1528 : /*
1529 : * The slot that held the tuple that we returned in previous
1530 : * gettuple call can now be reused.
1531 : */
1532 272994 : if (state->lastReturnedTuple)
1533 : {
1534 152850 : RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1535 152850 : state->lastReturnedTuple = NULL;
1536 : }
1537 :
1538 272994 : if (forward)
1539 : {
1540 272964 : if (state->eof_reached)
1541 0 : return false;
1542 :
1543 272964 : if ((tuplen = getlen(state->result_tape, true)) != 0)
1544 : {
1545 272940 : READTUP(state, stup, state->result_tape, tuplen);
1546 :
1547 : /*
1548 : * Remember the tuple we return, so that we can recycle
1549 : * its memory on next call. (This can be NULL, in the
1550 : * !state->tuples case).
1551 : */
1552 272940 : state->lastReturnedTuple = stup->tuple;
1553 :
1554 272940 : return true;
1555 : }
1556 : else
1557 : {
1558 24 : state->eof_reached = true;
1559 24 : return false;
1560 : }
1561 : }
1562 :
1563 : /*
1564 : * Backward.
1565 : *
1566 : * if all tuples are fetched already then we return last tuple,
1567 : * else - tuple before last returned.
1568 : */
1569 30 : if (state->eof_reached)
1570 : {
1571 : /*
1572 : * Seek position is pointing just past the zero tuplen at the
1573 : * end of file; back up to fetch last tuple's ending length
1574 : * word. If seek fails we must have a completely empty file.
1575 : */
1576 12 : nmoved = LogicalTapeBackspace(state->result_tape,
1577 : 2 * sizeof(unsigned int));
1578 12 : if (nmoved == 0)
1579 0 : return false;
1580 12 : else if (nmoved != 2 * sizeof(unsigned int))
1581 0 : elog(ERROR, "unexpected tape position");
1582 12 : state->eof_reached = false;
1583 : }
1584 : else
1585 : {
1586 : /*
1587 : * Back up and fetch previously-returned tuple's ending length
1588 : * word. If seek fails, assume we are at start of file.
1589 : */
1590 18 : nmoved = LogicalTapeBackspace(state->result_tape,
1591 : sizeof(unsigned int));
1592 18 : if (nmoved == 0)
1593 0 : return false;
1594 18 : else if (nmoved != sizeof(unsigned int))
1595 0 : elog(ERROR, "unexpected tape position");
1596 18 : tuplen = getlen(state->result_tape, false);
1597 :
1598 : /*
1599 : * Back up to get ending length word of tuple before it.
1600 : */
1601 18 : nmoved = LogicalTapeBackspace(state->result_tape,
1602 : tuplen + 2 * sizeof(unsigned int));
1603 18 : if (nmoved == tuplen + sizeof(unsigned int))
1604 : {
1605 : /*
1606 : * We backed up over the previous tuple, but there was no
1607 : * ending length word before it. That means that the prev
1608 : * tuple is the first tuple in the file. It is now the
1609 : * next to read in forward direction (not obviously right,
1610 : * but that is what in-memory case does).
1611 : */
1612 6 : return false;
1613 : }
1614 12 : else if (nmoved != tuplen + 2 * sizeof(unsigned int))
1615 0 : elog(ERROR, "bogus tuple length in backward scan");
1616 : }
1617 :
1618 24 : tuplen = getlen(state->result_tape, false);
1619 :
1620 : /*
1621 : * Now we have the length of the prior tuple, back up and read it.
1622 : * Note: READTUP expects we are positioned after the initial
1623 : * length word of the tuple, so back up to that point.
1624 : */
1625 24 : nmoved = LogicalTapeBackspace(state->result_tape,
1626 : tuplen);
1627 24 : if (nmoved != tuplen)
1628 0 : elog(ERROR, "bogus tuple length in backward scan");
1629 24 : READTUP(state, stup, state->result_tape, tuplen);
1630 :
1631 : /*
1632 : * Remember the tuple we return, so that we can recycle its memory
1633 : * on next call. (This can be NULL, in the Datum case).
1634 : */
1635 24 : state->lastReturnedTuple = stup->tuple;
1636 :
1637 24 : return true;
1638 :
1639 4567580 : case TSS_FINALMERGE:
1640 : Assert(forward);
1641 : /* We are managing memory ourselves, with the slab allocator. */
1642 : Assert(state->slabAllocatorUsed);
1643 :
1644 : /*
1645 : * The slab slot holding the tuple that we returned in previous
1646 : * gettuple call can now be reused.
1647 : */
1648 4567580 : if (state->lastReturnedTuple)
1649 : {
1650 4507292 : RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1651 4507292 : state->lastReturnedTuple = NULL;
1652 : }
1653 :
1654 : /*
1655 : * This code should match the inner loop of mergeonerun().
1656 : */
1657 4567580 : if (state->memtupcount > 0)
1658 : {
1659 4567340 : int srcTapeIndex = state->memtuples[0].srctape;
1660 4567340 : LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
1661 : SortTuple newtup;
1662 :
1663 4567340 : *stup = state->memtuples[0];
1664 :
1665 : /*
1666 : * Remember the tuple we return, so that we can recycle its
1667 : * memory on next call. (This can be NULL, in the Datum case).
1668 : */
1669 4567340 : state->lastReturnedTuple = stup->tuple;
1670 :
1671 : /*
1672 : * Pull next tuple from tape, and replace the returned tuple
1673 : * at top of the heap with it.
1674 : */
1675 4567340 : if (!mergereadnext(state, srcTape, &newtup))
1676 : {
1677 : /*
1678 : * If no more data, we've reached end of run on this tape.
1679 : * Remove the top node from the heap.
1680 : */
1681 340 : tuplesort_heap_delete_top(state);
1682 340 : state->nInputRuns--;
1683 :
1684 : /*
1685 : * Close the tape. It'd go away at the end of the sort
1686 : * anyway, but better to release the memory early.
1687 : */
1688 340 : LogicalTapeClose(srcTape);
1689 340 : return true;
1690 : }
1691 4567000 : newtup.srctape = srcTapeIndex;
1692 4567000 : tuplesort_heap_replace_top(state, &newtup);
1693 4567000 : return true;
1694 : }
1695 240 : return false;
1696 :
1697 0 : default:
1698 0 : elog(ERROR, "invalid tuplesort state");
1699 : return false; /* keep compiler quiet */
1700 : }
1701 : }
1702 :
1703 :
1704 : /*
1705 : * Advance over N tuples in either forward or back direction,
1706 : * without returning any data. N==0 is a no-op.
1707 : * Returns true if successful, false if ran out of tuples.
1708 : */
1709 : bool
1710 392 : tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
1711 : {
1712 : MemoryContext oldcontext;
1713 :
1714 : /*
1715 : * We don't actually support backwards skip yet, because no callers need
1716 : * it. The API is designed to allow for that later, though.
1717 : */
1718 : Assert(forward);
1719 : Assert(ntuples >= 0);
1720 : Assert(!WORKER(state));
1721 :
1722 392 : switch (state->status)
1723 : {
1724 368 : case TSS_SORTEDINMEM:
1725 368 : if (state->memtupcount - state->current >= ntuples)
1726 : {
1727 368 : state->current += ntuples;
1728 368 : return true;
1729 : }
1730 0 : state->current = state->memtupcount;
1731 0 : state->eof_reached = true;
1732 :
1733 : /*
1734 : * Complain if caller tries to retrieve more tuples than
1735 : * originally asked for in a bounded sort. This is because
1736 : * returning EOF here might be the wrong thing.
1737 : */
1738 0 : if (state->bounded && state->current >= state->bound)
1739 0 : elog(ERROR, "retrieved too many tuples in a bounded sort");
1740 :
1741 0 : return false;
1742 :
1743 24 : case TSS_SORTEDONTAPE:
1744 : case TSS_FINALMERGE:
1745 :
1746 : /*
1747 : * We could probably optimize these cases better, but for now it's
1748 : * not worth the trouble.
1749 : */
1750 24 : oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1751 240132 : while (ntuples-- > 0)
1752 : {
1753 : SortTuple stup;
1754 :
1755 240108 : if (!tuplesort_gettuple_common(state, forward, &stup))
1756 : {
1757 0 : MemoryContextSwitchTo(oldcontext);
1758 0 : return false;
1759 : }
1760 240108 : CHECK_FOR_INTERRUPTS();
1761 : }
1762 24 : MemoryContextSwitchTo(oldcontext);
1763 24 : return true;
1764 :
1765 0 : default:
1766 0 : elog(ERROR, "invalid tuplesort state");
1767 : return false; /* keep compiler quiet */
1768 : }
1769 : }
1770 :
1771 : /*
1772 : * tuplesort_merge_order - report merge order we'll use for given memory
1773 : * (note: "merge order" just means the number of input tapes in the merge).
1774 : *
1775 : * This is exported for use by the planner. allowedMem is in bytes.
1776 : */
1777 : int
1778 17040 : tuplesort_merge_order(int64 allowedMem)
1779 : {
1780 : int mOrder;
1781 :
1782 : /*----------
1783 : * In the merge phase, we need buffer space for each input and output tape.
1784 : * Each pass in the balanced merge algorithm reads from M input tapes, and
1785 : * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
1786 : * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
1787 : * input tape.
1788 : *
1789 : * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
1790 : * N * TAPE_BUFFER_OVERHEAD
1791 : *
1792 : * Except for the last and next-to-last merge passes, where there can be
1793 : * fewer tapes left to process, M = N. We choose M so that we have the
1794 : * desired amount of memory available for the input buffers
1795 : * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
1796 : * available for the tape buffers (allowedMem).
1797 : *
1798 : * Note: you might be thinking we need to account for the memtuples[]
1799 : * array in this calculation, but we effectively treat that as part of the
1800 : * MERGE_BUFFER_SIZE workspace.
1801 : *----------
1802 : */
1803 17040 : mOrder = allowedMem /
1804 : (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
1805 :
1806 : /*
1807 : * Even in minimum memory, use at least a MINORDER merge. On the other
1808 : * hand, even when we have lots of memory, do not use more than a MAXORDER
1809 : * merge. Tapes are pretty cheap, but they're not entirely free. Each
1810 : * additional tape reduces the amount of memory available to build runs,
1811 : * which in turn can cause the same sort to need more runs, which makes
1812 : * merging slower even if it can still be done in a single pass. Also,
1813 : * high order merges are quite slow due to CPU cache effects; it can be
1814 : * faster to pay the I/O cost of a multi-pass merge than to perform a
1815 : * single merge pass across many hundreds of tapes.
1816 : */
1817 17040 : mOrder = Max(mOrder, MINORDER);
1818 17040 : mOrder = Min(mOrder, MAXORDER);
1819 :
1820 17040 : return mOrder;
1821 : }
1822 :
1823 : /*
1824 : * Helper function to calculate how much memory to allocate for the read buffer
1825 : * of each input tape in a merge pass.
1826 : *
1827 : * 'avail_mem' is the amount of memory available for the buffers of all the
1828 : * tapes, both input and output.
1829 : * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
1830 : * 'maxOutputTapes' is the max. number of output tapes we should produce.
1831 : */
1832 : static int64
1833 306 : merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
1834 : int maxOutputTapes)
1835 : {
1836 : int nOutputRuns;
1837 : int nOutputTapes;
1838 :
1839 : /*
1840 : * How many output tapes will we produce in this pass?
1841 : *
1842 : * This is nInputRuns / nInputTapes, rounded up.
1843 : */
1844 306 : nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1845 :
1846 306 : nOutputTapes = Min(nOutputRuns, maxOutputTapes);
1847 :
1848 : /*
1849 : * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
1850 : * remaining memory is divided evenly between the input tapes.
1851 : *
1852 : * This also follows from the formula in tuplesort_merge_order, but here
1853 : * we derive the input buffer size from the amount of memory available,
1854 : * and M and N.
1855 : */
1856 306 : return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
1857 : }
1858 :
1859 : /*
1860 : * inittapes - initialize for tape sorting.
1861 : *
1862 : * This is called only if we have found we won't sort in memory.
1863 : */
1864 : static void
1865 564 : inittapes(Tuplesortstate *state, bool mergeruns)
1866 : {
1867 : Assert(!LEADER(state));
1868 :
1869 564 : if (mergeruns)
1870 : {
1871 : /* Compute number of input tapes to use when merging */
1872 120 : state->maxTapes = tuplesort_merge_order(state->allowedMem);
1873 : }
1874 : else
1875 : {
1876 : /* Workers can sometimes produce single run, output without merge */
1877 : Assert(WORKER(state));
1878 444 : state->maxTapes = MINORDER;
1879 : }
1880 :
1881 564 : if (trace_sort)
1882 0 : elog(LOG, "worker %d switching to external sort with %d tapes: %s",
1883 : state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
1884 :
1885 : /* Create the tape set */
1886 564 : inittapestate(state, state->maxTapes);
1887 564 : state->tapeset =
1888 564 : LogicalTapeSetCreate(false,
1889 564 : state->shared ? &state->shared->fileset : NULL,
1890 : state->worker);
1891 :
1892 564 : state->currentRun = 0;
1893 :
1894 : /*
1895 : * Initialize logical tape arrays.
1896 : */
1897 564 : state->inputTapes = NULL;
1898 564 : state->nInputTapes = 0;
1899 564 : state->nInputRuns = 0;
1900 :
1901 564 : state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
1902 564 : state->nOutputTapes = 0;
1903 564 : state->nOutputRuns = 0;
1904 :
1905 564 : state->status = TSS_BUILDRUNS;
1906 :
1907 564 : selectnewtape(state);
1908 564 : }
1909 :
1910 : /*
1911 : * inittapestate - initialize generic tape management state
1912 : */
1913 : static void
1914 720 : inittapestate(Tuplesortstate *state, int maxTapes)
1915 : {
1916 : int64 tapeSpace;
1917 :
1918 : /*
1919 : * Decrease availMem to reflect the space needed for tape buffers; but
1920 : * don't decrease it to the point that we have no room for tuples. (That
1921 : * case is only likely to occur if sorting pass-by-value Datums; in all
1922 : * other scenarios the memtuples[] array is unlikely to occupy more than
1923 : * half of allowedMem. In the pass-by-value case it's not important to
1924 : * account for tuple space, so we don't care if LACKMEM becomes
1925 : * inaccurate.)
1926 : */
1927 720 : tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
1928 :
1929 720 : if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1930 618 : USEMEM(state, tapeSpace);
1931 :
1932 : /*
1933 : * Make sure that the temp file(s) underlying the tape set are created in
1934 : * suitable temp tablespaces. For parallel sorts, this should have been
1935 : * called already, but it doesn't matter if it is called a second time.
1936 : */
1937 720 : PrepareTempTablespaces();
1938 720 : }
1939 :
1940 : /*
1941 : * selectnewtape -- select next tape to output to.
1942 : *
1943 : * This is called after finishing a run when we know another run
1944 : * must be started. This is used both when building the initial
1945 : * runs, and during merge passes.
1946 : */
1947 : static void
1948 1638 : selectnewtape(Tuplesortstate *state)
1949 : {
1950 : /*
1951 : * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
1952 : * both zero. On each call, we create a new output tape to hold the next
1953 : * run, until maxTapes is reached. After that, we assign new runs to the
1954 : * existing tapes in a round robin fashion.
1955 : */
1956 1638 : if (state->nOutputTapes < state->maxTapes)
1957 : {
1958 : /* Create a new tape to hold the next run */
1959 : Assert(state->outputTapes[state->nOutputRuns] == NULL);
1960 : Assert(state->nOutputRuns == state->nOutputTapes);
1961 1056 : state->destTape = LogicalTapeCreate(state->tapeset);
1962 1056 : state->outputTapes[state->nOutputTapes] = state->destTape;
1963 1056 : state->nOutputTapes++;
1964 1056 : state->nOutputRuns++;
1965 : }
1966 : else
1967 : {
1968 : /*
1969 : * We have reached the max number of tapes. Append to an existing
1970 : * tape.
1971 : */
1972 582 : state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
1973 582 : state->nOutputRuns++;
1974 : }
1975 1638 : }
1976 :
1977 : /*
1978 : * Initialize the slab allocation arena, for the given number of slots.
1979 : */
1980 : static void
1981 276 : init_slab_allocator(Tuplesortstate *state, int numSlots)
1982 : {
1983 276 : if (numSlots > 0)
1984 : {
1985 : char *p;
1986 : int i;
1987 :
1988 264 : state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
1989 264 : state->slabMemoryEnd = state->slabMemoryBegin +
1990 264 : numSlots * SLAB_SLOT_SIZE;
1991 264 : state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
1992 264 : USEMEM(state, numSlots * SLAB_SLOT_SIZE);
1993 :
1994 264 : p = state->slabMemoryBegin;
1995 1006 : for (i = 0; i < numSlots - 1; i++)
1996 : {
1997 742 : ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
1998 742 : p += SLAB_SLOT_SIZE;
1999 : }
2000 264 : ((SlabSlot *) p)->nextfree = NULL;
2001 : }
2002 : else
2003 : {
2004 12 : state->slabMemoryBegin = state->slabMemoryEnd = NULL;
2005 12 : state->slabFreeHead = NULL;
2006 : }
2007 276 : state->slabAllocatorUsed = true;
2008 276 : }
2009 :
2010 : /*
2011 : * mergeruns -- merge all the completed initial runs.
2012 : *
2013 : * This implements the Balanced k-Way Merge Algorithm. All input data has
2014 : * already been written to initial runs on tape (see dumptuples).
2015 : */
2016 : static void
2017 276 : mergeruns(Tuplesortstate *state)
2018 : {
2019 : int tapenum;
2020 :
2021 : Assert(state->status == TSS_BUILDRUNS);
2022 : Assert(state->memtupcount == 0);
2023 :
2024 276 : if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
2025 : {
2026 : /*
2027 : * If there are multiple runs to be merged, when we go to read back
2028 : * tuples from disk, abbreviated keys will not have been stored, and
2029 : * we don't care to regenerate them. Disable abbreviation from this
2030 : * point on.
2031 : */
2032 30 : state->base.sortKeys->abbrev_converter = NULL;
2033 30 : state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
2034 :
2035 : /* Not strictly necessary, but be tidy */
2036 30 : state->base.sortKeys->abbrev_abort = NULL;
2037 30 : state->base.sortKeys->abbrev_full_comparator = NULL;
2038 : }
2039 :
2040 : /*
2041 : * Reset tuple memory. We've freed all the tuples that we previously
2042 : * allocated. We will use the slab allocator from now on.
2043 : */
2044 276 : MemoryContextResetOnly(state->base.tuplecontext);
2045 :
2046 : /*
2047 : * We no longer need a large memtuples array. (We will allocate a smaller
2048 : * one for the heap later.)
2049 : */
2050 276 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2051 276 : pfree(state->memtuples);
2052 276 : state->memtuples = NULL;
2053 :
2054 : /*
2055 : * Initialize the slab allocator. We need one slab slot per input tape,
2056 : * for the tuples in the heap, plus one to hold the tuple last returned
2057 : * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
2058 : * however, we don't need to do allocate anything.)
2059 : *
2060 : * In a multi-pass merge, we could shrink this allocation for the last
2061 : * merge pass, if it has fewer tapes than previous passes, but we don't
2062 : * bother.
2063 : *
2064 : * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2065 : * to track memory usage of individual tuples.
2066 : */
2067 276 : if (state->base.tuples)
2068 264 : init_slab_allocator(state, state->nOutputTapes + 1);
2069 : else
2070 12 : init_slab_allocator(state, 0);
2071 :
2072 : /*
2073 : * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
2074 : * from each input tape.
2075 : *
2076 : * We could shrink this, too, between passes in a multi-pass merge, but we
2077 : * don't bother. (The initial input tapes are still in outputTapes. The
2078 : * number of input tapes will not increase between passes.)
2079 : */
2080 276 : state->memtupsize = state->nOutputTapes;
2081 552 : state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
2082 276 : state->nOutputTapes * sizeof(SortTuple));
2083 276 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2084 :
2085 : /*
2086 : * Use all the remaining memory we have available for tape buffers among
2087 : * all the input tapes. At the beginning of each merge pass, we will
2088 : * divide this memory between the input and output tapes in the pass.
2089 : */
2090 276 : state->tape_buffer_mem = state->availMem;
2091 276 : USEMEM(state, state->tape_buffer_mem);
2092 276 : if (trace_sort)
2093 0 : elog(LOG, "worker %d using %zu KB of memory for tape buffers",
2094 : state->worker, state->tape_buffer_mem / 1024);
2095 :
2096 : for (;;)
2097 : {
2098 : /*
2099 : * On the first iteration, or if we have read all the runs from the
2100 : * input tapes in a multi-pass merge, it's time to start a new pass.
2101 : * Rewind all the output tapes, and make them inputs for the next
2102 : * pass.
2103 : */
2104 414 : if (state->nInputRuns == 0)
2105 : {
2106 : int64 input_buffer_size;
2107 :
2108 : /* Close the old, emptied, input tapes */
2109 306 : if (state->nInputTapes > 0)
2110 : {
2111 210 : for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2112 180 : LogicalTapeClose(state->inputTapes[tapenum]);
2113 30 : pfree(state->inputTapes);
2114 : }
2115 :
2116 : /* Previous pass's outputs become next pass's inputs. */
2117 306 : state->inputTapes = state->outputTapes;
2118 306 : state->nInputTapes = state->nOutputTapes;
2119 306 : state->nInputRuns = state->nOutputRuns;
2120 :
2121 : /*
2122 : * Reset output tape variables. The actual LogicalTapes will be
2123 : * created as needed, here we only allocate the array to hold
2124 : * them.
2125 : */
2126 306 : state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
2127 306 : state->nOutputTapes = 0;
2128 306 : state->nOutputRuns = 0;
2129 :
2130 : /*
2131 : * Redistribute the memory allocated for tape buffers, among the
2132 : * new input and output tapes.
2133 : */
2134 306 : input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
2135 : state->nInputTapes,
2136 : state->nInputRuns,
2137 : state->maxTapes);
2138 :
2139 306 : if (trace_sort)
2140 0 : elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2141 : state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
2142 : pg_rusage_show(&state->ru_start));
2143 :
2144 : /* Prepare the new input tapes for merge pass. */
2145 1216 : for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2146 910 : LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
2147 :
2148 : /*
2149 : * If there's just one run left on each input tape, then only one
2150 : * merge pass remains. If we don't have to produce a materialized
2151 : * sorted tape, we can stop at this point and do the final merge
2152 : * on-the-fly.
2153 : */
2154 306 : if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
2155 288 : && state->nInputRuns <= state->nInputTapes
2156 258 : && !WORKER(state))
2157 : {
2158 : /* Tell logtape.c we won't be writing anymore */
2159 258 : LogicalTapeSetForgetFreeSpace(state->tapeset);
2160 : /* Initialize for the final merge pass */
2161 258 : beginmerge(state);
2162 258 : state->status = TSS_FINALMERGE;
2163 258 : return;
2164 : }
2165 : }
2166 :
2167 : /* Select an output tape */
2168 156 : selectnewtape(state);
2169 :
2170 : /* Merge one run from each input tape. */
2171 156 : mergeonerun(state);
2172 :
2173 : /*
2174 : * If the input tapes are empty, and we output only one output run,
2175 : * we're done. The current output tape contains the final result.
2176 : */
2177 156 : if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
2178 18 : break;
2179 : }
2180 :
2181 : /*
2182 : * Done. The result is on a single run on a single tape.
2183 : */
2184 18 : state->result_tape = state->outputTapes[0];
2185 18 : if (!WORKER(state))
2186 18 : LogicalTapeFreeze(state->result_tape, NULL);
2187 : else
2188 0 : worker_freeze_result_tape(state);
2189 18 : state->status = TSS_SORTEDONTAPE;
2190 :
2191 : /* Close all the now-empty input tapes, to release their read buffers. */
2192 102 : for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2193 84 : LogicalTapeClose(state->inputTapes[tapenum]);
2194 : }
2195 :
2196 : /*
2197 : * Merge one run from each input tape.
2198 : */
2199 : static void
2200 156 : mergeonerun(Tuplesortstate *state)
2201 : {
2202 : int srcTapeIndex;
2203 : LogicalTape *srcTape;
2204 :
2205 : /*
2206 : * Start the merge by loading one tuple from each active source tape into
2207 : * the heap.
2208 : */
2209 156 : beginmerge(state);
2210 :
2211 : Assert(state->slabAllocatorUsed);
2212 :
2213 : /*
2214 : * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2215 : * out, and replacing it with next tuple from same tape (if there is
2216 : * another one).
2217 : */
2218 855588 : while (state->memtupcount > 0)
2219 : {
2220 : SortTuple stup;
2221 :
2222 : /* write the tuple to destTape */
2223 855432 : srcTapeIndex = state->memtuples[0].srctape;
2224 855432 : srcTape = state->inputTapes[srcTapeIndex];
2225 855432 : WRITETUP(state, state->destTape, &state->memtuples[0]);
2226 :
2227 : /* recycle the slot of the tuple we just wrote out, for the next read */
2228 855432 : if (state->memtuples[0].tuple)
2229 735348 : RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2230 :
2231 : /*
2232 : * pull next tuple from the tape, and replace the written-out tuple in
2233 : * the heap with it.
2234 : */
2235 855432 : if (mergereadnext(state, srcTape, &stup))
2236 : {
2237 854586 : stup.srctape = srcTapeIndex;
2238 854586 : tuplesort_heap_replace_top(state, &stup);
2239 : }
2240 : else
2241 : {
2242 846 : tuplesort_heap_delete_top(state);
2243 846 : state->nInputRuns--;
2244 : }
2245 : }
2246 :
2247 : /*
2248 : * When the heap empties, we're done. Write an end-of-run marker on the
2249 : * output tape.
2250 : */
2251 156 : markrunend(state->destTape);
2252 156 : }
2253 :
2254 : /*
2255 : * beginmerge - initialize for a merge pass
2256 : *
2257 : * Fill the merge heap with the first tuple from each input tape.
2258 : */
2259 : static void
2260 414 : beginmerge(Tuplesortstate *state)
2261 : {
2262 : int activeTapes;
2263 : int srcTapeIndex;
2264 :
2265 : /* Heap should be empty here */
2266 : Assert(state->memtupcount == 0);
2267 :
2268 414 : activeTapes = Min(state->nInputTapes, state->nInputRuns);
2269 :
2270 1906 : for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
2271 : {
2272 : SortTuple tup;
2273 :
2274 1492 : if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
2275 : {
2276 1234 : tup.srctape = srcTapeIndex;
2277 1234 : tuplesort_heap_insert(state, &tup);
2278 : }
2279 : }
2280 414 : }
2281 :
2282 : /*
2283 : * mergereadnext - read next tuple from one merge input tape
2284 : *
2285 : * Returns false on EOF.
2286 : */
2287 : static bool
2288 5424264 : mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
2289 : {
2290 : unsigned int tuplen;
2291 :
2292 : /* read next tuple, if any */
2293 5424264 : if ((tuplen = getlen(srcTape, true)) == 0)
2294 1444 : return false;
2295 5422820 : READTUP(state, stup, srcTape, tuplen);
2296 :
2297 5422820 : return true;
2298 : }
2299 :
2300 : /*
2301 : * dumptuples - remove tuples from memtuples and write initial run to tape
2302 : *
2303 : * When alltuples = true, dump everything currently in memory. (This case is
2304 : * only used at end of input data.)
2305 : */
2306 : static void
2307 1065264 : dumptuples(Tuplesortstate *state, bool alltuples)
2308 : {
2309 : int memtupwrite;
2310 : int i;
2311 :
2312 : /*
2313 : * Nothing to do if we still fit in available memory and have array slots,
2314 : * unless this is the final call during initial run generation.
2315 : */
2316 1065264 : if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2317 1064346 : !alltuples)
2318 1063782 : return;
2319 :
2320 : /*
2321 : * Final call might require no sorting, in rare cases where we just so
2322 : * happen to have previously LACKMEM()'d at the point where exactly all
2323 : * remaining tuples are loaded into memory, just before input was
2324 : * exhausted. In general, short final runs are quite possible, but avoid
2325 : * creating a completely empty run. In a worker, though, we must produce
2326 : * at least one tape, even if it's empty.
2327 : */
2328 1482 : if (state->memtupcount == 0 && state->currentRun > 0)
2329 0 : return;
2330 :
2331 : Assert(state->status == TSS_BUILDRUNS);
2332 :
2333 : /*
2334 : * It seems unlikely that this limit will ever be exceeded, but take no
2335 : * chances
2336 : */
2337 1482 : if (state->currentRun == INT_MAX)
2338 0 : ereport(ERROR,
2339 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2340 : errmsg("cannot have more than %d runs for an external sort",
2341 : INT_MAX)));
2342 :
2343 1482 : if (state->currentRun > 0)
2344 918 : selectnewtape(state);
2345 :
2346 1482 : state->currentRun++;
2347 :
2348 1482 : if (trace_sort)
2349 0 : elog(LOG, "worker %d starting quicksort of run %d: %s",
2350 : state->worker, state->currentRun,
2351 : pg_rusage_show(&state->ru_start));
2352 :
2353 : /*
2354 : * Sort all tuples accumulated within the allowed amount of memory for
2355 : * this run using quicksort
2356 : */
2357 1482 : tuplesort_sort_memtuples(state);
2358 :
2359 1482 : if (trace_sort)
2360 0 : elog(LOG, "worker %d finished quicksort of run %d: %s",
2361 : state->worker, state->currentRun,
2362 : pg_rusage_show(&state->ru_start));
2363 :
2364 1482 : memtupwrite = state->memtupcount;
2365 5064074 : for (i = 0; i < memtupwrite; i++)
2366 : {
2367 5062592 : SortTuple *stup = &state->memtuples[i];
2368 :
2369 5062592 : WRITETUP(state, state->destTape, stup);
2370 : }
2371 :
2372 1482 : state->memtupcount = 0;
2373 :
2374 : /*
2375 : * Reset tuple memory. We've freed all of the tuples that we previously
2376 : * allocated. It's important to avoid fragmentation when there is a stark
2377 : * change in the sizes of incoming tuples. In bounded sorts,
2378 : * fragmentation due to AllocSetFree's bucketing by size class might be
2379 : * particularly bad if this step wasn't taken.
2380 : */
2381 1482 : MemoryContextReset(state->base.tuplecontext);
2382 :
2383 : /*
2384 : * Now update the memory accounting to subtract the memory used by the
2385 : * tuple.
2386 : */
2387 1482 : FREEMEM(state, state->tupleMem);
2388 1482 : state->tupleMem = 0;
2389 :
2390 1482 : markrunend(state->destTape);
2391 :
2392 1482 : if (trace_sort)
2393 0 : elog(LOG, "worker %d finished writing run %d to tape %d: %s",
2394 : state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
2395 : pg_rusage_show(&state->ru_start));
2396 : }
2397 :
2398 : /*
2399 : * tuplesort_rescan - rewind and replay the scan
2400 : */
2401 : void
2402 46 : tuplesort_rescan(Tuplesortstate *state)
2403 : {
2404 46 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2405 :
2406 : Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2407 :
2408 46 : switch (state->status)
2409 : {
2410 40 : case TSS_SORTEDINMEM:
2411 40 : state->current = 0;
2412 40 : state->eof_reached = false;
2413 40 : state->markpos_offset = 0;
2414 40 : state->markpos_eof = false;
2415 40 : break;
2416 6 : case TSS_SORTEDONTAPE:
2417 6 : LogicalTapeRewindForRead(state->result_tape, 0);
2418 6 : state->eof_reached = false;
2419 6 : state->markpos_block = 0L;
2420 6 : state->markpos_offset = 0;
2421 6 : state->markpos_eof = false;
2422 6 : break;
2423 0 : default:
2424 0 : elog(ERROR, "invalid tuplesort state");
2425 : break;
2426 : }
2427 :
2428 46 : MemoryContextSwitchTo(oldcontext);
2429 46 : }
2430 :
2431 : /*
2432 : * tuplesort_markpos - saves current position in the merged sort file
2433 : */
2434 : void
2435 575444 : tuplesort_markpos(Tuplesortstate *state)
2436 : {
2437 575444 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2438 :
2439 : Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2440 :
2441 575444 : switch (state->status)
2442 : {
2443 566636 : case TSS_SORTEDINMEM:
2444 566636 : state->markpos_offset = state->current;
2445 566636 : state->markpos_eof = state->eof_reached;
2446 566636 : break;
2447 8808 : case TSS_SORTEDONTAPE:
2448 8808 : LogicalTapeTell(state->result_tape,
2449 : &state->markpos_block,
2450 : &state->markpos_offset);
2451 8808 : state->markpos_eof = state->eof_reached;
2452 8808 : break;
2453 0 : default:
2454 0 : elog(ERROR, "invalid tuplesort state");
2455 : break;
2456 : }
2457 :
2458 575444 : MemoryContextSwitchTo(oldcontext);
2459 575444 : }
2460 :
2461 : /*
2462 : * tuplesort_restorepos - restores current position in merged sort file to
2463 : * last saved position
2464 : */
2465 : void
2466 30328 : tuplesort_restorepos(Tuplesortstate *state)
2467 : {
2468 30328 : MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2469 :
2470 : Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2471 :
2472 30328 : switch (state->status)
2473 : {
2474 24136 : case TSS_SORTEDINMEM:
2475 24136 : state->current = state->markpos_offset;
2476 24136 : state->eof_reached = state->markpos_eof;
2477 24136 : break;
2478 6192 : case TSS_SORTEDONTAPE:
2479 6192 : LogicalTapeSeek(state->result_tape,
2480 : state->markpos_block,
2481 : state->markpos_offset);
2482 6192 : state->eof_reached = state->markpos_eof;
2483 6192 : break;
2484 0 : default:
2485 0 : elog(ERROR, "invalid tuplesort state");
2486 : break;
2487 : }
2488 :
2489 30328 : MemoryContextSwitchTo(oldcontext);
2490 30328 : }
2491 :
2492 : /*
2493 : * tuplesort_get_stats - extract summary statistics
2494 : *
2495 : * This can be called after tuplesort_performsort() finishes to obtain
2496 : * printable summary information about how the sort was performed.
2497 : */
2498 : void
2499 390 : tuplesort_get_stats(Tuplesortstate *state,
2500 : TuplesortInstrumentation *stats)
2501 : {
2502 : /*
2503 : * Note: it might seem we should provide both memory and disk usage for a
2504 : * disk-based sort. However, the current code doesn't track memory space
2505 : * accurately once we have begun to return tuples to the caller (since we
2506 : * don't account for pfree's the caller is expected to do), so we cannot
2507 : * rely on availMem in a disk sort. This does not seem worth the overhead
2508 : * to fix. Is it worth creating an API for the memory context code to
2509 : * tell us how much is actually used in sortcontext?
2510 : */
2511 390 : tuplesort_updatemax(state);
2512 :
2513 390 : if (state->isMaxSpaceDisk)
2514 6 : stats->spaceType = SORT_SPACE_TYPE_DISK;
2515 : else
2516 384 : stats->spaceType = SORT_SPACE_TYPE_MEMORY;
2517 390 : stats->spaceUsed = (state->maxSpace + 1023) / 1024;
2518 :
2519 390 : switch (state->maxSpaceStatus)
2520 : {
2521 384 : case TSS_SORTEDINMEM:
2522 384 : if (state->boundUsed)
2523 42 : stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
2524 : else
2525 342 : stats->sortMethod = SORT_TYPE_QUICKSORT;
2526 384 : break;
2527 0 : case TSS_SORTEDONTAPE:
2528 0 : stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
2529 0 : break;
2530 6 : case TSS_FINALMERGE:
2531 6 : stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
2532 6 : break;
2533 0 : default:
2534 0 : stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
2535 0 : break;
2536 : }
2537 390 : }
2538 :
2539 : /*
2540 : * Convert TuplesortMethod to a string.
2541 : */
2542 : const char *
2543 288 : tuplesort_method_name(TuplesortMethod m)
2544 : {
2545 288 : switch (m)
2546 : {
2547 0 : case SORT_TYPE_STILL_IN_PROGRESS:
2548 0 : return "still in progress";
2549 42 : case SORT_TYPE_TOP_N_HEAPSORT:
2550 42 : return "top-N heapsort";
2551 240 : case SORT_TYPE_QUICKSORT:
2552 240 : return "quicksort";
2553 0 : case SORT_TYPE_EXTERNAL_SORT:
2554 0 : return "external sort";
2555 6 : case SORT_TYPE_EXTERNAL_MERGE:
2556 6 : return "external merge";
2557 : }
2558 :
2559 0 : return "unknown";
2560 : }
2561 :
2562 : /*
2563 : * Convert TuplesortSpaceType to a string.
2564 : */
2565 : const char *
2566 252 : tuplesort_space_type_name(TuplesortSpaceType t)
2567 : {
2568 : Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
2569 252 : return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
2570 : }
2571 :
2572 :
2573 : /*
2574 : * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2575 : */
2576 :
2577 : /*
2578 : * Convert the existing unordered array of SortTuples to a bounded heap,
2579 : * discarding all but the smallest "state->bound" tuples.
2580 : *
2581 : * When working with a bounded heap, we want to keep the largest entry
2582 : * at the root (array entry zero), instead of the smallest as in the normal
2583 : * sort case. This allows us to discard the largest entry cheaply.
2584 : * Therefore, we temporarily reverse the sort direction.
2585 : */
2586 : static void
2587 400 : make_bounded_heap(Tuplesortstate *state)
2588 : {
2589 400 : int tupcount = state->memtupcount;
2590 : int i;
2591 :
2592 : Assert(state->status == TSS_INITIAL);
2593 : Assert(state->bounded);
2594 : Assert(tupcount >= state->bound);
2595 : Assert(SERIAL(state));
2596 :
2597 : /* Reverse sort direction so largest entry will be at root */
2598 400 : reversedirection(state);
2599 :
2600 400 : state->memtupcount = 0; /* make the heap empty */
2601 37912 : for (i = 0; i < tupcount; i++)
2602 : {
2603 37512 : if (state->memtupcount < state->bound)
2604 : {
2605 : /* Insert next tuple into heap */
2606 : /* Must copy source tuple to avoid possible overwrite */
2607 18556 : SortTuple stup = state->memtuples[i];
2608 :
2609 18556 : tuplesort_heap_insert(state, &stup);
2610 : }
2611 : else
2612 : {
2613 : /*
2614 : * The heap is full. Replace the largest entry with the new
2615 : * tuple, or just discard it, if it's larger than anything already
2616 : * in the heap.
2617 : */
2618 18956 : if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2619 : {
2620 9772 : free_sort_tuple(state, &state->memtuples[i]);
2621 9772 : CHECK_FOR_INTERRUPTS();
2622 : }
2623 : else
2624 9184 : tuplesort_heap_replace_top(state, &state->memtuples[i]);
2625 : }
2626 : }
2627 :
2628 : Assert(state->memtupcount == state->bound);
2629 400 : state->status = TSS_BOUNDED;
2630 400 : }
2631 :
2632 : /*
2633 : * Convert the bounded heap to a properly-sorted array
2634 : */
2635 : static void
2636 400 : sort_bounded_heap(Tuplesortstate *state)
2637 : {
2638 400 : int tupcount = state->memtupcount;
2639 :
2640 : Assert(state->status == TSS_BOUNDED);
2641 : Assert(state->bounded);
2642 : Assert(tupcount == state->bound);
2643 : Assert(SERIAL(state));
2644 :
2645 : /*
2646 : * We can unheapify in place because each delete-top call will remove the
2647 : * largest entry, which we can promptly store in the newly freed slot at
2648 : * the end. Once we're down to a single-entry heap, we're done.
2649 : */
2650 18556 : while (state->memtupcount > 1)
2651 : {
2652 18156 : SortTuple stup = state->memtuples[0];
2653 :
2654 : /* this sifts-up the next-largest entry and decreases memtupcount */
2655 18156 : tuplesort_heap_delete_top(state);
2656 18156 : state->memtuples[state->memtupcount] = stup;
2657 : }
2658 400 : state->memtupcount = tupcount;
2659 :
2660 : /*
2661 : * Reverse sort direction back to the original state. This is not
2662 : * actually necessary but seems like a good idea for tidiness.
2663 : */
2664 400 : reversedirection(state);
2665 :
2666 400 : state->status = TSS_SORTEDINMEM;
2667 400 : state->boundUsed = true;
2668 400 : }
2669 :
2670 : /*
2671 : * Sort all memtuples using specialized qsort() routines.
2672 : *
2673 : * Quicksort is used for small in-memory sorts, and external sort runs.
2674 : */
2675 : static void
2676 214458 : tuplesort_sort_memtuples(Tuplesortstate *state)
2677 : {
2678 : Assert(!LEADER(state));
2679 :
2680 214458 : if (state->memtupcount > 1)
2681 : {
2682 : /*
2683 : * Do we have the leading column's value or abbreviation in datum1,
2684 : * and is there a specialization for its comparator?
2685 : */
2686 62924 : if (state->base.haveDatum1 && state->base.sortKeys)
2687 : {
2688 62890 : if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
2689 : {
2690 3076 : qsort_tuple_unsigned(state->memtuples,
2691 3076 : state->memtupcount,
2692 : state);
2693 3060 : return;
2694 : }
2695 : #if SIZEOF_DATUM >= 8
2696 59814 : else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
2697 : {
2698 1036 : qsort_tuple_signed(state->memtuples,
2699 1036 : state->memtupcount,
2700 : state);
2701 1036 : return;
2702 : }
2703 : #endif
2704 58778 : else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
2705 : {
2706 38270 : qsort_tuple_int32(state->memtuples,
2707 38270 : state->memtupcount,
2708 : state);
2709 38210 : return;
2710 : }
2711 : }
2712 :
2713 : /* Can we use the single-key sort function? */
2714 20542 : if (state->base.onlyKey != NULL)
2715 : {
2716 8452 : qsort_ssup(state->memtuples, state->memtupcount,
2717 8452 : state->base.onlyKey);
2718 : }
2719 : else
2720 : {
2721 12090 : qsort_tuple(state->memtuples,
2722 12090 : state->memtupcount,
2723 : state->base.comparetup,
2724 : state);
2725 : }
2726 : }
2727 : }
2728 :
2729 : /*
2730 : * Insert a new tuple into an empty or existing heap, maintaining the
2731 : * heap invariant. Caller is responsible for ensuring there's room.
2732 : *
2733 : * Note: For some callers, tuple points to a memtuples[] entry above the
2734 : * end of the heap. This is safe as long as it's not immediately adjacent
2735 : * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
2736 : * is, it might get overwritten before being moved into the heap!
2737 : */
2738 : static void
2739 19790 : tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
2740 : {
2741 : SortTuple *memtuples;
2742 : int j;
2743 :
2744 19790 : memtuples = state->memtuples;
2745 : Assert(state->memtupcount < state->memtupsize);
2746 :
2747 19790 : CHECK_FOR_INTERRUPTS();
2748 :
2749 : /*
2750 : * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2751 : * using 1-based array indexes, not 0-based.
2752 : */
2753 19790 : j = state->memtupcount++;
2754 57620 : while (j > 0)
2755 : {
2756 50864 : int i = (j - 1) >> 1;
2757 :
2758 50864 : if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
2759 13034 : break;
2760 37830 : memtuples[j] = memtuples[i];
2761 37830 : j = i;
2762 : }
2763 19790 : memtuples[j] = *tuple;
2764 19790 : }
2765 :
2766 : /*
2767 : * Remove the tuple at state->memtuples[0] from the heap. Decrement
2768 : * memtupcount, and sift up to maintain the heap invariant.
2769 : *
2770 : * The caller has already free'd the tuple the top node points to,
2771 : * if necessary.
2772 : */
2773 : static void
2774 19342 : tuplesort_heap_delete_top(Tuplesortstate *state)
2775 : {
2776 19342 : SortTuple *memtuples = state->memtuples;
2777 : SortTuple *tuple;
2778 :
2779 19342 : if (--state->memtupcount <= 0)
2780 282 : return;
2781 :
2782 : /*
2783 : * Remove the last tuple in the heap, and re-insert it, by replacing the
2784 : * current top node with it.
2785 : */
2786 19060 : tuple = &memtuples[state->memtupcount];
2787 19060 : tuplesort_heap_replace_top(state, tuple);
2788 : }
2789 :
2790 : /*
2791 : * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
2792 : * maintain the heap invariant.
2793 : *
2794 : * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
2795 : * Heapsort, steps H3-H8).
2796 : */
2797 : static void
2798 5952806 : tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
2799 : {
2800 5952806 : SortTuple *memtuples = state->memtuples;
2801 : unsigned int i,
2802 : n;
2803 :
2804 : Assert(state->memtupcount >= 1);
2805 :
2806 5952806 : CHECK_FOR_INTERRUPTS();
2807 :
2808 : /*
2809 : * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
2810 : * This prevents overflow in the "2 * i + 1" calculation, since at the top
2811 : * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
2812 : */
2813 5952806 : n = state->memtupcount;
2814 5952806 : i = 0; /* i is where the "hole" is */
2815 : for (;;)
2816 1841674 : {
2817 7794480 : unsigned int j = 2 * i + 1;
2818 :
2819 7794480 : if (j >= n)
2820 1762300 : break;
2821 8465550 : if (j + 1 < n &&
2822 2433370 : COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
2823 960618 : j++;
2824 6032180 : if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
2825 4190506 : break;
2826 1841674 : memtuples[i] = memtuples[j];
2827 1841674 : i = j;
2828 : }
2829 5952806 : memtuples[i] = *tuple;
2830 5952806 : }
2831 :
2832 : /*
2833 : * Function to reverse the sort direction from its current state
2834 : *
2835 : * It is not safe to call this when performing hash tuplesorts
2836 : */
2837 : static void
2838 800 : reversedirection(Tuplesortstate *state)
2839 : {
2840 800 : SortSupport sortKey = state->base.sortKeys;
2841 : int nkey;
2842 :
2843 1960 : for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
2844 : {
2845 1160 : sortKey->ssup_reverse = !sortKey->ssup_reverse;
2846 1160 : sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
2847 : }
2848 800 : }
2849 :
2850 :
2851 : /*
2852 : * Tape interface routines
2853 : */
2854 :
2855 : static unsigned int
2856 5697270 : getlen(LogicalTape *tape, bool eofOK)
2857 : {
2858 : unsigned int len;
2859 :
2860 5697270 : if (LogicalTapeRead(tape,
2861 : &len, sizeof(len)) != sizeof(len))
2862 0 : elog(ERROR, "unexpected end of tape");
2863 5697270 : if (len == 0 && !eofOK)
2864 0 : elog(ERROR, "unexpected end of data");
2865 5697270 : return len;
2866 : }
2867 :
2868 : static void
2869 1638 : markrunend(LogicalTape *tape)
2870 : {
2871 1638 : unsigned int len = 0;
2872 :
2873 1638 : LogicalTapeWrite(tape, &len, sizeof(len));
2874 1638 : }
2875 :
2876 : /*
2877 : * Get memory for tuple from within READTUP() routine.
2878 : *
2879 : * We use next free slot from the slab allocator, or palloc() if the tuple
2880 : * is too large for that.
2881 : */
2882 : void *
2883 5395556 : tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
2884 : {
2885 : SlabSlot *buf;
2886 :
2887 : /*
2888 : * We pre-allocate enough slots in the slab arena that we should never run
2889 : * out.
2890 : */
2891 : Assert(state->slabFreeHead);
2892 :
2893 5395556 : if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
2894 0 : return MemoryContextAlloc(state->base.sortcontext, tuplen);
2895 : else
2896 : {
2897 5395556 : buf = state->slabFreeHead;
2898 : /* Reuse this slot */
2899 5395556 : state->slabFreeHead = buf->nextfree;
2900 :
2901 5395556 : return buf;
2902 : }
2903 : }
2904 :
2905 :
2906 : /*
2907 : * Parallel sort routines
2908 : */
2909 :
2910 : /*
2911 : * tuplesort_estimate_shared - estimate required shared memory allocation
2912 : *
2913 : * nWorkers is an estimate of the number of workers (it's the number that
2914 : * will be requested).
2915 : */
2916 : Size
2917 158 : tuplesort_estimate_shared(int nWorkers)
2918 : {
2919 : Size tapesSize;
2920 :
2921 : Assert(nWorkers > 0);
2922 :
2923 : /* Make sure that BufFile shared state is MAXALIGN'd */
2924 158 : tapesSize = mul_size(sizeof(TapeShare), nWorkers);
2925 158 : tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
2926 :
2927 158 : return tapesSize;
2928 : }
2929 :
2930 : /*
2931 : * tuplesort_initialize_shared - initialize shared tuplesort state
2932 : *
2933 : * Must be called from leader process before workers are launched, to
2934 : * establish state needed up-front for worker tuplesortstates. nWorkers
2935 : * should match the argument passed to tuplesort_estimate_shared().
2936 : */
2937 : void
2938 222 : tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
2939 : {
2940 : int i;
2941 :
2942 : Assert(nWorkers > 0);
2943 :
2944 222 : SpinLockInit(&shared->mutex);
2945 222 : shared->currentWorker = 0;
2946 222 : shared->workersFinished = 0;
2947 222 : SharedFileSetInit(&shared->fileset, seg);
2948 222 : shared->nTapes = nWorkers;
2949 674 : for (i = 0; i < nWorkers; i++)
2950 : {
2951 452 : shared->tapes[i].firstblocknumber = 0L;
2952 : }
2953 222 : }
2954 :
2955 : /*
2956 : * tuplesort_attach_shared - attach to shared tuplesort state
2957 : *
2958 : * Must be called by all worker processes.
2959 : */
2960 : void
2961 224 : tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
2962 : {
2963 : /* Attach to SharedFileSet */
2964 224 : SharedFileSetAttach(&shared->fileset, seg);
2965 224 : }
2966 :
2967 : /*
2968 : * worker_get_identifier - Assign and return ordinal identifier for worker
2969 : *
2970 : * The order in which these are assigned is not well defined, and should not
2971 : * matter; worker numbers across parallel sort participants need only be
2972 : * distinct and gapless. logtape.c requires this.
2973 : *
2974 : * Note that the identifiers assigned from here have no relation to
2975 : * ParallelWorkerNumber number, to avoid making any assumption about
2976 : * caller's requirements. However, we do follow the ParallelWorkerNumber
2977 : * convention of representing a non-worker with worker number -1. This
2978 : * includes the leader, as well as serial Tuplesort processes.
2979 : */
2980 : static int
2981 444 : worker_get_identifier(Tuplesortstate *state)
2982 : {
2983 444 : Sharedsort *shared = state->shared;
2984 : int worker;
2985 :
2986 : Assert(WORKER(state));
2987 :
2988 444 : SpinLockAcquire(&shared->mutex);
2989 444 : worker = shared->currentWorker++;
2990 444 : SpinLockRelease(&shared->mutex);
2991 :
2992 444 : return worker;
2993 : }
2994 :
2995 : /*
2996 : * worker_freeze_result_tape - freeze worker's result tape for leader
2997 : *
2998 : * This is called by workers just after the result tape has been determined,
2999 : * instead of calling LogicalTapeFreeze() directly. They do so because
3000 : * workers require a few additional steps over similar serial
3001 : * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
3002 : * steps are around freeing now unneeded resources, and representing to
3003 : * leader that worker's input run is available for its merge.
3004 : *
3005 : * There should only be one final output run for each worker, which consists
3006 : * of all tuples that were originally input into worker.
3007 : */
3008 : static void
3009 444 : worker_freeze_result_tape(Tuplesortstate *state)
3010 : {
3011 444 : Sharedsort *shared = state->shared;
3012 : TapeShare output;
3013 :
3014 : Assert(WORKER(state));
3015 : Assert(state->result_tape != NULL);
3016 : Assert(state->memtupcount == 0);
3017 :
3018 : /*
3019 : * Free most remaining memory, in case caller is sensitive to our holding
3020 : * on to it. memtuples may not be a tiny merge heap at this point.
3021 : */
3022 444 : pfree(state->memtuples);
3023 : /* Be tidy */
3024 444 : state->memtuples = NULL;
3025 444 : state->memtupsize = 0;
3026 :
3027 : /*
3028 : * Parallel worker requires result tape metadata, which is to be stored in
3029 : * shared memory for leader
3030 : */
3031 444 : LogicalTapeFreeze(state->result_tape, &output);
3032 :
3033 : /* Store properties of output tape, and update finished worker count */
3034 444 : SpinLockAcquire(&shared->mutex);
3035 444 : shared->tapes[state->worker] = output;
3036 444 : shared->workersFinished++;
3037 444 : SpinLockRelease(&shared->mutex);
3038 444 : }
3039 :
3040 : /*
3041 : * worker_nomergeruns - dump memtuples in worker, without merging
3042 : *
3043 : * This called as an alternative to mergeruns() with a worker when no
3044 : * merging is required.
3045 : */
3046 : static void
3047 444 : worker_nomergeruns(Tuplesortstate *state)
3048 : {
3049 : Assert(WORKER(state));
3050 : Assert(state->result_tape == NULL);
3051 : Assert(state->nOutputRuns == 1);
3052 :
3053 444 : state->result_tape = state->destTape;
3054 444 : worker_freeze_result_tape(state);
3055 444 : }
3056 :
3057 : /*
3058 : * leader_takeover_tapes - create tapeset for leader from worker tapes
3059 : *
3060 : * So far, leader Tuplesortstate has performed no actual sorting. By now, all
3061 : * sorting has occurred in workers, all of which must have already returned
3062 : * from tuplesort_performsort().
3063 : *
3064 : * When this returns, leader process is left in a state that is virtually
3065 : * indistinguishable from it having generated runs as a serial external sort
3066 : * might have.
3067 : */
3068 : static void
3069 156 : leader_takeover_tapes(Tuplesortstate *state)
3070 : {
3071 156 : Sharedsort *shared = state->shared;
3072 156 : int nParticipants = state->nParticipants;
3073 : int workersFinished;
3074 : int j;
3075 :
3076 : Assert(LEADER(state));
3077 : Assert(nParticipants >= 1);
3078 :
3079 156 : SpinLockAcquire(&shared->mutex);
3080 156 : workersFinished = shared->workersFinished;
3081 156 : SpinLockRelease(&shared->mutex);
3082 :
3083 156 : if (nParticipants != workersFinished)
3084 0 : elog(ERROR, "cannot take over tapes before all workers finish");
3085 :
3086 : /*
3087 : * Create the tapeset from worker tapes, including a leader-owned tape at
3088 : * the end. Parallel workers are far more expensive than logical tapes,
3089 : * so the number of tapes allocated here should never be excessive.
3090 : */
3091 156 : inittapestate(state, nParticipants);
3092 156 : state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
3093 :
3094 : /*
3095 : * Set currentRun to reflect the number of runs we will merge (it's not
3096 : * used for anything, this is just pro forma)
3097 : */
3098 156 : state->currentRun = nParticipants;
3099 :
3100 : /*
3101 : * Initialize the state to look the same as after building the initial
3102 : * runs.
3103 : *
3104 : * There will always be exactly 1 run per worker, and exactly one input
3105 : * tape per run, because workers always output exactly 1 run, even when
3106 : * there were no input tuples for workers to sort.
3107 : */
3108 156 : state->inputTapes = NULL;
3109 156 : state->nInputTapes = 0;
3110 156 : state->nInputRuns = 0;
3111 :
3112 156 : state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
3113 156 : state->nOutputTapes = nParticipants;
3114 156 : state->nOutputRuns = nParticipants;
3115 :
3116 472 : for (j = 0; j < nParticipants; j++)
3117 : {
3118 316 : state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
3119 : }
3120 :
3121 156 : state->status = TSS_BUILDRUNS;
3122 156 : }
3123 :
3124 : /*
3125 : * Convenience routine to free a tuple previously loaded into sort memory
3126 : */
3127 : static void
3128 3759792 : free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
3129 : {
3130 3759792 : if (stup->tuple)
3131 : {
3132 3599594 : FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
3133 3599594 : pfree(stup->tuple);
3134 3599594 : stup->tuple = NULL;
3135 : }
3136 3759792 : }
3137 :
3138 : int
3139 0 : ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
3140 : {
3141 0 : if (x < y)
3142 0 : return -1;
3143 0 : else if (x > y)
3144 0 : return 1;
3145 : else
3146 0 : return 0;
3147 : }
3148 :
3149 : #if SIZEOF_DATUM >= 8
3150 : int
3151 1045702 : ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
3152 : {
3153 1045702 : int64 xx = DatumGetInt64(x);
3154 1045702 : int64 yy = DatumGetInt64(y);
3155 :
3156 1045702 : if (xx < yy)
3157 371158 : return -1;
3158 674544 : else if (xx > yy)
3159 347124 : return 1;
3160 : else
3161 327420 : return 0;
3162 : }
3163 : #endif
3164 :
3165 : int
3166 160535790 : ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
3167 : {
3168 160535790 : int32 xx = DatumGetInt32(x);
3169 160535790 : int32 yy = DatumGetInt32(y);
3170 :
3171 160535790 : if (xx < yy)
3172 37452670 : return -1;
3173 123083120 : else if (xx > yy)
3174 36336162 : return 1;
3175 : else
3176 86746958 : return 0;
3177 : }
|