Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * sharedtuplestore.c
4 : * Simple mechanism for sharing tuples between backends.
5 : *
6 : * This module contains a shared temporary tuple storage mechanism providing
7 : * a parallel-aware subset of the features of tuplestore.c. Multiple backends
8 : * can write to a SharedTuplestore, and then multiple backends can later scan
9 : * the stored tuples. Currently, the only scan type supported is a parallel
10 : * scan where each backend reads an arbitrary subset of the tuples that were
11 : * written.
12 : *
13 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
14 : * Portions Copyright (c) 1994, Regents of the University of California
15 : *
16 : * IDENTIFICATION
17 : * src/backend/utils/sort/sharedtuplestore.c
18 : *
19 : *-------------------------------------------------------------------------
20 : */
21 :
22 : #include "postgres.h"
23 :
24 : #include "access/htup.h"
25 : #include "access/htup_details.h"
26 : #include "storage/buffile.h"
27 : #include "storage/lwlock.h"
28 : #include "storage/sharedfileset.h"
29 : #include "utils/sharedtuplestore.h"
30 :
31 : /*
32 : * The size of chunks, in pages. This is somewhat arbitrarily set to match
33 : * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
34 : * at approximately the same rate as it allocates new chunks of memory to
35 : * insert them into.
36 : */
37 : #define STS_CHUNK_PAGES 4
38 : #define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
39 : #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
40 :
41 : /* Chunk written to disk. */
42 : typedef struct SharedTuplestoreChunk
43 : {
44 : int ntuples; /* Number of tuples in this chunk. */
45 : int overflow; /* If overflow, how many including this one? */
46 : char data[FLEXIBLE_ARRAY_MEMBER];
47 : } SharedTuplestoreChunk;
48 :
49 : /* Per-participant shared state. */
50 : typedef struct SharedTuplestoreParticipant
51 : {
52 : LWLock lock;
53 : BlockNumber read_page; /* Page number for next read. */
54 : BlockNumber npages; /* Number of pages written. */
55 : bool writing; /* Used only for assertions. */
56 : } SharedTuplestoreParticipant;
57 :
58 : /* The control object that lives in shared memory. */
59 : struct SharedTuplestore
60 : {
61 : int nparticipants; /* Number of participants that can write. */
62 : int flags; /* Flag bits from SHARED_TUPLESTORE_XXX */
63 : size_t meta_data_size; /* Size of per-tuple header. */
64 : char name[NAMEDATALEN]; /* A name for this tuplestore. */
65 :
66 : /* Followed by per-participant shared state. */
67 : SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
68 : };
69 :
70 : /* Per-participant state that lives in backend-local memory. */
71 : struct SharedTuplestoreAccessor
72 : {
73 : int participant; /* My participant number. */
74 : SharedTuplestore *sts; /* The shared state. */
75 : SharedFileSet *fileset; /* The SharedFileSet holding files. */
76 : MemoryContext context; /* Memory context for buffers. */
77 :
78 : /* State for reading. */
79 : int read_participant; /* The current participant to read from. */
80 : BufFile *read_file; /* The current file to read from. */
81 : int read_ntuples_available; /* The number of tuples in chunk. */
82 : int read_ntuples; /* How many tuples have we read from chunk? */
83 : size_t read_bytes; /* How many bytes have we read from chunk? */
84 : char *read_buffer; /* A buffer for loading tuples. */
85 : size_t read_buffer_size;
86 : BlockNumber read_next_page; /* Lowest block we'll consider reading. */
87 :
88 : /* State for writing. */
89 : SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
90 : BufFile *write_file; /* The current file to write to. */
91 : char *write_pointer; /* Current write pointer within chunk. */
92 : char *write_end; /* One past the end of the current chunk. */
93 : };
94 :
95 : static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
96 : int participant);
97 :
98 : /*
99 : * Return the amount of shared memory required to hold SharedTuplestore for a
100 : * given number of participants.
101 : */
102 : size_t
103 3752 : sts_estimate(int participants)
104 : {
105 7504 : return offsetof(SharedTuplestore, participants) +
106 3752 : sizeof(SharedTuplestoreParticipant) * participants;
107 : }
108 :
109 : /*
110 : * Initialize a SharedTuplestore in existing shared memory. There must be
111 : * space for sts_estimate(participants) bytes. If flags includes the value
112 : * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
113 : * eagerly (but this isn't yet implemented).
114 : *
115 : * Tuples that are stored may optionally carry a piece of fixed sized
116 : * meta-data which will be retrieved along with the tuple. This is useful for
117 : * the hash values used in multi-batch hash joins, but could have other
118 : * applications.
119 : *
120 : * The caller must supply a SharedFileSet, which is essentially a directory
121 : * that will be cleaned up automatically, and a name which must be unique
122 : * across all SharedTuplestores created in the same SharedFileSet.
123 : */
124 : SharedTuplestoreAccessor *
125 1392 : sts_initialize(SharedTuplestore *sts, int participants,
126 : int my_participant_number,
127 : size_t meta_data_size,
128 : int flags,
129 : SharedFileSet *fileset,
130 : const char *name)
131 : {
132 : SharedTuplestoreAccessor *accessor;
133 : int i;
134 :
135 : Assert(my_participant_number < participants);
136 :
137 1392 : sts->nparticipants = participants;
138 1392 : sts->meta_data_size = meta_data_size;
139 1392 : sts->flags = flags;
140 :
141 1392 : if (strlen(name) > sizeof(sts->name) - 1)
142 0 : elog(ERROR, "SharedTuplestore name too long");
143 1392 : strcpy(sts->name, name);
144 :
145 : /*
146 : * Limit meta-data so it + tuple size always fits into a single chunk.
147 : * sts_puttuple() and sts_read_tuple() could be made to support scenarios
148 : * where that's not the case, but it's not currently required. If so,
149 : * meta-data size probably should be made variable, too.
150 : */
151 1392 : if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
152 0 : elog(ERROR, "meta-data too long");
153 :
154 5124 : for (i = 0; i < participants; ++i)
155 : {
156 3732 : LWLockInitialize(&sts->participants[i].lock,
157 : LWTRANCHE_SHARED_TUPLESTORE);
158 3732 : sts->participants[i].read_page = 0;
159 3732 : sts->participants[i].npages = 0;
160 3732 : sts->participants[i].writing = false;
161 : }
162 :
163 1392 : accessor = palloc0(sizeof(SharedTuplestoreAccessor));
164 1392 : accessor->participant = my_participant_number;
165 1392 : accessor->sts = sts;
166 1392 : accessor->fileset = fileset;
167 1392 : accessor->context = CurrentMemoryContext;
168 :
169 1392 : return accessor;
170 : }
171 :
172 : /*
173 : * Attach to a SharedTuplestore that has been initialized by another backend,
174 : * so that this backend can read and write tuples.
175 : */
176 : SharedTuplestoreAccessor *
177 1850 : sts_attach(SharedTuplestore *sts,
178 : int my_participant_number,
179 : SharedFileSet *fileset)
180 : {
181 : SharedTuplestoreAccessor *accessor;
182 :
183 : Assert(my_participant_number < sts->nparticipants);
184 :
185 1850 : accessor = palloc0(sizeof(SharedTuplestoreAccessor));
186 1850 : accessor->participant = my_participant_number;
187 1850 : accessor->sts = sts;
188 1850 : accessor->fileset = fileset;
189 1850 : accessor->context = CurrentMemoryContext;
190 :
191 1850 : return accessor;
192 : }
193 :
194 : static void
195 3248 : sts_flush_chunk(SharedTuplestoreAccessor *accessor)
196 : {
197 : size_t size;
198 :
199 3248 : size = STS_CHUNK_PAGES * BLCKSZ;
200 3248 : BufFileWrite(accessor->write_file, accessor->write_chunk, size);
201 3248 : memset(accessor->write_chunk, 0, size);
202 3248 : accessor->write_pointer = &accessor->write_chunk->data[0];
203 3248 : accessor->sts->participants[accessor->participant].npages +=
204 : STS_CHUNK_PAGES;
205 3248 : }
206 :
207 : /*
208 : * Finish writing tuples. This must be called by all backends that have
209 : * written data before any backend begins reading it.
210 : */
211 : void
212 5192 : sts_end_write(SharedTuplestoreAccessor *accessor)
213 : {
214 5192 : if (accessor->write_file != NULL)
215 : {
216 1544 : sts_flush_chunk(accessor);
217 1544 : BufFileClose(accessor->write_file);
218 1544 : pfree(accessor->write_chunk);
219 1544 : accessor->write_chunk = NULL;
220 1544 : accessor->write_file = NULL;
221 1544 : accessor->sts->participants[accessor->participant].writing = false;
222 : }
223 5192 : }
224 :
225 : /*
226 : * Prepare to rescan. Only one participant must call this. After it returns,
227 : * all participants may call sts_begin_parallel_scan() and then loop over
228 : * sts_parallel_scan_next(). This function must not be called concurrently
229 : * with a scan, and synchronization to avoid that is the caller's
230 : * responsibility.
231 : */
232 : void
233 0 : sts_reinitialize(SharedTuplestoreAccessor *accessor)
234 : {
235 : int i;
236 :
237 : /*
238 : * Reset the shared read head for all participants' files. Also set the
239 : * initial chunk size to the minimum (any increases from that size will be
240 : * recorded in chunk_expansion_log).
241 : */
242 0 : for (i = 0; i < accessor->sts->nparticipants; ++i)
243 : {
244 0 : accessor->sts->participants[i].read_page = 0;
245 : }
246 0 : }
247 :
248 : /*
249 : * Begin scanning the contents in parallel.
250 : */
251 : void
252 1312 : sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
253 : {
254 : int i PG_USED_FOR_ASSERTS_ONLY;
255 :
256 : /* End any existing scan that was in progress. */
257 1312 : sts_end_parallel_scan(accessor);
258 :
259 : /*
260 : * Any backend that might have written into this shared tuplestore must
261 : * have called sts_end_write(), so that all buffers are flushed and the
262 : * files have stopped growing.
263 : */
264 4902 : for (i = 0; i < accessor->sts->nparticipants; ++i)
265 : Assert(!accessor->sts->participants[i].writing);
266 :
267 : /*
268 : * We will start out reading the file that THIS backend wrote. There may
269 : * be some caching locality advantage to that.
270 : */
271 1312 : accessor->read_participant = accessor->participant;
272 1312 : accessor->read_file = NULL;
273 1312 : accessor->read_next_page = 0;
274 1312 : }
275 :
276 : /*
277 : * Finish a parallel scan, freeing associated backend-local resources.
278 : */
279 : void
280 6642 : sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
281 : {
282 : /*
283 : * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
284 : * we'd probably need a reference count of current parallel scanners so we
285 : * could safely do it only when the reference count reaches zero.
286 : */
287 6642 : if (accessor->read_file != NULL)
288 : {
289 0 : BufFileClose(accessor->read_file);
290 0 : accessor->read_file = NULL;
291 : }
292 6642 : }
293 :
294 : /*
295 : * Write a tuple. If a meta-data size was provided to sts_initialize, then a
296 : * pointer to meta data of that size must be provided.
297 : */
298 : void
299 2400798 : sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
300 : MinimalTuple tuple)
301 : {
302 : size_t size;
303 :
304 : /* Do we have our own file yet? */
305 2400798 : if (accessor->write_file == NULL)
306 : {
307 : SharedTuplestoreParticipant *participant;
308 : char name[MAXPGPATH];
309 : MemoryContext oldcxt;
310 :
311 : /* Create one. Only this backend will write into it. */
312 1544 : sts_filename(name, accessor, accessor->participant);
313 :
314 1544 : oldcxt = MemoryContextSwitchTo(accessor->context);
315 1544 : accessor->write_file =
316 1544 : BufFileCreateFileSet(&accessor->fileset->fs, name);
317 1544 : MemoryContextSwitchTo(oldcxt);
318 :
319 : /* Set up the shared state for this backend's file. */
320 1544 : participant = &accessor->sts->participants[accessor->participant];
321 1544 : participant->writing = true; /* for assertions only */
322 : }
323 :
324 : /* Do we have space? */
325 2400798 : size = accessor->sts->meta_data_size + tuple->t_len;
326 2400798 : if (accessor->write_pointer + size > accessor->write_end)
327 : {
328 3032 : if (accessor->write_chunk == NULL)
329 : {
330 : /* First time through. Allocate chunk. */
331 1544 : accessor->write_chunk = (SharedTuplestoreChunk *)
332 1544 : MemoryContextAllocZero(accessor->context,
333 : STS_CHUNK_PAGES * BLCKSZ);
334 1544 : accessor->write_chunk->ntuples = 0;
335 1544 : accessor->write_pointer = &accessor->write_chunk->data[0];
336 1544 : accessor->write_end = (char *)
337 1544 : accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
338 : }
339 : else
340 : {
341 : /* See if flushing helps. */
342 1488 : sts_flush_chunk(accessor);
343 : }
344 :
345 : /* It may still not be enough in the case of a gigantic tuple. */
346 3032 : if (accessor->write_pointer + size > accessor->write_end)
347 : {
348 : size_t written;
349 :
350 : /*
351 : * We'll write the beginning of the oversized tuple, and then
352 : * write the rest in some number of 'overflow' chunks.
353 : *
354 : * sts_initialize() verifies that the size of the tuple +
355 : * meta-data always fits into a chunk. Because the chunk has been
356 : * flushed above, we can be sure to have all of a chunk's usable
357 : * space available.
358 : */
359 : Assert(accessor->write_pointer + accessor->sts->meta_data_size +
360 : sizeof(uint32) < accessor->write_end);
361 :
362 : /* Write the meta-data as one chunk. */
363 24 : if (accessor->sts->meta_data_size > 0)
364 24 : memcpy(accessor->write_pointer, meta_data,
365 24 : accessor->sts->meta_data_size);
366 :
367 : /*
368 : * Write as much of the tuple as we can fit. This includes the
369 : * tuple's size at the start.
370 : */
371 24 : written = accessor->write_end - accessor->write_pointer -
372 24 : accessor->sts->meta_data_size;
373 24 : memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
374 : tuple, written);
375 24 : ++accessor->write_chunk->ntuples;
376 24 : size -= accessor->sts->meta_data_size;
377 24 : size -= written;
378 : /* Now write as many overflow chunks as we need for the rest. */
379 240 : while (size > 0)
380 : {
381 : size_t written_this_chunk;
382 :
383 216 : sts_flush_chunk(accessor);
384 :
385 : /*
386 : * How many overflow chunks to go? This will allow readers to
387 : * skip all of them at once instead of reading each one.
388 : */
389 216 : accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
390 : STS_CHUNK_DATA_SIZE;
391 216 : written_this_chunk =
392 216 : Min(accessor->write_end - accessor->write_pointer, size);
393 216 : memcpy(accessor->write_pointer, (char *) tuple + written,
394 : written_this_chunk);
395 216 : accessor->write_pointer += written_this_chunk;
396 216 : size -= written_this_chunk;
397 216 : written += written_this_chunk;
398 : }
399 24 : return;
400 : }
401 : }
402 :
403 : /* Copy meta-data and tuple into buffer. */
404 2400774 : if (accessor->sts->meta_data_size > 0)
405 2400774 : memcpy(accessor->write_pointer, meta_data,
406 2400774 : accessor->sts->meta_data_size);
407 2400774 : memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
408 2400774 : tuple->t_len);
409 2400774 : accessor->write_pointer += size;
410 2400774 : ++accessor->write_chunk->ntuples;
411 : }
412 :
413 : static MinimalTuple
414 2400798 : sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
415 : {
416 : MinimalTuple tuple;
417 : uint32 size;
418 : size_t remaining_size;
419 : size_t this_chunk_size;
420 : char *destination;
421 :
422 : /*
423 : * We'll keep track of bytes read from this chunk so that we can detect an
424 : * overflowing tuple and switch to reading overflow pages.
425 : */
426 2400798 : if (accessor->sts->meta_data_size > 0)
427 : {
428 2400798 : BufFileReadExact(accessor->read_file, meta_data, accessor->sts->meta_data_size);
429 2400798 : accessor->read_bytes += accessor->sts->meta_data_size;
430 : }
431 2400798 : BufFileReadExact(accessor->read_file, &size, sizeof(size));
432 2400798 : accessor->read_bytes += sizeof(size);
433 2400798 : if (size > accessor->read_buffer_size)
434 : {
435 : size_t new_read_buffer_size;
436 :
437 946 : if (accessor->read_buffer != NULL)
438 0 : pfree(accessor->read_buffer);
439 946 : new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
440 946 : accessor->read_buffer =
441 946 : MemoryContextAlloc(accessor->context, new_read_buffer_size);
442 946 : accessor->read_buffer_size = new_read_buffer_size;
443 : }
444 2400798 : remaining_size = size - sizeof(uint32);
445 2400798 : this_chunk_size = Min(remaining_size,
446 : BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
447 2400798 : destination = accessor->read_buffer + sizeof(uint32);
448 2400798 : BufFileReadExact(accessor->read_file, destination, this_chunk_size);
449 2400798 : accessor->read_bytes += this_chunk_size;
450 2400798 : remaining_size -= this_chunk_size;
451 2400798 : destination += this_chunk_size;
452 2400798 : ++accessor->read_ntuples;
453 :
454 : /* Check if we need to read any overflow chunks. */
455 2401014 : while (remaining_size > 0)
456 : {
457 : /* We are now positioned at the start of an overflow chunk. */
458 : SharedTuplestoreChunk chunk_header;
459 :
460 216 : BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
461 216 : accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
462 216 : if (chunk_header.overflow == 0)
463 0 : ereport(ERROR,
464 : (errcode_for_file_access(),
465 : errmsg("unexpected chunk in shared tuplestore temporary file"),
466 : errdetail_internal("Expected overflow chunk.")));
467 216 : accessor->read_next_page += STS_CHUNK_PAGES;
468 216 : this_chunk_size = Min(remaining_size,
469 : BLCKSZ * STS_CHUNK_PAGES -
470 : STS_CHUNK_HEADER_SIZE);
471 216 : BufFileReadExact(accessor->read_file, destination, this_chunk_size);
472 216 : accessor->read_bytes += this_chunk_size;
473 216 : remaining_size -= this_chunk_size;
474 216 : destination += this_chunk_size;
475 :
476 : /*
477 : * These will be used to count regular tuples following the oversized
478 : * tuple that spilled into this overflow chunk.
479 : */
480 216 : accessor->read_ntuples = 0;
481 216 : accessor->read_ntuples_available = chunk_header.ntuples;
482 : }
483 :
484 2400798 : tuple = (MinimalTuple) accessor->read_buffer;
485 2400798 : tuple->t_len = size;
486 :
487 2400798 : return tuple;
488 : }
489 :
490 : /*
491 : * Get the next tuple in the current parallel scan.
492 : */
493 : MinimalTuple
494 2401944 : sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
495 : {
496 : SharedTuplestoreParticipant *p;
497 : BlockNumber read_page;
498 : bool eof;
499 :
500 : for (;;)
501 : {
502 : /* Can we read more tuples from the current chunk? */
503 2406928 : if (accessor->read_ntuples < accessor->read_ntuples_available)
504 2400798 : return sts_read_tuple(accessor, meta_data);
505 :
506 : /* Find the location of a new chunk to read. */
507 6130 : p = &accessor->sts->participants[accessor->read_participant];
508 :
509 6130 : LWLockAcquire(&p->lock, LW_EXCLUSIVE);
510 : /* We can skip directly past overflow pages we know about. */
511 6130 : if (p->read_page < accessor->read_next_page)
512 24 : p->read_page = accessor->read_next_page;
513 6130 : eof = p->read_page >= p->npages;
514 6130 : if (!eof)
515 : {
516 : /* Claim the next chunk. */
517 3032 : read_page = p->read_page;
518 : /* Advance the read head for the next reader. */
519 3032 : p->read_page += STS_CHUNK_PAGES;
520 3032 : accessor->read_next_page = p->read_page;
521 : }
522 6130 : LWLockRelease(&p->lock);
523 :
524 6130 : if (!eof)
525 : {
526 : SharedTuplestoreChunk chunk_header;
527 :
528 : /* Make sure we have the file open. */
529 3032 : if (accessor->read_file == NULL)
530 : {
531 : char name[MAXPGPATH];
532 : MemoryContext oldcxt;
533 :
534 1620 : sts_filename(name, accessor, accessor->read_participant);
535 :
536 1620 : oldcxt = MemoryContextSwitchTo(accessor->context);
537 1620 : accessor->read_file =
538 1620 : BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
539 : false);
540 1620 : MemoryContextSwitchTo(oldcxt);
541 : }
542 :
543 : /* Seek and load the chunk header. */
544 3032 : if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
545 0 : ereport(ERROR,
546 : (errcode_for_file_access(),
547 : errmsg("could not seek to block %u in shared tuplestore temporary file",
548 : read_page)));
549 3032 : BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
550 :
551 : /*
552 : * If this is an overflow chunk, we skip it and any following
553 : * overflow chunks all at once.
554 : */
555 3032 : if (chunk_header.overflow > 0)
556 : {
557 0 : accessor->read_next_page = read_page +
558 0 : chunk_header.overflow * STS_CHUNK_PAGES;
559 0 : continue;
560 : }
561 :
562 3032 : accessor->read_ntuples = 0;
563 3032 : accessor->read_ntuples_available = chunk_header.ntuples;
564 3032 : accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
565 :
566 : /* Go around again, so we can get a tuple from this chunk. */
567 : }
568 : else
569 : {
570 3098 : if (accessor->read_file != NULL)
571 : {
572 1620 : BufFileClose(accessor->read_file);
573 1620 : accessor->read_file = NULL;
574 : }
575 :
576 : /*
577 : * Try the next participant's file. If we've gone full circle,
578 : * we're done.
579 : */
580 3098 : accessor->read_participant = (accessor->read_participant + 1) %
581 3098 : accessor->sts->nparticipants;
582 3098 : if (accessor->read_participant == accessor->participant)
583 1146 : break;
584 1952 : accessor->read_next_page = 0;
585 :
586 : /* Go around again, so we can get a chunk from this file. */
587 : }
588 : }
589 :
590 1146 : return NULL;
591 : }
592 :
593 : /*
594 : * Create the name used for the BufFile that a given participant will write.
595 : */
596 : static void
597 3164 : sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
598 : {
599 3164 : snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
600 3164 : }
|