Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * tuplestore.c
4 : * Generalized routines for temporary tuple storage.
5 : *
6 : * This module handles temporary storage of tuples for purposes such
7 : * as Materialize nodes, hashjoin batch files, etc. It is essentially
8 : * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9 : * but can only store and regurgitate a sequence of tuples. However,
10 : * because no sort is required, it is allowed to start reading the sequence
11 : * before it has all been written. This is particularly useful for cursors,
12 : * because it allows random access within the already-scanned portion of
13 : * a query without having to process the underlying scan to completion.
14 : * Also, it is possible to support multiple independent read pointers.
15 : *
16 : * A temporary file is used to handle the data if it exceeds the
17 : * space limit specified by the caller.
18 : *
19 : * The (approximate) amount of memory allowed to the tuplestore is specified
20 : * in kilobytes by the caller. We absorb tuples and simply store them in an
21 : * in-memory array as long as we haven't exceeded maxKBytes. If we do exceed
22 : * maxKBytes, we dump all the tuples into a temp file and then read from that
23 : * when needed.
24 : *
25 : * Upon creation, a tuplestore supports a single read pointer, numbered 0.
26 : * Additional read pointers can be created using tuplestore_alloc_read_pointer.
27 : * Mark/restore behavior is supported by copying read pointers.
28 : *
29 : * When the caller requests backward-scan capability, we write the temp file
30 : * in a format that allows either forward or backward scan. Otherwise, only
31 : * forward scan is allowed. A request for backward scan must be made before
32 : * putting any tuples into the tuplestore. Rewind is normally allowed but
33 : * can be turned off via tuplestore_set_eflags; turning off rewind for all
34 : * read pointers enables truncation of the tuplestore at the oldest read point
35 : * for minimal memory usage. (The caller must explicitly call tuplestore_trim
36 : * at appropriate times for truncation to actually happen.)
37 : *
38 : * Note: in TSS_WRITEFILE state, the temp file's seek position is the
39 : * current write position, and the write-position variables in the tuplestore
40 : * aren't kept up to date. Similarly, in TSS_READFILE state the temp file's
41 : * seek position is the active read pointer's position, and that read pointer
42 : * isn't kept up to date. We update the appropriate variables using ftell()
43 : * before switching to the other state or activating a different read pointer.
44 : *
45 : *
46 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
47 : * Portions Copyright (c) 1994, Regents of the University of California
48 : *
49 : * IDENTIFICATION
50 : * src/backend/utils/sort/tuplestore.c
51 : *
52 : *-------------------------------------------------------------------------
53 : */
54 :
55 : #include "postgres.h"
56 :
57 : #include <limits.h>
58 :
59 : #include "access/htup_details.h"
60 : #include "commands/tablespace.h"
61 : #include "executor/executor.h"
62 : #include "miscadmin.h"
63 : #include "storage/buffile.h"
64 : #include "utils/memutils.h"
65 : #include "utils/resowner.h"
66 : #include "utils/tuplestore.h"
67 :
68 :
69 : /*
70 : * Possible states of a Tuplestore object. These denote the states that
71 : * persist between calls of Tuplestore routines.
72 : */
73 : typedef enum
74 : {
75 : TSS_INMEM, /* Tuples still fit in memory */
76 : TSS_WRITEFILE, /* Writing to temp file */
77 : TSS_READFILE, /* Reading from temp file */
78 : } TupStoreStatus;
79 :
80 : /*
81 : * State for a single read pointer. If we are in state INMEM then all the
82 : * read pointers' "current" fields denote the read positions. In state
83 : * WRITEFILE, the file/offset fields denote the read positions. In state
84 : * READFILE, inactive read pointers have valid file/offset, but the active
85 : * read pointer implicitly has position equal to the temp file's seek position.
86 : *
87 : * Special case: if eof_reached is true, then the pointer's read position is
88 : * implicitly equal to the write position, and current/file/offset aren't
89 : * maintained. This way we need not update all the read pointers each time
90 : * we write.
91 : */
92 : typedef struct
93 : {
94 : int eflags; /* capability flags */
95 : bool eof_reached; /* read has reached EOF */
96 : int current; /* next array index to read */
97 : int file; /* temp file# */
98 : pgoff_t offset; /* byte offset in file */
99 : } TSReadPointer;
100 :
101 : /*
102 : * Private state of a Tuplestore operation.
103 : */
104 : struct Tuplestorestate
105 : {
106 : TupStoreStatus status; /* enumerated value as shown above */
107 : int eflags; /* capability flags (OR of pointers' flags) */
108 : bool backward; /* store extra length words in file? */
109 : bool interXact; /* keep open through transactions? */
110 : bool truncated; /* tuplestore_trim has removed tuples? */
111 : bool usedDisk; /* used by tuplestore_get_stats() */
112 : int64 maxSpace; /* used by tuplestore_get_stats() */
113 : int64 availMem; /* remaining memory available, in bytes */
114 : int64 allowedMem; /* total memory allowed, in bytes */
115 : int64 tuples; /* number of tuples added */
116 : BufFile *myfile; /* underlying file, or NULL if none */
117 : MemoryContext context; /* memory context for holding tuples */
118 : ResourceOwner resowner; /* resowner for holding temp files */
119 :
120 : /*
121 : * These function pointers decouple the routines that must know what kind
122 : * of tuple we are handling from the routines that don't need to know it.
123 : * They are set up by the tuplestore_begin_xxx routines.
124 : *
125 : * (Although tuplestore.c currently only supports heap tuples, I've copied
126 : * this part of tuplesort.c so that extension to other kinds of objects
127 : * will be easy if it's ever needed.)
128 : *
129 : * Function to copy a supplied input tuple into palloc'd space. (NB: we
130 : * assume that a single pfree() is enough to release the tuple later, so
131 : * the representation must be "flat" in one palloc chunk.) state->availMem
132 : * must be decreased by the amount of space used.
133 : */
134 : void *(*copytup) (Tuplestorestate *state, void *tup);
135 :
136 : /*
137 : * Function to write a stored tuple onto tape. The representation of the
138 : * tuple on tape need not be the same as it is in memory; requirements on
139 : * the tape representation are given below. After writing the tuple,
140 : * pfree() it, and increase state->availMem by the amount of memory space
141 : * thereby released.
142 : */
143 : void (*writetup) (Tuplestorestate *state, void *tup);
144 :
145 : /*
146 : * Function to read a stored tuple from tape back into memory. 'len' is
147 : * the already-read length of the stored tuple. Create and return a
148 : * palloc'd copy, and decrease state->availMem by the amount of memory
149 : * space consumed.
150 : */
151 : void *(*readtup) (Tuplestorestate *state, unsigned int len);
152 :
153 : /*
154 : * This array holds pointers to tuples in memory if we are in state INMEM.
155 : * In states WRITEFILE and READFILE it's not used.
156 : *
157 : * When memtupdeleted > 0, the first memtupdeleted pointers are already
158 : * released due to a tuplestore_trim() operation, but we haven't expended
159 : * the effort to slide the remaining pointers down. These unused pointers
160 : * are set to NULL to catch any invalid accesses. Note that memtupcount
161 : * includes the deleted pointers.
162 : */
163 : void **memtuples; /* array of pointers to palloc'd tuples */
164 : int memtupdeleted; /* the first N slots are currently unused */
165 : int memtupcount; /* number of tuples currently present */
166 : int memtupsize; /* allocated length of memtuples array */
167 : bool growmemtuples; /* memtuples' growth still underway? */
168 :
169 : /*
170 : * These variables are used to keep track of the current positions.
171 : *
172 : * In state WRITEFILE, the current file seek position is the write point;
173 : * in state READFILE, the write position is remembered in writepos_xxx.
174 : * (The write position is the same as EOF, but since BufFileSeek doesn't
175 : * currently implement SEEK_END, we have to remember it explicitly.)
176 : */
177 : TSReadPointer *readptrs; /* array of read pointers */
178 : int activeptr; /* index of the active read pointer */
179 : int readptrcount; /* number of pointers currently valid */
180 : int readptrsize; /* allocated length of readptrs array */
181 :
182 : int writepos_file; /* file# (valid if READFILE state) */
183 : pgoff_t writepos_offset; /* offset (valid if READFILE state) */
184 : };
185 :
186 : #define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
187 : #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
188 : #define READTUP(state,len) ((*(state)->readtup) (state, len))
189 : #define LACKMEM(state) ((state)->availMem < 0)
190 : #define USEMEM(state,amt) ((state)->availMem -= (amt))
191 : #define FREEMEM(state,amt) ((state)->availMem += (amt))
192 :
193 : /*--------------------
194 : *
195 : * NOTES about on-tape representation of tuples:
196 : *
197 : * We require the first "unsigned int" of a stored tuple to be the total size
198 : * on-tape of the tuple, including itself (so it is never zero).
199 : * The remainder of the stored tuple
200 : * may or may not match the in-memory representation of the tuple ---
201 : * any conversion needed is the job of the writetup and readtup routines.
202 : *
203 : * If state->backward is true, then the stored representation of
204 : * the tuple must be followed by another "unsigned int" that is a copy of the
205 : * length --- so the total tape space used is actually sizeof(unsigned int)
206 : * more than the stored length value. This allows read-backwards. When
207 : * state->backward is not set, the write/read routines may omit the extra
208 : * length word.
209 : *
210 : * writetup is expected to write both length words as well as the tuple
211 : * data. When readtup is called, the tape is positioned just after the
212 : * front length word; readtup must read the tuple data and advance past
213 : * the back length word (if present).
214 : *
215 : * The write/read routines can make use of the tuple description data
216 : * stored in the Tuplestorestate record, if needed. They are also expected
217 : * to adjust state->availMem by the amount of memory space (not tape space!)
218 : * released or consumed. There is no error return from either writetup
219 : * or readtup; they should ereport() on failure.
220 : *
221 : *
222 : * NOTES about memory consumption calculations:
223 : *
224 : * We count space allocated for tuples against the maxKBytes limit,
225 : * plus the space used by the variable-size array memtuples.
226 : * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
227 : * We don't worry about the size of the read pointer array, either.
228 : *
229 : * Note that we count actual space used (as shown by GetMemoryChunkSpace)
230 : * rather than the originally-requested size. This is important since
231 : * palloc can add substantial overhead. It's not a complete answer since
232 : * we won't count any wasted space in palloc allocation blocks, but it's
233 : * a lot better than what we were doing before 7.3.
234 : *
235 : *--------------------
236 : */
237 :
238 :
239 : static Tuplestorestate *tuplestore_begin_common(int eflags,
240 : bool interXact,
241 : int maxKBytes);
242 : static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
243 : static void dumptuples(Tuplestorestate *state);
244 : static void tuplestore_updatemax(Tuplestorestate *state);
245 : static unsigned int getlen(Tuplestorestate *state, bool eofOK);
246 : static void *copytup_heap(Tuplestorestate *state, void *tup);
247 : static void writetup_heap(Tuplestorestate *state, void *tup);
248 : static void *readtup_heap(Tuplestorestate *state, unsigned int len);
249 :
250 :
251 : /*
252 : * tuplestore_begin_xxx
253 : *
254 : * Initialize for a tuple store operation.
255 : */
256 : static Tuplestorestate *
257 146133 : tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
258 : {
259 : Tuplestorestate *state;
260 :
261 146133 : state = palloc0_object(Tuplestorestate);
262 :
263 146133 : state->status = TSS_INMEM;
264 146133 : state->eflags = eflags;
265 146133 : state->interXact = interXact;
266 146133 : state->truncated = false;
267 146133 : state->usedDisk = false;
268 146133 : state->maxSpace = 0;
269 146133 : state->allowedMem = maxKBytes * (int64) 1024;
270 146133 : state->availMem = state->allowedMem;
271 146133 : state->myfile = NULL;
272 :
273 : /*
274 : * The palloc/pfree pattern for tuple memory is in a FIFO pattern. A
275 : * generation context is perfectly suited for this.
276 : */
277 146133 : state->context = GenerationContextCreate(CurrentMemoryContext,
278 : "tuplestore tuples",
279 : ALLOCSET_DEFAULT_SIZES);
280 146133 : state->resowner = CurrentResourceOwner;
281 :
282 146133 : state->memtupdeleted = 0;
283 146133 : state->memtupcount = 0;
284 146133 : state->tuples = 0;
285 :
286 : /*
287 : * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
288 : * see comments in grow_memtuples().
289 : */
290 146133 : state->memtupsize = Max(16384 / sizeof(void *),
291 : ALLOCSET_SEPARATE_THRESHOLD / sizeof(void *) + 1);
292 :
293 146133 : state->growmemtuples = true;
294 146133 : state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
295 :
296 146133 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
297 :
298 146133 : state->activeptr = 0;
299 146133 : state->readptrcount = 1;
300 146133 : state->readptrsize = 8; /* arbitrary */
301 146133 : state->readptrs = (TSReadPointer *)
302 146133 : palloc(state->readptrsize * sizeof(TSReadPointer));
303 :
304 146133 : state->readptrs[0].eflags = eflags;
305 146133 : state->readptrs[0].eof_reached = false;
306 146133 : state->readptrs[0].current = 0;
307 :
308 146133 : return state;
309 : }
310 :
311 : /*
312 : * tuplestore_begin_heap
313 : *
314 : * Create a new tuplestore; other types of tuple stores (other than
315 : * "heap" tuple stores, for heap tuples) are possible, but not presently
316 : * implemented.
317 : *
318 : * randomAccess: if true, both forward and backward accesses to the
319 : * tuple store are allowed.
320 : *
321 : * interXact: if true, the files used for on-disk storage persist beyond the
322 : * end of the current transaction. NOTE: It's the caller's responsibility to
323 : * create such a tuplestore in a memory context and resource owner that will
324 : * also survive transaction boundaries, and to ensure the tuplestore is closed
325 : * when it's no longer wanted.
326 : *
327 : * maxKBytes: how much data to store in memory (any data beyond this
328 : * amount is paged to disk). When in doubt, use work_mem.
329 : */
330 : Tuplestorestate *
331 146133 : tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
332 : {
333 : Tuplestorestate *state;
334 : int eflags;
335 :
336 : /*
337 : * This interpretation of the meaning of randomAccess is compatible with
338 : * the pre-8.3 behavior of tuplestores.
339 : */
340 146133 : eflags = randomAccess ?
341 146133 : (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
342 : (EXEC_FLAG_REWIND);
343 :
344 146133 : state = tuplestore_begin_common(eflags, interXact, maxKBytes);
345 :
346 146133 : state->copytup = copytup_heap;
347 146133 : state->writetup = writetup_heap;
348 146133 : state->readtup = readtup_heap;
349 :
350 146133 : return state;
351 : }
352 :
353 : /*
354 : * tuplestore_set_eflags
355 : *
356 : * Set the capability flags for read pointer 0 at a finer grain than is
357 : * allowed by tuplestore_begin_xxx. This must be called before inserting
358 : * any data into the tuplestore.
359 : *
360 : * eflags is a bitmask following the meanings used for executor node
361 : * startup flags (see executor.h). tuplestore pays attention to these bits:
362 : * EXEC_FLAG_REWIND need rewind to start
363 : * EXEC_FLAG_BACKWARD need backward fetch
364 : * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
365 : * is set per "randomAccess" in the tuplestore_begin_xxx call.
366 : *
367 : * NOTE: setting BACKWARD without REWIND means the pointer can read backwards,
368 : * but not further than the truncation point (the furthest-back read pointer
369 : * position at the time of the last tuplestore_trim call).
370 : */
371 : void
372 4999 : tuplestore_set_eflags(Tuplestorestate *state, int eflags)
373 : {
374 : int i;
375 :
376 4999 : if (state->status != TSS_INMEM || state->memtupcount != 0)
377 0 : elog(ERROR, "too late to call tuplestore_set_eflags");
378 :
379 4999 : state->readptrs[0].eflags = eflags;
380 4999 : for (i = 1; i < state->readptrcount; i++)
381 0 : eflags |= state->readptrs[i].eflags;
382 4999 : state->eflags = eflags;
383 4999 : }
384 :
385 : /*
386 : * tuplestore_alloc_read_pointer - allocate another read pointer.
387 : *
388 : * Returns the pointer's index.
389 : *
390 : * The new pointer initially copies the position of read pointer 0.
391 : * It can have its own eflags, but if any data has been inserted into
392 : * the tuplestore, these eflags must not represent an increase in
393 : * requirements.
394 : */
395 : int
396 6594 : tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
397 : {
398 : /* Check for possible increase of requirements */
399 6594 : if (state->status != TSS_INMEM || state->memtupcount != 0)
400 : {
401 426 : if ((state->eflags | eflags) != state->eflags)
402 0 : elog(ERROR, "too late to require new tuplestore eflags");
403 : }
404 :
405 : /* Make room for another read pointer if needed */
406 6594 : if (state->readptrcount >= state->readptrsize)
407 : {
408 20 : int newcnt = state->readptrsize * 2;
409 :
410 20 : state->readptrs = (TSReadPointer *)
411 20 : repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
412 20 : state->readptrsize = newcnt;
413 : }
414 :
415 : /* And set it up */
416 6594 : state->readptrs[state->readptrcount] = state->readptrs[0];
417 6594 : state->readptrs[state->readptrcount].eflags = eflags;
418 :
419 6594 : state->eflags |= eflags;
420 :
421 6594 : return state->readptrcount++;
422 : }
423 :
424 : /*
425 : * tuplestore_clear
426 : *
427 : * Delete all the contents of a tuplestore, and reset its read pointers
428 : * to the start.
429 : */
430 : void
431 6879 : tuplestore_clear(Tuplestorestate *state)
432 : {
433 : int i;
434 : TSReadPointer *readptr;
435 :
436 : /* update the maxSpace before doing any USEMEM/FREEMEM adjustments */
437 6879 : tuplestore_updatemax(state);
438 :
439 6879 : if (state->myfile)
440 8 : BufFileClose(state->myfile);
441 6879 : state->myfile = NULL;
442 :
443 : #ifdef USE_ASSERT_CHECKING
444 : {
445 : int64 availMem = state->availMem;
446 :
447 : /*
448 : * Below, we reset the memory context for storing tuples. To save
449 : * from having to always call GetMemoryChunkSpace() on all stored
450 : * tuples, we adjust the availMem to forget all the tuples and just
451 : * recall USEMEM for the space used by the memtuples array. Here we
452 : * just Assert that's correct and the memory tracking hasn't gone
453 : * wrong anywhere.
454 : */
455 : for (i = state->memtupdeleted; i < state->memtupcount; i++)
456 : availMem += GetMemoryChunkSpace(state->memtuples[i]);
457 :
458 : availMem += GetMemoryChunkSpace(state->memtuples);
459 :
460 : Assert(availMem == state->allowedMem);
461 : }
462 : #endif
463 :
464 : /* clear the memory consumed by the memory tuples */
465 6879 : MemoryContextReset(state->context);
466 :
467 : /*
468 : * Zero the used memory and re-consume the space for the memtuples array.
469 : * This saves having to FREEMEM for each stored tuple.
470 : */
471 6879 : state->availMem = state->allowedMem;
472 6879 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
473 :
474 6879 : state->status = TSS_INMEM;
475 6879 : state->truncated = false;
476 6879 : state->memtupdeleted = 0;
477 6879 : state->memtupcount = 0;
478 6879 : state->tuples = 0;
479 6879 : readptr = state->readptrs;
480 21447 : for (i = 0; i < state->readptrcount; readptr++, i++)
481 : {
482 14568 : readptr->eof_reached = false;
483 14568 : readptr->current = 0;
484 : }
485 6879 : }
486 :
487 : /*
488 : * tuplestore_end
489 : *
490 : * Release resources and clean up.
491 : */
492 : void
493 145524 : tuplestore_end(Tuplestorestate *state)
494 : {
495 145524 : if (state->myfile)
496 53 : BufFileClose(state->myfile);
497 :
498 145524 : MemoryContextDelete(state->context);
499 145524 : pfree(state->memtuples);
500 145524 : pfree(state->readptrs);
501 145524 : pfree(state);
502 145524 : }
503 :
504 : /*
505 : * tuplestore_select_read_pointer - make the specified read pointer active
506 : */
507 : void
508 2912989 : tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
509 : {
510 : TSReadPointer *readptr;
511 : TSReadPointer *oldptr;
512 :
513 : Assert(ptr >= 0 && ptr < state->readptrcount);
514 :
515 : /* No work if already active */
516 2912989 : if (ptr == state->activeptr)
517 753552 : return;
518 :
519 2159437 : readptr = &state->readptrs[ptr];
520 2159437 : oldptr = &state->readptrs[state->activeptr];
521 :
522 2159437 : switch (state->status)
523 : {
524 2159429 : case TSS_INMEM:
525 : case TSS_WRITEFILE:
526 : /* no work */
527 2159429 : break;
528 8 : case TSS_READFILE:
529 :
530 : /*
531 : * First, save the current read position in the pointer about to
532 : * become inactive.
533 : */
534 8 : if (!oldptr->eof_reached)
535 8 : BufFileTell(state->myfile,
536 : &oldptr->file,
537 : &oldptr->offset);
538 :
539 : /*
540 : * We have to make the temp file's seek position equal to the
541 : * logical position of the new read pointer. In eof_reached
542 : * state, that's the EOF, which we have available from the saved
543 : * write position.
544 : */
545 8 : if (readptr->eof_reached)
546 : {
547 0 : if (BufFileSeek(state->myfile,
548 : state->writepos_file,
549 : state->writepos_offset,
550 : SEEK_SET) != 0)
551 0 : ereport(ERROR,
552 : (errcode_for_file_access(),
553 : errmsg("could not seek in tuplestore temporary file")));
554 : }
555 : else
556 : {
557 8 : if (BufFileSeek(state->myfile,
558 : readptr->file,
559 : readptr->offset,
560 : SEEK_SET) != 0)
561 0 : ereport(ERROR,
562 : (errcode_for_file_access(),
563 : errmsg("could not seek in tuplestore temporary file")));
564 : }
565 8 : break;
566 0 : default:
567 0 : elog(ERROR, "invalid tuplestore state");
568 : break;
569 : }
570 :
571 2159437 : state->activeptr = ptr;
572 : }
573 :
574 : /*
575 : * tuplestore_tuple_count
576 : *
577 : * Returns the number of tuples added since creation or the last
578 : * tuplestore_clear().
579 : */
580 : int64
581 4264 : tuplestore_tuple_count(Tuplestorestate *state)
582 : {
583 4264 : return state->tuples;
584 : }
585 :
586 : /*
587 : * tuplestore_ateof
588 : *
589 : * Returns the active read pointer's eof_reached state.
590 : */
591 : bool
592 2951963 : tuplestore_ateof(Tuplestorestate *state)
593 : {
594 2951963 : return state->readptrs[state->activeptr].eof_reached;
595 : }
596 :
597 : /*
598 : * Grow the memtuples[] array, if possible within our memory constraint. We
599 : * must not exceed INT_MAX tuples in memory or the caller-provided memory
600 : * limit. Return true if we were able to enlarge the array, false if not.
601 : *
602 : * Normally, at each increment we double the size of the array. When doing
603 : * that would exceed a limit, we attempt one last, smaller increase (and then
604 : * clear the growmemtuples flag so we don't try any more). That allows us to
605 : * use memory as fully as permitted; sticking to the pure doubling rule could
606 : * result in almost half going unused. Because availMem moves around with
607 : * tuple addition/removal, we need some rule to prevent making repeated small
608 : * increases in memtupsize, which would just be useless thrashing. The
609 : * growmemtuples flag accomplishes that and also prevents useless
610 : * recalculations in this function.
611 : */
612 : static bool
613 1250 : grow_memtuples(Tuplestorestate *state)
614 : {
615 : int newmemtupsize;
616 1250 : int memtupsize = state->memtupsize;
617 1250 : int64 memNowUsed = state->allowedMem - state->availMem;
618 :
619 : /* Forget it if we've already maxed out memtuples, per comment above */
620 1250 : if (!state->growmemtuples)
621 4 : return false;
622 :
623 : /* Select new value of memtupsize */
624 1246 : if (memNowUsed <= state->availMem)
625 : {
626 : /*
627 : * We've used no more than half of allowedMem; double our usage,
628 : * clamping at INT_MAX tuples.
629 : */
630 1201 : if (memtupsize < INT_MAX / 2)
631 1201 : newmemtupsize = memtupsize * 2;
632 : else
633 : {
634 0 : newmemtupsize = INT_MAX;
635 0 : state->growmemtuples = false;
636 : }
637 : }
638 : else
639 : {
640 : /*
641 : * This will be the last increment of memtupsize. Abandon doubling
642 : * strategy and instead increase as much as we safely can.
643 : *
644 : * To stay within allowedMem, we can't increase memtupsize by more
645 : * than availMem / sizeof(void *) elements. In practice, we want to
646 : * increase it by considerably less, because we need to leave some
647 : * space for the tuples to which the new array slots will refer. We
648 : * assume the new tuples will be about the same size as the tuples
649 : * we've already seen, and thus we can extrapolate from the space
650 : * consumption so far to estimate an appropriate new size for the
651 : * memtuples array. The optimal value might be higher or lower than
652 : * this estimate, but it's hard to know that in advance. We again
653 : * clamp at INT_MAX tuples.
654 : *
655 : * This calculation is safe against enlarging the array so much that
656 : * LACKMEM becomes true, because the memory currently used includes
657 : * the present array; thus, there would be enough allowedMem for the
658 : * new array elements even if no other memory were currently used.
659 : *
660 : * We do the arithmetic in float8, because otherwise the product of
661 : * memtupsize and allowedMem could overflow. Any inaccuracy in the
662 : * result should be insignificant; but even if we computed a
663 : * completely insane result, the checks below will prevent anything
664 : * really bad from happening.
665 : */
666 : double grow_ratio;
667 :
668 45 : grow_ratio = (double) state->allowedMem / (double) memNowUsed;
669 45 : if (memtupsize * grow_ratio < INT_MAX)
670 45 : newmemtupsize = (int) (memtupsize * grow_ratio);
671 : else
672 0 : newmemtupsize = INT_MAX;
673 :
674 : /* We won't make any further enlargement attempts */
675 45 : state->growmemtuples = false;
676 : }
677 :
678 : /* Must enlarge array by at least one element, else report failure */
679 1246 : if (newmemtupsize <= memtupsize)
680 0 : goto noalloc;
681 :
682 : /*
683 : * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
684 : * to ensure our request won't be rejected. Note that we can easily
685 : * exhaust address space before facing this outcome. (This is presently
686 : * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
687 : * don't rely on that at this distance.)
688 : */
689 1246 : if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(void *))
690 : {
691 0 : newmemtupsize = (int) (MaxAllocHugeSize / sizeof(void *));
692 0 : state->growmemtuples = false; /* can't grow any more */
693 : }
694 :
695 : /*
696 : * We need to be sure that we do not cause LACKMEM to become true, else
697 : * the space management algorithm will go nuts. The code above should
698 : * never generate a dangerous request, but to be safe, check explicitly
699 : * that the array growth fits within availMem. (We could still cause
700 : * LACKMEM if the memory chunk overhead associated with the memtuples
701 : * array were to increase. That shouldn't happen because we chose the
702 : * initial array size large enough to ensure that palloc will be treating
703 : * both old and new arrays as separate chunks. But we'll check LACKMEM
704 : * explicitly below just in case.)
705 : */
706 1246 : if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(void *)))
707 0 : goto noalloc;
708 :
709 : /* OK, do it */
710 1246 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
711 1246 : state->memtupsize = newmemtupsize;
712 1246 : state->memtuples = (void **)
713 1246 : repalloc_huge(state->memtuples,
714 1246 : state->memtupsize * sizeof(void *));
715 1246 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
716 1246 : if (LACKMEM(state))
717 0 : elog(ERROR, "unexpected out-of-memory situation in tuplestore");
718 1246 : return true;
719 :
720 0 : noalloc:
721 : /* If for any reason we didn't realloc, shut off future attempts */
722 0 : state->growmemtuples = false;
723 0 : return false;
724 : }
725 :
726 : /*
727 : * Accept one tuple and append it to the tuplestore.
728 : *
729 : * Note that the input tuple is always copied; the caller need not save it.
730 : *
731 : * If the active read pointer is currently "at EOF", it remains so (the read
732 : * pointer implicitly advances along with the write pointer); otherwise the
733 : * read pointer is unchanged. Non-active read pointers do not move, which
734 : * means they are certain to not be "at EOF" immediately after puttuple.
735 : * This curious-seeming behavior is for the convenience of nodeMaterial.c and
736 : * nodeCtescan.c, which would otherwise need to do extra pointer repositioning
737 : * steps.
738 : *
739 : * tuplestore_puttupleslot() is a convenience routine to collect data from
740 : * a TupleTableSlot without an extra copy operation.
741 : */
742 : void
743 1392044 : tuplestore_puttupleslot(Tuplestorestate *state,
744 : TupleTableSlot *slot)
745 : {
746 : MinimalTuple tuple;
747 1392044 : MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
748 :
749 : /*
750 : * Form a MinimalTuple in working memory
751 : */
752 1392044 : tuple = ExecCopySlotMinimalTuple(slot);
753 1392044 : USEMEM(state, GetMemoryChunkSpace(tuple));
754 :
755 1392044 : tuplestore_puttuple_common(state, tuple);
756 :
757 1392044 : MemoryContextSwitchTo(oldcxt);
758 1392044 : }
759 :
760 : /*
761 : * "Standard" case to copy from a HeapTuple. This is actually now somewhat
762 : * deprecated, but not worth getting rid of in view of the number of callers.
763 : */
764 : void
765 1073391 : tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
766 : {
767 1073391 : MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
768 :
769 : /*
770 : * Copy the tuple. (Must do this even in WRITEFILE case. Note that
771 : * COPYTUP includes USEMEM, so we needn't do that here.)
772 : */
773 1073391 : tuple = COPYTUP(state, tuple);
774 :
775 1073391 : tuplestore_puttuple_common(state, tuple);
776 :
777 1073391 : MemoryContextSwitchTo(oldcxt);
778 1073391 : }
779 :
780 : /*
781 : * Similar to tuplestore_puttuple(), but work from values + nulls arrays.
782 : * This avoids an extra tuple-construction operation.
783 : */
784 : void
785 10371928 : tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
786 : const Datum *values, const bool *isnull)
787 : {
788 : MinimalTuple tuple;
789 10371928 : MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
790 :
791 10371928 : tuple = heap_form_minimal_tuple(tdesc, values, isnull, 0);
792 10371928 : USEMEM(state, GetMemoryChunkSpace(tuple));
793 :
794 10371928 : tuplestore_puttuple_common(state, tuple);
795 :
796 10371928 : MemoryContextSwitchTo(oldcxt);
797 10371928 : }
798 :
799 : static void
800 12837363 : tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
801 : {
802 : TSReadPointer *readptr;
803 : int i;
804 : ResourceOwner oldowner;
805 : MemoryContext oldcxt;
806 :
807 12837363 : state->tuples++;
808 :
809 12837363 : switch (state->status)
810 : {
811 10072570 : case TSS_INMEM:
812 :
813 : /*
814 : * Update read pointers as needed; see API spec above.
815 : */
816 10072570 : readptr = state->readptrs;
817 21815484 : for (i = 0; i < state->readptrcount; readptr++, i++)
818 : {
819 11742914 : if (readptr->eof_reached && i != state->activeptr)
820 : {
821 314 : readptr->eof_reached = false;
822 314 : readptr->current = state->memtupcount;
823 : }
824 : }
825 :
826 : /*
827 : * Grow the array as needed. Note that we try to grow the array
828 : * when there is still one free slot remaining --- if we fail,
829 : * there'll still be room to store the incoming tuple, and then
830 : * we'll switch to tape-based operation.
831 : */
832 10072570 : if (state->memtupcount >= state->memtupsize - 1)
833 : {
834 1250 : (void) grow_memtuples(state);
835 : Assert(state->memtupcount < state->memtupsize);
836 : }
837 :
838 : /* Stash the tuple in the in-memory array */
839 10072570 : state->memtuples[state->memtupcount++] = tuple;
840 :
841 : /*
842 : * Done if we still fit in available memory and have array slots.
843 : */
844 10072570 : if (state->memtupcount < state->memtupsize && !LACKMEM(state))
845 10072507 : return;
846 :
847 : /*
848 : * Nope; time to switch to tape-based operation. Make sure that
849 : * the temp file(s) are created in suitable temp tablespaces.
850 : */
851 63 : PrepareTempTablespaces();
852 :
853 : /* associate the file with the store's resource owner */
854 63 : oldowner = CurrentResourceOwner;
855 63 : CurrentResourceOwner = state->resowner;
856 :
857 : /*
858 : * We switch out of the state->context as this is a generation
859 : * context, which isn't ideal for allocations relating to the
860 : * BufFile.
861 : */
862 63 : oldcxt = MemoryContextSwitchTo(state->context->parent);
863 :
864 63 : state->myfile = BufFileCreateTemp(state->interXact);
865 :
866 63 : MemoryContextSwitchTo(oldcxt);
867 :
868 63 : CurrentResourceOwner = oldowner;
869 :
870 : /*
871 : * Freeze the decision about whether trailing length words will be
872 : * used. We can't change this choice once data is on tape, even
873 : * though callers might drop the requirement.
874 : */
875 63 : state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
876 :
877 : /*
878 : * Update the maximum space used before dumping the tuples. It's
879 : * possible that more space will be used by the tuples in memory
880 : * than the space that will be used on disk.
881 : */
882 63 : tuplestore_updatemax(state);
883 :
884 63 : state->status = TSS_WRITEFILE;
885 63 : dumptuples(state);
886 63 : break;
887 2764785 : case TSS_WRITEFILE:
888 :
889 : /*
890 : * Update read pointers as needed; see API spec above. Note:
891 : * BufFileTell is quite cheap, so not worth trying to avoid
892 : * multiple calls.
893 : */
894 2764785 : readptr = state->readptrs;
895 5537266 : for (i = 0; i < state->readptrcount; readptr++, i++)
896 : {
897 2772481 : if (readptr->eof_reached && i != state->activeptr)
898 : {
899 0 : readptr->eof_reached = false;
900 0 : BufFileTell(state->myfile,
901 : &readptr->file,
902 : &readptr->offset);
903 : }
904 : }
905 :
906 2764785 : WRITETUP(state, tuple);
907 2764785 : break;
908 8 : case TSS_READFILE:
909 :
910 : /*
911 : * Switch from reading to writing.
912 : */
913 8 : if (!state->readptrs[state->activeptr].eof_reached)
914 8 : BufFileTell(state->myfile,
915 8 : &state->readptrs[state->activeptr].file,
916 8 : &state->readptrs[state->activeptr].offset);
917 8 : if (BufFileSeek(state->myfile,
918 : state->writepos_file, state->writepos_offset,
919 : SEEK_SET) != 0)
920 0 : ereport(ERROR,
921 : (errcode_for_file_access(),
922 : errmsg("could not seek in tuplestore temporary file")));
923 8 : state->status = TSS_WRITEFILE;
924 :
925 : /*
926 : * Update read pointers as needed; see API spec above.
927 : */
928 8 : readptr = state->readptrs;
929 24 : for (i = 0; i < state->readptrcount; readptr++, i++)
930 : {
931 16 : if (readptr->eof_reached && i != state->activeptr)
932 : {
933 0 : readptr->eof_reached = false;
934 0 : readptr->file = state->writepos_file;
935 0 : readptr->offset = state->writepos_offset;
936 : }
937 : }
938 :
939 8 : WRITETUP(state, tuple);
940 8 : break;
941 0 : default:
942 0 : elog(ERROR, "invalid tuplestore state");
943 : break;
944 : }
945 : }
946 :
947 : /*
948 : * Fetch the next tuple in either forward or back direction.
949 : * Returns NULL if no more tuples. If should_free is set, the
950 : * caller must pfree the returned tuple when done with it.
951 : *
952 : * Backward scan is only allowed if randomAccess was set true or
953 : * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
954 : */
955 : static void *
956 15139729 : tuplestore_gettuple(Tuplestorestate *state, bool forward,
957 : bool *should_free)
958 : {
959 15139729 : TSReadPointer *readptr = &state->readptrs[state->activeptr];
960 : unsigned int tuplen;
961 : void *tup;
962 :
963 : Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
964 :
965 15139729 : switch (state->status)
966 : {
967 13053388 : case TSS_INMEM:
968 13053388 : *should_free = false;
969 13053388 : if (forward)
970 : {
971 12935830 : if (readptr->eof_reached)
972 121 : return NULL;
973 12935709 : if (readptr->current < state->memtupcount)
974 : {
975 : /* We have another tuple, so return it */
976 12693764 : return state->memtuples[readptr->current++];
977 : }
978 241945 : readptr->eof_reached = true;
979 241945 : return NULL;
980 : }
981 : else
982 : {
983 : /*
984 : * if all tuples are fetched already then we return last
985 : * tuple, else tuple before last returned.
986 : */
987 117558 : if (readptr->eof_reached)
988 : {
989 1901 : readptr->current = state->memtupcount;
990 1901 : readptr->eof_reached = false;
991 : }
992 : else
993 : {
994 115657 : if (readptr->current <= state->memtupdeleted)
995 : {
996 : Assert(!state->truncated);
997 0 : return NULL;
998 : }
999 115657 : readptr->current--; /* last returned tuple */
1000 : }
1001 117558 : if (readptr->current <= state->memtupdeleted)
1002 : {
1003 : Assert(!state->truncated);
1004 21 : return NULL;
1005 : }
1006 117537 : return state->memtuples[readptr->current - 1];
1007 : }
1008 : break;
1009 :
1010 70 : case TSS_WRITEFILE:
1011 : /* Skip state change if we'll just return NULL */
1012 70 : if (readptr->eof_reached && forward)
1013 0 : return NULL;
1014 :
1015 : /*
1016 : * Switch from writing to reading.
1017 : */
1018 70 : BufFileTell(state->myfile,
1019 : &state->writepos_file, &state->writepos_offset);
1020 70 : if (!readptr->eof_reached)
1021 70 : if (BufFileSeek(state->myfile,
1022 : readptr->file, readptr->offset,
1023 : SEEK_SET) != 0)
1024 0 : ereport(ERROR,
1025 : (errcode_for_file_access(),
1026 : errmsg("could not seek in tuplestore temporary file")));
1027 70 : state->status = TSS_READFILE;
1028 : pg_fallthrough;
1029 :
1030 2086341 : case TSS_READFILE:
1031 2086341 : *should_free = true;
1032 2086341 : if (forward)
1033 : {
1034 2086341 : if ((tuplen = getlen(state, true)) != 0)
1035 : {
1036 2086291 : tup = READTUP(state, tuplen);
1037 2086291 : return tup;
1038 : }
1039 : else
1040 : {
1041 50 : readptr->eof_reached = true;
1042 50 : return NULL;
1043 : }
1044 : }
1045 :
1046 : /*
1047 : * Backward.
1048 : *
1049 : * if all tuples are fetched already then we return last tuple,
1050 : * else tuple before last returned.
1051 : *
1052 : * Back up to fetch previously-returned tuple's ending length
1053 : * word. If seek fails, assume we are at start of file.
1054 : */
1055 0 : if (BufFileSeek(state->myfile, 0, -(pgoff_t) sizeof(unsigned int),
1056 : SEEK_CUR) != 0)
1057 : {
1058 : /* even a failed backwards fetch gets you out of eof state */
1059 0 : readptr->eof_reached = false;
1060 : Assert(!state->truncated);
1061 0 : return NULL;
1062 : }
1063 0 : tuplen = getlen(state, false);
1064 :
1065 0 : if (readptr->eof_reached)
1066 : {
1067 0 : readptr->eof_reached = false;
1068 : /* We will return the tuple returned before returning NULL */
1069 : }
1070 : else
1071 : {
1072 : /*
1073 : * Back up to get ending length word of tuple before it.
1074 : */
1075 0 : if (BufFileSeek(state->myfile, 0,
1076 0 : -(pgoff_t) (tuplen + 2 * sizeof(unsigned int)),
1077 : SEEK_CUR) != 0)
1078 : {
1079 : /*
1080 : * If that fails, presumably the prev tuple is the first
1081 : * in the file. Back up so that it becomes next to read
1082 : * in forward direction (not obviously right, but that is
1083 : * what in-memory case does).
1084 : */
1085 0 : if (BufFileSeek(state->myfile, 0,
1086 0 : -(pgoff_t) (tuplen + sizeof(unsigned int)),
1087 : SEEK_CUR) != 0)
1088 0 : ereport(ERROR,
1089 : (errcode_for_file_access(),
1090 : errmsg("could not seek in tuplestore temporary file")));
1091 : Assert(!state->truncated);
1092 0 : return NULL;
1093 : }
1094 0 : tuplen = getlen(state, false);
1095 : }
1096 :
1097 : /*
1098 : * Now we have the length of the prior tuple, back up and read it.
1099 : * Note: READTUP expects we are positioned after the initial
1100 : * length word of the tuple, so back up to that point.
1101 : */
1102 0 : if (BufFileSeek(state->myfile, 0,
1103 0 : -(pgoff_t) tuplen,
1104 : SEEK_CUR) != 0)
1105 0 : ereport(ERROR,
1106 : (errcode_for_file_access(),
1107 : errmsg("could not seek in tuplestore temporary file")));
1108 0 : tup = READTUP(state, tuplen);
1109 0 : return tup;
1110 :
1111 0 : default:
1112 0 : elog(ERROR, "invalid tuplestore state");
1113 : return NULL; /* keep compiler quiet */
1114 : }
1115 : }
1116 :
1117 : /*
1118 : * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
1119 : *
1120 : * If successful, put tuple in slot and return true; else, clear the slot
1121 : * and return false.
1122 : *
1123 : * If copy is true, the slot receives a copied tuple (allocated in current
1124 : * memory context) that will stay valid regardless of future manipulations of
1125 : * the tuplestore's state. If copy is false, the slot may just receive a
1126 : * pointer to a tuple held within the tuplestore. The latter is more
1127 : * efficient but the slot contents may be corrupted if additional writes to
1128 : * the tuplestore occur. (If using tuplestore_trim, see comments therein.)
1129 : */
1130 : bool
1131 15024317 : tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
1132 : bool copy, TupleTableSlot *slot)
1133 : {
1134 : MinimalTuple tuple;
1135 : bool should_free;
1136 :
1137 15024317 : tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);
1138 :
1139 15024317 : if (tuple)
1140 : {
1141 14784041 : if (copy && !should_free)
1142 : {
1143 1326769 : tuple = heap_copy_minimal_tuple(tuple, 0);
1144 1326769 : should_free = true;
1145 : }
1146 14784041 : ExecStoreMinimalTuple(tuple, slot, should_free);
1147 14784041 : return true;
1148 : }
1149 : else
1150 : {
1151 240276 : ExecClearTuple(slot);
1152 240276 : return false;
1153 : }
1154 : }
1155 :
1156 : /*
1157 : * tuplestore_advance - exported function to adjust position without fetching
1158 : *
1159 : * We could optimize this case to avoid palloc/pfree overhead, but for the
1160 : * moment it doesn't seem worthwhile.
1161 : */
1162 : bool
1163 115412 : tuplestore_advance(Tuplestorestate *state, bool forward)
1164 : {
1165 : void *tuple;
1166 : bool should_free;
1167 :
1168 115412 : tuple = tuplestore_gettuple(state, forward, &should_free);
1169 :
1170 115412 : if (tuple)
1171 : {
1172 113551 : if (should_free)
1173 0 : pfree(tuple);
1174 113551 : return true;
1175 : }
1176 : else
1177 : {
1178 1861 : return false;
1179 : }
1180 : }
1181 :
1182 : /*
1183 : * Advance over N tuples in either forward or back direction,
1184 : * without returning any data. N<=0 is a no-op.
1185 : * Returns true if successful, false if ran out of tuples.
1186 : */
1187 : bool
1188 891102 : tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
1189 : {
1190 891102 : TSReadPointer *readptr = &state->readptrs[state->activeptr];
1191 :
1192 : Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
1193 :
1194 891102 : if (ntuples <= 0)
1195 12 : return true;
1196 :
1197 891090 : switch (state->status)
1198 : {
1199 891090 : case TSS_INMEM:
1200 891090 : if (forward)
1201 : {
1202 889260 : if (readptr->eof_reached)
1203 0 : return false;
1204 889260 : if (state->memtupcount - readptr->current >= ntuples)
1205 : {
1206 889199 : readptr->current += ntuples;
1207 889199 : return true;
1208 : }
1209 61 : readptr->current = state->memtupcount;
1210 61 : readptr->eof_reached = true;
1211 61 : return false;
1212 : }
1213 : else
1214 : {
1215 1830 : if (readptr->eof_reached)
1216 : {
1217 0 : readptr->current = state->memtupcount;
1218 0 : readptr->eof_reached = false;
1219 0 : ntuples--;
1220 : }
1221 1830 : if (readptr->current - state->memtupdeleted > ntuples)
1222 : {
1223 1830 : readptr->current -= ntuples;
1224 1830 : return true;
1225 : }
1226 : Assert(!state->truncated);
1227 0 : readptr->current = state->memtupdeleted;
1228 0 : return false;
1229 : }
1230 : break;
1231 :
1232 0 : default:
1233 : /* We don't currently try hard to optimize other cases */
1234 0 : while (ntuples-- > 0)
1235 : {
1236 : void *tuple;
1237 : bool should_free;
1238 :
1239 0 : tuple = tuplestore_gettuple(state, forward, &should_free);
1240 :
1241 0 : if (tuple == NULL)
1242 0 : return false;
1243 0 : if (should_free)
1244 0 : pfree(tuple);
1245 0 : CHECK_FOR_INTERRUPTS();
1246 : }
1247 0 : return true;
1248 : }
1249 : }
1250 :
1251 : /*
1252 : * dumptuples - remove tuples from memory and write to tape
1253 : *
1254 : * As a side effect, we must convert each read pointer's position from
1255 : * "current" to file/offset format. But eof_reached pointers don't
1256 : * need to change state.
1257 : */
1258 : static void
1259 63 : dumptuples(Tuplestorestate *state)
1260 : {
1261 : int i;
1262 :
1263 63 : for (i = state->memtupdeleted;; i++)
1264 435928 : {
1265 435991 : TSReadPointer *readptr = state->readptrs;
1266 : int j;
1267 :
1268 884278 : for (j = 0; j < state->readptrcount; readptr++, j++)
1269 : {
1270 448287 : if (i == readptr->current && !readptr->eof_reached)
1271 71 : BufFileTell(state->myfile,
1272 : &readptr->file, &readptr->offset);
1273 : }
1274 435991 : if (i >= state->memtupcount)
1275 63 : break;
1276 435928 : WRITETUP(state, state->memtuples[i]);
1277 : }
1278 63 : state->memtupdeleted = 0;
1279 63 : state->memtupcount = 0;
1280 63 : }
1281 :
1282 : /*
1283 : * tuplestore_rescan - rewind the active read pointer to start
1284 : */
1285 : void
1286 200589 : tuplestore_rescan(Tuplestorestate *state)
1287 : {
1288 200589 : TSReadPointer *readptr = &state->readptrs[state->activeptr];
1289 :
1290 : Assert(readptr->eflags & EXEC_FLAG_REWIND);
1291 : Assert(!state->truncated);
1292 :
1293 200589 : switch (state->status)
1294 : {
1295 200535 : case TSS_INMEM:
1296 200535 : readptr->eof_reached = false;
1297 200535 : readptr->current = 0;
1298 200535 : break;
1299 54 : case TSS_WRITEFILE:
1300 54 : readptr->eof_reached = false;
1301 54 : readptr->file = 0;
1302 54 : readptr->offset = 0;
1303 54 : break;
1304 0 : case TSS_READFILE:
1305 0 : readptr->eof_reached = false;
1306 0 : if (BufFileSeek(state->myfile, 0, 0, SEEK_SET) != 0)
1307 0 : ereport(ERROR,
1308 : (errcode_for_file_access(),
1309 : errmsg("could not seek in tuplestore temporary file")));
1310 0 : break;
1311 0 : default:
1312 0 : elog(ERROR, "invalid tuplestore state");
1313 : break;
1314 : }
1315 200589 : }
1316 :
1317 : /*
1318 : * tuplestore_copy_read_pointer - copy a read pointer's state to another
1319 : */
1320 : void
1321 40344 : tuplestore_copy_read_pointer(Tuplestorestate *state,
1322 : int srcptr, int destptr)
1323 : {
1324 40344 : TSReadPointer *sptr = &state->readptrs[srcptr];
1325 40344 : TSReadPointer *dptr = &state->readptrs[destptr];
1326 :
1327 : Assert(srcptr >= 0 && srcptr < state->readptrcount);
1328 : Assert(destptr >= 0 && destptr < state->readptrcount);
1329 :
1330 : /* Assigning to self is a no-op */
1331 40344 : if (srcptr == destptr)
1332 0 : return;
1333 :
1334 40344 : if (dptr->eflags != sptr->eflags)
1335 : {
1336 : /* Possible change of overall eflags, so copy and then recompute */
1337 : int eflags;
1338 : int i;
1339 :
1340 0 : *dptr = *sptr;
1341 0 : eflags = state->readptrs[0].eflags;
1342 0 : for (i = 1; i < state->readptrcount; i++)
1343 0 : eflags |= state->readptrs[i].eflags;
1344 0 : state->eflags = eflags;
1345 : }
1346 : else
1347 40344 : *dptr = *sptr;
1348 :
1349 40344 : switch (state->status)
1350 : {
1351 40344 : case TSS_INMEM:
1352 : case TSS_WRITEFILE:
1353 : /* no work */
1354 40344 : break;
1355 0 : case TSS_READFILE:
1356 :
1357 : /*
1358 : * This case is a bit tricky since the active read pointer's
1359 : * position corresponds to the seek point, not what is in its
1360 : * variables. Assigning to the active requires a seek, and
1361 : * assigning from the active requires a tell, except when
1362 : * eof_reached.
1363 : */
1364 0 : if (destptr == state->activeptr)
1365 : {
1366 0 : if (dptr->eof_reached)
1367 : {
1368 0 : if (BufFileSeek(state->myfile,
1369 : state->writepos_file,
1370 : state->writepos_offset,
1371 : SEEK_SET) != 0)
1372 0 : ereport(ERROR,
1373 : (errcode_for_file_access(),
1374 : errmsg("could not seek in tuplestore temporary file")));
1375 : }
1376 : else
1377 : {
1378 0 : if (BufFileSeek(state->myfile,
1379 : dptr->file, dptr->offset,
1380 : SEEK_SET) != 0)
1381 0 : ereport(ERROR,
1382 : (errcode_for_file_access(),
1383 : errmsg("could not seek in tuplestore temporary file")));
1384 : }
1385 : }
1386 0 : else if (srcptr == state->activeptr)
1387 : {
1388 0 : if (!dptr->eof_reached)
1389 0 : BufFileTell(state->myfile,
1390 : &dptr->file,
1391 : &dptr->offset);
1392 : }
1393 0 : break;
1394 0 : default:
1395 0 : elog(ERROR, "invalid tuplestore state");
1396 : break;
1397 : }
1398 : }
1399 :
1400 : /*
1401 : * tuplestore_trim - remove all no-longer-needed tuples
1402 : *
1403 : * Calling this function authorizes the tuplestore to delete all tuples
1404 : * before the oldest read pointer, if no read pointer is marked as requiring
1405 : * REWIND capability.
1406 : *
1407 : * Note: this is obviously safe if no pointer has BACKWARD capability either.
1408 : * If a pointer is marked as BACKWARD but not REWIND capable, it means that
1409 : * the pointer can be moved backward but not before the oldest other read
1410 : * pointer.
1411 : */
1412 : void
1413 607696 : tuplestore_trim(Tuplestorestate *state)
1414 : {
1415 : int oldest;
1416 : int nremove;
1417 : int i;
1418 :
1419 : /*
1420 : * Truncation is disallowed if any read pointer requires rewind
1421 : * capability.
1422 : */
1423 607696 : if (state->eflags & EXEC_FLAG_REWIND)
1424 0 : return;
1425 :
1426 : /*
1427 : * We don't bother trimming temp files since it usually would mean more
1428 : * work than just letting them sit in kernel buffers until they age out.
1429 : */
1430 607696 : if (state->status != TSS_INMEM)
1431 19992 : return;
1432 :
1433 : /* Find the oldest read pointer */
1434 587704 : oldest = state->memtupcount;
1435 2559829 : for (i = 0; i < state->readptrcount; i++)
1436 : {
1437 1972125 : if (!state->readptrs[i].eof_reached)
1438 1952615 : oldest = Min(oldest, state->readptrs[i].current);
1439 : }
1440 :
1441 : /*
1442 : * Note: you might think we could remove all the tuples before the oldest
1443 : * "current", since that one is the next to be returned. However, since
1444 : * tuplestore_gettuple returns a direct pointer to our internal copy of
1445 : * the tuple, it's likely that the caller has still got the tuple just
1446 : * before "current" referenced in a slot. So we keep one extra tuple
1447 : * before the oldest "current". (Strictly speaking, we could require such
1448 : * callers to use the "copy" flag to tuplestore_gettupleslot, but for
1449 : * efficiency we allow this one case to not use "copy".)
1450 : */
1451 587704 : nremove = oldest - 1;
1452 587704 : if (nremove <= 0)
1453 5238 : return; /* nothing to do */
1454 :
1455 : Assert(nremove >= state->memtupdeleted);
1456 : Assert(nremove <= state->memtupcount);
1457 :
1458 : /* before freeing any memory, update the statistics */
1459 582466 : tuplestore_updatemax(state);
1460 :
1461 : /* Release no-longer-needed tuples */
1462 1165608 : for (i = state->memtupdeleted; i < nremove; i++)
1463 : {
1464 583142 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
1465 583142 : pfree(state->memtuples[i]);
1466 583142 : state->memtuples[i] = NULL;
1467 : }
1468 582466 : state->memtupdeleted = nremove;
1469 :
1470 : /* mark tuplestore as truncated (used for Assert crosschecks only) */
1471 582466 : state->truncated = true;
1472 :
1473 : /*
1474 : * If nremove is less than 1/8th memtupcount, just stop here, leaving the
1475 : * "deleted" slots as NULL. This prevents us from expending O(N^2) time
1476 : * repeatedly memmove-ing a large pointer array. The worst case space
1477 : * wastage is pretty small, since it's just pointers and not whole tuples.
1478 : */
1479 582466 : if (nremove < state->memtupcount / 8)
1480 75120 : return;
1481 :
1482 : /*
1483 : * Slide the array down and readjust pointers.
1484 : *
1485 : * In mergejoin's current usage, it's demonstrable that there will always
1486 : * be exactly one non-removed tuple; so optimize that case.
1487 : */
1488 507346 : if (nremove + 1 == state->memtupcount)
1489 420218 : state->memtuples[0] = state->memtuples[nremove];
1490 : else
1491 87128 : memmove(state->memtuples, state->memtuples + nremove,
1492 87128 : (state->memtupcount - nremove) * sizeof(void *));
1493 :
1494 507346 : state->memtupdeleted = 0;
1495 507346 : state->memtupcount -= nremove;
1496 2227733 : for (i = 0; i < state->readptrcount; i++)
1497 : {
1498 1720387 : if (!state->readptrs[i].eof_reached)
1499 1717527 : state->readptrs[i].current -= nremove;
1500 : }
1501 : }
1502 :
1503 : /*
1504 : * tuplestore_updatemax
1505 : * Update the maximum space used by this tuplestore and the method used
1506 : * for storage.
1507 : */
1508 : static void
1509 589428 : tuplestore_updatemax(Tuplestorestate *state)
1510 : {
1511 589428 : if (state->status == TSS_INMEM)
1512 589420 : state->maxSpace = Max(state->maxSpace,
1513 : state->allowedMem - state->availMem);
1514 : else
1515 : {
1516 8 : state->maxSpace = Max(state->maxSpace,
1517 : BufFileSize(state->myfile));
1518 :
1519 : /*
1520 : * usedDisk never gets set to false again after spilling to disk, even
1521 : * if tuplestore_clear() is called and new tuples go to memory again.
1522 : */
1523 8 : state->usedDisk = true;
1524 : }
1525 589428 : }
1526 :
1527 : /*
1528 : * tuplestore_get_stats
1529 : * Obtain statistics about the maximum space used by the tuplestore.
1530 : * These statistics are the maximums and are not reset by calls to
1531 : * tuplestore_trim() or tuplestore_clear().
1532 : */
1533 : void
1534 20 : tuplestore_get_stats(Tuplestorestate *state, char **max_storage_type,
1535 : int64 *max_space)
1536 : {
1537 20 : tuplestore_updatemax(state);
1538 :
1539 20 : if (state->usedDisk)
1540 8 : *max_storage_type = "Disk";
1541 : else
1542 12 : *max_storage_type = "Memory";
1543 :
1544 20 : *max_space = state->maxSpace;
1545 20 : }
1546 :
1547 : /*
1548 : * tuplestore_in_memory
1549 : *
1550 : * Returns true if the tuplestore has not spilled to disk.
1551 : *
1552 : * XXX exposing this is a violation of modularity ... should get rid of it.
1553 : */
1554 : bool
1555 1157590 : tuplestore_in_memory(Tuplestorestate *state)
1556 : {
1557 1157590 : return (state->status == TSS_INMEM);
1558 : }
1559 :
1560 :
1561 : /*
1562 : * Tape interface routines
1563 : */
1564 :
1565 : static unsigned int
1566 2086341 : getlen(Tuplestorestate *state, bool eofOK)
1567 : {
1568 : unsigned int len;
1569 : size_t nbytes;
1570 :
1571 2086341 : nbytes = BufFileReadMaybeEOF(state->myfile, &len, sizeof(len), eofOK);
1572 2086341 : if (nbytes == 0)
1573 50 : return 0;
1574 : else
1575 2086291 : return len;
1576 : }
1577 :
1578 :
1579 : /*
1580 : * Routines specialized for HeapTuple case
1581 : *
1582 : * The stored form is actually a MinimalTuple, but for largely historical
1583 : * reasons we allow COPYTUP to work from a HeapTuple.
1584 : *
1585 : * Since MinimalTuple already has length in its first word, we don't need
1586 : * to write that separately.
1587 : */
1588 :
1589 : static void *
1590 1073391 : copytup_heap(Tuplestorestate *state, void *tup)
1591 : {
1592 : MinimalTuple tuple;
1593 :
1594 1073391 : tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup, 0);
1595 1073391 : USEMEM(state, GetMemoryChunkSpace(tuple));
1596 1073391 : return tuple;
1597 : }
1598 :
1599 : static void
1600 3200721 : writetup_heap(Tuplestorestate *state, void *tup)
1601 : {
1602 3200721 : MinimalTuple tuple = (MinimalTuple) tup;
1603 :
1604 : /* the part of the MinimalTuple we'll write: */
1605 3200721 : char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1606 3200721 : unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
1607 :
1608 : /* total on-disk footprint: */
1609 3200721 : unsigned int tuplen = tupbodylen + sizeof(int);
1610 :
1611 3200721 : BufFileWrite(state->myfile, &tuplen, sizeof(tuplen));
1612 3200721 : BufFileWrite(state->myfile, tupbody, tupbodylen);
1613 3200721 : if (state->backward) /* need trailing length word? */
1614 0 : BufFileWrite(state->myfile, &tuplen, sizeof(tuplen));
1615 :
1616 3200721 : FREEMEM(state, GetMemoryChunkSpace(tuple));
1617 3200721 : heap_free_minimal_tuple(tuple);
1618 3200721 : }
1619 :
1620 : static void *
1621 2086291 : readtup_heap(Tuplestorestate *state, unsigned int len)
1622 : {
1623 2086291 : unsigned int tupbodylen = len - sizeof(int);
1624 2086291 : unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
1625 2086291 : MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
1626 2086291 : char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1627 :
1628 : /* read in the tuple proper */
1629 2086291 : tuple->t_len = tuplen;
1630 2086291 : BufFileReadExact(state->myfile, tupbody, tupbodylen);
1631 2086291 : if (state->backward) /* need trailing length word? */
1632 0 : BufFileReadExact(state->myfile, &tuplen, sizeof(tuplen));
1633 2086291 : return tuple;
1634 : }
|