Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * tuplestore.c
4 : * Generalized routines for temporary tuple storage.
5 : *
6 : * This module handles temporary storage of tuples for purposes such
7 : * as Materialize nodes, hashjoin batch files, etc. It is essentially
8 : * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9 : * but can only store and regurgitate a sequence of tuples. However,
10 : * because no sort is required, it is allowed to start reading the sequence
11 : * before it has all been written. This is particularly useful for cursors,
12 : * because it allows random access within the already-scanned portion of
13 : * a query without having to process the underlying scan to completion.
14 : * Also, it is possible to support multiple independent read pointers.
15 : *
16 : * A temporary file is used to handle the data if it exceeds the
17 : * space limit specified by the caller.
18 : *
19 : * The (approximate) amount of memory allowed to the tuplestore is specified
20 : * in kilobytes by the caller. We absorb tuples and simply store them in an
21 : * in-memory array as long as we haven't exceeded maxKBytes. If we do exceed
22 : * maxKBytes, we dump all the tuples into a temp file and then read from that
23 : * when needed.
24 : *
25 : * Upon creation, a tuplestore supports a single read pointer, numbered 0.
26 : * Additional read pointers can be created using tuplestore_alloc_read_pointer.
27 : * Mark/restore behavior is supported by copying read pointers.
28 : *
29 : * When the caller requests backward-scan capability, we write the temp file
30 : * in a format that allows either forward or backward scan. Otherwise, only
31 : * forward scan is allowed. A request for backward scan must be made before
32 : * putting any tuples into the tuplestore. Rewind is normally allowed but
33 : * can be turned off via tuplestore_set_eflags; turning off rewind for all
34 : * read pointers enables truncation of the tuplestore at the oldest read point
35 : * for minimal memory usage. (The caller must explicitly call tuplestore_trim
36 : * at appropriate times for truncation to actually happen.)
37 : *
38 : * Note: in TSS_WRITEFILE state, the temp file's seek position is the
39 : * current write position, and the write-position variables in the tuplestore
40 : * aren't kept up to date. Similarly, in TSS_READFILE state the temp file's
41 : * seek position is the active read pointer's position, and that read pointer
42 : * isn't kept up to date. We update the appropriate variables using ftell()
43 : * before switching to the other state or activating a different read pointer.
44 : *
45 : *
46 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
47 : * Portions Copyright (c) 1994, Regents of the University of California
48 : *
49 : * IDENTIFICATION
50 : * src/backend/utils/sort/tuplestore.c
51 : *
52 : *-------------------------------------------------------------------------
53 : */
54 :
55 : #include "postgres.h"
56 :
57 : #include <limits.h>
58 :
59 : #include "access/htup_details.h"
60 : #include "commands/tablespace.h"
61 : #include "executor/executor.h"
62 : #include "miscadmin.h"
63 : #include "storage/buffile.h"
64 : #include "utils/memutils.h"
65 : #include "utils/resowner.h"
66 :
67 :
68 : /*
69 : * Possible states of a Tuplestore object. These denote the states that
70 : * persist between calls of Tuplestore routines.
71 : */
72 : typedef enum
73 : {
74 : TSS_INMEM, /* Tuples still fit in memory */
75 : TSS_WRITEFILE, /* Writing to temp file */
76 : TSS_READFILE, /* Reading from temp file */
77 : } TupStoreStatus;
78 :
79 : /*
80 : * State for a single read pointer. If we are in state INMEM then all the
81 : * read pointers' "current" fields denote the read positions. In state
82 : * WRITEFILE, the file/offset fields denote the read positions. In state
83 : * READFILE, inactive read pointers have valid file/offset, but the active
84 : * read pointer implicitly has position equal to the temp file's seek position.
85 : *
86 : * Special case: if eof_reached is true, then the pointer's read position is
87 : * implicitly equal to the write position, and current/file/offset aren't
88 : * maintained. This way we need not update all the read pointers each time
89 : * we write.
90 : */
91 : typedef struct
92 : {
93 : int eflags; /* capability flags */
94 : bool eof_reached; /* read has reached EOF */
95 : int current; /* next array index to read */
96 : int file; /* temp file# */
97 : off_t offset; /* byte offset in file */
98 : } TSReadPointer;
99 :
100 : /*
101 : * Private state of a Tuplestore operation.
102 : */
103 : struct Tuplestorestate
104 : {
105 : TupStoreStatus status; /* enumerated value as shown above */
106 : int eflags; /* capability flags (OR of pointers' flags) */
107 : bool backward; /* store extra length words in file? */
108 : bool interXact; /* keep open through transactions? */
109 : bool truncated; /* tuplestore_trim has removed tuples? */
110 : bool usedDisk; /* used by tuplestore_get_stats() */
111 : int64 maxSpace; /* used by tuplestore_get_stats() */
112 : int64 availMem; /* remaining memory available, in bytes */
113 : int64 allowedMem; /* total memory allowed, in bytes */
114 : int64 tuples; /* number of tuples added */
115 : BufFile *myfile; /* underlying file, or NULL if none */
116 : MemoryContext context; /* memory context for holding tuples */
117 : ResourceOwner resowner; /* resowner for holding temp files */
118 :
119 : /*
120 : * These function pointers decouple the routines that must know what kind
121 : * of tuple we are handling from the routines that don't need to know it.
122 : * They are set up by the tuplestore_begin_xxx routines.
123 : *
124 : * (Although tuplestore.c currently only supports heap tuples, I've copied
125 : * this part of tuplesort.c so that extension to other kinds of objects
126 : * will be easy if it's ever needed.)
127 : *
128 : * Function to copy a supplied input tuple into palloc'd space. (NB: we
129 : * assume that a single pfree() is enough to release the tuple later, so
130 : * the representation must be "flat" in one palloc chunk.) state->availMem
131 : * must be decreased by the amount of space used.
132 : */
133 : void *(*copytup) (Tuplestorestate *state, void *tup);
134 :
135 : /*
136 : * Function to write a stored tuple onto tape. The representation of the
137 : * tuple on tape need not be the same as it is in memory; requirements on
138 : * the tape representation are given below. After writing the tuple,
139 : * pfree() it, and increase state->availMem by the amount of memory space
140 : * thereby released.
141 : */
142 : void (*writetup) (Tuplestorestate *state, void *tup);
143 :
144 : /*
145 : * Function to read a stored tuple from tape back into memory. 'len' is
146 : * the already-read length of the stored tuple. Create and return a
147 : * palloc'd copy, and decrease state->availMem by the amount of memory
148 : * space consumed.
149 : */
150 : void *(*readtup) (Tuplestorestate *state, unsigned int len);
151 :
152 : /*
153 : * This array holds pointers to tuples in memory if we are in state INMEM.
154 : * In states WRITEFILE and READFILE it's not used.
155 : *
156 : * When memtupdeleted > 0, the first memtupdeleted pointers are already
157 : * released due to a tuplestore_trim() operation, but we haven't expended
158 : * the effort to slide the remaining pointers down. These unused pointers
159 : * are set to NULL to catch any invalid accesses. Note that memtupcount
160 : * includes the deleted pointers.
161 : */
162 : void **memtuples; /* array of pointers to palloc'd tuples */
163 : int memtupdeleted; /* the first N slots are currently unused */
164 : int memtupcount; /* number of tuples currently present */
165 : int memtupsize; /* allocated length of memtuples array */
166 : bool growmemtuples; /* memtuples' growth still underway? */
167 :
168 : /*
169 : * These variables are used to keep track of the current positions.
170 : *
171 : * In state WRITEFILE, the current file seek position is the write point;
172 : * in state READFILE, the write position is remembered in writepos_xxx.
173 : * (The write position is the same as EOF, but since BufFileSeek doesn't
174 : * currently implement SEEK_END, we have to remember it explicitly.)
175 : */
176 : TSReadPointer *readptrs; /* array of read pointers */
177 : int activeptr; /* index of the active read pointer */
178 : int readptrcount; /* number of pointers currently valid */
179 : int readptrsize; /* allocated length of readptrs array */
180 :
181 : int writepos_file; /* file# (valid if READFILE state) */
182 : off_t writepos_offset; /* offset (valid if READFILE state) */
183 : };
184 :
185 : #define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
186 : #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
187 : #define READTUP(state,len) ((*(state)->readtup) (state, len))
188 : #define LACKMEM(state) ((state)->availMem < 0)
189 : #define USEMEM(state,amt) ((state)->availMem -= (amt))
190 : #define FREEMEM(state,amt) ((state)->availMem += (amt))
191 :
192 : /*--------------------
193 : *
194 : * NOTES about on-tape representation of tuples:
195 : *
196 : * We require the first "unsigned int" of a stored tuple to be the total size
197 : * on-tape of the tuple, including itself (so it is never zero).
198 : * The remainder of the stored tuple
199 : * may or may not match the in-memory representation of the tuple ---
200 : * any conversion needed is the job of the writetup and readtup routines.
201 : *
202 : * If state->backward is true, then the stored representation of
203 : * the tuple must be followed by another "unsigned int" that is a copy of the
204 : * length --- so the total tape space used is actually sizeof(unsigned int)
205 : * more than the stored length value. This allows read-backwards. When
206 : * state->backward is not set, the write/read routines may omit the extra
207 : * length word.
208 : *
209 : * writetup is expected to write both length words as well as the tuple
210 : * data. When readtup is called, the tape is positioned just after the
211 : * front length word; readtup must read the tuple data and advance past
212 : * the back length word (if present).
213 : *
214 : * The write/read routines can make use of the tuple description data
215 : * stored in the Tuplestorestate record, if needed. They are also expected
216 : * to adjust state->availMem by the amount of memory space (not tape space!)
217 : * released or consumed. There is no error return from either writetup
218 : * or readtup; they should ereport() on failure.
219 : *
220 : *
221 : * NOTES about memory consumption calculations:
222 : *
223 : * We count space allocated for tuples against the maxKBytes limit,
224 : * plus the space used by the variable-size array memtuples.
225 : * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
226 : * We don't worry about the size of the read pointer array, either.
227 : *
228 : * Note that we count actual space used (as shown by GetMemoryChunkSpace)
229 : * rather than the originally-requested size. This is important since
230 : * palloc can add substantial overhead. It's not a complete answer since
231 : * we won't count any wasted space in palloc allocation blocks, but it's
232 : * a lot better than what we were doing before 7.3.
233 : *
234 : *--------------------
235 : */
236 :
237 :
238 : static Tuplestorestate *tuplestore_begin_common(int eflags,
239 : bool interXact,
240 : int maxKBytes);
241 : static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
242 : static void dumptuples(Tuplestorestate *state);
243 : static void tuplestore_updatemax(Tuplestorestate *state);
244 : static unsigned int getlen(Tuplestorestate *state, bool eofOK);
245 : static void *copytup_heap(Tuplestorestate *state, void *tup);
246 : static void writetup_heap(Tuplestorestate *state, void *tup);
247 : static void *readtup_heap(Tuplestorestate *state, unsigned int len);
248 :
249 :
250 : /*
251 : * tuplestore_begin_xxx
252 : *
253 : * Initialize for a tuple store operation.
254 : */
255 : static Tuplestorestate *
256 254890 : tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
257 : {
258 : Tuplestorestate *state;
259 :
260 254890 : state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
261 :
262 254890 : state->status = TSS_INMEM;
263 254890 : state->eflags = eflags;
264 254890 : state->interXact = interXact;
265 254890 : state->truncated = false;
266 254890 : state->usedDisk = false;
267 254890 : state->maxSpace = 0;
268 254890 : state->allowedMem = maxKBytes * 1024L;
269 254890 : state->availMem = state->allowedMem;
270 254890 : state->myfile = NULL;
271 :
272 : /*
273 : * The palloc/pfree pattern for tuple memory is in a FIFO pattern. A
274 : * generation context is perfectly suited for this.
275 : */
276 254890 : state->context = GenerationContextCreate(CurrentMemoryContext,
277 : "tuplestore tuples",
278 : ALLOCSET_DEFAULT_SIZES);
279 254890 : state->resowner = CurrentResourceOwner;
280 :
281 254890 : state->memtupdeleted = 0;
282 254890 : state->memtupcount = 0;
283 254890 : state->tuples = 0;
284 :
285 : /*
286 : * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
287 : * see comments in grow_memtuples().
288 : */
289 254890 : state->memtupsize = Max(16384 / sizeof(void *),
290 : ALLOCSET_SEPARATE_THRESHOLD / sizeof(void *) + 1);
291 :
292 254890 : state->growmemtuples = true;
293 254890 : state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
294 :
295 254890 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
296 :
297 254890 : state->activeptr = 0;
298 254890 : state->readptrcount = 1;
299 254890 : state->readptrsize = 8; /* arbitrary */
300 254890 : state->readptrs = (TSReadPointer *)
301 254890 : palloc(state->readptrsize * sizeof(TSReadPointer));
302 :
303 254890 : state->readptrs[0].eflags = eflags;
304 254890 : state->readptrs[0].eof_reached = false;
305 254890 : state->readptrs[0].current = 0;
306 :
307 254890 : return state;
308 : }
309 :
310 : /*
311 : * tuplestore_begin_heap
312 : *
313 : * Create a new tuplestore; other types of tuple stores (other than
314 : * "heap" tuple stores, for heap tuples) are possible, but not presently
315 : * implemented.
316 : *
317 : * randomAccess: if true, both forward and backward accesses to the
318 : * tuple store are allowed.
319 : *
320 : * interXact: if true, the files used for on-disk storage persist beyond the
321 : * end of the current transaction. NOTE: It's the caller's responsibility to
322 : * create such a tuplestore in a memory context and resource owner that will
323 : * also survive transaction boundaries, and to ensure the tuplestore is closed
324 : * when it's no longer wanted.
325 : *
326 : * maxKBytes: how much data to store in memory (any data beyond this
327 : * amount is paged to disk). When in doubt, use work_mem.
328 : */
329 : Tuplestorestate *
330 254890 : tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
331 : {
332 : Tuplestorestate *state;
333 : int eflags;
334 :
335 : /*
336 : * This interpretation of the meaning of randomAccess is compatible with
337 : * the pre-8.3 behavior of tuplestores.
338 : */
339 254890 : eflags = randomAccess ?
340 254890 : (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
341 : (EXEC_FLAG_REWIND);
342 :
343 254890 : state = tuplestore_begin_common(eflags, interXact, maxKBytes);
344 :
345 254890 : state->copytup = copytup_heap;
346 254890 : state->writetup = writetup_heap;
347 254890 : state->readtup = readtup_heap;
348 :
349 254890 : return state;
350 : }
351 :
352 : /*
353 : * tuplestore_set_eflags
354 : *
355 : * Set the capability flags for read pointer 0 at a finer grain than is
356 : * allowed by tuplestore_begin_xxx. This must be called before inserting
357 : * any data into the tuplestore.
358 : *
359 : * eflags is a bitmask following the meanings used for executor node
360 : * startup flags (see executor.h). tuplestore pays attention to these bits:
361 : * EXEC_FLAG_REWIND need rewind to start
362 : * EXEC_FLAG_BACKWARD need backward fetch
363 : * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
364 : * is set per "randomAccess" in the tuplestore_begin_xxx call.
365 : *
366 : * NOTE: setting BACKWARD without REWIND means the pointer can read backwards,
367 : * but not further than the truncation point (the furthest-back read pointer
368 : * position at the time of the last tuplestore_trim call).
369 : */
370 : void
371 6942 : tuplestore_set_eflags(Tuplestorestate *state, int eflags)
372 : {
373 : int i;
374 :
375 6942 : if (state->status != TSS_INMEM || state->memtupcount != 0)
376 0 : elog(ERROR, "too late to call tuplestore_set_eflags");
377 :
378 6942 : state->readptrs[0].eflags = eflags;
379 6942 : for (i = 1; i < state->readptrcount; i++)
380 0 : eflags |= state->readptrs[i].eflags;
381 6942 : state->eflags = eflags;
382 6942 : }
383 :
384 : /*
385 : * tuplestore_alloc_read_pointer - allocate another read pointer.
386 : *
387 : * Returns the pointer's index.
388 : *
389 : * The new pointer initially copies the position of read pointer 0.
390 : * It can have its own eflags, but if any data has been inserted into
391 : * the tuplestore, these eflags must not represent an increase in
392 : * requirements.
393 : */
394 : int
395 8778 : tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
396 : {
397 : /* Check for possible increase of requirements */
398 8778 : if (state->status != TSS_INMEM || state->memtupcount != 0)
399 : {
400 614 : if ((state->eflags | eflags) != state->eflags)
401 0 : elog(ERROR, "too late to require new tuplestore eflags");
402 : }
403 :
404 : /* Make room for another read pointer if needed */
405 8778 : if (state->readptrcount >= state->readptrsize)
406 : {
407 12 : int newcnt = state->readptrsize * 2;
408 :
409 12 : state->readptrs = (TSReadPointer *)
410 12 : repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
411 12 : state->readptrsize = newcnt;
412 : }
413 :
414 : /* And set it up */
415 8778 : state->readptrs[state->readptrcount] = state->readptrs[0];
416 8778 : state->readptrs[state->readptrcount].eflags = eflags;
417 :
418 8778 : state->eflags |= eflags;
419 :
420 8778 : return state->readptrcount++;
421 : }
422 :
423 : /*
424 : * tuplestore_clear
425 : *
426 : * Delete all the contents of a tuplestore, and reset its read pointers
427 : * to the start.
428 : */
429 : void
430 137182 : tuplestore_clear(Tuplestorestate *state)
431 : {
432 : int i;
433 : TSReadPointer *readptr;
434 :
435 : /* update the maxSpace before doing any USEMEM/FREEMEM adjustments */
436 137182 : tuplestore_updatemax(state);
437 :
438 137182 : if (state->myfile)
439 12 : BufFileClose(state->myfile);
440 137182 : state->myfile = NULL;
441 :
442 : #ifdef USE_ASSERT_CHECKING
443 : {
444 : int64 availMem = state->availMem;
445 :
446 : /*
447 : * Below, we reset the memory context for storing tuples. To save
448 : * from having to always call GetMemoryChunkSpace() on all stored
449 : * tuples, we adjust the availMem to forget all the tuples and just
450 : * recall USEMEM for the space used by the memtuples array. Here we
451 : * just Assert that's correct and the memory tracking hasn't gone
452 : * wrong anywhere.
453 : */
454 : for (i = state->memtupdeleted; i < state->memtupcount; i++)
455 : availMem += GetMemoryChunkSpace(state->memtuples[i]);
456 :
457 : availMem += GetMemoryChunkSpace(state->memtuples);
458 :
459 : Assert(availMem == state->allowedMem);
460 : }
461 : #endif
462 :
463 : /* clear the memory consumed by the memory tuples */
464 137182 : MemoryContextReset(state->context);
465 :
466 : /*
467 : * Zero the used memory and re-consume the space for the memtuples array.
468 : * This saves having to FREEMEM for each stored tuple.
469 : */
470 137182 : state->availMem = state->allowedMem;
471 137182 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
472 :
473 137182 : state->status = TSS_INMEM;
474 137182 : state->truncated = false;
475 137182 : state->memtupdeleted = 0;
476 137182 : state->memtupcount = 0;
477 137182 : state->tuples = 0;
478 137182 : readptr = state->readptrs;
479 284856 : for (i = 0; i < state->readptrcount; readptr++, i++)
480 : {
481 147674 : readptr->eof_reached = false;
482 147674 : readptr->current = 0;
483 : }
484 137182 : }
485 :
486 : /*
487 : * tuplestore_end
488 : *
489 : * Release resources and clean up.
490 : */
491 : void
492 215334 : tuplestore_end(Tuplestorestate *state)
493 : {
494 215334 : if (state->myfile)
495 80 : BufFileClose(state->myfile);
496 :
497 215334 : MemoryContextDelete(state->context);
498 215334 : pfree(state->memtuples);
499 215334 : pfree(state->readptrs);
500 215334 : pfree(state);
501 215334 : }
502 :
503 : /*
504 : * tuplestore_select_read_pointer - make the specified read pointer active
505 : */
506 : void
507 3855978 : tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
508 : {
509 : TSReadPointer *readptr;
510 : TSReadPointer *oldptr;
511 :
512 : Assert(ptr >= 0 && ptr < state->readptrcount);
513 :
514 : /* No work if already active */
515 3855978 : if (ptr == state->activeptr)
516 893392 : return;
517 :
518 2962586 : readptr = &state->readptrs[ptr];
519 2962586 : oldptr = &state->readptrs[state->activeptr];
520 :
521 2962586 : switch (state->status)
522 : {
523 2962574 : case TSS_INMEM:
524 : case TSS_WRITEFILE:
525 : /* no work */
526 2962574 : break;
527 12 : case TSS_READFILE:
528 :
529 : /*
530 : * First, save the current read position in the pointer about to
531 : * become inactive.
532 : */
533 12 : if (!oldptr->eof_reached)
534 12 : BufFileTell(state->myfile,
535 : &oldptr->file,
536 : &oldptr->offset);
537 :
538 : /*
539 : * We have to make the temp file's seek position equal to the
540 : * logical position of the new read pointer. In eof_reached
541 : * state, that's the EOF, which we have available from the saved
542 : * write position.
543 : */
544 12 : if (readptr->eof_reached)
545 : {
546 0 : if (BufFileSeek(state->myfile,
547 : state->writepos_file,
548 : state->writepos_offset,
549 : SEEK_SET) != 0)
550 0 : ereport(ERROR,
551 : (errcode_for_file_access(),
552 : errmsg("could not seek in tuplestore temporary file")));
553 : }
554 : else
555 : {
556 12 : if (BufFileSeek(state->myfile,
557 : readptr->file,
558 : readptr->offset,
559 : SEEK_SET) != 0)
560 0 : ereport(ERROR,
561 : (errcode_for_file_access(),
562 : errmsg("could not seek in tuplestore temporary file")));
563 : }
564 12 : break;
565 0 : default:
566 0 : elog(ERROR, "invalid tuplestore state");
567 : break;
568 : }
569 :
570 2962586 : state->activeptr = ptr;
571 : }
572 :
573 : /*
574 : * tuplestore_tuple_count
575 : *
576 : * Returns the number of tuples added since creation or the last
577 : * tuplestore_clear().
578 : */
579 : int64
580 6380 : tuplestore_tuple_count(Tuplestorestate *state)
581 : {
582 6380 : return state->tuples;
583 : }
584 :
585 : /*
586 : * tuplestore_ateof
587 : *
588 : * Returns the active read pointer's eof_reached state.
589 : */
590 : bool
591 2470724 : tuplestore_ateof(Tuplestorestate *state)
592 : {
593 2470724 : return state->readptrs[state->activeptr].eof_reached;
594 : }
595 :
596 : /*
597 : * Grow the memtuples[] array, if possible within our memory constraint. We
598 : * must not exceed INT_MAX tuples in memory or the caller-provided memory
599 : * limit. Return true if we were able to enlarge the array, false if not.
600 : *
601 : * Normally, at each increment we double the size of the array. When doing
602 : * that would exceed a limit, we attempt one last, smaller increase (and then
603 : * clear the growmemtuples flag so we don't try any more). That allows us to
604 : * use memory as fully as permitted; sticking to the pure doubling rule could
605 : * result in almost half going unused. Because availMem moves around with
606 : * tuple addition/removal, we need some rule to prevent making repeated small
607 : * increases in memtupsize, which would just be useless thrashing. The
608 : * growmemtuples flag accomplishes that and also prevents useless
609 : * recalculations in this function.
610 : */
611 : static bool
612 1896 : grow_memtuples(Tuplestorestate *state)
613 : {
614 : int newmemtupsize;
615 1896 : int memtupsize = state->memtupsize;
616 1896 : int64 memNowUsed = state->allowedMem - state->availMem;
617 :
618 : /* Forget it if we've already maxed out memtuples, per comment above */
619 1896 : if (!state->growmemtuples)
620 8 : return false;
621 :
622 : /* Select new value of memtupsize */
623 1888 : if (memNowUsed <= state->availMem)
624 : {
625 : /*
626 : * We've used no more than half of allowedMem; double our usage,
627 : * clamping at INT_MAX tuples.
628 : */
629 1826 : if (memtupsize < INT_MAX / 2)
630 1826 : newmemtupsize = memtupsize * 2;
631 : else
632 : {
633 0 : newmemtupsize = INT_MAX;
634 0 : state->growmemtuples = false;
635 : }
636 : }
637 : else
638 : {
639 : /*
640 : * This will be the last increment of memtupsize. Abandon doubling
641 : * strategy and instead increase as much as we safely can.
642 : *
643 : * To stay within allowedMem, we can't increase memtupsize by more
644 : * than availMem / sizeof(void *) elements. In practice, we want to
645 : * increase it by considerably less, because we need to leave some
646 : * space for the tuples to which the new array slots will refer. We
647 : * assume the new tuples will be about the same size as the tuples
648 : * we've already seen, and thus we can extrapolate from the space
649 : * consumption so far to estimate an appropriate new size for the
650 : * memtuples array. The optimal value might be higher or lower than
651 : * this estimate, but it's hard to know that in advance. We again
652 : * clamp at INT_MAX tuples.
653 : *
654 : * This calculation is safe against enlarging the array so much that
655 : * LACKMEM becomes true, because the memory currently used includes
656 : * the present array; thus, there would be enough allowedMem for the
657 : * new array elements even if no other memory were currently used.
658 : *
659 : * We do the arithmetic in float8, because otherwise the product of
660 : * memtupsize and allowedMem could overflow. Any inaccuracy in the
661 : * result should be insignificant; but even if we computed a
662 : * completely insane result, the checks below will prevent anything
663 : * really bad from happening.
664 : */
665 : double grow_ratio;
666 :
667 62 : grow_ratio = (double) state->allowedMem / (double) memNowUsed;
668 62 : if (memtupsize * grow_ratio < INT_MAX)
669 62 : newmemtupsize = (int) (memtupsize * grow_ratio);
670 : else
671 0 : newmemtupsize = INT_MAX;
672 :
673 : /* We won't make any further enlargement attempts */
674 62 : state->growmemtuples = false;
675 : }
676 :
677 : /* Must enlarge array by at least one element, else report failure */
678 1888 : if (newmemtupsize <= memtupsize)
679 0 : goto noalloc;
680 :
681 : /*
682 : * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
683 : * to ensure our request won't be rejected. Note that we can easily
684 : * exhaust address space before facing this outcome. (This is presently
685 : * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
686 : * don't rely on that at this distance.)
687 : */
688 1888 : if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(void *))
689 : {
690 0 : newmemtupsize = (int) (MaxAllocHugeSize / sizeof(void *));
691 0 : state->growmemtuples = false; /* can't grow any more */
692 : }
693 :
694 : /*
695 : * We need to be sure that we do not cause LACKMEM to become true, else
696 : * the space management algorithm will go nuts. The code above should
697 : * never generate a dangerous request, but to be safe, check explicitly
698 : * that the array growth fits within availMem. (We could still cause
699 : * LACKMEM if the memory chunk overhead associated with the memtuples
700 : * array were to increase. That shouldn't happen because we chose the
701 : * initial array size large enough to ensure that palloc will be treating
702 : * both old and new arrays as separate chunks. But we'll check LACKMEM
703 : * explicitly below just in case.)
704 : */
705 1888 : if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(void *)))
706 0 : goto noalloc;
707 :
708 : /* OK, do it */
709 1888 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
710 1888 : state->memtupsize = newmemtupsize;
711 1888 : state->memtuples = (void **)
712 1888 : repalloc_huge(state->memtuples,
713 1888 : state->memtupsize * sizeof(void *));
714 1888 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
715 1888 : if (LACKMEM(state))
716 0 : elog(ERROR, "unexpected out-of-memory situation in tuplestore");
717 1888 : return true;
718 :
719 0 : noalloc:
720 : /* If for any reason we didn't realloc, shut off future attempts */
721 0 : state->growmemtuples = false;
722 0 : return false;
723 : }
724 :
725 : /*
726 : * Accept one tuple and append it to the tuplestore.
727 : *
728 : * Note that the input tuple is always copied; the caller need not save it.
729 : *
730 : * If the active read pointer is currently "at EOF", it remains so (the read
731 : * pointer implicitly advances along with the write pointer); otherwise the
732 : * read pointer is unchanged. Non-active read pointers do not move, which
733 : * means they are certain to not be "at EOF" immediately after puttuple.
734 : * This curious-seeming behavior is for the convenience of nodeMaterial.c and
735 : * nodeCtescan.c, which would otherwise need to do extra pointer repositioning
736 : * steps.
737 : *
738 : * tuplestore_puttupleslot() is a convenience routine to collect data from
739 : * a TupleTableSlot without an extra copy operation.
740 : */
741 : void
742 2011616 : tuplestore_puttupleslot(Tuplestorestate *state,
743 : TupleTableSlot *slot)
744 : {
745 : MinimalTuple tuple;
746 2011616 : MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
747 :
748 : /*
749 : * Form a MinimalTuple in working memory
750 : */
751 2011616 : tuple = ExecCopySlotMinimalTuple(slot);
752 2011616 : USEMEM(state, GetMemoryChunkSpace(tuple));
753 :
754 2011616 : tuplestore_puttuple_common(state, tuple);
755 :
756 2011616 : MemoryContextSwitchTo(oldcxt);
757 2011616 : }
758 :
759 : /*
760 : * "Standard" case to copy from a HeapTuple. This is actually now somewhat
761 : * deprecated, but not worth getting rid of in view of the number of callers.
762 : */
763 : void
764 1466850 : tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
765 : {
766 1466850 : MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
767 :
768 : /*
769 : * Copy the tuple. (Must do this even in WRITEFILE case. Note that
770 : * COPYTUP includes USEMEM, so we needn't do that here.)
771 : */
772 1466850 : tuple = COPYTUP(state, tuple);
773 :
774 1466850 : tuplestore_puttuple_common(state, tuple);
775 :
776 1466850 : MemoryContextSwitchTo(oldcxt);
777 1466850 : }
778 :
779 : /*
780 : * Similar to tuplestore_puttuple(), but work from values + nulls arrays.
781 : * This avoids an extra tuple-construction operation.
782 : */
783 : void
784 16354532 : tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
785 : const Datum *values, const bool *isnull)
786 : {
787 : MinimalTuple tuple;
788 16354532 : MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
789 :
790 16354532 : tuple = heap_form_minimal_tuple(tdesc, values, isnull);
791 16354532 : USEMEM(state, GetMemoryChunkSpace(tuple));
792 :
793 16354532 : tuplestore_puttuple_common(state, tuple);
794 :
795 16354532 : MemoryContextSwitchTo(oldcxt);
796 16354532 : }
797 :
798 : static void
799 19832998 : tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
800 : {
801 : TSReadPointer *readptr;
802 : int i;
803 : ResourceOwner oldowner;
804 : MemoryContext oldcxt;
805 :
806 19832998 : state->tuples++;
807 :
808 19832998 : switch (state->status)
809 : {
810 14503990 : case TSS_INMEM:
811 :
812 : /*
813 : * Update read pointers as needed; see API spec above.
814 : */
815 14503990 : readptr = state->readptrs;
816 31289810 : for (i = 0; i < state->readptrcount; readptr++, i++)
817 : {
818 16785820 : if (readptr->eof_reached && i != state->activeptr)
819 : {
820 414 : readptr->eof_reached = false;
821 414 : readptr->current = state->memtupcount;
822 : }
823 : }
824 :
825 : /*
826 : * Grow the array as needed. Note that we try to grow the array
827 : * when there is still one free slot remaining --- if we fail,
828 : * there'll still be room to store the incoming tuple, and then
829 : * we'll switch to tape-based operation.
830 : */
831 14503990 : if (state->memtupcount >= state->memtupsize - 1)
832 : {
833 1896 : (void) grow_memtuples(state);
834 : Assert(state->memtupcount < state->memtupsize);
835 : }
836 :
837 : /* Stash the tuple in the in-memory array */
838 14503990 : state->memtuples[state->memtupcount++] = tuple;
839 :
840 : /*
841 : * Done if we still fit in available memory and have array slots.
842 : */
843 14503990 : if (state->memtupcount < state->memtupsize && !LACKMEM(state))
844 14503894 : return;
845 :
846 : /*
847 : * Nope; time to switch to tape-based operation. Make sure that
848 : * the temp file(s) are created in suitable temp tablespaces.
849 : */
850 96 : PrepareTempTablespaces();
851 :
852 : /* associate the file with the store's resource owner */
853 96 : oldowner = CurrentResourceOwner;
854 96 : CurrentResourceOwner = state->resowner;
855 :
856 : /*
857 : * We switch out of the state->context as this is a generation
858 : * context, which isn't ideal for allocations relating to the
859 : * BufFile.
860 : */
861 96 : oldcxt = MemoryContextSwitchTo(state->context->parent);
862 :
863 96 : state->myfile = BufFileCreateTemp(state->interXact);
864 :
865 96 : MemoryContextSwitchTo(oldcxt);
866 :
867 96 : CurrentResourceOwner = oldowner;
868 :
869 : /*
870 : * Freeze the decision about whether trailing length words will be
871 : * used. We can't change this choice once data is on tape, even
872 : * though callers might drop the requirement.
873 : */
874 96 : state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
875 :
876 : /*
877 : * Update the maximum space used before dumping the tuples. It's
878 : * possible that more space will be used by the tuples in memory
879 : * than the space that will be used on disk.
880 : */
881 96 : tuplestore_updatemax(state);
882 :
883 96 : state->status = TSS_WRITEFILE;
884 96 : dumptuples(state);
885 96 : break;
886 5328996 : case TSS_WRITEFILE:
887 :
888 : /*
889 : * Update read pointers as needed; see API spec above. Note:
890 : * BufFileTell is quite cheap, so not worth trying to avoid
891 : * multiple calls.
892 : */
893 5328996 : readptr = state->readptrs;
894 10663536 : for (i = 0; i < state->readptrcount; readptr++, i++)
895 : {
896 5334540 : if (readptr->eof_reached && i != state->activeptr)
897 : {
898 0 : readptr->eof_reached = false;
899 0 : BufFileTell(state->myfile,
900 : &readptr->file,
901 : &readptr->offset);
902 : }
903 : }
904 :
905 5328996 : WRITETUP(state, tuple);
906 5328996 : break;
907 12 : case TSS_READFILE:
908 :
909 : /*
910 : * Switch from reading to writing.
911 : */
912 12 : if (!state->readptrs[state->activeptr].eof_reached)
913 12 : BufFileTell(state->myfile,
914 12 : &state->readptrs[state->activeptr].file,
915 12 : &state->readptrs[state->activeptr].offset);
916 12 : if (BufFileSeek(state->myfile,
917 : state->writepos_file, state->writepos_offset,
918 : SEEK_SET) != 0)
919 0 : ereport(ERROR,
920 : (errcode_for_file_access(),
921 : errmsg("could not seek in tuplestore temporary file")));
922 12 : state->status = TSS_WRITEFILE;
923 :
924 : /*
925 : * Update read pointers as needed; see API spec above.
926 : */
927 12 : readptr = state->readptrs;
928 36 : for (i = 0; i < state->readptrcount; readptr++, i++)
929 : {
930 24 : if (readptr->eof_reached && i != state->activeptr)
931 : {
932 0 : readptr->eof_reached = false;
933 0 : readptr->file = state->writepos_file;
934 0 : readptr->offset = state->writepos_offset;
935 : }
936 : }
937 :
938 12 : WRITETUP(state, tuple);
939 12 : break;
940 0 : default:
941 0 : elog(ERROR, "invalid tuplestore state");
942 : break;
943 : }
944 : }
945 :
946 : /*
947 : * Fetch the next tuple in either forward or back direction.
948 : * Returns NULL if no more tuples. If should_free is set, the
949 : * caller must pfree the returned tuple when done with it.
950 : *
951 : * Backward scan is only allowed if randomAccess was set true or
952 : * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
953 : */
954 : static void *
955 20797578 : tuplestore_gettuple(Tuplestorestate *state, bool forward,
956 : bool *should_free)
957 : {
958 20797578 : TSReadPointer *readptr = &state->readptrs[state->activeptr];
959 : unsigned int tuplen;
960 : void *tup;
961 :
962 : Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
963 :
964 20797578 : switch (state->status)
965 : {
966 17029430 : case TSS_INMEM:
967 17029430 : *should_free = false;
968 17029430 : if (forward)
969 : {
970 16854160 : if (readptr->eof_reached)
971 180 : return NULL;
972 16853980 : if (readptr->current < state->memtupcount)
973 : {
974 : /* We have another tuple, so return it */
975 16486948 : return state->memtuples[readptr->current++];
976 : }
977 367032 : readptr->eof_reached = true;
978 367032 : return NULL;
979 : }
980 : else
981 : {
982 : /*
983 : * if all tuples are fetched already then we return last
984 : * tuple, else tuple before last returned.
985 : */
986 175270 : if (readptr->eof_reached)
987 : {
988 2600 : readptr->current = state->memtupcount;
989 2600 : readptr->eof_reached = false;
990 : }
991 : else
992 : {
993 172670 : if (readptr->current <= state->memtupdeleted)
994 : {
995 : Assert(!state->truncated);
996 0 : return NULL;
997 : }
998 172670 : readptr->current--; /* last returned tuple */
999 : }
1000 175270 : if (readptr->current <= state->memtupdeleted)
1001 : {
1002 : Assert(!state->truncated);
1003 30 : return NULL;
1004 : }
1005 175240 : return state->memtuples[readptr->current - 1];
1006 : }
1007 : break;
1008 :
1009 106 : case TSS_WRITEFILE:
1010 : /* Skip state change if we'll just return NULL */
1011 106 : if (readptr->eof_reached && forward)
1012 0 : return NULL;
1013 :
1014 : /*
1015 : * Switch from writing to reading.
1016 : */
1017 106 : BufFileTell(state->myfile,
1018 : &state->writepos_file, &state->writepos_offset);
1019 106 : if (!readptr->eof_reached)
1020 106 : if (BufFileSeek(state->myfile,
1021 : readptr->file, readptr->offset,
1022 : SEEK_SET) != 0)
1023 0 : ereport(ERROR,
1024 : (errcode_for_file_access(),
1025 : errmsg("could not seek in tuplestore temporary file")));
1026 106 : state->status = TSS_READFILE;
1027 : /* FALLTHROUGH */
1028 :
1029 3768148 : case TSS_READFILE:
1030 3768148 : *should_free = true;
1031 3768148 : if (forward)
1032 : {
1033 3768148 : if ((tuplen = getlen(state, true)) != 0)
1034 : {
1035 3768072 : tup = READTUP(state, tuplen);
1036 3768072 : return tup;
1037 : }
1038 : else
1039 : {
1040 76 : readptr->eof_reached = true;
1041 76 : return NULL;
1042 : }
1043 : }
1044 :
1045 : /*
1046 : * Backward.
1047 : *
1048 : * if all tuples are fetched already then we return last tuple,
1049 : * else tuple before last returned.
1050 : *
1051 : * Back up to fetch previously-returned tuple's ending length
1052 : * word. If seek fails, assume we are at start of file.
1053 : */
1054 0 : if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
1055 : SEEK_CUR) != 0)
1056 : {
1057 : /* even a failed backwards fetch gets you out of eof state */
1058 0 : readptr->eof_reached = false;
1059 : Assert(!state->truncated);
1060 0 : return NULL;
1061 : }
1062 0 : tuplen = getlen(state, false);
1063 :
1064 0 : if (readptr->eof_reached)
1065 : {
1066 0 : readptr->eof_reached = false;
1067 : /* We will return the tuple returned before returning NULL */
1068 : }
1069 : else
1070 : {
1071 : /*
1072 : * Back up to get ending length word of tuple before it.
1073 : */
1074 0 : if (BufFileSeek(state->myfile, 0,
1075 0 : -(long) (tuplen + 2 * sizeof(unsigned int)),
1076 : SEEK_CUR) != 0)
1077 : {
1078 : /*
1079 : * If that fails, presumably the prev tuple is the first
1080 : * in the file. Back up so that it becomes next to read
1081 : * in forward direction (not obviously right, but that is
1082 : * what in-memory case does).
1083 : */
1084 0 : if (BufFileSeek(state->myfile, 0,
1085 0 : -(long) (tuplen + sizeof(unsigned int)),
1086 : SEEK_CUR) != 0)
1087 0 : ereport(ERROR,
1088 : (errcode_for_file_access(),
1089 : errmsg("could not seek in tuplestore temporary file")));
1090 : Assert(!state->truncated);
1091 0 : return NULL;
1092 : }
1093 0 : tuplen = getlen(state, false);
1094 : }
1095 :
1096 : /*
1097 : * Now we have the length of the prior tuple, back up and read it.
1098 : * Note: READTUP expects we are positioned after the initial
1099 : * length word of the tuple, so back up to that point.
1100 : */
1101 0 : if (BufFileSeek(state->myfile, 0,
1102 0 : -(long) tuplen,
1103 : SEEK_CUR) != 0)
1104 0 : ereport(ERROR,
1105 : (errcode_for_file_access(),
1106 : errmsg("could not seek in tuplestore temporary file")));
1107 0 : tup = READTUP(state, tuplen);
1108 0 : return tup;
1109 :
1110 0 : default:
1111 0 : elog(ERROR, "invalid tuplestore state");
1112 : return NULL; /* keep compiler quiet */
1113 : }
1114 : }
1115 :
1116 : /*
1117 : * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
1118 : *
1119 : * If successful, put tuple in slot and return true; else, clear the slot
1120 : * and return false.
1121 : *
1122 : * If copy is true, the slot receives a copied tuple (allocated in current
1123 : * memory context) that will stay valid regardless of future manipulations of
1124 : * the tuplestore's state. If copy is false, the slot may just receive a
1125 : * pointer to a tuple held within the tuplestore. The latter is more
1126 : * efficient but the slot contents may be corrupted if additional writes to
1127 : * the tuplestore occur. (If using tuplestore_trim, see comments therein.)
1128 : */
1129 : bool
1130 20625328 : tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
1131 : bool copy, TupleTableSlot *slot)
1132 : {
1133 : MinimalTuple tuple;
1134 : bool should_free;
1135 :
1136 20625328 : tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);
1137 :
1138 20625328 : if (tuple)
1139 : {
1140 20260556 : if (copy && !should_free)
1141 : {
1142 1765616 : tuple = heap_copy_minimal_tuple(tuple);
1143 1765616 : should_free = true;
1144 : }
1145 20260556 : ExecStoreMinimalTuple(tuple, slot, should_free);
1146 20260556 : return true;
1147 : }
1148 : else
1149 : {
1150 364772 : ExecClearTuple(slot);
1151 364772 : return false;
1152 : }
1153 : }
1154 :
1155 : /*
1156 : * tuplestore_advance - exported function to adjust position without fetching
1157 : *
1158 : * We could optimize this case to avoid palloc/pfree overhead, but for the
1159 : * moment it doesn't seem worthwhile.
1160 : */
1161 : bool
1162 172250 : tuplestore_advance(Tuplestorestate *state, bool forward)
1163 : {
1164 : void *tuple;
1165 : bool should_free;
1166 :
1167 172250 : tuple = tuplestore_gettuple(state, forward, &should_free);
1168 :
1169 172250 : if (tuple)
1170 : {
1171 169704 : if (should_free)
1172 0 : pfree(tuple);
1173 169704 : return true;
1174 : }
1175 : else
1176 : {
1177 2546 : return false;
1178 : }
1179 : }
1180 :
1181 : /*
1182 : * Advance over N tuples in either forward or back direction,
1183 : * without returning any data. N<=0 is a no-op.
1184 : * Returns true if successful, false if ran out of tuples.
1185 : */
1186 : bool
1187 1281360 : tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
1188 : {
1189 1281360 : TSReadPointer *readptr = &state->readptrs[state->activeptr];
1190 :
1191 : Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
1192 :
1193 1281360 : if (ntuples <= 0)
1194 18 : return true;
1195 :
1196 1281342 : switch (state->status)
1197 : {
1198 1281342 : case TSS_INMEM:
1199 1281342 : if (forward)
1200 : {
1201 1278628 : if (readptr->eof_reached)
1202 0 : return false;
1203 1278628 : if (state->memtupcount - readptr->current >= ntuples)
1204 : {
1205 1278538 : readptr->current += ntuples;
1206 1278538 : return true;
1207 : }
1208 90 : readptr->current = state->memtupcount;
1209 90 : readptr->eof_reached = true;
1210 90 : return false;
1211 : }
1212 : else
1213 : {
1214 2714 : if (readptr->eof_reached)
1215 : {
1216 0 : readptr->current = state->memtupcount;
1217 0 : readptr->eof_reached = false;
1218 0 : ntuples--;
1219 : }
1220 2714 : if (readptr->current - state->memtupdeleted > ntuples)
1221 : {
1222 2714 : readptr->current -= ntuples;
1223 2714 : return true;
1224 : }
1225 : Assert(!state->truncated);
1226 0 : readptr->current = state->memtupdeleted;
1227 0 : return false;
1228 : }
1229 : break;
1230 :
1231 0 : default:
1232 : /* We don't currently try hard to optimize other cases */
1233 0 : while (ntuples-- > 0)
1234 : {
1235 : void *tuple;
1236 : bool should_free;
1237 :
1238 0 : tuple = tuplestore_gettuple(state, forward, &should_free);
1239 :
1240 0 : if (tuple == NULL)
1241 0 : return false;
1242 0 : if (should_free)
1243 0 : pfree(tuple);
1244 0 : CHECK_FOR_INTERRUPTS();
1245 : }
1246 0 : return true;
1247 : }
1248 : }
1249 :
1250 : /*
1251 : * dumptuples - remove tuples from memory and write to tape
1252 : *
1253 : * As a side effect, we must convert each read pointer's position from
1254 : * "current" to file/offset format. But eof_reached pointers don't
1255 : * need to change state.
1256 : */
1257 : static void
1258 96 : dumptuples(Tuplestorestate *state)
1259 : {
1260 : int i;
1261 :
1262 96 : for (i = state->memtupdeleted;; i++)
1263 827184 : {
1264 827280 : TSReadPointer *readptr = state->readptrs;
1265 : int j;
1266 :
1267 1673004 : for (j = 0; j < state->readptrcount; readptr++, j++)
1268 : {
1269 845724 : if (i == readptr->current && !readptr->eof_reached)
1270 108 : BufFileTell(state->myfile,
1271 : &readptr->file, &readptr->offset);
1272 : }
1273 827280 : if (i >= state->memtupcount)
1274 96 : break;
1275 827184 : WRITETUP(state, state->memtuples[i]);
1276 : }
1277 96 : state->memtupdeleted = 0;
1278 96 : state->memtupcount = 0;
1279 96 : }
1280 :
1281 : /*
1282 : * tuplestore_rescan - rewind the active read pointer to start
1283 : */
1284 : void
1285 299140 : tuplestore_rescan(Tuplestorestate *state)
1286 : {
1287 299140 : TSReadPointer *readptr = &state->readptrs[state->activeptr];
1288 :
1289 : Assert(readptr->eflags & EXEC_FLAG_REWIND);
1290 : Assert(!state->truncated);
1291 :
1292 299140 : switch (state->status)
1293 : {
1294 299058 : case TSS_INMEM:
1295 299058 : readptr->eof_reached = false;
1296 299058 : readptr->current = 0;
1297 299058 : break;
1298 82 : case TSS_WRITEFILE:
1299 82 : readptr->eof_reached = false;
1300 82 : readptr->file = 0;
1301 82 : readptr->offset = 0;
1302 82 : break;
1303 0 : case TSS_READFILE:
1304 0 : readptr->eof_reached = false;
1305 0 : if (BufFileSeek(state->myfile, 0, 0, SEEK_SET) != 0)
1306 0 : ereport(ERROR,
1307 : (errcode_for_file_access(),
1308 : errmsg("could not seek in tuplestore temporary file")));
1309 0 : break;
1310 0 : default:
1311 0 : elog(ERROR, "invalid tuplestore state");
1312 : break;
1313 : }
1314 299140 : }
1315 :
1316 : /*
1317 : * tuplestore_copy_read_pointer - copy a read pointer's state to another
1318 : */
1319 : void
1320 60460 : tuplestore_copy_read_pointer(Tuplestorestate *state,
1321 : int srcptr, int destptr)
1322 : {
1323 60460 : TSReadPointer *sptr = &state->readptrs[srcptr];
1324 60460 : TSReadPointer *dptr = &state->readptrs[destptr];
1325 :
1326 : Assert(srcptr >= 0 && srcptr < state->readptrcount);
1327 : Assert(destptr >= 0 && destptr < state->readptrcount);
1328 :
1329 : /* Assigning to self is a no-op */
1330 60460 : if (srcptr == destptr)
1331 0 : return;
1332 :
1333 60460 : if (dptr->eflags != sptr->eflags)
1334 : {
1335 : /* Possible change of overall eflags, so copy and then recompute */
1336 : int eflags;
1337 : int i;
1338 :
1339 0 : *dptr = *sptr;
1340 0 : eflags = state->readptrs[0].eflags;
1341 0 : for (i = 1; i < state->readptrcount; i++)
1342 0 : eflags |= state->readptrs[i].eflags;
1343 0 : state->eflags = eflags;
1344 : }
1345 : else
1346 60460 : *dptr = *sptr;
1347 :
1348 60460 : switch (state->status)
1349 : {
1350 60460 : case TSS_INMEM:
1351 : case TSS_WRITEFILE:
1352 : /* no work */
1353 60460 : break;
1354 0 : case TSS_READFILE:
1355 :
1356 : /*
1357 : * This case is a bit tricky since the active read pointer's
1358 : * position corresponds to the seek point, not what is in its
1359 : * variables. Assigning to the active requires a seek, and
1360 : * assigning from the active requires a tell, except when
1361 : * eof_reached.
1362 : */
1363 0 : if (destptr == state->activeptr)
1364 : {
1365 0 : if (dptr->eof_reached)
1366 : {
1367 0 : if (BufFileSeek(state->myfile,
1368 : state->writepos_file,
1369 : state->writepos_offset,
1370 : SEEK_SET) != 0)
1371 0 : ereport(ERROR,
1372 : (errcode_for_file_access(),
1373 : errmsg("could not seek in tuplestore temporary file")));
1374 : }
1375 : else
1376 : {
1377 0 : if (BufFileSeek(state->myfile,
1378 : dptr->file, dptr->offset,
1379 : SEEK_SET) != 0)
1380 0 : ereport(ERROR,
1381 : (errcode_for_file_access(),
1382 : errmsg("could not seek in tuplestore temporary file")));
1383 : }
1384 : }
1385 0 : else if (srcptr == state->activeptr)
1386 : {
1387 0 : if (!dptr->eof_reached)
1388 0 : BufFileTell(state->myfile,
1389 : &dptr->file,
1390 : &dptr->offset);
1391 : }
1392 0 : break;
1393 0 : default:
1394 0 : elog(ERROR, "invalid tuplestore state");
1395 : break;
1396 : }
1397 : }
1398 :
1399 : /*
1400 : * tuplestore_trim - remove all no-longer-needed tuples
1401 : *
1402 : * Calling this function authorizes the tuplestore to delete all tuples
1403 : * before the oldest read pointer, if no read pointer is marked as requiring
1404 : * REWIND capability.
1405 : *
1406 : * Note: this is obviously safe if no pointer has BACKWARD capability either.
1407 : * If a pointer is marked as BACKWARD but not REWIND capable, it means that
1408 : * the pointer can be moved backward but not before the oldest other read
1409 : * pointer.
1410 : */
1411 : void
1412 850746 : tuplestore_trim(Tuplestorestate *state)
1413 : {
1414 : int oldest;
1415 : int nremove;
1416 : int i;
1417 :
1418 : /*
1419 : * Truncation is disallowed if any read pointer requires rewind
1420 : * capability.
1421 : */
1422 850746 : if (state->eflags & EXEC_FLAG_REWIND)
1423 0 : return;
1424 :
1425 : /*
1426 : * We don't bother trimming temp files since it usually would mean more
1427 : * work than just letting them sit in kernel buffers until they age out.
1428 : */
1429 850746 : if (state->status != TSS_INMEM)
1430 23988 : return;
1431 :
1432 : /* Find the oldest read pointer */
1433 826758 : oldest = state->memtupcount;
1434 3564420 : for (i = 0; i < state->readptrcount; i++)
1435 : {
1436 2737662 : if (!state->readptrs[i].eof_reached)
1437 2708542 : oldest = Min(oldest, state->readptrs[i].current);
1438 : }
1439 :
1440 : /*
1441 : * Note: you might think we could remove all the tuples before the oldest
1442 : * "current", since that one is the next to be returned. However, since
1443 : * tuplestore_gettuple returns a direct pointer to our internal copy of
1444 : * the tuple, it's likely that the caller has still got the tuple just
1445 : * before "current" referenced in a slot. So we keep one extra tuple
1446 : * before the oldest "current". (Strictly speaking, we could require such
1447 : * callers to use the "copy" flag to tuplestore_gettupleslot, but for
1448 : * efficiency we allow this one case to not use "copy".)
1449 : */
1450 826758 : nremove = oldest - 1;
1451 826758 : if (nremove <= 0)
1452 7178 : return; /* nothing to do */
1453 :
1454 : Assert(nremove >= state->memtupdeleted);
1455 : Assert(nremove <= state->memtupcount);
1456 :
1457 : /* before freeing any memory, update the statistics */
1458 819580 : tuplestore_updatemax(state);
1459 :
1460 : /* Release no-longer-needed tuples */
1461 1640174 : for (i = state->memtupdeleted; i < nremove; i++)
1462 : {
1463 820594 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
1464 820594 : pfree(state->memtuples[i]);
1465 820594 : state->memtuples[i] = NULL;
1466 : }
1467 819580 : state->memtupdeleted = nremove;
1468 :
1469 : /* mark tuplestore as truncated (used for Assert crosschecks only) */
1470 819580 : state->truncated = true;
1471 :
1472 : /*
1473 : * If nremove is less than 1/8th memtupcount, just stop here, leaving the
1474 : * "deleted" slots as NULL. This prevents us from expending O(N^2) time
1475 : * repeatedly memmove-ing a large pointer array. The worst case space
1476 : * wastage is pretty small, since it's just pointers and not whole tuples.
1477 : */
1478 819580 : if (nremove < state->memtupcount / 8)
1479 112680 : return;
1480 :
1481 : /*
1482 : * Slide the array down and readjust pointers.
1483 : *
1484 : * In mergejoin's current usage, it's demonstrable that there will always
1485 : * be exactly one non-removed tuple; so optimize that case.
1486 : */
1487 706900 : if (nremove + 1 == state->memtupcount)
1488 630088 : state->memtuples[0] = state->memtuples[nremove];
1489 : else
1490 76812 : memmove(state->memtuples, state->memtuples + nremove,
1491 76812 : (state->memtupcount - nremove) * sizeof(void *));
1492 :
1493 706900 : state->memtupdeleted = 0;
1494 706900 : state->memtupcount -= nremove;
1495 3071376 : for (i = 0; i < state->readptrcount; i++)
1496 : {
1497 2364476 : if (!state->readptrs[i].eof_reached)
1498 2360318 : state->readptrs[i].current -= nremove;
1499 : }
1500 : }
1501 :
1502 : /*
1503 : * tuplestore_updatemax
1504 : * Update the maximum space used by this tuplestore and the method used
1505 : * for storage.
1506 : */
1507 : static void
1508 956888 : tuplestore_updatemax(Tuplestorestate *state)
1509 : {
1510 956888 : if (state->status == TSS_INMEM)
1511 956876 : state->maxSpace = Max(state->maxSpace,
1512 : state->allowedMem - state->availMem);
1513 : else
1514 : {
1515 12 : state->maxSpace = Max(state->maxSpace,
1516 : BufFileSize(state->myfile));
1517 :
1518 : /*
1519 : * usedDisk never gets set to false again after spilling to disk, even
1520 : * if tuplestore_clear() is called and new tuples go to memory again.
1521 : */
1522 12 : state->usedDisk = true;
1523 : }
1524 956888 : }
1525 :
1526 : /*
1527 : * tuplestore_get_stats
1528 : * Obtain statistics about the maximum space used by the tuplestore.
1529 : * These statistics are the maximums and are not reset by calls to
1530 : * tuplestore_trim() or tuplestore_clear().
1531 : */
1532 : void
1533 30 : tuplestore_get_stats(Tuplestorestate *state, char **max_storage_type,
1534 : int64 *max_space)
1535 : {
1536 30 : tuplestore_updatemax(state);
1537 :
1538 30 : if (state->usedDisk)
1539 12 : *max_storage_type = "Disk";
1540 : else
1541 18 : *max_storage_type = "Memory";
1542 :
1543 30 : *max_space = state->maxSpace;
1544 30 : }
1545 :
1546 : /*
1547 : * tuplestore_in_memory
1548 : *
1549 : * Returns true if the tuplestore has not spilled to disk.
1550 : *
1551 : * XXX exposing this is a violation of modularity ... should get rid of it.
1552 : */
1553 : bool
1554 1572012 : tuplestore_in_memory(Tuplestorestate *state)
1555 : {
1556 1572012 : return (state->status == TSS_INMEM);
1557 : }
1558 :
1559 :
1560 : /*
1561 : * Tape interface routines
1562 : */
1563 :
1564 : static unsigned int
1565 3768148 : getlen(Tuplestorestate *state, bool eofOK)
1566 : {
1567 : unsigned int len;
1568 : size_t nbytes;
1569 :
1570 3768148 : nbytes = BufFileReadMaybeEOF(state->myfile, &len, sizeof(len), eofOK);
1571 3768148 : if (nbytes == 0)
1572 76 : return 0;
1573 : else
1574 3768072 : return len;
1575 : }
1576 :
1577 :
1578 : /*
1579 : * Routines specialized for HeapTuple case
1580 : *
1581 : * The stored form is actually a MinimalTuple, but for largely historical
1582 : * reasons we allow COPYTUP to work from a HeapTuple.
1583 : *
1584 : * Since MinimalTuple already has length in its first word, we don't need
1585 : * to write that separately.
1586 : */
1587 :
1588 : static void *
1589 1466850 : copytup_heap(Tuplestorestate *state, void *tup)
1590 : {
1591 : MinimalTuple tuple;
1592 :
1593 1466850 : tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup);
1594 1466850 : USEMEM(state, GetMemoryChunkSpace(tuple));
1595 1466850 : return tuple;
1596 : }
1597 :
1598 : static void
1599 6156192 : writetup_heap(Tuplestorestate *state, void *tup)
1600 : {
1601 6156192 : MinimalTuple tuple = (MinimalTuple) tup;
1602 :
1603 : /* the part of the MinimalTuple we'll write: */
1604 6156192 : char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1605 6156192 : unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
1606 :
1607 : /* total on-disk footprint: */
1608 6156192 : unsigned int tuplen = tupbodylen + sizeof(int);
1609 :
1610 6156192 : BufFileWrite(state->myfile, &tuplen, sizeof(tuplen));
1611 6156192 : BufFileWrite(state->myfile, tupbody, tupbodylen);
1612 6156192 : if (state->backward) /* need trailing length word? */
1613 0 : BufFileWrite(state->myfile, &tuplen, sizeof(tuplen));
1614 :
1615 6156192 : FREEMEM(state, GetMemoryChunkSpace(tuple));
1616 6156192 : heap_free_minimal_tuple(tuple);
1617 6156192 : }
1618 :
1619 : static void *
1620 3768072 : readtup_heap(Tuplestorestate *state, unsigned int len)
1621 : {
1622 3768072 : unsigned int tupbodylen = len - sizeof(int);
1623 3768072 : unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
1624 3768072 : MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
1625 3768072 : char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1626 :
1627 : /* read in the tuple proper */
1628 3768072 : tuple->t_len = tuplen;
1629 3768072 : BufFileReadExact(state->myfile, tupbody, tupbodylen);
1630 3768072 : if (state->backward) /* need trailing length word? */
1631 0 : BufFileReadExact(state->myfile, &tuplen, sizeof(tuplen));
1632 3768072 : return tuple;
1633 : }
|