Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * sharedtuplestore.c
4 : * Simple mechanism for sharing tuples between backends.
5 : *
6 : * This module contains a shared temporary tuple storage mechanism providing
7 : * a parallel-aware subset of the features of tuplestore.c. Multiple backends
8 : * can write to a SharedTuplestore, and then multiple backends can later scan
9 : * the stored tuples. Currently, the only scan type supported is a parallel
10 : * scan where each backend reads an arbitrary subset of the tuples that were
11 : * written.
12 : *
13 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
14 : * Portions Copyright (c) 1994, Regents of the University of California
15 : *
16 : * IDENTIFICATION
17 : * src/backend/utils/sort/sharedtuplestore.c
18 : *
19 : *-------------------------------------------------------------------------
20 : */
21 :
22 : #include "postgres.h"
23 :
24 : #include "access/htup.h"
25 : #include "access/htup_details.h"
26 : #include "storage/buffile.h"
27 : #include "storage/lwlock.h"
28 : #include "storage/sharedfileset.h"
29 : #include "utils/sharedtuplestore.h"
30 :
31 : /*
32 : * The size of chunks, in pages. This is somewhat arbitrarily set to match
33 : * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
34 : * at approximately the same rate as it allocates new chunks of memory to
35 : * insert them into.
36 : */
37 : #define STS_CHUNK_PAGES 4
38 : #define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
39 : #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
40 :
41 : /* Chunk written to disk. */
42 : typedef struct SharedTuplestoreChunk
43 : {
44 : int ntuples; /* Number of tuples in this chunk. */
45 : int overflow; /* If overflow, how many including this one? */
46 : char data[FLEXIBLE_ARRAY_MEMBER];
47 : } SharedTuplestoreChunk;
48 :
49 : /* Per-participant shared state. */
50 : typedef struct SharedTuplestoreParticipant
51 : {
52 : LWLock lock;
53 : BlockNumber read_page; /* Page number for next read. */
54 : BlockNumber npages; /* Number of pages written. */
55 : bool writing; /* Used only for assertions. */
56 : } SharedTuplestoreParticipant;
57 :
58 : /* The control object that lives in shared memory. */
59 : struct SharedTuplestore
60 : {
61 : int nparticipants; /* Number of participants that can write. */
62 : int flags; /* Flag bits from SHARED_TUPLESTORE_XXX */
63 : size_t meta_data_size; /* Size of per-tuple header. */
64 : char name[NAMEDATALEN]; /* A name for this tuplestore. */
65 :
66 : /* Followed by per-participant shared state. */
67 : SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
68 : };
69 :
70 : /* Per-participant state that lives in backend-local memory. */
71 : struct SharedTuplestoreAccessor
72 : {
73 : int participant; /* My participant number. */
74 : SharedTuplestore *sts; /* The shared state. */
75 : SharedFileSet *fileset; /* The SharedFileSet holding files. */
76 : MemoryContext context; /* Memory context for buffers. */
77 :
78 : /* State for reading. */
79 : int read_participant; /* The current participant to read from. */
80 : BufFile *read_file; /* The current file to read from. */
81 : int read_ntuples_available; /* The number of tuples in chunk. */
82 : int read_ntuples; /* How many tuples have we read from chunk? */
83 : size_t read_bytes; /* How many bytes have we read from chunk? */
84 : char *read_buffer; /* A buffer for loading tuples. */
85 : size_t read_buffer_size;
86 : BlockNumber read_next_page; /* Lowest block we'll consider reading. */
87 :
88 : /* State for writing. */
89 : SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
90 : BufFile *write_file; /* The current file to write to. */
91 : BlockNumber write_page; /* The next page to write to. */
92 : char *write_pointer; /* Current write pointer within chunk. */
93 : char *write_end; /* One past the end of the current chunk. */
94 : };
95 :
96 : static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
97 : int participant);
98 :
99 : /*
100 : * Return the amount of shared memory required to hold SharedTuplestore for a
101 : * given number of participants.
102 : */
103 : size_t
104 4802 : sts_estimate(int participants)
105 : {
106 9604 : return offsetof(SharedTuplestore, participants) +
107 4802 : sizeof(SharedTuplestoreParticipant) * participants;
108 : }
109 :
110 : /*
111 : * Initialize a SharedTuplestore in existing shared memory. There must be
112 : * space for sts_estimate(participants) bytes. If flags includes the value
113 : * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
114 : * eagerly (but this isn't yet implemented).
115 : *
116 : * Tuples that are stored may optionally carry a piece of fixed sized
117 : * meta-data which will be retrieved along with the tuple. This is useful for
118 : * the hash values used in multi-batch hash joins, but could have other
119 : * applications.
120 : *
121 : * The caller must supply a SharedFileSet, which is essentially a directory
122 : * that will be cleaned up automatically, and a name which must be unique
123 : * across all SharedTuplestores created in the same SharedFileSet.
124 : */
125 : SharedTuplestoreAccessor *
126 1860 : sts_initialize(SharedTuplestore *sts, int participants,
127 : int my_participant_number,
128 : size_t meta_data_size,
129 : int flags,
130 : SharedFileSet *fileset,
131 : const char *name)
132 : {
133 : SharedTuplestoreAccessor *accessor;
134 : int i;
135 :
136 : Assert(my_participant_number < participants);
137 :
138 1860 : sts->nparticipants = participants;
139 1860 : sts->meta_data_size = meta_data_size;
140 1860 : sts->flags = flags;
141 :
142 1860 : if (strlen(name) > sizeof(sts->name) - 1)
143 0 : elog(ERROR, "SharedTuplestore name too long");
144 1860 : strcpy(sts->name, name);
145 :
146 : /*
147 : * Limit meta-data so it + tuple size always fits into a single chunk.
148 : * sts_puttuple() and sts_read_tuple() could be made to support scenarios
149 : * where that's not the case, but it's not currently required. If so,
150 : * meta-data size probably should be made variable, too.
151 : */
152 1860 : if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
153 0 : elog(ERROR, "meta-data too long");
154 :
155 7000 : for (i = 0; i < participants; ++i)
156 : {
157 5140 : LWLockInitialize(&sts->participants[i].lock,
158 : LWTRANCHE_SHARED_TUPLESTORE);
159 5140 : sts->participants[i].read_page = 0;
160 5140 : sts->participants[i].npages = 0;
161 5140 : sts->participants[i].writing = false;
162 : }
163 :
164 1860 : accessor = palloc0(sizeof(SharedTuplestoreAccessor));
165 1860 : accessor->participant = my_participant_number;
166 1860 : accessor->sts = sts;
167 1860 : accessor->fileset = fileset;
168 1860 : accessor->context = CurrentMemoryContext;
169 :
170 1860 : return accessor;
171 : }
172 :
173 : /*
174 : * Attach to a SharedTuplestore that has been initialized by another backend,
175 : * so that this backend can read and write tuples.
176 : */
177 : SharedTuplestoreAccessor *
178 2336 : sts_attach(SharedTuplestore *sts,
179 : int my_participant_number,
180 : SharedFileSet *fileset)
181 : {
182 : SharedTuplestoreAccessor *accessor;
183 :
184 : Assert(my_participant_number < sts->nparticipants);
185 :
186 2336 : accessor = palloc0(sizeof(SharedTuplestoreAccessor));
187 2336 : accessor->participant = my_participant_number;
188 2336 : accessor->sts = sts;
189 2336 : accessor->fileset = fileset;
190 2336 : accessor->context = CurrentMemoryContext;
191 :
192 2336 : return accessor;
193 : }
194 :
195 : static void
196 3646 : sts_flush_chunk(SharedTuplestoreAccessor *accessor)
197 : {
198 : size_t size;
199 :
200 3646 : size = STS_CHUNK_PAGES * BLCKSZ;
201 3646 : BufFileWrite(accessor->write_file, accessor->write_chunk, size);
202 3646 : memset(accessor->write_chunk, 0, size);
203 3646 : accessor->write_pointer = &accessor->write_chunk->data[0];
204 3646 : accessor->sts->participants[accessor->participant].npages +=
205 : STS_CHUNK_PAGES;
206 3646 : }
207 :
208 : /*
209 : * Finish writing tuples. This must be called by all backends that have
210 : * written data before any backend begins reading it.
211 : */
212 : void
213 6558 : sts_end_write(SharedTuplestoreAccessor *accessor)
214 : {
215 6558 : if (accessor->write_file != NULL)
216 : {
217 2080 : sts_flush_chunk(accessor);
218 2080 : BufFileClose(accessor->write_file);
219 2080 : pfree(accessor->write_chunk);
220 2080 : accessor->write_chunk = NULL;
221 2080 : accessor->write_file = NULL;
222 2080 : accessor->sts->participants[accessor->participant].writing = false;
223 : }
224 6558 : }
225 :
226 : /*
227 : * Prepare to rescan. Only one participant must call this. After it returns,
228 : * all participants may call sts_begin_parallel_scan() and then loop over
229 : * sts_parallel_scan_next(). This function must not be called concurrently
230 : * with a scan, and synchronization to avoid that is the caller's
231 : * responsibility.
232 : */
233 : void
234 0 : sts_reinitialize(SharedTuplestoreAccessor *accessor)
235 : {
236 : int i;
237 :
238 : /*
239 : * Reset the shared read head for all participants' files. Also set the
240 : * initial chunk size to the minimum (any increases from that size will be
241 : * recorded in chunk_expansion_log).
242 : */
243 0 : for (i = 0; i < accessor->sts->nparticipants; ++i)
244 : {
245 0 : accessor->sts->participants[i].read_page = 0;
246 : }
247 0 : }
248 :
249 : /*
250 : * Begin scanning the contents in parallel.
251 : */
252 : void
253 1672 : sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
254 : {
255 : int i PG_USED_FOR_ASSERTS_ONLY;
256 :
257 : /* End any existing scan that was in progress. */
258 1672 : sts_end_parallel_scan(accessor);
259 :
260 : /*
261 : * Any backend that might have written into this shared tuplestore must
262 : * have called sts_end_write(), so that all buffers are flushed and the
263 : * files have stopped growing.
264 : */
265 6336 : for (i = 0; i < accessor->sts->nparticipants; ++i)
266 : Assert(!accessor->sts->participants[i].writing);
267 :
268 : /*
269 : * We will start out reading the file that THIS backend wrote. There may
270 : * be some caching locality advantage to that.
271 : */
272 1672 : accessor->read_participant = accessor->participant;
273 1672 : accessor->read_file = NULL;
274 1672 : accessor->read_next_page = 0;
275 1672 : }
276 :
277 : /*
278 : * Finish a parallel scan, freeing associated backend-local resources.
279 : */
280 : void
281 8386 : sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
282 : {
283 : /*
284 : * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
285 : * we'd probably need a reference count of current parallel scanners so we
286 : * could safely do it only when the reference count reaches zero.
287 : */
288 8386 : if (accessor->read_file != NULL)
289 : {
290 0 : BufFileClose(accessor->read_file);
291 0 : accessor->read_file = NULL;
292 : }
293 8386 : }
294 :
295 : /*
296 : * Write a tuple. If a meta-data size was provided to sts_initialize, then a
297 : * pointer to meta data of that size must be provided.
298 : */
299 : void
300 2499662 : sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
301 : MinimalTuple tuple)
302 : {
303 : size_t size;
304 :
305 : /* Do we have our own file yet? */
306 2499662 : if (accessor->write_file == NULL)
307 : {
308 : SharedTuplestoreParticipant *participant;
309 : char name[MAXPGPATH];
310 : MemoryContext oldcxt;
311 :
312 : /* Create one. Only this backend will write into it. */
313 2080 : sts_filename(name, accessor, accessor->participant);
314 :
315 2080 : oldcxt = MemoryContextSwitchTo(accessor->context);
316 2080 : accessor->write_file =
317 2080 : BufFileCreateFileSet(&accessor->fileset->fs, name);
318 2080 : MemoryContextSwitchTo(oldcxt);
319 :
320 : /* Set up the shared state for this backend's file. */
321 2080 : participant = &accessor->sts->participants[accessor->participant];
322 2080 : participant->writing = true; /* for assertions only */
323 : }
324 :
325 : /* Do we have space? */
326 2499662 : size = accessor->sts->meta_data_size + tuple->t_len;
327 2499662 : if (accessor->write_pointer + size > accessor->write_end)
328 : {
329 3430 : if (accessor->write_chunk == NULL)
330 : {
331 : /* First time through. Allocate chunk. */
332 2080 : accessor->write_chunk = (SharedTuplestoreChunk *)
333 2080 : MemoryContextAllocZero(accessor->context,
334 : STS_CHUNK_PAGES * BLCKSZ);
335 2080 : accessor->write_chunk->ntuples = 0;
336 2080 : accessor->write_pointer = &accessor->write_chunk->data[0];
337 2080 : accessor->write_end = (char *)
338 2080 : accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
339 : }
340 : else
341 : {
342 : /* See if flushing helps. */
343 1350 : sts_flush_chunk(accessor);
344 : }
345 :
346 : /* It may still not be enough in the case of a gigantic tuple. */
347 3430 : if (accessor->write_pointer + size > accessor->write_end)
348 : {
349 : size_t written;
350 :
351 : /*
352 : * We'll write the beginning of the oversized tuple, and then
353 : * write the rest in some number of 'overflow' chunks.
354 : *
355 : * sts_initialize() verifies that the size of the tuple +
356 : * meta-data always fits into a chunk. Because the chunk has been
357 : * flushed above, we can be sure to have all of a chunk's usable
358 : * space available.
359 : */
360 : Assert(accessor->write_pointer + accessor->sts->meta_data_size +
361 : sizeof(uint32) < accessor->write_end);
362 :
363 : /* Write the meta-data as one chunk. */
364 24 : if (accessor->sts->meta_data_size > 0)
365 24 : memcpy(accessor->write_pointer, meta_data,
366 24 : accessor->sts->meta_data_size);
367 :
368 : /*
369 : * Write as much of the tuple as we can fit. This includes the
370 : * tuple's size at the start.
371 : */
372 24 : written = accessor->write_end - accessor->write_pointer -
373 24 : accessor->sts->meta_data_size;
374 24 : memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
375 : tuple, written);
376 24 : ++accessor->write_chunk->ntuples;
377 24 : size -= accessor->sts->meta_data_size;
378 24 : size -= written;
379 : /* Now write as many overflow chunks as we need for the rest. */
380 240 : while (size > 0)
381 : {
382 : size_t written_this_chunk;
383 :
384 216 : sts_flush_chunk(accessor);
385 :
386 : /*
387 : * How many overflow chunks to go? This will allow readers to
388 : * skip all of them at once instead of reading each one.
389 : */
390 216 : accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
391 : STS_CHUNK_DATA_SIZE;
392 216 : written_this_chunk =
393 216 : Min(accessor->write_end - accessor->write_pointer, size);
394 216 : memcpy(accessor->write_pointer, (char *) tuple + written,
395 : written_this_chunk);
396 216 : accessor->write_pointer += written_this_chunk;
397 216 : size -= written_this_chunk;
398 216 : written += written_this_chunk;
399 : }
400 24 : return;
401 : }
402 : }
403 :
404 : /* Copy meta-data and tuple into buffer. */
405 2499638 : if (accessor->sts->meta_data_size > 0)
406 2499638 : memcpy(accessor->write_pointer, meta_data,
407 2499638 : accessor->sts->meta_data_size);
408 2499638 : memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
409 2499638 : tuple->t_len);
410 2499638 : accessor->write_pointer += size;
411 2499638 : ++accessor->write_chunk->ntuples;
412 : }
413 :
414 : static MinimalTuple
415 2499662 : sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
416 : {
417 : MinimalTuple tuple;
418 : uint32 size;
419 : size_t remaining_size;
420 : size_t this_chunk_size;
421 : char *destination;
422 :
423 : /*
424 : * We'll keep track of bytes read from this chunk so that we can detect an
425 : * overflowing tuple and switch to reading overflow pages.
426 : */
427 2499662 : if (accessor->sts->meta_data_size > 0)
428 : {
429 2499662 : BufFileReadExact(accessor->read_file, meta_data, accessor->sts->meta_data_size);
430 2499662 : accessor->read_bytes += accessor->sts->meta_data_size;
431 : }
432 2499662 : BufFileReadExact(accessor->read_file, &size, sizeof(size));
433 2499662 : accessor->read_bytes += sizeof(size);
434 2499662 : if (size > accessor->read_buffer_size)
435 : {
436 : size_t new_read_buffer_size;
437 :
438 1344 : if (accessor->read_buffer != NULL)
439 0 : pfree(accessor->read_buffer);
440 1344 : new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
441 1344 : accessor->read_buffer =
442 1344 : MemoryContextAlloc(accessor->context, new_read_buffer_size);
443 1344 : accessor->read_buffer_size = new_read_buffer_size;
444 : }
445 2499662 : remaining_size = size - sizeof(uint32);
446 2499662 : this_chunk_size = Min(remaining_size,
447 : BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
448 2499662 : destination = accessor->read_buffer + sizeof(uint32);
449 2499662 : BufFileReadExact(accessor->read_file, destination, this_chunk_size);
450 2499662 : accessor->read_bytes += this_chunk_size;
451 2499662 : remaining_size -= this_chunk_size;
452 2499662 : destination += this_chunk_size;
453 2499662 : ++accessor->read_ntuples;
454 :
455 : /* Check if we need to read any overflow chunks. */
456 2499878 : while (remaining_size > 0)
457 : {
458 : /* We are now positioned at the start of an overflow chunk. */
459 : SharedTuplestoreChunk chunk_header;
460 :
461 216 : BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
462 216 : accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
463 216 : if (chunk_header.overflow == 0)
464 0 : ereport(ERROR,
465 : (errcode_for_file_access(),
466 : errmsg("unexpected chunk in shared tuplestore temporary file"),
467 : errdetail_internal("Expected overflow chunk.")));
468 216 : accessor->read_next_page += STS_CHUNK_PAGES;
469 216 : this_chunk_size = Min(remaining_size,
470 : BLCKSZ * STS_CHUNK_PAGES -
471 : STS_CHUNK_HEADER_SIZE);
472 216 : BufFileReadExact(accessor->read_file, destination, this_chunk_size);
473 216 : accessor->read_bytes += this_chunk_size;
474 216 : remaining_size -= this_chunk_size;
475 216 : destination += this_chunk_size;
476 :
477 : /*
478 : * These will be used to count regular tuples following the oversized
479 : * tuple that spilled into this overflow chunk.
480 : */
481 216 : accessor->read_ntuples = 0;
482 216 : accessor->read_ntuples_available = chunk_header.ntuples;
483 : }
484 :
485 2499662 : tuple = (MinimalTuple) accessor->read_buffer;
486 2499662 : tuple->t_len = size;
487 :
488 2499662 : return tuple;
489 : }
490 :
491 : /*
492 : * Get the next tuple in the current parallel scan.
493 : */
494 : MinimalTuple
495 2507382 : sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
496 : {
497 : SharedTuplestoreParticipant *p;
498 : BlockNumber read_page;
499 : bool eof;
500 :
501 : for (;;)
502 : {
503 : /* Can we read more tuples from the current chunk? */
504 2507382 : if (accessor->read_ntuples < accessor->read_ntuples_available)
505 2499662 : return sts_read_tuple(accessor, meta_data);
506 :
507 : /* Find the location of a new chunk to read. */
508 7720 : p = &accessor->sts->participants[accessor->read_participant];
509 :
510 7720 : LWLockAcquire(&p->lock, LW_EXCLUSIVE);
511 : /* We can skip directly past overflow pages we know about. */
512 7720 : if (p->read_page < accessor->read_next_page)
513 24 : p->read_page = accessor->read_next_page;
514 7720 : eof = p->read_page >= p->npages;
515 7720 : if (!eof)
516 : {
517 : /* Claim the next chunk. */
518 3430 : read_page = p->read_page;
519 : /* Advance the read head for the next reader. */
520 3430 : p->read_page += STS_CHUNK_PAGES;
521 3430 : accessor->read_next_page = p->read_page;
522 : }
523 7720 : LWLockRelease(&p->lock);
524 :
525 7720 : if (!eof)
526 : {
527 : SharedTuplestoreChunk chunk_header;
528 :
529 : /* Make sure we have the file open. */
530 3430 : if (accessor->read_file == NULL)
531 : {
532 : char name[MAXPGPATH];
533 : MemoryContext oldcxt;
534 :
535 2124 : sts_filename(name, accessor, accessor->read_participant);
536 :
537 2124 : oldcxt = MemoryContextSwitchTo(accessor->context);
538 2124 : accessor->read_file =
539 2124 : BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
540 : false);
541 2124 : MemoryContextSwitchTo(oldcxt);
542 : }
543 :
544 : /* Seek and load the chunk header. */
545 3430 : if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
546 0 : ereport(ERROR,
547 : (errcode_for_file_access(),
548 : errmsg("could not seek to block %u in shared tuplestore temporary file",
549 : read_page)));
550 3430 : BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
551 :
552 : /*
553 : * If this is an overflow chunk, we skip it and any following
554 : * overflow chunks all at once.
555 : */
556 3430 : if (chunk_header.overflow > 0)
557 : {
558 0 : accessor->read_next_page = read_page +
559 0 : chunk_header.overflow * STS_CHUNK_PAGES;
560 0 : continue;
561 : }
562 :
563 3430 : accessor->read_ntuples = 0;
564 3430 : accessor->read_ntuples_available = chunk_header.ntuples;
565 3430 : accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
566 :
567 : /* Go around again, so we can get a tuple from this chunk. */
568 : }
569 : else
570 : {
571 4290 : if (accessor->read_file != NULL)
572 : {
573 2124 : BufFileClose(accessor->read_file);
574 2124 : accessor->read_file = NULL;
575 : }
576 :
577 : /*
578 : * Try the next participant's file. If we've gone full circle,
579 : * we're done.
580 : */
581 4290 : accessor->read_participant = (accessor->read_participant + 1) %
582 4290 : accessor->sts->nparticipants;
583 4290 : if (accessor->read_participant == accessor->participant)
584 1546 : break;
585 2744 : accessor->read_next_page = 0;
586 :
587 : /* Go around again, so we can get a chunk from this file. */
588 : }
589 : }
590 :
591 1546 : return NULL;
592 : }
593 :
594 : /*
595 : * Create the name used for the BufFile that a given participant will write.
596 : */
597 : static void
598 4204 : sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
599 : {
600 4204 : snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
601 4204 : }
|