Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * tuplestore.c
4 : * Generalized routines for temporary tuple storage.
5 : *
6 : * This module handles temporary storage of tuples for purposes such
7 : * as Materialize nodes, hashjoin batch files, etc. It is essentially
8 : * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9 : * but can only store and regurgitate a sequence of tuples. However,
10 : * because no sort is required, it is allowed to start reading the sequence
11 : * before it has all been written. This is particularly useful for cursors,
12 : * because it allows random access within the already-scanned portion of
13 : * a query without having to process the underlying scan to completion.
14 : * Also, it is possible to support multiple independent read pointers.
15 : *
16 : * A temporary file is used to handle the data if it exceeds the
17 : * space limit specified by the caller.
18 : *
19 : * The (approximate) amount of memory allowed to the tuplestore is specified
20 : * in kilobytes by the caller. We absorb tuples and simply store them in an
21 : * in-memory array as long as we haven't exceeded maxKBytes. If we do exceed
22 : * maxKBytes, we dump all the tuples into a temp file and then read from that
23 : * when needed.
24 : *
25 : * Upon creation, a tuplestore supports a single read pointer, numbered 0.
26 : * Additional read pointers can be created using tuplestore_alloc_read_pointer.
27 : * Mark/restore behavior is supported by copying read pointers.
28 : *
29 : * When the caller requests backward-scan capability, we write the temp file
30 : * in a format that allows either forward or backward scan. Otherwise, only
31 : * forward scan is allowed. A request for backward scan must be made before
32 : * putting any tuples into the tuplestore. Rewind is normally allowed but
33 : * can be turned off via tuplestore_set_eflags; turning off rewind for all
34 : * read pointers enables truncation of the tuplestore at the oldest read point
35 : * for minimal memory usage. (The caller must explicitly call tuplestore_trim
36 : * at appropriate times for truncation to actually happen.)
37 : *
38 : * Note: in TSS_WRITEFILE state, the temp file's seek position is the
39 : * current write position, and the write-position variables in the tuplestore
40 : * aren't kept up to date. Similarly, in TSS_READFILE state the temp file's
41 : * seek position is the active read pointer's position, and that read pointer
42 : * isn't kept up to date. We update the appropriate variables using ftell()
43 : * before switching to the other state or activating a different read pointer.
44 : *
45 : *
46 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
47 : * Portions Copyright (c) 1994, Regents of the University of California
48 : *
49 : * IDENTIFICATION
50 : * src/backend/utils/sort/tuplestore.c
51 : *
52 : *-------------------------------------------------------------------------
53 : */
54 :
55 : #include "postgres.h"
56 :
57 : #include <limits.h>
58 :
59 : #include "access/htup_details.h"
60 : #include "commands/tablespace.h"
61 : #include "executor/executor.h"
62 : #include "miscadmin.h"
63 : #include "storage/buffile.h"
64 : #include "utils/memutils.h"
65 : #include "utils/resowner.h"
66 : #include "utils/tuplestore.h"
67 :
68 :
69 : /*
70 : * Possible states of a Tuplestore object. These denote the states that
71 : * persist between calls of Tuplestore routines.
72 : */
73 : typedef enum
74 : {
75 : TSS_INMEM, /* Tuples still fit in memory */
76 : TSS_WRITEFILE, /* Writing to temp file */
77 : TSS_READFILE, /* Reading from temp file */
78 : } TupStoreStatus;
79 :
80 : /*
81 : * State for a single read pointer. If we are in state INMEM then all the
82 : * read pointers' "current" fields denote the read positions. In state
83 : * WRITEFILE, the file/offset fields denote the read positions. In state
84 : * READFILE, inactive read pointers have valid file/offset, but the active
85 : * read pointer implicitly has position equal to the temp file's seek position.
86 : *
87 : * Special case: if eof_reached is true, then the pointer's read position is
88 : * implicitly equal to the write position, and current/file/offset aren't
89 : * maintained. This way we need not update all the read pointers each time
90 : * we write.
91 : */
92 : typedef struct
93 : {
94 : int eflags; /* capability flags */
95 : bool eof_reached; /* read has reached EOF */
96 : int current; /* next array index to read */
97 : int file; /* temp file# */
98 : pgoff_t offset; /* byte offset in file */
99 : } TSReadPointer;
100 :
101 : /*
102 : * Private state of a Tuplestore operation.
103 : */
104 : struct Tuplestorestate
105 : {
106 : TupStoreStatus status; /* enumerated value as shown above */
107 : int eflags; /* capability flags (OR of pointers' flags) */
108 : bool backward; /* store extra length words in file? */
109 : bool interXact; /* keep open through transactions? */
110 : bool truncated; /* tuplestore_trim has removed tuples? */
111 : bool usedDisk; /* used by tuplestore_get_stats() */
112 : int64 maxSpace; /* used by tuplestore_get_stats() */
113 : int64 availMem; /* remaining memory available, in bytes */
114 : int64 allowedMem; /* total memory allowed, in bytes */
115 : int64 tuples; /* number of tuples added */
116 : BufFile *myfile; /* underlying file, or NULL if none */
117 : MemoryContext context; /* memory context for holding tuples */
118 : ResourceOwner resowner; /* resowner for holding temp files */
119 :
120 : /*
121 : * These function pointers decouple the routines that must know what kind
122 : * of tuple we are handling from the routines that don't need to know it.
123 : * They are set up by the tuplestore_begin_xxx routines.
124 : *
125 : * (Although tuplestore.c currently only supports heap tuples, I've copied
126 : * this part of tuplesort.c so that extension to other kinds of objects
127 : * will be easy if it's ever needed.)
128 : *
129 : * Function to copy a supplied input tuple into palloc'd space. (NB: we
130 : * assume that a single pfree() is enough to release the tuple later, so
131 : * the representation must be "flat" in one palloc chunk.) state->availMem
132 : * must be decreased by the amount of space used.
133 : */
134 : void *(*copytup) (Tuplestorestate *state, void *tup);
135 :
136 : /*
137 : * Function to write a stored tuple onto tape. The representation of the
138 : * tuple on tape need not be the same as it is in memory; requirements on
139 : * the tape representation are given below. After writing the tuple,
140 : * pfree() it, and increase state->availMem by the amount of memory space
141 : * thereby released.
142 : */
143 : void (*writetup) (Tuplestorestate *state, void *tup);
144 :
145 : /*
146 : * Function to read a stored tuple from tape back into memory. 'len' is
147 : * the already-read length of the stored tuple. Create and return a
148 : * palloc'd copy, and decrease state->availMem by the amount of memory
149 : * space consumed.
150 : */
151 : void *(*readtup) (Tuplestorestate *state, unsigned int len);
152 :
153 : /*
154 : * This array holds pointers to tuples in memory if we are in state INMEM.
155 : * In states WRITEFILE and READFILE it's not used.
156 : *
157 : * When memtupdeleted > 0, the first memtupdeleted pointers are already
158 : * released due to a tuplestore_trim() operation, but we haven't expended
159 : * the effort to slide the remaining pointers down. These unused pointers
160 : * are set to NULL to catch any invalid accesses. Note that memtupcount
161 : * includes the deleted pointers.
162 : */
163 : void **memtuples; /* array of pointers to palloc'd tuples */
164 : int memtupdeleted; /* the first N slots are currently unused */
165 : int memtupcount; /* number of tuples currently present */
166 : int memtupsize; /* allocated length of memtuples array */
167 : bool growmemtuples; /* memtuples' growth still underway? */
168 :
169 : /*
170 : * These variables are used to keep track of the current positions.
171 : *
172 : * In state WRITEFILE, the current file seek position is the write point;
173 : * in state READFILE, the write position is remembered in writepos_xxx.
174 : * (The write position is the same as EOF, but since BufFileSeek doesn't
175 : * currently implement SEEK_END, we have to remember it explicitly.)
176 : */
177 : TSReadPointer *readptrs; /* array of read pointers */
178 : int activeptr; /* index of the active read pointer */
179 : int readptrcount; /* number of pointers currently valid */
180 : int readptrsize; /* allocated length of readptrs array */
181 :
182 : int writepos_file; /* file# (valid if READFILE state) */
183 : pgoff_t writepos_offset; /* offset (valid if READFILE state) */
184 : };
185 :
186 : #define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
187 : #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
188 : #define READTUP(state,len) ((*(state)->readtup) (state, len))
189 : #define LACKMEM(state) ((state)->availMem < 0)
190 : #define USEMEM(state,amt) ((state)->availMem -= (amt))
191 : #define FREEMEM(state,amt) ((state)->availMem += (amt))
192 :
193 : /*--------------------
194 : *
195 : * NOTES about on-tape representation of tuples:
196 : *
197 : * We require the first "unsigned int" of a stored tuple to be the total size
198 : * on-tape of the tuple, including itself (so it is never zero).
199 : * The remainder of the stored tuple
200 : * may or may not match the in-memory representation of the tuple ---
201 : * any conversion needed is the job of the writetup and readtup routines.
202 : *
203 : * If state->backward is true, then the stored representation of
204 : * the tuple must be followed by another "unsigned int" that is a copy of the
205 : * length --- so the total tape space used is actually sizeof(unsigned int)
206 : * more than the stored length value. This allows read-backwards. When
207 : * state->backward is not set, the write/read routines may omit the extra
208 : * length word.
209 : *
210 : * writetup is expected to write both length words as well as the tuple
211 : * data. When readtup is called, the tape is positioned just after the
212 : * front length word; readtup must read the tuple data and advance past
213 : * the back length word (if present).
214 : *
215 : * The write/read routines can make use of the tuple description data
216 : * stored in the Tuplestorestate record, if needed. They are also expected
217 : * to adjust state->availMem by the amount of memory space (not tape space!)
218 : * released or consumed. There is no error return from either writetup
219 : * or readtup; they should ereport() on failure.
220 : *
221 : *
222 : * NOTES about memory consumption calculations:
223 : *
224 : * We count space allocated for tuples against the maxKBytes limit,
225 : * plus the space used by the variable-size array memtuples.
226 : * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
227 : * We don't worry about the size of the read pointer array, either.
228 : *
229 : * Note that we count actual space used (as shown by GetMemoryChunkSpace)
230 : * rather than the originally-requested size. This is important since
231 : * palloc can add substantial overhead. It's not a complete answer since
232 : * we won't count any wasted space in palloc allocation blocks, but it's
233 : * a lot better than what we were doing before 7.3.
234 : *
235 : *--------------------
236 : */
237 :
238 :
239 : static Tuplestorestate *tuplestore_begin_common(int eflags,
240 : bool interXact,
241 : int maxKBytes);
242 : static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
243 : static void dumptuples(Tuplestorestate *state);
244 : static void tuplestore_updatemax(Tuplestorestate *state);
245 : static unsigned int getlen(Tuplestorestate *state, bool eofOK);
246 : static void *copytup_heap(Tuplestorestate *state, void *tup);
247 : static void writetup_heap(Tuplestorestate *state, void *tup);
248 : static void *readtup_heap(Tuplestorestate *state, unsigned int len);
249 :
250 :
251 : /*
252 : * tuplestore_begin_xxx
253 : *
254 : * Initialize for a tuple store operation.
255 : */
256 : static Tuplestorestate *
257 147057 : tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
258 : {
259 : Tuplestorestate *state;
260 :
261 147057 : state = palloc0_object(Tuplestorestate);
262 :
263 147057 : state->status = TSS_INMEM;
264 147057 : state->eflags = eflags;
265 147057 : state->interXact = interXact;
266 147057 : state->truncated = false;
267 147057 : state->usedDisk = false;
268 147057 : state->maxSpace = 0;
269 147057 : state->allowedMem = maxKBytes * (int64) 1024;
270 147057 : state->availMem = state->allowedMem;
271 147057 : state->myfile = NULL;
272 :
273 : /*
274 : * The palloc/pfree pattern for tuple memory is in a FIFO pattern. A
275 : * generation context is perfectly suited for this.
276 : */
277 147057 : state->context = GenerationContextCreate(CurrentMemoryContext,
278 : "tuplestore tuples",
279 : ALLOCSET_DEFAULT_SIZES);
280 147057 : state->resowner = CurrentResourceOwner;
281 :
282 147057 : state->memtupdeleted = 0;
283 147057 : state->memtupcount = 0;
284 147057 : state->tuples = 0;
285 :
286 : /*
287 : * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
288 : * see comments in grow_memtuples().
289 : */
290 147057 : state->memtupsize = Max(16384 / sizeof(void *),
291 : ALLOCSET_SEPARATE_THRESHOLD / sizeof(void *) + 1);
292 :
293 147057 : state->growmemtuples = true;
294 147057 : state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
295 :
296 147057 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
297 :
298 147057 : state->activeptr = 0;
299 147057 : state->readptrcount = 1;
300 147057 : state->readptrsize = 8; /* arbitrary */
301 147057 : state->readptrs = (TSReadPointer *)
302 147057 : palloc(state->readptrsize * sizeof(TSReadPointer));
303 :
304 147057 : state->readptrs[0].eflags = eflags;
305 147057 : state->readptrs[0].eof_reached = false;
306 147057 : state->readptrs[0].current = 0;
307 :
308 147057 : return state;
309 : }
310 :
311 : /*
312 : * tuplestore_begin_heap
313 : *
314 : * Create a new tuplestore; other types of tuple stores (other than
315 : * "heap" tuple stores, for heap tuples) are possible, but not presently
316 : * implemented.
317 : *
318 : * randomAccess: if true, both forward and backward accesses to the
319 : * tuple store are allowed.
320 : *
321 : * interXact: if true, the files used for on-disk storage persist beyond the
322 : * end of the current transaction. NOTE: It's the caller's responsibility to
323 : * create such a tuplestore in a memory context and resource owner that will
324 : * also survive transaction boundaries, and to ensure the tuplestore is closed
325 : * when it's no longer wanted.
326 : *
327 : * maxKBytes: how much data to store in memory (any data beyond this
328 : * amount is paged to disk). When in doubt, use work_mem.
329 : */
330 : Tuplestorestate *
331 147057 : tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
332 : {
333 : Tuplestorestate *state;
334 : int eflags;
335 :
336 : /*
337 : * This interpretation of the meaning of randomAccess is compatible with
338 : * the pre-8.3 behavior of tuplestores.
339 : */
340 147057 : eflags = randomAccess ?
341 147057 : (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
342 : (EXEC_FLAG_REWIND);
343 :
344 147057 : state = tuplestore_begin_common(eflags, interXact, maxKBytes);
345 :
346 147057 : state->copytup = copytup_heap;
347 147057 : state->writetup = writetup_heap;
348 147057 : state->readtup = readtup_heap;
349 :
350 147057 : return state;
351 : }
352 :
353 : /*
354 : * tuplestore_set_eflags
355 : *
356 : * Set the capability flags for read pointer 0 at a finer grain than is
357 : * allowed by tuplestore_begin_xxx. This must be called before inserting
358 : * any data into the tuplestore.
359 : *
360 : * eflags is a bitmask following the meanings used for executor node
361 : * startup flags (see executor.h). tuplestore pays attention to these bits:
362 : * EXEC_FLAG_REWIND need rewind to start
363 : * EXEC_FLAG_BACKWARD need backward fetch
364 : * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
365 : * is set per "randomAccess" in the tuplestore_begin_xxx call.
366 : *
367 : * NOTE: setting BACKWARD without REWIND means the pointer can read backwards,
368 : * but not further than the truncation point (the furthest-back read pointer
369 : * position at the time of the last tuplestore_trim call).
370 : */
371 : void
372 5026 : tuplestore_set_eflags(Tuplestorestate *state, int eflags)
373 : {
374 : int i;
375 :
376 5026 : if (state->status != TSS_INMEM || state->memtupcount != 0)
377 0 : elog(ERROR, "too late to call tuplestore_set_eflags");
378 :
379 5026 : state->readptrs[0].eflags = eflags;
380 5026 : for (i = 1; i < state->readptrcount; i++)
381 0 : eflags |= state->readptrs[i].eflags;
382 5026 : state->eflags = eflags;
383 5026 : }
384 :
385 : /*
386 : * tuplestore_alloc_read_pointer - allocate another read pointer.
387 : *
388 : * Returns the pointer's index.
389 : *
390 : * The new pointer initially copies the position of read pointer 0.
391 : * It can have its own eflags, but if any data has been inserted into
392 : * the tuplestore, these eflags must not represent an increase in
393 : * requirements.
394 : */
395 : int
396 6670 : tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
397 : {
398 : /* Check for possible increase of requirements */
399 6670 : if (state->status != TSS_INMEM || state->memtupcount != 0)
400 : {
401 498 : if ((state->eflags | eflags) != state->eflags)
402 0 : elog(ERROR, "too late to require new tuplestore eflags");
403 : }
404 :
405 : /* Make room for another read pointer if needed */
406 6670 : if (state->readptrcount >= state->readptrsize)
407 : {
408 20 : int newcnt = state->readptrsize * 2;
409 :
410 20 : state->readptrs = (TSReadPointer *)
411 20 : repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
412 20 : state->readptrsize = newcnt;
413 : }
414 :
415 : /* And set it up */
416 6670 : state->readptrs[state->readptrcount] = state->readptrs[0];
417 6670 : state->readptrs[state->readptrcount].eflags = eflags;
418 :
419 6670 : state->eflags |= eflags;
420 :
421 6670 : return state->readptrcount++;
422 : }
423 :
424 : /*
425 : * tuplestore_clear
426 : *
427 : * Delete all the contents of a tuplestore, and reset its read pointers
428 : * to the start.
429 : */
430 : void
431 6881 : tuplestore_clear(Tuplestorestate *state)
432 : {
433 : int i;
434 : TSReadPointer *readptr;
435 :
436 : /* update the maxSpace before doing any USEMEM/FREEMEM adjustments */
437 6881 : tuplestore_updatemax(state);
438 :
439 6881 : if (state->myfile)
440 8 : BufFileClose(state->myfile);
441 6881 : state->myfile = NULL;
442 :
443 : #ifdef USE_ASSERT_CHECKING
444 : {
445 : int64 availMem = state->availMem;
446 :
447 : /*
448 : * Below, we reset the memory context for storing tuples. To save
449 : * from having to always call GetMemoryChunkSpace() on all stored
450 : * tuples, we adjust the availMem to forget all the tuples and just
451 : * recall USEMEM for the space used by the memtuples array. Here we
452 : * just Assert that's correct and the memory tracking hasn't gone
453 : * wrong anywhere.
454 : */
455 : for (i = state->memtupdeleted; i < state->memtupcount; i++)
456 : availMem += GetMemoryChunkSpace(state->memtuples[i]);
457 :
458 : availMem += GetMemoryChunkSpace(state->memtuples);
459 :
460 : Assert(availMem == state->allowedMem);
461 : }
462 : #endif
463 :
464 : /* clear the memory consumed by the memory tuples */
465 6881 : MemoryContextReset(state->context);
466 :
467 : /*
468 : * Zero the used memory and re-consume the space for the memtuples array.
469 : * This saves having to FREEMEM for each stored tuple.
470 : */
471 6881 : state->availMem = state->allowedMem;
472 6881 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
473 :
474 6881 : state->status = TSS_INMEM;
475 6881 : state->truncated = false;
476 6881 : state->memtupdeleted = 0;
477 6881 : state->memtupcount = 0;
478 6881 : state->tuples = 0;
479 6881 : readptr = state->readptrs;
480 21455 : for (i = 0; i < state->readptrcount; readptr++, i++)
481 : {
482 14574 : readptr->eof_reached = false;
483 14574 : readptr->current = 0;
484 : }
485 6881 : }
486 :
487 : /*
488 : * tuplestore_end
489 : *
490 : * Release resources and clean up.
491 : */
492 : void
493 146440 : tuplestore_end(Tuplestorestate *state)
494 : {
495 146440 : if (state->myfile)
496 61 : BufFileClose(state->myfile);
497 :
498 146440 : MemoryContextDelete(state->context);
499 146440 : pfree(state->memtuples);
500 146440 : pfree(state->readptrs);
501 146440 : pfree(state);
502 146440 : }
503 :
504 : /*
505 : * tuplestore_select_read_pointer - make the specified read pointer active
506 : */
507 : void
508 2925958 : tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
509 : {
510 : TSReadPointer *readptr;
511 : TSReadPointer *oldptr;
512 :
513 : Assert(ptr >= 0 && ptr < state->readptrcount);
514 :
515 : /* No work if already active */
516 2925958 : if (ptr == state->activeptr)
517 760451 : return;
518 :
519 2165507 : readptr = &state->readptrs[ptr];
520 2165507 : oldptr = &state->readptrs[state->activeptr];
521 :
522 2165507 : switch (state->status)
523 : {
524 2165499 : case TSS_INMEM:
525 : case TSS_WRITEFILE:
526 : /* no work */
527 2165499 : break;
528 8 : case TSS_READFILE:
529 :
530 : /*
531 : * First, save the current read position in the pointer about to
532 : * become inactive.
533 : */
534 8 : if (!oldptr->eof_reached)
535 8 : BufFileTell(state->myfile,
536 : &oldptr->file,
537 : &oldptr->offset);
538 :
539 : /*
540 : * We have to make the temp file's seek position equal to the
541 : * logical position of the new read pointer. In eof_reached
542 : * state, that's the EOF, which we have available from the saved
543 : * write position.
544 : */
545 8 : if (readptr->eof_reached)
546 : {
547 0 : if (BufFileSeek(state->myfile,
548 : state->writepos_file,
549 : state->writepos_offset,
550 : SEEK_SET) != 0)
551 0 : ereport(ERROR,
552 : (errcode_for_file_access(),
553 : errmsg("could not seek in tuplestore temporary file")));
554 : }
555 : else
556 : {
557 8 : if (BufFileSeek(state->myfile,
558 : readptr->file,
559 : readptr->offset,
560 : SEEK_SET) != 0)
561 0 : ereport(ERROR,
562 : (errcode_for_file_access(),
563 : errmsg("could not seek in tuplestore temporary file")));
564 : }
565 8 : break;
566 0 : default:
567 0 : elog(ERROR, "invalid tuplestore state");
568 : break;
569 : }
570 :
571 2165507 : state->activeptr = ptr;
572 : }
573 :
574 : /*
575 : * tuplestore_tuple_count
576 : *
577 : * Returns the number of tuples added since creation or the last
578 : * tuplestore_clear().
579 : */
580 : int64
581 4406 : tuplestore_tuple_count(Tuplestorestate *state)
582 : {
583 4406 : return state->tuples;
584 : }
585 :
586 : /*
587 : * tuplestore_ateof
588 : *
589 : * Returns the active read pointer's eof_reached state.
590 : */
591 : bool
592 3073390 : tuplestore_ateof(Tuplestorestate *state)
593 : {
594 3073390 : return state->readptrs[state->activeptr].eof_reached;
595 : }
596 :
597 : /*
598 : * Grow the memtuples[] array, if possible within our memory constraint. We
599 : * must not exceed INT_MAX tuples in memory or the caller-provided memory
600 : * limit. Return true if we were able to enlarge the array, false if not.
601 : *
602 : * Normally, at each increment we double the size of the array. When doing
603 : * that would exceed a limit, we attempt one last, smaller increase (and then
604 : * clear the growmemtuples flag so we don't try any more). That allows us to
605 : * use memory as fully as permitted; sticking to the pure doubling rule could
606 : * result in almost half going unused. Because availMem moves around with
607 : * tuple addition/removal, we need some rule to prevent making repeated small
608 : * increases in memtupsize, which would just be useless thrashing. The
609 : * growmemtuples flag accomplishes that and also prevents useless
610 : * recalculations in this function.
611 : */
612 : static bool
613 1250 : grow_memtuples(Tuplestorestate *state)
614 : {
615 : int newmemtupsize;
616 1250 : int memtupsize = state->memtupsize;
617 1250 : int64 memNowUsed = state->allowedMem - state->availMem;
618 :
619 : /* Forget it if we've already maxed out memtuples, per comment above */
620 1250 : if (!state->growmemtuples)
621 4 : return false;
622 :
623 : /* Select new value of memtupsize */
624 1246 : if (memNowUsed <= state->availMem)
625 : {
626 : /*
627 : * We've used no more than half of allowedMem; double our usage,
628 : * clamping at INT_MAX tuples.
629 : */
630 1201 : if (memtupsize < INT_MAX / 2)
631 1201 : newmemtupsize = memtupsize * 2;
632 : else
633 : {
634 0 : newmemtupsize = INT_MAX;
635 0 : state->growmemtuples = false;
636 : }
637 : }
638 : else
639 : {
640 : /*
641 : * This will be the last increment of memtupsize. Abandon doubling
642 : * strategy and instead increase as much as we safely can.
643 : *
644 : * To stay within allowedMem, we can't increase memtupsize by more
645 : * than availMem / sizeof(void *) elements. In practice, we want to
646 : * increase it by considerably less, because we need to leave some
647 : * space for the tuples to which the new array slots will refer. We
648 : * assume the new tuples will be about the same size as the tuples
649 : * we've already seen, and thus we can extrapolate from the space
650 : * consumption so far to estimate an appropriate new size for the
651 : * memtuples array. The optimal value might be higher or lower than
652 : * this estimate, but it's hard to know that in advance. We again
653 : * clamp at INT_MAX tuples.
654 : *
655 : * This calculation is safe against enlarging the array so much that
656 : * LACKMEM becomes true, because the memory currently used includes
657 : * the present array; thus, there would be enough allowedMem for the
658 : * new array elements even if no other memory were currently used.
659 : *
660 : * We do the arithmetic in float8, because otherwise the product of
661 : * memtupsize and allowedMem could overflow. Any inaccuracy in the
662 : * result should be insignificant; but even if we computed a
663 : * completely insane result, the checks below will prevent anything
664 : * really bad from happening.
665 : */
666 : double grow_ratio;
667 :
668 45 : grow_ratio = (double) state->allowedMem / (double) memNowUsed;
669 45 : if (memtupsize * grow_ratio < INT_MAX)
670 45 : newmemtupsize = (int) (memtupsize * grow_ratio);
671 : else
672 0 : newmemtupsize = INT_MAX;
673 :
674 : /* We won't make any further enlargement attempts */
675 45 : state->growmemtuples = false;
676 : }
677 :
678 : /* Must enlarge array by at least one element, else report failure */
679 1246 : if (newmemtupsize <= memtupsize)
680 0 : goto noalloc;
681 :
682 : /*
683 : * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
684 : * to ensure our request won't be rejected. Note that we can easily
685 : * exhaust address space before facing this outcome. (This is presently
686 : * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
687 : * don't rely on that at this distance.)
688 : */
689 1246 : if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(void *))
690 : {
691 0 : newmemtupsize = (int) (MaxAllocHugeSize / sizeof(void *));
692 0 : state->growmemtuples = false; /* can't grow any more */
693 : }
694 :
695 : /*
696 : * We need to be sure that we do not cause LACKMEM to become true, else
697 : * the space management algorithm will go nuts. The code above should
698 : * never generate a dangerous request, but to be safe, check explicitly
699 : * that the array growth fits within availMem. (We could still cause
700 : * LACKMEM if the memory chunk overhead associated with the memtuples
701 : * array were to increase. That shouldn't happen because we chose the
702 : * initial array size large enough to ensure that palloc will be treating
703 : * both old and new arrays as separate chunks. But we'll check LACKMEM
704 : * explicitly below just in case.)
705 : */
706 1246 : if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(void *)))
707 0 : goto noalloc;
708 :
709 : /* OK, do it */
710 1246 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
711 1246 : state->memtuples = (void **)
712 1246 : repalloc_huge(state->memtuples,
713 : newmemtupsize * sizeof(void *));
714 1246 : state->memtupsize = newmemtupsize;
715 1246 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
716 1246 : if (LACKMEM(state))
717 0 : elog(ERROR, "unexpected out-of-memory situation in tuplestore");
718 1246 : return true;
719 :
720 0 : noalloc:
721 : /* If for any reason we didn't realloc, shut off future attempts */
722 0 : state->growmemtuples = false;
723 0 : return false;
724 : }
725 :
726 : /*
727 : * Accept one tuple and append it to the tuplestore.
728 : *
729 : * Note that the input tuple is always copied; the caller need not save it.
730 : *
731 : * If the active read pointer is currently "at EOF", it remains so (the read
732 : * pointer implicitly advances along with the write pointer); otherwise the
733 : * read pointer is unchanged. Non-active read pointers do not move, which
734 : * means they are certain to not be "at EOF" immediately after puttuple.
735 : * This curious-seeming behavior is for the convenience of nodeMaterial.c and
736 : * nodeCtescan.c, which would otherwise need to do extra pointer repositioning
737 : * steps.
738 : *
739 : * tuplestore_puttupleslot() is a convenience routine to collect data from
740 : * a TupleTableSlot without an extra copy operation.
741 : */
742 : void
743 1400208 : tuplestore_puttupleslot(Tuplestorestate *state,
744 : TupleTableSlot *slot)
745 : {
746 : MinimalTuple tuple;
747 1400208 : MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
748 :
749 : /*
750 : * Form a MinimalTuple in working memory
751 : */
752 1400208 : tuple = ExecCopySlotMinimalTuple(slot);
753 1400208 : USEMEM(state, GetMemoryChunkSpace(tuple));
754 :
755 1400208 : tuplestore_puttuple_common(state, tuple);
756 :
757 1400208 : MemoryContextSwitchTo(oldcxt);
758 1400208 : }
759 :
760 : /*
761 : * "Standard" case to copy from a HeapTuple. This is actually now somewhat
762 : * deprecated, but not worth getting rid of in view of the number of callers.
763 : */
764 : void
765 1129169 : tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
766 : {
767 1129169 : MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
768 :
769 : /*
770 : * Copy the tuple. (Must do this even in WRITEFILE case. Note that
771 : * COPYTUP includes USEMEM, so we needn't do that here.)
772 : */
773 1129169 : tuple = COPYTUP(state, tuple);
774 :
775 1129169 : tuplestore_puttuple_common(state, tuple);
776 :
777 1129169 : MemoryContextSwitchTo(oldcxt);
778 1129169 : }
779 :
780 : /*
781 : * Similar to tuplestore_puttuple(), but work from values + nulls arrays.
782 : * This avoids an extra tuple-construction operation.
783 : */
784 : void
785 11893771 : tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
786 : const Datum *values, const bool *isnull)
787 : {
788 : MinimalTuple tuple;
789 11893771 : MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
790 :
791 11893771 : tuple = heap_form_minimal_tuple(tdesc, values, isnull, 0);
792 11893771 : USEMEM(state, GetMemoryChunkSpace(tuple));
793 :
794 11893771 : tuplestore_puttuple_common(state, tuple);
795 :
796 11893771 : MemoryContextSwitchTo(oldcxt);
797 11893771 : }
798 :
799 : static void
800 14423148 : tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
801 : {
802 : TSReadPointer *readptr;
803 : int i;
804 : ResourceOwner oldowner;
805 : MemoryContext oldcxt;
806 :
807 14423148 : state->tuples++;
808 :
809 14423148 : switch (state->status)
810 : {
811 10150231 : case TSS_INMEM:
812 :
813 : /*
814 : * Update read pointers as needed; see API spec above.
815 : */
816 10150231 : readptr = state->readptrs;
817 21975161 : for (i = 0; i < state->readptrcount; readptr++, i++)
818 : {
819 11824930 : if (readptr->eof_reached && i != state->activeptr)
820 : {
821 314 : readptr->eof_reached = false;
822 314 : readptr->current = state->memtupcount;
823 : }
824 : }
825 :
826 : /*
827 : * Grow the array as needed. Note that we try to grow the array
828 : * when there is still one free slot remaining --- if we fail,
829 : * there'll still be room to store the incoming tuple, and then
830 : * we'll switch to tape-based operation.
831 : */
832 10150231 : if (state->memtupcount >= state->memtupsize - 1)
833 : {
834 1250 : (void) grow_memtuples(state);
835 : Assert(state->memtupcount < state->memtupsize);
836 : }
837 :
838 : /* Stash the tuple in the in-memory array */
839 10150231 : state->memtuples[state->memtupcount++] = tuple;
840 :
841 : /*
842 : * Done if we still fit in available memory and have array slots.
843 : */
844 10150231 : if (state->memtupcount < state->memtupsize && !LACKMEM(state))
845 10150160 : return;
846 :
847 : /*
848 : * Nope; time to switch to tape-based operation. Make sure that
849 : * the temp file(s) are created in suitable temp tablespaces.
850 : */
851 71 : PrepareTempTablespaces();
852 :
853 : /* associate the file with the store's resource owner */
854 71 : oldowner = CurrentResourceOwner;
855 71 : CurrentResourceOwner = state->resowner;
856 :
857 : /*
858 : * We switch out of the state->context as this is a generation
859 : * context, which isn't ideal for allocations relating to the
860 : * BufFile.
861 : */
862 71 : oldcxt = MemoryContextSwitchTo(state->context->parent);
863 :
864 71 : state->myfile = BufFileCreateTemp(state->interXact);
865 :
866 71 : MemoryContextSwitchTo(oldcxt);
867 :
868 71 : CurrentResourceOwner = oldowner;
869 :
870 : /*
871 : * Freeze the decision about whether trailing length words will be
872 : * used. We can't change this choice once data is on tape, even
873 : * though callers might drop the requirement.
874 : */
875 71 : state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
876 :
877 : /*
878 : * Update the maximum space used before dumping the tuples. It's
879 : * possible that more space will be used by the tuples in memory
880 : * than the space that will be used on disk.
881 : */
882 71 : tuplestore_updatemax(state);
883 :
884 71 : state->status = TSS_WRITEFILE;
885 71 : dumptuples(state);
886 71 : break;
887 4272909 : case TSS_WRITEFILE:
888 :
889 : /*
890 : * Update read pointers as needed; see API spec above. Note:
891 : * BufFileTell is quite cheap, so not worth trying to avoid
892 : * multiple calls.
893 : */
894 4272909 : readptr = state->readptrs;
895 8553514 : for (i = 0; i < state->readptrcount; readptr++, i++)
896 : {
897 4280605 : if (readptr->eof_reached && i != state->activeptr)
898 : {
899 0 : readptr->eof_reached = false;
900 0 : BufFileTell(state->myfile,
901 : &readptr->file,
902 : &readptr->offset);
903 : }
904 : }
905 :
906 4272909 : WRITETUP(state, tuple);
907 4272909 : break;
908 8 : case TSS_READFILE:
909 :
910 : /*
911 : * Switch from reading to writing.
912 : */
913 8 : if (!state->readptrs[state->activeptr].eof_reached)
914 8 : BufFileTell(state->myfile,
915 8 : &state->readptrs[state->activeptr].file,
916 8 : &state->readptrs[state->activeptr].offset);
917 8 : if (BufFileSeek(state->myfile,
918 : state->writepos_file, state->writepos_offset,
919 : SEEK_SET) != 0)
920 0 : ereport(ERROR,
921 : (errcode_for_file_access(),
922 : errmsg("could not seek in tuplestore temporary file")));
923 8 : state->status = TSS_WRITEFILE;
924 :
925 : /*
926 : * Update read pointers as needed; see API spec above.
927 : */
928 8 : readptr = state->readptrs;
929 24 : for (i = 0; i < state->readptrcount; readptr++, i++)
930 : {
931 16 : if (readptr->eof_reached && i != state->activeptr)
932 : {
933 0 : readptr->eof_reached = false;
934 0 : readptr->file = state->writepos_file;
935 0 : readptr->offset = state->writepos_offset;
936 : }
937 : }
938 :
939 8 : WRITETUP(state, tuple);
940 8 : break;
941 0 : default:
942 0 : elog(ERROR, "invalid tuplestore state");
943 : break;
944 : }
945 : }
946 :
947 : /*
948 : * Fetch the next tuple in either forward or back direction.
949 : * Returns NULL if no more tuples. If should_free is set, the
950 : * caller must pfree the returned tuple when done with it.
951 : *
952 : * Backward scan is only allowed if randomAccess was set true or
953 : * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
954 : */
955 : static void *
956 15342317 : tuplestore_gettuple(Tuplestorestate *state, bool forward,
957 : bool *should_free)
958 : {
959 15342317 : TSReadPointer *readptr = &state->readptrs[state->activeptr];
960 : unsigned int tuplen;
961 : void *tup;
962 :
963 : Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
964 :
965 15342317 : switch (state->status)
966 : {
967 13255556 : case TSS_INMEM:
968 13255556 : *should_free = false;
969 13255556 : if (forward)
970 : {
971 13137974 : if (readptr->eof_reached)
972 129 : return NULL;
973 13137845 : if (readptr->current < state->memtupcount)
974 : {
975 : /* We have another tuple, so return it */
976 12891794 : return state->memtuples[readptr->current++];
977 : }
978 246051 : readptr->eof_reached = true;
979 246051 : return NULL;
980 : }
981 : else
982 : {
983 : /*
984 : * if all tuples are fetched already then we return last
985 : * tuple, else tuple before last returned.
986 : */
987 117582 : if (readptr->eof_reached)
988 : {
989 1901 : readptr->current = state->memtupcount;
990 1901 : readptr->eof_reached = false;
991 : }
992 : else
993 : {
994 115681 : if (readptr->current <= state->memtupdeleted)
995 : {
996 : Assert(!state->truncated);
997 0 : return NULL;
998 : }
999 115681 : readptr->current--; /* last returned tuple */
1000 : }
1001 117582 : if (readptr->current <= state->memtupdeleted)
1002 : {
1003 : Assert(!state->truncated);
1004 21 : return NULL;
1005 : }
1006 117561 : return state->memtuples[readptr->current - 1];
1007 : }
1008 : break;
1009 :
1010 78 : case TSS_WRITEFILE:
1011 : /* Skip state change if we'll just return NULL */
1012 78 : if (readptr->eof_reached && forward)
1013 0 : return NULL;
1014 :
1015 : /*
1016 : * Switch from writing to reading.
1017 : */
1018 78 : BufFileTell(state->myfile,
1019 : &state->writepos_file, &state->writepos_offset);
1020 78 : if (!readptr->eof_reached)
1021 78 : if (BufFileSeek(state->myfile,
1022 : readptr->file, readptr->offset,
1023 : SEEK_SET) != 0)
1024 0 : ereport(ERROR,
1025 : (errcode_for_file_access(),
1026 : errmsg("could not seek in tuplestore temporary file")));
1027 78 : state->status = TSS_READFILE;
1028 : pg_fallthrough;
1029 :
1030 2086761 : case TSS_READFILE:
1031 2086761 : *should_free = true;
1032 2086761 : if (forward)
1033 : {
1034 2086761 : if ((tuplen = getlen(state, true)) != 0)
1035 : {
1036 2086690 : tup = READTUP(state, tuplen);
1037 2086690 : return tup;
1038 : }
1039 : else
1040 : {
1041 71 : readptr->eof_reached = true;
1042 71 : return NULL;
1043 : }
1044 : }
1045 :
1046 : /*
1047 : * Backward.
1048 : *
1049 : * if all tuples are fetched already then we return last tuple,
1050 : * else tuple before last returned.
1051 : *
1052 : * Back up to fetch previously-returned tuple's ending length
1053 : * word. If seek fails, assume we are at start of file.
1054 : */
1055 0 : if (BufFileSeek(state->myfile, 0, -(pgoff_t) sizeof(unsigned int),
1056 : SEEK_CUR) != 0)
1057 : {
1058 : /* even a failed backwards fetch gets you out of eof state */
1059 0 : readptr->eof_reached = false;
1060 : Assert(!state->truncated);
1061 0 : return NULL;
1062 : }
1063 0 : tuplen = getlen(state, false);
1064 :
1065 0 : if (readptr->eof_reached)
1066 : {
1067 0 : readptr->eof_reached = false;
1068 : /* We will return the tuple returned before returning NULL */
1069 : }
1070 : else
1071 : {
1072 : /*
1073 : * Back up to get ending length word of tuple before it.
1074 : */
1075 0 : if (BufFileSeek(state->myfile, 0,
1076 0 : -(pgoff_t) (tuplen + 2 * sizeof(unsigned int)),
1077 : SEEK_CUR) != 0)
1078 : {
1079 : /*
1080 : * If that fails, presumably the prev tuple is the first
1081 : * in the file. Back up so that it becomes next to read
1082 : * in forward direction (not obviously right, but that is
1083 : * what in-memory case does).
1084 : */
1085 0 : if (BufFileSeek(state->myfile, 0,
1086 0 : -(pgoff_t) (tuplen + sizeof(unsigned int)),
1087 : SEEK_CUR) != 0)
1088 0 : ereport(ERROR,
1089 : (errcode_for_file_access(),
1090 : errmsg("could not seek in tuplestore temporary file")));
1091 : Assert(!state->truncated);
1092 0 : return NULL;
1093 : }
1094 0 : tuplen = getlen(state, false);
1095 : }
1096 :
1097 : /*
1098 : * Now we have the length of the prior tuple, back up and read it.
1099 : * Note: READTUP expects we are positioned after the initial
1100 : * length word of the tuple, so back up to that point.
1101 : */
1102 0 : if (BufFileSeek(state->myfile, 0,
1103 0 : -(pgoff_t) tuplen,
1104 : SEEK_CUR) != 0)
1105 0 : ereport(ERROR,
1106 : (errcode_for_file_access(),
1107 : errmsg("could not seek in tuplestore temporary file")));
1108 0 : tup = READTUP(state, tuplen);
1109 0 : return tup;
1110 :
1111 0 : default:
1112 0 : elog(ERROR, "invalid tuplestore state");
1113 : return NULL; /* keep compiler quiet */
1114 : }
1115 : }
1116 :
1117 : /*
1118 : * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
1119 : *
1120 : * If successful, put tuple in slot and return true; else, clear the slot
1121 : * and return false.
1122 : *
1123 : * If copy is true, the slot receives a copied tuple (allocated in current
1124 : * memory context) that will stay valid regardless of future manipulations of
1125 : * the tuplestore's state. If copy is false, the slot may just receive a
1126 : * pointer to a tuple held within the tuplestore. The latter is more
1127 : * efficient but the slot contents may be corrupted if additional writes to
1128 : * the tuplestore occur. (If using tuplestore_trim, see comments therein.)
1129 : */
1130 : bool
1131 15224742 : tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
1132 : bool copy, TupleTableSlot *slot)
1133 : {
1134 : MinimalTuple tuple;
1135 : bool should_free;
1136 :
1137 15224742 : tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);
1138 :
1139 15224742 : if (tuple)
1140 : {
1141 14980418 : if (copy && !should_free)
1142 : {
1143 1330470 : tuple = heap_copy_minimal_tuple(tuple, 0);
1144 1330470 : should_free = true;
1145 : }
1146 14980418 : ExecStoreMinimalTuple(tuple, slot, should_free);
1147 14980418 : return true;
1148 : }
1149 : else
1150 : {
1151 244324 : ExecClearTuple(slot);
1152 244324 : return false;
1153 : }
1154 : }
1155 :
1156 : /*
1157 : * tuplestore_gettupleslot_force - exported function to fetch a tuple
1158 : *
1159 : * This is identical to tuplestore_gettupleslot except the given slot can be
1160 : * any kind of slot; it need not be one that will accept a MinimalTuple.
1161 : */
1162 : bool
1163 2163 : tuplestore_gettupleslot_force(Tuplestorestate *state, bool forward,
1164 : bool copy, TupleTableSlot *slot)
1165 : {
1166 : MinimalTuple tuple;
1167 : bool should_free;
1168 :
1169 2163 : tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);
1170 :
1171 2163 : if (tuple)
1172 : {
1173 2076 : if (copy && !should_free)
1174 : {
1175 0 : tuple = heap_copy_minimal_tuple(tuple, 0);
1176 0 : should_free = true;
1177 : }
1178 2076 : ExecForceStoreMinimalTuple(tuple, slot, should_free);
1179 2076 : return true;
1180 : }
1181 : else
1182 : {
1183 87 : ExecClearTuple(slot);
1184 87 : return false;
1185 : }
1186 : }
1187 :
1188 : /*
1189 : * tuplestore_advance - exported function to adjust position without fetching
1190 : *
1191 : * We could optimize this case to avoid palloc/pfree overhead, but for the
1192 : * moment it doesn't seem worthwhile.
1193 : */
1194 : bool
1195 115412 : tuplestore_advance(Tuplestorestate *state, bool forward)
1196 : {
1197 : void *tuple;
1198 : bool should_free;
1199 :
1200 115412 : tuple = tuplestore_gettuple(state, forward, &should_free);
1201 :
1202 115412 : if (tuple)
1203 : {
1204 113551 : if (should_free)
1205 0 : pfree(tuple);
1206 113551 : return true;
1207 : }
1208 : else
1209 : {
1210 1861 : return false;
1211 : }
1212 : }
1213 :
1214 : /*
1215 : * Advance over N tuples in either forward or back direction,
1216 : * without returning any data. N<=0 is a no-op.
1217 : * Returns true if successful, false if ran out of tuples.
1218 : */
1219 : bool
1220 895125 : tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
1221 : {
1222 895125 : TSReadPointer *readptr = &state->readptrs[state->activeptr];
1223 :
1224 : Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
1225 :
1226 895125 : if (ntuples <= 0)
1227 12 : return true;
1228 :
1229 895113 : switch (state->status)
1230 : {
1231 895113 : case TSS_INMEM:
1232 895113 : if (forward)
1233 : {
1234 893260 : if (readptr->eof_reached)
1235 0 : return false;
1236 893260 : if (state->memtupcount - readptr->current >= ntuples)
1237 : {
1238 893199 : readptr->current += ntuples;
1239 893199 : return true;
1240 : }
1241 61 : readptr->current = state->memtupcount;
1242 61 : readptr->eof_reached = true;
1243 61 : return false;
1244 : }
1245 : else
1246 : {
1247 1853 : if (readptr->eof_reached)
1248 : {
1249 0 : readptr->current = state->memtupcount;
1250 0 : readptr->eof_reached = false;
1251 0 : ntuples--;
1252 : }
1253 1853 : if (readptr->current - state->memtupdeleted > ntuples)
1254 : {
1255 1853 : readptr->current -= ntuples;
1256 1853 : return true;
1257 : }
1258 : Assert(!state->truncated);
1259 0 : readptr->current = state->memtupdeleted;
1260 0 : return false;
1261 : }
1262 : break;
1263 :
1264 0 : default:
1265 : /* We don't currently try hard to optimize other cases */
1266 0 : while (ntuples-- > 0)
1267 : {
1268 : void *tuple;
1269 : bool should_free;
1270 :
1271 0 : tuple = tuplestore_gettuple(state, forward, &should_free);
1272 :
1273 0 : if (tuple == NULL)
1274 0 : return false;
1275 0 : if (should_free)
1276 0 : pfree(tuple);
1277 0 : CHECK_FOR_INTERRUPTS();
1278 : }
1279 0 : return true;
1280 : }
1281 : }
1282 :
1283 : /*
1284 : * dumptuples - remove tuples from memory and write to tape
1285 : *
1286 : * As a side effect, we must convert each read pointer's position from
1287 : * "current" to file/offset format. But eof_reached pointers don't
1288 : * need to change state.
1289 : */
1290 : static void
1291 71 : dumptuples(Tuplestorestate *state)
1292 : {
1293 : int i;
1294 :
1295 71 : for (i = state->memtupdeleted;; i++)
1296 435932 : {
1297 436003 : TSReadPointer *readptr = state->readptrs;
1298 : int j;
1299 :
1300 884302 : for (j = 0; j < state->readptrcount; readptr++, j++)
1301 : {
1302 448299 : if (i == readptr->current && !readptr->eof_reached)
1303 79 : BufFileTell(state->myfile,
1304 : &readptr->file, &readptr->offset);
1305 : }
1306 436003 : if (i >= state->memtupcount)
1307 71 : break;
1308 435932 : WRITETUP(state, state->memtuples[i]);
1309 :
1310 : /*
1311 : * Increase memtupdeleted to track the fact that we just deleted that
1312 : * tuple. Think not to remove this on the grounds that we'll reset
1313 : * memtupdeleted to zero below. We might not reach that if some later
1314 : * WRITETUP fails (e.g. due to overrunning temp_file_limit). If so,
1315 : * we'd error out leaving an effectively-corrupt tuplestore, which
1316 : * would be quite bad if it's a persistent data structure such as a
1317 : * Portal's holdStore.
1318 : */
1319 435932 : state->memtupdeleted++;
1320 : }
1321 : /* Now we can reset memtupdeleted along with memtupcount */
1322 71 : state->memtupdeleted = 0;
1323 71 : state->memtupcount = 0;
1324 71 : }
1325 :
1326 : /*
1327 : * tuplestore_rescan - rewind the active read pointer to start
1328 : */
1329 : void
1330 204321 : tuplestore_rescan(Tuplestorestate *state)
1331 : {
1332 204321 : TSReadPointer *readptr = &state->readptrs[state->activeptr];
1333 :
1334 : Assert(readptr->eflags & EXEC_FLAG_REWIND);
1335 : Assert(!state->truncated);
1336 :
1337 204321 : switch (state->status)
1338 : {
1339 204267 : case TSS_INMEM:
1340 204267 : readptr->eof_reached = false;
1341 204267 : readptr->current = 0;
1342 204267 : break;
1343 54 : case TSS_WRITEFILE:
1344 54 : readptr->eof_reached = false;
1345 54 : readptr->file = 0;
1346 54 : readptr->offset = 0;
1347 54 : break;
1348 0 : case TSS_READFILE:
1349 0 : readptr->eof_reached = false;
1350 0 : if (BufFileSeek(state->myfile, 0, 0, SEEK_SET) != 0)
1351 0 : ereport(ERROR,
1352 : (errcode_for_file_access(),
1353 : errmsg("could not seek in tuplestore temporary file")));
1354 0 : break;
1355 0 : default:
1356 0 : elog(ERROR, "invalid tuplestore state");
1357 : break;
1358 : }
1359 204321 : }
1360 :
1361 : /*
1362 : * tuplestore_copy_read_pointer - copy a read pointer's state to another
1363 : */
1364 : void
1365 40344 : tuplestore_copy_read_pointer(Tuplestorestate *state,
1366 : int srcptr, int destptr)
1367 : {
1368 40344 : TSReadPointer *sptr = &state->readptrs[srcptr];
1369 40344 : TSReadPointer *dptr = &state->readptrs[destptr];
1370 :
1371 : Assert(srcptr >= 0 && srcptr < state->readptrcount);
1372 : Assert(destptr >= 0 && destptr < state->readptrcount);
1373 :
1374 : /* Assigning to self is a no-op */
1375 40344 : if (srcptr == destptr)
1376 0 : return;
1377 :
1378 40344 : if (dptr->eflags != sptr->eflags)
1379 : {
1380 : /* Possible change of overall eflags, so copy and then recompute */
1381 : int eflags;
1382 : int i;
1383 :
1384 0 : *dptr = *sptr;
1385 0 : eflags = state->readptrs[0].eflags;
1386 0 : for (i = 1; i < state->readptrcount; i++)
1387 0 : eflags |= state->readptrs[i].eflags;
1388 0 : state->eflags = eflags;
1389 : }
1390 : else
1391 40344 : *dptr = *sptr;
1392 :
1393 40344 : switch (state->status)
1394 : {
1395 40344 : case TSS_INMEM:
1396 : case TSS_WRITEFILE:
1397 : /* no work */
1398 40344 : break;
1399 0 : case TSS_READFILE:
1400 :
1401 : /*
1402 : * This case is a bit tricky since the active read pointer's
1403 : * position corresponds to the seek point, not what is in its
1404 : * variables. Assigning to the active requires a seek, and
1405 : * assigning from the active requires a tell, except when
1406 : * eof_reached.
1407 : */
1408 0 : if (destptr == state->activeptr)
1409 : {
1410 0 : if (dptr->eof_reached)
1411 : {
1412 0 : if (BufFileSeek(state->myfile,
1413 : state->writepos_file,
1414 : state->writepos_offset,
1415 : SEEK_SET) != 0)
1416 0 : ereport(ERROR,
1417 : (errcode_for_file_access(),
1418 : errmsg("could not seek in tuplestore temporary file")));
1419 : }
1420 : else
1421 : {
1422 0 : if (BufFileSeek(state->myfile,
1423 : dptr->file, dptr->offset,
1424 : SEEK_SET) != 0)
1425 0 : ereport(ERROR,
1426 : (errcode_for_file_access(),
1427 : errmsg("could not seek in tuplestore temporary file")));
1428 : }
1429 : }
1430 0 : else if (srcptr == state->activeptr)
1431 : {
1432 0 : if (!dptr->eof_reached)
1433 0 : BufFileTell(state->myfile,
1434 : &dptr->file,
1435 : &dptr->offset);
1436 : }
1437 0 : break;
1438 0 : default:
1439 0 : elog(ERROR, "invalid tuplestore state");
1440 : break;
1441 : }
1442 : }
1443 :
1444 : /*
1445 : * tuplestore_trim - remove all no-longer-needed tuples
1446 : *
1447 : * Calling this function authorizes the tuplestore to delete all tuples
1448 : * before the oldest read pointer, if no read pointer is marked as requiring
1449 : * REWIND capability.
1450 : *
1451 : * Note: this is obviously safe if no pointer has BACKWARD capability either.
1452 : * If a pointer is marked as BACKWARD but not REWIND capable, it means that
1453 : * the pointer can be moved backward but not before the oldest other read
1454 : * pointer.
1455 : */
1456 : void
1457 609696 : tuplestore_trim(Tuplestorestate *state)
1458 : {
1459 : int oldest;
1460 : int nremove;
1461 : int i;
1462 :
1463 : /*
1464 : * Truncation is disallowed if any read pointer requires rewind
1465 : * capability.
1466 : */
1467 609696 : if (state->eflags & EXEC_FLAG_REWIND)
1468 0 : return;
1469 :
1470 : /*
1471 : * We don't bother trimming temp files since it usually would mean more
1472 : * work than just letting them sit in kernel buffers until they age out.
1473 : */
1474 609696 : if (state->status != TSS_INMEM)
1475 19992 : return;
1476 :
1477 : /* Find the oldest read pointer */
1478 589704 : oldest = state->memtupcount;
1479 2567829 : for (i = 0; i < state->readptrcount; i++)
1480 : {
1481 1978125 : if (!state->readptrs[i].eof_reached)
1482 1958615 : oldest = Min(oldest, state->readptrs[i].current);
1483 : }
1484 :
1485 : /*
1486 : * Note: you might think we could remove all the tuples before the oldest
1487 : * "current", since that one is the next to be returned. However, since
1488 : * tuplestore_gettuple returns a direct pointer to our internal copy of
1489 : * the tuple, it's likely that the caller has still got the tuple just
1490 : * before "current" referenced in a slot. So we keep one extra tuple
1491 : * before the oldest "current". (Strictly speaking, we could require such
1492 : * callers to use the "copy" flag to tuplestore_gettupleslot, but for
1493 : * efficiency we allow this one case to not use "copy".)
1494 : */
1495 589704 : nremove = oldest - 1;
1496 589704 : if (nremove <= 0)
1497 5240 : return; /* nothing to do */
1498 :
1499 : Assert(nremove >= state->memtupdeleted);
1500 : Assert(nremove <= state->memtupcount);
1501 :
1502 : /* before freeing any memory, update the statistics */
1503 584464 : tuplestore_updatemax(state);
1504 :
1505 : /* Release no-longer-needed tuples */
1506 1169604 : for (i = state->memtupdeleted; i < nremove; i++)
1507 : {
1508 585140 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
1509 585140 : pfree(state->memtuples[i]);
1510 585140 : state->memtuples[i] = NULL;
1511 : /* As in dumptuples(), increment memtupdeleted synchronously */
1512 585140 : state->memtupdeleted++;
1513 : }
1514 : Assert(state->memtupdeleted == nremove);
1515 :
1516 : /* mark tuplestore as truncated (used for Assert crosschecks only) */
1517 584464 : state->truncated = true;
1518 :
1519 : /*
1520 : * If nremove is less than 1/8th memtupcount, just stop here, leaving the
1521 : * "deleted" slots as NULL. This prevents us from expending O(N^2) time
1522 : * repeatedly memmove-ing a large pointer array. The worst case space
1523 : * wastage is pretty small, since it's just pointers and not whole tuples.
1524 : */
1525 584464 : if (nremove < state->memtupcount / 8)
1526 75120 : return;
1527 :
1528 : /*
1529 : * Slide the array down and readjust pointers.
1530 : *
1531 : * In mergejoin's current usage, it's demonstrable that there will always
1532 : * be exactly one non-removed tuple; so optimize that case.
1533 : */
1534 509344 : if (nremove + 1 == state->memtupcount)
1535 422216 : state->memtuples[0] = state->memtuples[nremove];
1536 : else
1537 87128 : memmove(state->memtuples, state->memtuples + nremove,
1538 87128 : (state->memtupcount - nremove) * sizeof(void *));
1539 :
1540 509344 : state->memtupdeleted = 0;
1541 509344 : state->memtupcount -= nremove;
1542 2235725 : for (i = 0; i < state->readptrcount; i++)
1543 : {
1544 1726381 : if (!state->readptrs[i].eof_reached)
1545 1723521 : state->readptrs[i].current -= nremove;
1546 : }
1547 : }
1548 :
1549 : /*
1550 : * tuplestore_updatemax
1551 : * Update the maximum space used by this tuplestore and the method used
1552 : * for storage.
1553 : */
1554 : static void
1555 591436 : tuplestore_updatemax(Tuplestorestate *state)
1556 : {
1557 591436 : if (state->status == TSS_INMEM)
1558 591428 : state->maxSpace = Max(state->maxSpace,
1559 : state->allowedMem - state->availMem);
1560 : else
1561 : {
1562 8 : state->maxSpace = Max(state->maxSpace,
1563 : BufFileSize(state->myfile));
1564 :
1565 : /*
1566 : * usedDisk never gets set to false again after spilling to disk, even
1567 : * if tuplestore_clear() is called and new tuples go to memory again.
1568 : */
1569 8 : state->usedDisk = true;
1570 : }
1571 591436 : }
1572 :
1573 : /*
1574 : * tuplestore_get_stats
1575 : * Obtain statistics about the maximum space used by the tuplestore.
1576 : * These statistics are the maximums and are not reset by calls to
1577 : * tuplestore_trim() or tuplestore_clear().
1578 : */
1579 : void
1580 20 : tuplestore_get_stats(Tuplestorestate *state, char **max_storage_type,
1581 : int64 *max_space)
1582 : {
1583 20 : tuplestore_updatemax(state);
1584 :
1585 20 : if (state->usedDisk)
1586 8 : *max_storage_type = "Disk";
1587 : else
1588 12 : *max_storage_type = "Memory";
1589 :
1590 20 : *max_space = state->maxSpace;
1591 20 : }
1592 :
1593 : /*
1594 : * tuplestore_in_memory
1595 : *
1596 : * Returns true if the tuplestore has not spilled to disk.
1597 : *
1598 : * XXX exposing this is a violation of modularity ... should get rid of it.
1599 : */
1600 : bool
1601 1159592 : tuplestore_in_memory(Tuplestorestate *state)
1602 : {
1603 1159592 : return (state->status == TSS_INMEM);
1604 : }
1605 :
1606 :
1607 : /*
1608 : * Tape interface routines
1609 : */
1610 :
1611 : static unsigned int
1612 2086761 : getlen(Tuplestorestate *state, bool eofOK)
1613 : {
1614 : unsigned int len;
1615 : size_t nbytes;
1616 :
1617 2086761 : nbytes = BufFileReadMaybeEOF(state->myfile, &len, sizeof(len), eofOK);
1618 2086761 : if (nbytes == 0)
1619 71 : return 0;
1620 : else
1621 2086690 : return len;
1622 : }
1623 :
1624 :
1625 : /*
1626 : * Routines specialized for HeapTuple case
1627 : *
1628 : * The stored form is actually a MinimalTuple, but for largely historical
1629 : * reasons we allow COPYTUP to work from a HeapTuple.
1630 : *
1631 : * Since MinimalTuple already has length in its first word, we don't need
1632 : * to write that separately.
1633 : */
1634 :
1635 : static void *
1636 1129169 : copytup_heap(Tuplestorestate *state, void *tup)
1637 : {
1638 : MinimalTuple tuple;
1639 :
1640 1129169 : tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup, 0);
1641 1129169 : USEMEM(state, GetMemoryChunkSpace(tuple));
1642 1129169 : return tuple;
1643 : }
1644 :
1645 : static void
1646 4708849 : writetup_heap(Tuplestorestate *state, void *tup)
1647 : {
1648 4708849 : MinimalTuple tuple = (MinimalTuple) tup;
1649 :
1650 : /* the part of the MinimalTuple we'll write: */
1651 4708849 : char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1652 4708849 : unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
1653 :
1654 : /* total on-disk footprint: */
1655 4708849 : unsigned int tuplen = tupbodylen + sizeof(int);
1656 :
1657 4708849 : BufFileWrite(state->myfile, &tuplen, sizeof(tuplen));
1658 4708849 : BufFileWrite(state->myfile, tupbody, tupbodylen);
1659 4708849 : if (state->backward) /* need trailing length word? */
1660 0 : BufFileWrite(state->myfile, &tuplen, sizeof(tuplen));
1661 :
1662 4708849 : FREEMEM(state, GetMemoryChunkSpace(tuple));
1663 4708849 : heap_free_minimal_tuple(tuple);
1664 4708849 : }
1665 :
1666 : static void *
1667 2086690 : readtup_heap(Tuplestorestate *state, unsigned int len)
1668 : {
1669 2086690 : unsigned int tupbodylen = len - sizeof(int);
1670 2086690 : unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
1671 2086690 : MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
1672 2086690 : char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1673 :
1674 : /* read in the tuple proper */
1675 2086690 : tuple->t_len = tuplen;
1676 2086690 : BufFileReadExact(state->myfile, tupbody, tupbodylen);
1677 2086690 : if (state->backward) /* need trailing length word? */
1678 0 : BufFileReadExact(state->myfile, &tuplen, sizeof(tuplen));
1679 2086690 : return tuple;
1680 : }
|