Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeHashjoin.c
4 : * Routines to handle hash join nodes
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeHashjoin.c
12 : *
13 : * HASH JOIN
14 : *
15 : * This is based on the "hybrid hash join" algorithm described shortly in the
16 : * following page
17 : *
18 : * https://en.wikipedia.org/wiki/Hash_join#Hybrid_hash_join
19 : *
20 : * and in detail in the referenced paper:
21 : *
22 : * "An Adaptive Hash Join Algorithm for Multiuser Environments"
23 : * Hansjörg Zeller; Jim Gray (1990). Proceedings of the 16th VLDB conference.
24 : * Brisbane: 186–197.
25 : *
26 : * If the inner side tuples of a hash join do not fit in memory, the hash join
27 : * can be executed in multiple batches.
28 : *
29 : * If the statistics on the inner side relation are accurate, planner chooses a
30 : * multi-batch strategy and estimates the number of batches.
31 : *
32 : * The query executor measures the real size of the hashtable and increases the
33 : * number of batches if the hashtable grows too large.
34 : *
35 : * The number of batches is always a power of two, so an increase in the number
36 : * of batches doubles it.
37 : *
38 : * Serial hash join measures batch size lazily -- waiting until it is loading a
39 : * batch to determine if it will fit in memory. While inserting tuples into the
40 : * hashtable, serial hash join will, if that tuple were to exceed work_mem,
41 : * dump out the hashtable and reassign them either to other batch files or the
42 : * current batch resident in the hashtable.
43 : *
44 : * Parallel hash join, on the other hand, completes all changes to the number
45 : * of batches during the build phase. If it increases the number of batches, it
46 : * dumps out all the tuples from all batches and reassigns them to entirely new
47 : * batch files. Then it checks every batch to ensure it will fit in the space
48 : * budget for the query.
49 : *
50 : * In both parallel and serial hash join, the executor currently makes a best
51 : * effort. If a particular batch will not fit in memory, it tries doubling the
52 : * number of batches. If after a batch increase, there is a batch which
53 : * retained all or none of its tuples, the executor disables growth in the
54 : * number of batches globally. After growth is disabled, all batches that would
55 : * have previously triggered an increase in the number of batches instead
56 : * exceed the space allowed.
57 : *
58 : * PARALLELISM
59 : *
60 : * Hash joins can participate in parallel query execution in several ways. A
61 : * parallel-oblivious hash join is one where the node is unaware that it is
62 : * part of a parallel plan. In this case, a copy of the inner plan is used to
63 : * build a copy of the hash table in every backend, and the outer plan could
64 : * either be built from a partial or complete path, so that the results of the
65 : * hash join are correspondingly either partial or complete. A parallel-aware
66 : * hash join is one that behaves differently, coordinating work between
67 : * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
68 : * Hash Join always appears with a Parallel Hash node.
69 : *
70 : * Parallel-aware hash joins use the same per-backend state machine to track
71 : * progress through the hash join algorithm as parallel-oblivious hash joins.
72 : * In a parallel-aware hash join, there is also a shared state machine that
73 : * co-operating backends use to synchronize their local state machines and
74 : * program counters. The shared state machine is managed with a Barrier IPC
75 : * primitive. When all attached participants arrive at a barrier, the phase
76 : * advances and all waiting participants are released.
77 : *
78 : * When a participant begins working on a parallel hash join, it must first
79 : * figure out how much progress has already been made, because participants
80 : * don't wait for each other to begin. For this reason there are switch
81 : * statements at key points in the code where we have to synchronize our local
82 : * state machine with the phase, and then jump to the correct part of the
83 : * algorithm so that we can get started.
84 : *
85 : * One barrier called build_barrier is used to coordinate the hashing phases.
86 : * The phase is represented by an integer which begins at zero and increments
87 : * one by one, but in the code it is referred to by symbolic names as follows.
88 : * An asterisk indicates a phase that is performed by a single arbitrarily
89 : * chosen process.
90 : *
91 : * PHJ_BUILD_ELECT -- initial state
92 : * PHJ_BUILD_ALLOCATE* -- one sets up the batches and table 0
93 : * PHJ_BUILD_HASH_INNER -- all hash the inner rel
94 : * PHJ_BUILD_HASH_OUTER -- (multi-batch only) all hash the outer
95 : * PHJ_BUILD_RUN -- building done, probing can begin
96 : * PHJ_BUILD_FREE* -- all work complete, one frees batches
97 : *
98 : * While in the phase PHJ_BUILD_HASH_INNER a separate pair of barriers may
99 : * be used repeatedly as required to coordinate expansions in the number of
100 : * batches or buckets. Their phases are as follows:
101 : *
102 : * PHJ_GROW_BATCHES_ELECT -- initial state
103 : * PHJ_GROW_BATCHES_REALLOCATE* -- one allocates new batches
104 : * PHJ_GROW_BATCHES_REPARTITION -- all repartition
105 : * PHJ_GROW_BATCHES_DECIDE* -- one detects skew and cleans up
106 : * PHJ_GROW_BATCHES_FINISH -- finished one growth cycle
107 : *
108 : * PHJ_GROW_BUCKETS_ELECT -- initial state
109 : * PHJ_GROW_BUCKETS_REALLOCATE* -- one allocates new buckets
110 : * PHJ_GROW_BUCKETS_REINSERT -- all insert tuples
111 : *
112 : * If the planner got the number of batches and buckets right, those won't be
113 : * necessary, but on the other hand we might finish up needing to expand the
114 : * buckets or batches multiple times while hashing the inner relation to stay
115 : * within our memory budget and load factor target. For that reason it's a
116 : * separate pair of barriers using circular phases.
117 : *
118 : * The PHJ_BUILD_HASH_OUTER phase is required only for multi-batch joins,
119 : * because we need to divide the outer relation into batches up front in order
120 : * to be able to process batches entirely independently. In contrast, the
121 : * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
122 : * batches whenever it encounters them while scanning and probing, which it
123 : * can do because it processes batches in serial order.
124 : *
125 : * Once PHJ_BUILD_RUN is reached, backends then split up and process
126 : * different batches, or gang up and work together on probing batches if there
127 : * aren't enough to go around. For each batch there is a separate barrier
128 : * with the following phases:
129 : *
130 : * PHJ_BATCH_ELECT -- initial state
131 : * PHJ_BATCH_ALLOCATE* -- one allocates buckets
132 : * PHJ_BATCH_LOAD -- all load the hash table from disk
133 : * PHJ_BATCH_PROBE -- all probe
134 : * PHJ_BATCH_SCAN* -- one does right/right-anti/full unmatched scan
135 : * PHJ_BATCH_FREE* -- one frees memory
136 : *
137 : * Batch 0 is a special case, because it starts out in phase
138 : * PHJ_BATCH_PROBE; populating batch 0's hash table is done during
139 : * PHJ_BUILD_HASH_INNER so we can skip loading.
140 : *
141 : * Initially we try to plan for a single-batch hash join using the combined
142 : * hash_mem of all participants to create a large shared hash table. If that
143 : * turns out either at planning or execution time to be impossible then we
144 : * fall back to regular hash_mem sized hash tables.
145 : *
146 : * To avoid deadlocks, we never wait for any barrier unless it is known that
147 : * all other backends attached to it are actively executing the node or have
148 : * finished. Practically, that means that we never emit a tuple while attached
149 : * to a barrier, unless the barrier has reached a phase that means that no
150 : * process will wait on it again. We emit tuples while attached to the build
151 : * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
152 : * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
153 : * respectively without waiting, using BarrierArriveAndDetach() and
154 : * BarrierArriveAndDetachExceptLast() respectively. The last to detach
155 : * receives a different return value so that it knows that it's safe to
156 : * clean up. Any straggler process that attaches after that phase is reached
157 : * will see that it's too late to participate or access the relevant shared
158 : * memory objects.
159 : *
160 : *-------------------------------------------------------------------------
161 : */
162 :
163 : #include "postgres.h"
164 :
165 : #include "access/htup_details.h"
166 : #include "access/parallel.h"
167 : #include "executor/executor.h"
168 : #include "executor/hashjoin.h"
169 : #include "executor/nodeHash.h"
170 : #include "executor/nodeHashjoin.h"
171 : #include "miscadmin.h"
172 : #include "pgstat.h"
173 : #include "utils/memutils.h"
174 : #include "utils/sharedtuplestore.h"
175 :
176 :
177 : /*
178 : * States of the ExecHashJoin state machine
179 : */
180 : #define HJ_BUILD_HASHTABLE 1
181 : #define HJ_NEED_NEW_OUTER 2
182 : #define HJ_SCAN_BUCKET 3
183 : #define HJ_FILL_OUTER_TUPLE 4
184 : #define HJ_FILL_INNER_TUPLES 5
185 : #define HJ_NEED_NEW_BATCH 6
186 :
187 : /* Returns true if doing null-fill on outer relation */
188 : #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
189 : /* Returns true if doing null-fill on inner relation */
190 : #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
191 :
192 : static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
193 : HashJoinState *hjstate,
194 : uint32 *hashvalue);
195 : static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
196 : HashJoinState *hjstate,
197 : uint32 *hashvalue);
198 : static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
199 : BufFile *file,
200 : uint32 *hashvalue,
201 : TupleTableSlot *tupleSlot);
202 : static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
203 : static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
204 : static void ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate);
205 :
206 :
207 : /* ----------------------------------------------------------------
208 : * ExecHashJoinImpl
209 : *
210 : * This function implements the Hybrid Hashjoin algorithm. It is marked
211 : * with an always-inline attribute so that ExecHashJoin() and
212 : * ExecParallelHashJoin() can inline it. Compilers that respect the
213 : * attribute should create versions specialized for parallel == true and
214 : * parallel == false with unnecessary branches removed.
215 : *
216 : * Note: the relation we build hash table on is the "inner"
217 : * the other one is "outer".
218 : * ----------------------------------------------------------------
219 : */
220 : static pg_attribute_always_inline TupleTableSlot *
221 9463684 : ExecHashJoinImpl(PlanState *pstate, bool parallel)
222 : {
223 9463684 : HashJoinState *node = castNode(HashJoinState, pstate);
224 : PlanState *outerNode;
225 : HashState *hashNode;
226 : ExprState *joinqual;
227 : ExprState *otherqual;
228 : ExprContext *econtext;
229 : HashJoinTable hashtable;
230 : TupleTableSlot *outerTupleSlot;
231 : uint32 hashvalue;
232 : int batchno;
233 : ParallelHashJoinState *parallel_state;
234 :
235 : /*
236 : * get information from HashJoin node
237 : */
238 9463684 : joinqual = node->js.joinqual;
239 9463684 : otherqual = node->js.ps.qual;
240 9463684 : hashNode = (HashState *) innerPlanState(node);
241 9463684 : outerNode = outerPlanState(node);
242 9463684 : hashtable = node->hj_HashTable;
243 9463684 : econtext = node->js.ps.ps_ExprContext;
244 9463684 : parallel_state = hashNode->parallel_state;
245 :
246 : /*
247 : * Reset per-tuple memory context to free any expression evaluation
248 : * storage allocated in the previous tuple cycle.
249 : */
250 9463684 : ResetExprContext(econtext);
251 :
252 : /*
253 : * run the hash join state machine
254 : */
255 : for (;;)
256 : {
257 : /*
258 : * It's possible to iterate this loop many times before returning a
259 : * tuple, in some pathological cases such as needing to move much of
260 : * the current batch to a later batch. So let's check for interrupts
261 : * each time through.
262 : */
263 35229798 : CHECK_FOR_INTERRUPTS();
264 :
265 35229798 : switch (node->hj_JoinState)
266 : {
267 22810 : case HJ_BUILD_HASHTABLE:
268 :
269 : /*
270 : * First time through: build hash table for inner relation.
271 : */
272 : Assert(hashtable == NULL);
273 :
274 : /*
275 : * If the outer relation is completely empty, and it's not
276 : * right/right-anti/full join, we can quit without building
277 : * the hash table. However, for an inner join it is only a
278 : * win to check this when the outer relation's startup cost is
279 : * less than the projected cost of building the hash table.
280 : * Otherwise it's best to build the hash table first and see
281 : * if the inner relation is empty. (When it's a left join, we
282 : * should always make this check, since we aren't going to be
283 : * able to skip the join on the strength of an empty inner
284 : * relation anyway.)
285 : *
286 : * If we are rescanning the join, we make use of information
287 : * gained on the previous scan: don't bother to try the
288 : * prefetch if the previous scan found the outer relation
289 : * nonempty. This is not 100% reliable since with new
290 : * parameters the outer relation might yield different
291 : * results, but it's a good heuristic.
292 : *
293 : * The only way to make the check is to try to fetch a tuple
294 : * from the outer plan node. If we succeed, we have to stash
295 : * it away for later consumption by ExecHashJoinOuterGetTuple.
296 : */
297 22810 : if (HJ_FILL_INNER(node))
298 : {
299 : /* no chance to not build the hash table */
300 5010 : node->hj_FirstOuterTupleSlot = NULL;
301 : }
302 17800 : else if (parallel)
303 : {
304 : /*
305 : * The empty-outer optimization is not implemented for
306 : * shared hash tables, because no one participant can
307 : * determine that there are no outer tuples, and it's not
308 : * yet clear that it's worth the synchronization overhead
309 : * of reaching consensus to figure that out. So we have
310 : * to build the hash table.
311 : */
312 326 : node->hj_FirstOuterTupleSlot = NULL;
313 : }
314 17474 : else if (HJ_FILL_OUTER(node) ||
315 13664 : (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
316 12852 : !node->hj_OuterNotEmpty))
317 : {
318 15940 : node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
319 15940 : if (TupIsNull(node->hj_FirstOuterTupleSlot))
320 : {
321 3434 : node->hj_OuterNotEmpty = false;
322 3434 : return NULL;
323 : }
324 : else
325 12506 : node->hj_OuterNotEmpty = true;
326 : }
327 : else
328 1534 : node->hj_FirstOuterTupleSlot = NULL;
329 :
330 : /*
331 : * Create the hash table. If using Parallel Hash, then
332 : * whoever gets here first will create the hash table and any
333 : * later arrivals will merely attach to it.
334 : */
335 19376 : hashtable = ExecHashTableCreate(hashNode,
336 : node->hj_HashOperators,
337 : node->hj_Collations,
338 19376 : HJ_FILL_INNER(node));
339 19376 : node->hj_HashTable = hashtable;
340 :
341 : /*
342 : * Execute the Hash node, to build the hash table. If using
343 : * Parallel Hash, then we'll try to help hashing unless we
344 : * arrived too late.
345 : */
346 19376 : hashNode->hashtable = hashtable;
347 19376 : (void) MultiExecProcNode((PlanState *) hashNode);
348 :
349 : /*
350 : * If the inner relation is completely empty, and we're not
351 : * doing a left outer join, we can quit without scanning the
352 : * outer relation.
353 : */
354 19376 : if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
355 : {
356 944 : if (parallel)
357 : {
358 : /*
359 : * Advance the build barrier to PHJ_BUILD_RUN before
360 : * proceeding so we can negotiate resource cleanup.
361 : */
362 6 : Barrier *build_barrier = ¶llel_state->build_barrier;
363 :
364 8 : while (BarrierPhase(build_barrier) < PHJ_BUILD_RUN)
365 2 : BarrierArriveAndWait(build_barrier, 0);
366 : }
367 944 : return NULL;
368 : }
369 :
370 : /*
371 : * need to remember whether nbatch has increased since we
372 : * began scanning the outer relation
373 : */
374 18432 : hashtable->nbatch_outstart = hashtable->nbatch;
375 :
376 : /*
377 : * Reset OuterNotEmpty for scan. (It's OK if we fetched a
378 : * tuple above, because ExecHashJoinOuterGetTuple will
379 : * immediately set it again.)
380 : */
381 18432 : node->hj_OuterNotEmpty = false;
382 :
383 18432 : if (parallel)
384 : {
385 : Barrier *build_barrier;
386 :
387 392 : build_barrier = ¶llel_state->build_barrier;
388 : Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
389 : BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
390 : BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
391 392 : if (BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER)
392 : {
393 : /*
394 : * If multi-batch, we need to hash the outer relation
395 : * up front.
396 : */
397 250 : if (hashtable->nbatch > 1)
398 138 : ExecParallelHashJoinPartitionOuter(node);
399 250 : BarrierArriveAndWait(build_barrier,
400 : WAIT_EVENT_HASH_BUILD_HASH_OUTER);
401 : }
402 142 : else if (BarrierPhase(build_barrier) == PHJ_BUILD_FREE)
403 : {
404 : /*
405 : * If we attached so late that the job is finished and
406 : * the batch state has been freed, we can return
407 : * immediately.
408 : */
409 4 : return NULL;
410 : }
411 :
412 : /* Each backend should now select a batch to work on. */
413 : Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUN);
414 388 : hashtable->curbatch = -1;
415 388 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
416 :
417 388 : continue;
418 : }
419 : else
420 18040 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
421 :
422 : /* FALL THRU */
423 :
424 16671276 : case HJ_NEED_NEW_OUTER:
425 :
426 : /*
427 : * We don't have an outer tuple, try to get the next one
428 : */
429 16671276 : if (parallel)
430 : outerTupleSlot =
431 2160890 : ExecParallelHashJoinOuterGetTuple(outerNode, node,
432 : &hashvalue);
433 : else
434 : outerTupleSlot =
435 14510386 : ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
436 :
437 16671276 : if (TupIsNull(outerTupleSlot))
438 : {
439 : /* end of batch, or maybe whole join */
440 20404 : if (HJ_FILL_INNER(node))
441 : {
442 : /* set up to scan for unmatched inner tuples */
443 4832 : if (parallel)
444 : {
445 : /*
446 : * Only one process is currently allow to handle
447 : * each batch's unmatched tuples, in a parallel
448 : * join.
449 : */
450 74 : if (ExecParallelPrepHashTableForUnmatched(node))
451 66 : node->hj_JoinState = HJ_FILL_INNER_TUPLES;
452 : else
453 8 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
454 : }
455 : else
456 : {
457 4758 : ExecPrepHashTableForUnmatched(node);
458 4758 : node->hj_JoinState = HJ_FILL_INNER_TUPLES;
459 : }
460 : }
461 : else
462 15572 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
463 20404 : continue;
464 : }
465 :
466 16650872 : econtext->ecxt_outertuple = outerTupleSlot;
467 16650872 : node->hj_MatchedOuter = false;
468 :
469 : /*
470 : * Find the corresponding bucket for this tuple in the main
471 : * hash table or skew hash table.
472 : */
473 16650872 : node->hj_CurHashValue = hashvalue;
474 16650872 : ExecHashGetBucketAndBatch(hashtable, hashvalue,
475 : &node->hj_CurBucketNo, &batchno);
476 16650872 : node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
477 : hashvalue);
478 16650872 : node->hj_CurTuple = NULL;
479 :
480 : /*
481 : * The tuple might not belong to the current batch (where
482 : * "current batch" includes the skew buckets if any).
483 : */
484 16650872 : if (batchno != hashtable->curbatch &&
485 1471392 : node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
486 : {
487 : bool shouldFree;
488 1470192 : MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
489 : &shouldFree);
490 :
491 : /*
492 : * Need to postpone this outer tuple to a later batch.
493 : * Save it in the corresponding outer-batch file.
494 : */
495 : Assert(parallel_state == NULL);
496 : Assert(batchno > hashtable->curbatch);
497 1470192 : ExecHashJoinSaveTuple(mintuple, hashvalue,
498 1470192 : &hashtable->outerBatchFile[batchno],
499 : hashtable);
500 :
501 1470192 : if (shouldFree)
502 1470192 : heap_free_minimal_tuple(mintuple);
503 :
504 : /* Loop around, staying in HJ_NEED_NEW_OUTER state */
505 1470192 : continue;
506 : }
507 :
508 : /* OK, let's scan the bucket for matches */
509 15180680 : node->hj_JoinState = HJ_SCAN_BUCKET;
510 :
511 : /* FALL THRU */
512 :
513 21346208 : case HJ_SCAN_BUCKET:
514 :
515 : /*
516 : * Scan the selected hash bucket for matches to current outer
517 : */
518 21346208 : if (parallel)
519 : {
520 4200054 : if (!ExecParallelScanHashBucket(node, econtext))
521 : {
522 : /* out of matches; check for possible outer-join fill */
523 2160030 : node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
524 2160030 : continue;
525 : }
526 : }
527 : else
528 : {
529 17146154 : if (!ExecScanHashBucket(node, econtext))
530 : {
531 : /* out of matches; check for possible outer-join fill */
532 9338654 : node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
533 9338654 : continue;
534 : }
535 : }
536 :
537 : /*
538 : * We've got a match, but still need to test non-hashed quals.
539 : * ExecScanHashBucket already set up all the state needed to
540 : * call ExecQual.
541 : *
542 : * If we pass the qual, then save state for next call and have
543 : * ExecProject form the projection, store it in the tuple
544 : * table, and return the slot.
545 : *
546 : * Only the joinquals determine tuple match status, but all
547 : * quals must pass to actually return the tuple.
548 : */
549 9847524 : if (joinqual == NULL || ExecQual(joinqual, econtext))
550 : {
551 9697998 : node->hj_MatchedOuter = true;
552 :
553 :
554 : /*
555 : * This is really only needed if HJ_FILL_INNER(node), but
556 : * we'll avoid the branch and just set it always.
557 : */
558 9697998 : if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
559 6133090 : HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
560 :
561 : /* In an antijoin, we never return a matched tuple */
562 9697998 : if (node->js.jointype == JOIN_ANTI)
563 : {
564 1531164 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
565 1531164 : continue;
566 : }
567 :
568 : /*
569 : * In a right-antijoin, we never return a matched tuple.
570 : * And we need to stay on the current outer tuple to
571 : * continue scanning the inner side for matches.
572 : */
573 8166834 : if (node->js.jointype == JOIN_RIGHT_ANTI)
574 26742 : continue;
575 :
576 : /*
577 : * If we only need to join to the first matching inner
578 : * tuple, then consider returning this one, but after that
579 : * continue with next outer tuple.
580 : */
581 8140092 : if (node->js.single_match)
582 2150764 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
583 :
584 8140092 : if (otherqual == NULL || ExecQual(otherqual, econtext))
585 7960858 : return ExecProject(node->js.ps.ps_ProjInfo);
586 : else
587 179234 : InstrCountFiltered2(node, 1);
588 : }
589 : else
590 149526 : InstrCountFiltered1(node, 1);
591 328760 : break;
592 :
593 11498684 : case HJ_FILL_OUTER_TUPLE:
594 :
595 : /*
596 : * The current outer tuple has run out of matches, so check
597 : * whether to emit a dummy outer-join tuple. Whether we emit
598 : * one or not, the next state is NEED_NEW_OUTER.
599 : */
600 11498684 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
601 :
602 11498684 : if (!node->hj_MatchedOuter &&
603 6799152 : HJ_FILL_OUTER(node))
604 : {
605 : /*
606 : * Generate a fake join tuple with nulls for the inner
607 : * tuple, and return it if it passes the non-join quals.
608 : */
609 1435116 : econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
610 :
611 1435116 : if (otherqual == NULL || ExecQual(otherqual, econtext))
612 622686 : return ExecProject(node->js.ps.ps_ProjInfo);
613 : else
614 812430 : InstrCountFiltered2(node, 1);
615 : }
616 10875998 : break;
617 :
618 868754 : case HJ_FILL_INNER_TUPLES:
619 :
620 : /*
621 : * We have finished a batch, but we are doing
622 : * right/right-anti/full join, so any unmatched inner tuples
623 : * in the hashtable have to be emitted before we continue to
624 : * the next batch.
625 : */
626 1617436 : if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
627 748682 : : ExecScanHashTableForUnmatched(node, econtext)))
628 : {
629 : /* no more unmatched tuples */
630 4818 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
631 4818 : continue;
632 : }
633 :
634 : /*
635 : * Generate a fake join tuple with nulls for the outer tuple,
636 : * and return it if it passes the non-join quals.
637 : */
638 863936 : econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
639 :
640 863936 : if (otherqual == NULL || ExecQual(otherqual, econtext))
641 857012 : return ExecProject(node->js.ps.ps_ProjInfo);
642 : else
643 6924 : InstrCountFiltered2(node, 1);
644 6924 : break;
645 :
646 20786 : case HJ_NEED_NEW_BATCH:
647 :
648 : /*
649 : * Try to advance to next batch. Done if there are no more.
650 : */
651 20786 : if (parallel)
652 : {
653 1248 : if (!ExecParallelHashJoinNewBatch(node))
654 388 : return NULL; /* end of parallel-aware join */
655 : }
656 : else
657 : {
658 19538 : if (!ExecHashJoinNewBatch(node))
659 18358 : return NULL; /* end of parallel-oblivious join */
660 : }
661 2040 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
662 2040 : break;
663 :
664 0 : default:
665 0 : elog(ERROR, "unrecognized hashjoin state: %d",
666 : (int) node->hj_JoinState);
667 : }
668 : }
669 : }
670 :
671 : /* ----------------------------------------------------------------
672 : * ExecHashJoin
673 : *
674 : * Parallel-oblivious version.
675 : * ----------------------------------------------------------------
676 : */
677 : static TupleTableSlot * /* return: a tuple or NULL */
678 7183250 : ExecHashJoin(PlanState *pstate)
679 : {
680 : /*
681 : * On sufficiently smart compilers this should be inlined with the
682 : * parallel-aware branches removed.
683 : */
684 7183250 : return ExecHashJoinImpl(pstate, false);
685 : }
686 :
687 : /* ----------------------------------------------------------------
688 : * ExecParallelHashJoin
689 : *
690 : * Parallel-aware version.
691 : * ----------------------------------------------------------------
692 : */
693 : static TupleTableSlot * /* return: a tuple or NULL */
694 2280434 : ExecParallelHashJoin(PlanState *pstate)
695 : {
696 : /*
697 : * On sufficiently smart compilers this should be inlined with the
698 : * parallel-oblivious branches removed.
699 : */
700 2280434 : return ExecHashJoinImpl(pstate, true);
701 : }
702 :
703 : /* ----------------------------------------------------------------
704 : * ExecInitHashJoin
705 : *
706 : * Init routine for HashJoin node.
707 : * ----------------------------------------------------------------
708 : */
709 : HashJoinState *
710 27798 : ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
711 : {
712 : HashJoinState *hjstate;
713 : Plan *outerNode;
714 : Hash *hashNode;
715 : TupleDesc outerDesc,
716 : innerDesc;
717 : const TupleTableSlotOps *ops;
718 :
719 : /* check for unsupported flags */
720 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
721 :
722 : /*
723 : * create state structure
724 : */
725 27798 : hjstate = makeNode(HashJoinState);
726 27798 : hjstate->js.ps.plan = (Plan *) node;
727 27798 : hjstate->js.ps.state = estate;
728 :
729 : /*
730 : * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
731 : * where this function may be replaced with a parallel version, if we
732 : * managed to launch a parallel query.
733 : */
734 27798 : hjstate->js.ps.ExecProcNode = ExecHashJoin;
735 27798 : hjstate->js.jointype = node->join.jointype;
736 :
737 : /*
738 : * Miscellaneous initialization
739 : *
740 : * create expression context for node
741 : */
742 27798 : ExecAssignExprContext(estate, &hjstate->js.ps);
743 :
744 : /*
745 : * initialize child nodes
746 : *
747 : * Note: we could suppress the REWIND flag for the inner input, which
748 : * would amount to betting that the hash will be a single batch. Not
749 : * clear if this would be a win or not.
750 : */
751 27798 : outerNode = outerPlan(node);
752 27798 : hashNode = (Hash *) innerPlan(node);
753 :
754 27798 : outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
755 27798 : outerDesc = ExecGetResultType(outerPlanState(hjstate));
756 27798 : innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
757 27798 : innerDesc = ExecGetResultType(innerPlanState(hjstate));
758 :
759 : /*
760 : * Initialize result slot, type and projection.
761 : */
762 27798 : ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
763 27798 : ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
764 :
765 : /*
766 : * tuple table initialization
767 : */
768 27798 : ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
769 27798 : hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
770 : ops);
771 :
772 : /*
773 : * detect whether we need only consider the first matching inner tuple
774 : */
775 41710 : hjstate->js.single_match = (node->join.inner_unique ||
776 13912 : node->join.jointype == JOIN_SEMI);
777 :
778 : /* set up null tuples for outer joins, if needed */
779 27798 : switch (node->join.jointype)
780 : {
781 17514 : case JOIN_INNER:
782 : case JOIN_SEMI:
783 17514 : break;
784 4206 : case JOIN_LEFT:
785 : case JOIN_ANTI:
786 4206 : hjstate->hj_NullInnerTupleSlot =
787 4206 : ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
788 4206 : break;
789 5042 : case JOIN_RIGHT:
790 : case JOIN_RIGHT_ANTI:
791 5042 : hjstate->hj_NullOuterTupleSlot =
792 5042 : ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
793 5042 : break;
794 1036 : case JOIN_FULL:
795 1036 : hjstate->hj_NullOuterTupleSlot =
796 1036 : ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
797 1036 : hjstate->hj_NullInnerTupleSlot =
798 1036 : ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
799 1036 : break;
800 0 : default:
801 0 : elog(ERROR, "unrecognized join type: %d",
802 : (int) node->join.jointype);
803 : }
804 :
805 : /*
806 : * now for some voodoo. our temporary tuple slot is actually the result
807 : * tuple slot of the Hash node (which is our inner plan). we can do this
808 : * because Hash nodes don't return tuples via ExecProcNode() -- instead
809 : * the hash join node uses ExecScanHashBucket() to get at the contents of
810 : * the hash table. -cim 6/9/91
811 : */
812 : {
813 27798 : HashState *hashstate = (HashState *) innerPlanState(hjstate);
814 27798 : TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
815 :
816 27798 : hjstate->hj_HashTupleSlot = slot;
817 : }
818 :
819 : /*
820 : * initialize child expressions
821 : */
822 27798 : hjstate->js.ps.qual =
823 27798 : ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
824 27798 : hjstate->js.joinqual =
825 27798 : ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
826 27798 : hjstate->hashclauses =
827 27798 : ExecInitQual(node->hashclauses, (PlanState *) hjstate);
828 :
829 : /*
830 : * initialize hash-specific info
831 : */
832 27798 : hjstate->hj_HashTable = NULL;
833 27798 : hjstate->hj_FirstOuterTupleSlot = NULL;
834 :
835 27798 : hjstate->hj_CurHashValue = 0;
836 27798 : hjstate->hj_CurBucketNo = 0;
837 27798 : hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
838 27798 : hjstate->hj_CurTuple = NULL;
839 :
840 27798 : hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys,
841 : (PlanState *) hjstate);
842 27798 : hjstate->hj_HashOperators = node->hashoperators;
843 27798 : hjstate->hj_Collations = node->hashcollations;
844 :
845 27798 : hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
846 27798 : hjstate->hj_MatchedOuter = false;
847 27798 : hjstate->hj_OuterNotEmpty = false;
848 :
849 27798 : return hjstate;
850 : }
851 :
852 : /* ----------------------------------------------------------------
853 : * ExecEndHashJoin
854 : *
855 : * clean up routine for HashJoin node
856 : * ----------------------------------------------------------------
857 : */
858 : void
859 27714 : ExecEndHashJoin(HashJoinState *node)
860 : {
861 : /*
862 : * Free hash table
863 : */
864 27714 : if (node->hj_HashTable)
865 : {
866 18482 : ExecHashTableDestroy(node->hj_HashTable);
867 18482 : node->hj_HashTable = NULL;
868 : }
869 :
870 : /*
871 : * Free the exprcontext
872 : */
873 27714 : ExecFreeExprContext(&node->js.ps);
874 :
875 : /*
876 : * clean out the tuple table
877 : */
878 27714 : ExecClearTuple(node->js.ps.ps_ResultTupleSlot);
879 27714 : ExecClearTuple(node->hj_OuterTupleSlot);
880 27714 : ExecClearTuple(node->hj_HashTupleSlot);
881 :
882 : /*
883 : * clean up subtrees
884 : */
885 27714 : ExecEndNode(outerPlanState(node));
886 27714 : ExecEndNode(innerPlanState(node));
887 27714 : }
888 :
889 : /*
890 : * ExecHashJoinOuterGetTuple
891 : *
892 : * get the next outer tuple for a parallel oblivious hashjoin: either by
893 : * executing the outer plan node in the first pass, or from the temp
894 : * files for the hashjoin batches.
895 : *
896 : * Returns a null slot if no more outer tuples (within the current batch).
897 : *
898 : * On success, the tuple's hash value is stored at *hashvalue --- this is
899 : * either originally computed, or re-read from the temp file.
900 : */
901 : static TupleTableSlot *
902 14510386 : ExecHashJoinOuterGetTuple(PlanState *outerNode,
903 : HashJoinState *hjstate,
904 : uint32 *hashvalue)
905 : {
906 14510386 : HashJoinTable hashtable = hjstate->hj_HashTable;
907 14510386 : int curbatch = hashtable->curbatch;
908 : TupleTableSlot *slot;
909 :
910 14510386 : if (curbatch == 0) /* if it is the first pass */
911 : {
912 : /*
913 : * Check to see if first outer tuple was already fetched by
914 : * ExecHashJoin() and not used yet.
915 : */
916 13039014 : slot = hjstate->hj_FirstOuterTupleSlot;
917 13039014 : if (!TupIsNull(slot))
918 11996 : hjstate->hj_FirstOuterTupleSlot = NULL;
919 : else
920 13027018 : slot = ExecProcNode(outerNode);
921 :
922 13039828 : while (!TupIsNull(slot))
923 : {
924 : /*
925 : * We have to compute the tuple's hash value.
926 : */
927 13021464 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
928 :
929 13021464 : econtext->ecxt_outertuple = slot;
930 13021464 : if (ExecHashGetHashValue(hashtable, econtext,
931 : hjstate->hj_OuterHashKeys,
932 : true, /* outer tuple */
933 13021464 : HJ_FILL_OUTER(hjstate),
934 : hashvalue))
935 : {
936 : /* remember outer relation is not empty for possible rescan */
937 13020650 : hjstate->hj_OuterNotEmpty = true;
938 :
939 13020650 : return slot;
940 : }
941 :
942 : /*
943 : * That tuple couldn't match because of a NULL, so discard it and
944 : * continue with the next one.
945 : */
946 814 : slot = ExecProcNode(outerNode);
947 : }
948 : }
949 1471372 : else if (curbatch < hashtable->nbatch)
950 : {
951 1471372 : BufFile *file = hashtable->outerBatchFile[curbatch];
952 :
953 : /*
954 : * In outer-join cases, we could get here even though the batch file
955 : * is empty.
956 : */
957 1471372 : if (file == NULL)
958 0 : return NULL;
959 :
960 1471372 : slot = ExecHashJoinGetSavedTuple(hjstate,
961 : file,
962 : hashvalue,
963 : hjstate->hj_OuterTupleSlot);
964 1471372 : if (!TupIsNull(slot))
965 1470192 : return slot;
966 : }
967 :
968 : /* End of this batch */
969 19544 : return NULL;
970 : }
971 :
972 : /*
973 : * ExecHashJoinOuterGetTuple variant for the parallel case.
974 : */
975 : static TupleTableSlot *
976 2160890 : ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
977 : HashJoinState *hjstate,
978 : uint32 *hashvalue)
979 : {
980 2160890 : HashJoinTable hashtable = hjstate->hj_HashTable;
981 2160890 : int curbatch = hashtable->curbatch;
982 : TupleTableSlot *slot;
983 :
984 : /*
985 : * In the Parallel Hash case we only run the outer plan directly for
986 : * single-batch hash joins. Otherwise we have to go to batch files, even
987 : * for batch 0.
988 : */
989 2160890 : if (curbatch == 0 && hashtable->nbatch == 1)
990 : {
991 960122 : slot = ExecProcNode(outerNode);
992 :
993 960122 : while (!TupIsNull(slot))
994 : {
995 960006 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
996 :
997 960006 : econtext->ecxt_outertuple = slot;
998 960006 : if (ExecHashGetHashValue(hashtable, econtext,
999 : hjstate->hj_OuterHashKeys,
1000 : true, /* outer tuple */
1001 960006 : HJ_FILL_OUTER(hjstate),
1002 : hashvalue))
1003 960006 : return slot;
1004 :
1005 : /*
1006 : * That tuple couldn't match because of a NULL, so discard it and
1007 : * continue with the next one.
1008 : */
1009 0 : slot = ExecProcNode(outerNode);
1010 : }
1011 : }
1012 1200768 : else if (curbatch < hashtable->nbatch)
1013 : {
1014 : MinimalTuple tuple;
1015 :
1016 1200768 : tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
1017 : hashvalue);
1018 1200768 : if (tuple != NULL)
1019 : {
1020 1200024 : ExecForceStoreMinimalTuple(tuple,
1021 : hjstate->hj_OuterTupleSlot,
1022 : false);
1023 1200024 : slot = hjstate->hj_OuterTupleSlot;
1024 1200024 : return slot;
1025 : }
1026 : else
1027 744 : ExecClearTuple(hjstate->hj_OuterTupleSlot);
1028 : }
1029 :
1030 : /* End of this batch */
1031 860 : hashtable->batches[curbatch].outer_eof = true;
1032 :
1033 860 : return NULL;
1034 : }
1035 :
1036 : /*
1037 : * ExecHashJoinNewBatch
1038 : * switch to a new hashjoin batch
1039 : *
1040 : * Returns true if successful, false if there are no more batches.
1041 : */
1042 : static bool
1043 19538 : ExecHashJoinNewBatch(HashJoinState *hjstate)
1044 : {
1045 19538 : HashJoinTable hashtable = hjstate->hj_HashTable;
1046 : int nbatch;
1047 : int curbatch;
1048 : BufFile *innerFile;
1049 : TupleTableSlot *slot;
1050 : uint32 hashvalue;
1051 :
1052 19538 : nbatch = hashtable->nbatch;
1053 19538 : curbatch = hashtable->curbatch;
1054 :
1055 19538 : if (curbatch > 0)
1056 : {
1057 : /*
1058 : * We no longer need the previous outer batch file; close it right
1059 : * away to free disk space.
1060 : */
1061 1180 : if (hashtable->outerBatchFile[curbatch])
1062 1180 : BufFileClose(hashtable->outerBatchFile[curbatch]);
1063 1180 : hashtable->outerBatchFile[curbatch] = NULL;
1064 : }
1065 : else /* we just finished the first batch */
1066 : {
1067 : /*
1068 : * Reset some of the skew optimization state variables, since we no
1069 : * longer need to consider skew tuples after the first batch. The
1070 : * memory context reset we are about to do will release the skew
1071 : * hashtable itself.
1072 : */
1073 18358 : hashtable->skewEnabled = false;
1074 18358 : hashtable->skewBucket = NULL;
1075 18358 : hashtable->skewBucketNums = NULL;
1076 18358 : hashtable->nSkewBuckets = 0;
1077 18358 : hashtable->spaceUsedSkew = 0;
1078 : }
1079 :
1080 : /*
1081 : * We can always skip over any batches that are completely empty on both
1082 : * sides. We can sometimes skip over batches that are empty on only one
1083 : * side, but there are exceptions:
1084 : *
1085 : * 1. In a left/full outer join, we have to process outer batches even if
1086 : * the inner batch is empty. Similarly, in a right/right-anti/full outer
1087 : * join, we have to process inner batches even if the outer batch is
1088 : * empty.
1089 : *
1090 : * 2. If we have increased nbatch since the initial estimate, we have to
1091 : * scan inner batches since they might contain tuples that need to be
1092 : * reassigned to later inner batches.
1093 : *
1094 : * 3. Similarly, if we have increased nbatch since starting the outer
1095 : * scan, we have to rescan outer batches in case they contain tuples that
1096 : * need to be reassigned.
1097 : */
1098 19538 : curbatch++;
1099 19538 : while (curbatch < nbatch &&
1100 1180 : (hashtable->outerBatchFile[curbatch] == NULL ||
1101 1180 : hashtable->innerBatchFile[curbatch] == NULL))
1102 : {
1103 0 : if (hashtable->outerBatchFile[curbatch] &&
1104 0 : HJ_FILL_OUTER(hjstate))
1105 0 : break; /* must process due to rule 1 */
1106 0 : if (hashtable->innerBatchFile[curbatch] &&
1107 0 : HJ_FILL_INNER(hjstate))
1108 0 : break; /* must process due to rule 1 */
1109 0 : if (hashtable->innerBatchFile[curbatch] &&
1110 0 : nbatch != hashtable->nbatch_original)
1111 0 : break; /* must process due to rule 2 */
1112 0 : if (hashtable->outerBatchFile[curbatch] &&
1113 0 : nbatch != hashtable->nbatch_outstart)
1114 0 : break; /* must process due to rule 3 */
1115 : /* We can ignore this batch. */
1116 : /* Release associated temp files right away. */
1117 0 : if (hashtable->innerBatchFile[curbatch])
1118 0 : BufFileClose(hashtable->innerBatchFile[curbatch]);
1119 0 : hashtable->innerBatchFile[curbatch] = NULL;
1120 0 : if (hashtable->outerBatchFile[curbatch])
1121 0 : BufFileClose(hashtable->outerBatchFile[curbatch]);
1122 0 : hashtable->outerBatchFile[curbatch] = NULL;
1123 0 : curbatch++;
1124 : }
1125 :
1126 19538 : if (curbatch >= nbatch)
1127 18358 : return false; /* no more batches */
1128 :
1129 1180 : hashtable->curbatch = curbatch;
1130 :
1131 : /*
1132 : * Reload the hash table with the new inner batch (which could be empty)
1133 : */
1134 1180 : ExecHashTableReset(hashtable);
1135 :
1136 1180 : innerFile = hashtable->innerBatchFile[curbatch];
1137 :
1138 1180 : if (innerFile != NULL)
1139 : {
1140 1180 : if (BufFileSeek(innerFile, 0, 0, SEEK_SET))
1141 0 : ereport(ERROR,
1142 : (errcode_for_file_access(),
1143 : errmsg("could not rewind hash-join temporary file")));
1144 :
1145 2351346 : while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1146 : innerFile,
1147 : &hashvalue,
1148 : hjstate->hj_HashTupleSlot)))
1149 : {
1150 : /*
1151 : * NOTE: some tuples may be sent to future batches. Also, it is
1152 : * possible for hashtable->nbatch to be increased here!
1153 : */
1154 2350166 : ExecHashTableInsert(hashtable, slot, hashvalue);
1155 : }
1156 :
1157 : /*
1158 : * after we build the hash table, the inner batch file is no longer
1159 : * needed
1160 : */
1161 1180 : BufFileClose(innerFile);
1162 1180 : hashtable->innerBatchFile[curbatch] = NULL;
1163 : }
1164 :
1165 : /*
1166 : * Rewind outer batch file (if present), so that we can start reading it.
1167 : */
1168 1180 : if (hashtable->outerBatchFile[curbatch] != NULL)
1169 : {
1170 1180 : if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET))
1171 0 : ereport(ERROR,
1172 : (errcode_for_file_access(),
1173 : errmsg("could not rewind hash-join temporary file")));
1174 : }
1175 :
1176 1180 : return true;
1177 : }
1178 :
1179 : /*
1180 : * Choose a batch to work on, and attach to it. Returns true if successful,
1181 : * false if there are no more batches.
1182 : */
1183 : static bool
1184 1248 : ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
1185 : {
1186 1248 : HashJoinTable hashtable = hjstate->hj_HashTable;
1187 : int start_batchno;
1188 : int batchno;
1189 :
1190 : /*
1191 : * If we were already attached to a batch, remember not to bother checking
1192 : * it again, and detach from it (possibly freeing the hash table if we are
1193 : * last to detach).
1194 : */
1195 1248 : if (hashtable->curbatch >= 0)
1196 : {
1197 852 : hashtable->batches[hashtable->curbatch].done = true;
1198 852 : ExecHashTableDetachBatch(hashtable);
1199 : }
1200 :
1201 : /*
1202 : * Search for a batch that isn't done. We use an atomic counter to start
1203 : * our search at a different batch in every participant when there are
1204 : * more batches than participants.
1205 : */
1206 1248 : batchno = start_batchno =
1207 1248 : pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
1208 1248 : hashtable->nbatch;
1209 : do
1210 : {
1211 : uint32 hashvalue;
1212 : MinimalTuple tuple;
1213 : TupleTableSlot *slot;
1214 :
1215 3060 : if (!hashtable->batches[batchno].done)
1216 : {
1217 : SharedTuplestoreAccessor *inner_tuples;
1218 1762 : Barrier *batch_barrier =
1219 1762 : &hashtable->batches[batchno].shared->batch_barrier;
1220 :
1221 1762 : switch (BarrierAttach(batch_barrier))
1222 : {
1223 582 : case PHJ_BATCH_ELECT:
1224 :
1225 : /* One backend allocates the hash table. */
1226 582 : if (BarrierArriveAndWait(batch_barrier,
1227 : WAIT_EVENT_HASH_BATCH_ELECT))
1228 582 : ExecParallelHashTableAlloc(hashtable, batchno);
1229 : /* Fall through. */
1230 :
1231 : case PHJ_BATCH_ALLOCATE:
1232 : /* Wait for allocation to complete. */
1233 584 : BarrierArriveAndWait(batch_barrier,
1234 : WAIT_EVENT_HASH_BATCH_ALLOCATE);
1235 : /* Fall through. */
1236 :
1237 596 : case PHJ_BATCH_LOAD:
1238 : /* Start (or join in) loading tuples. */
1239 596 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1240 596 : inner_tuples = hashtable->batches[batchno].inner_tuples;
1241 596 : sts_begin_parallel_scan(inner_tuples);
1242 1083524 : while ((tuple = sts_parallel_scan_next(inner_tuples,
1243 : &hashvalue)))
1244 : {
1245 1082928 : ExecForceStoreMinimalTuple(tuple,
1246 : hjstate->hj_HashTupleSlot,
1247 : false);
1248 1082928 : slot = hjstate->hj_HashTupleSlot;
1249 1082928 : ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
1250 : hashvalue);
1251 : }
1252 596 : sts_end_parallel_scan(inner_tuples);
1253 596 : BarrierArriveAndWait(batch_barrier,
1254 : WAIT_EVENT_HASH_BATCH_LOAD);
1255 : /* Fall through. */
1256 :
1257 860 : case PHJ_BATCH_PROBE:
1258 :
1259 : /*
1260 : * This batch is ready to probe. Return control to
1261 : * caller. We stay attached to batch_barrier so that the
1262 : * hash table stays alive until everyone's finished
1263 : * probing it, but no participant is allowed to wait at
1264 : * this barrier again (or else a deadlock could occur).
1265 : * All attached participants must eventually detach from
1266 : * the barrier and one worker must advance the phase so
1267 : * that the final phase is reached.
1268 : */
1269 860 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1270 860 : sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1271 :
1272 860 : return true;
1273 0 : case PHJ_BATCH_SCAN:
1274 :
1275 : /*
1276 : * In principle, we could help scan for unmatched tuples,
1277 : * since that phase is already underway (the thing we
1278 : * can't do under current deadlock-avoidance rules is wait
1279 : * for others to arrive at PHJ_BATCH_SCAN, because
1280 : * PHJ_BATCH_PROBE emits tuples, but in this case we just
1281 : * got here without waiting). That is not yet done. For
1282 : * now, we just detach and go around again. We have to
1283 : * use ExecHashTableDetachBatch() because there's a small
1284 : * chance we'll be the last to detach, and then we're
1285 : * responsible for freeing memory.
1286 : */
1287 0 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1288 0 : hashtable->batches[batchno].done = true;
1289 0 : ExecHashTableDetachBatch(hashtable);
1290 0 : break;
1291 :
1292 902 : case PHJ_BATCH_FREE:
1293 :
1294 : /*
1295 : * Already done. Detach and go around again (if any
1296 : * remain).
1297 : */
1298 902 : BarrierDetach(batch_barrier);
1299 902 : hashtable->batches[batchno].done = true;
1300 902 : hashtable->curbatch = -1;
1301 902 : break;
1302 :
1303 0 : default:
1304 0 : elog(ERROR, "unexpected batch phase %d",
1305 : BarrierPhase(batch_barrier));
1306 : }
1307 : }
1308 2200 : batchno = (batchno + 1) % hashtable->nbatch;
1309 2200 : } while (batchno != start_batchno);
1310 :
1311 388 : return false;
1312 : }
1313 :
1314 : /*
1315 : * ExecHashJoinSaveTuple
1316 : * save a tuple to a batch file.
1317 : *
1318 : * The data recorded in the file for each tuple is its hash value,
1319 : * then the tuple in MinimalTuple format.
1320 : *
1321 : * fileptr points to a batch file in one of the the hashtable arrays.
1322 : *
1323 : * The batch files (and their buffers) are allocated in the spill context
1324 : * created for the hashtable.
1325 : */
1326 : void
1327 3820358 : ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
1328 : BufFile **fileptr, HashJoinTable hashtable)
1329 : {
1330 3820358 : BufFile *file = *fileptr;
1331 :
1332 : /*
1333 : * The batch file is lazily created. If this is the first tuple written to
1334 : * this batch, the batch file is created and its buffer is allocated in
1335 : * the spillCxt context, NOT in the batchCxt.
1336 : *
1337 : * During the build phase, buffered files are created for inner batches.
1338 : * Each batch's buffered file is closed (and its buffer freed) after the
1339 : * batch is loaded into memory during the outer side scan. Therefore, it
1340 : * is necessary to allocate the batch file buffer in a memory context
1341 : * which outlives the batch itself.
1342 : *
1343 : * Also, we use spillCxt instead of hashCxt for a better accounting of the
1344 : * spilling memory consumption.
1345 : */
1346 3820358 : if (file == NULL)
1347 : {
1348 2360 : MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
1349 :
1350 2360 : file = BufFileCreateTemp(false);
1351 2360 : *fileptr = file;
1352 :
1353 2360 : MemoryContextSwitchTo(oldctx);
1354 : }
1355 :
1356 3820358 : BufFileWrite(file, &hashvalue, sizeof(uint32));
1357 3820358 : BufFileWrite(file, tuple, tuple->t_len);
1358 3820358 : }
1359 :
1360 : /*
1361 : * ExecHashJoinGetSavedTuple
1362 : * read the next tuple from a batch file. Return NULL if no more.
1363 : *
1364 : * On success, *hashvalue is set to the tuple's hash value, and the tuple
1365 : * itself is stored in the given slot.
1366 : */
1367 : static TupleTableSlot *
1368 3822718 : ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
1369 : BufFile *file,
1370 : uint32 *hashvalue,
1371 : TupleTableSlot *tupleSlot)
1372 : {
1373 : uint32 header[2];
1374 : size_t nread;
1375 : MinimalTuple tuple;
1376 :
1377 : /*
1378 : * We check for interrupts here because this is typically taken as an
1379 : * alternative code path to an ExecProcNode() call, which would include
1380 : * such a check.
1381 : */
1382 3822718 : CHECK_FOR_INTERRUPTS();
1383 :
1384 : /*
1385 : * Since both the hash value and the MinimalTuple length word are uint32,
1386 : * we can read them both in one BufFileRead() call without any type
1387 : * cheating.
1388 : */
1389 3822718 : nread = BufFileReadMaybeEOF(file, header, sizeof(header), true);
1390 3822718 : if (nread == 0) /* end of file */
1391 : {
1392 2360 : ExecClearTuple(tupleSlot);
1393 2360 : return NULL;
1394 : }
1395 3820358 : *hashvalue = header[0];
1396 3820358 : tuple = (MinimalTuple) palloc(header[1]);
1397 3820358 : tuple->t_len = header[1];
1398 3820358 : BufFileReadExact(file,
1399 : (char *) tuple + sizeof(uint32),
1400 3820358 : header[1] - sizeof(uint32));
1401 3820358 : ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
1402 3820358 : return tupleSlot;
1403 : }
1404 :
1405 :
1406 : void
1407 2236 : ExecReScanHashJoin(HashJoinState *node)
1408 : {
1409 2236 : PlanState *outerPlan = outerPlanState(node);
1410 2236 : PlanState *innerPlan = innerPlanState(node);
1411 :
1412 : /*
1413 : * In a multi-batch join, we currently have to do rescans the hard way,
1414 : * primarily because batch temp files may have already been released. But
1415 : * if it's a single-batch join, and there is no parameter change for the
1416 : * inner subnode, then we can just re-use the existing hash table without
1417 : * rebuilding it.
1418 : */
1419 2236 : if (node->hj_HashTable != NULL)
1420 : {
1421 1312 : if (node->hj_HashTable->nbatch == 1 &&
1422 1312 : innerPlan->chgParam == NULL)
1423 : {
1424 : /*
1425 : * Okay to reuse the hash table; needn't rescan inner, either.
1426 : *
1427 : * However, if it's a right/right-anti/full join, we'd better
1428 : * reset the inner-tuple match flags contained in the table.
1429 : */
1430 502 : if (HJ_FILL_INNER(node))
1431 6 : ExecHashTableResetMatchFlags(node->hj_HashTable);
1432 :
1433 : /*
1434 : * Also, we need to reset our state about the emptiness of the
1435 : * outer relation, so that the new scan of the outer will update
1436 : * it correctly if it turns out to be empty this time. (There's no
1437 : * harm in clearing it now because ExecHashJoin won't need the
1438 : * info. In the other cases, where the hash table doesn't exist
1439 : * or we are destroying it, we leave this state alone because
1440 : * ExecHashJoin will need it the first time through.)
1441 : */
1442 502 : node->hj_OuterNotEmpty = false;
1443 :
1444 : /* ExecHashJoin can skip the BUILD_HASHTABLE step */
1445 502 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
1446 : }
1447 : else
1448 : {
1449 : /* must destroy and rebuild hash table */
1450 810 : HashState *hashNode = castNode(HashState, innerPlan);
1451 :
1452 : Assert(hashNode->hashtable == node->hj_HashTable);
1453 : /* accumulate stats from old hash table, if wanted */
1454 : /* (this should match ExecShutdownHash) */
1455 810 : if (hashNode->ps.instrument && !hashNode->hinstrument)
1456 0 : hashNode->hinstrument = (HashInstrumentation *)
1457 0 : palloc0(sizeof(HashInstrumentation));
1458 810 : if (hashNode->hinstrument)
1459 0 : ExecHashAccumInstrumentation(hashNode->hinstrument,
1460 : hashNode->hashtable);
1461 : /* for safety, be sure to clear child plan node's pointer too */
1462 810 : hashNode->hashtable = NULL;
1463 :
1464 810 : ExecHashTableDestroy(node->hj_HashTable);
1465 810 : node->hj_HashTable = NULL;
1466 810 : node->hj_JoinState = HJ_BUILD_HASHTABLE;
1467 :
1468 : /*
1469 : * if chgParam of subnode is not null then plan will be re-scanned
1470 : * by first ExecProcNode.
1471 : */
1472 810 : if (innerPlan->chgParam == NULL)
1473 0 : ExecReScan(innerPlan);
1474 : }
1475 : }
1476 :
1477 : /* Always reset intra-tuple state */
1478 2236 : node->hj_CurHashValue = 0;
1479 2236 : node->hj_CurBucketNo = 0;
1480 2236 : node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
1481 2236 : node->hj_CurTuple = NULL;
1482 :
1483 2236 : node->hj_MatchedOuter = false;
1484 2236 : node->hj_FirstOuterTupleSlot = NULL;
1485 :
1486 : /*
1487 : * if chgParam of subnode is not null then plan will be re-scanned by
1488 : * first ExecProcNode.
1489 : */
1490 2236 : if (outerPlan->chgParam == NULL)
1491 1642 : ExecReScan(outerPlan);
1492 2236 : }
1493 :
1494 : void
1495 24818 : ExecShutdownHashJoin(HashJoinState *node)
1496 : {
1497 24818 : if (node->hj_HashTable)
1498 : {
1499 : /*
1500 : * Detach from shared state before DSM memory goes away. This makes
1501 : * sure that we don't have any pointers into DSM memory by the time
1502 : * ExecEndHashJoin runs.
1503 : */
1504 18464 : ExecHashTableDetachBatch(node->hj_HashTable);
1505 18464 : ExecHashTableDetach(node->hj_HashTable);
1506 : }
1507 24818 : }
1508 :
1509 : static void
1510 138 : ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
1511 : {
1512 138 : PlanState *outerState = outerPlanState(hjstate);
1513 138 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1514 138 : HashJoinTable hashtable = hjstate->hj_HashTable;
1515 : TupleTableSlot *slot;
1516 : uint32 hashvalue;
1517 : int i;
1518 :
1519 : Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1520 :
1521 : /* Execute outer plan, writing all tuples to shared tuplestores. */
1522 : for (;;)
1523 : {
1524 1200162 : slot = ExecProcNode(outerState);
1525 1200162 : if (TupIsNull(slot))
1526 : break;
1527 1200024 : econtext->ecxt_outertuple = slot;
1528 1200024 : if (ExecHashGetHashValue(hashtable, econtext,
1529 : hjstate->hj_OuterHashKeys,
1530 : true, /* outer tuple */
1531 1200024 : HJ_FILL_OUTER(hjstate),
1532 : &hashvalue))
1533 : {
1534 : int batchno;
1535 : int bucketno;
1536 : bool shouldFree;
1537 1200024 : MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1538 :
1539 1200024 : ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1540 : &batchno);
1541 1200024 : sts_puttuple(hashtable->batches[batchno].outer_tuples,
1542 : &hashvalue, mintup);
1543 :
1544 1200024 : if (shouldFree)
1545 1200024 : heap_free_minimal_tuple(mintup);
1546 : }
1547 1200024 : CHECK_FOR_INTERRUPTS();
1548 : }
1549 :
1550 : /* Make sure all outer partitions are readable by any backend. */
1551 1194 : for (i = 0; i < hashtable->nbatch; ++i)
1552 1056 : sts_end_write(hashtable->batches[i].outer_tuples);
1553 138 : }
1554 :
1555 : void
1556 120 : ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
1557 : {
1558 120 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
1559 120 : shm_toc_estimate_keys(&pcxt->estimator, 1);
1560 120 : }
1561 :
1562 : void
1563 120 : ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1564 : {
1565 120 : int plan_node_id = state->js.ps.plan->plan_node_id;
1566 : HashState *hashNode;
1567 : ParallelHashJoinState *pstate;
1568 :
1569 : /*
1570 : * Disable shared hash table mode if we failed to create a real DSM
1571 : * segment, because that means that we don't have a DSA area to work with.
1572 : */
1573 120 : if (pcxt->seg == NULL)
1574 0 : return;
1575 :
1576 120 : ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1577 :
1578 : /*
1579 : * Set up the state needed to coordinate access to the shared hash
1580 : * table(s), using the plan node ID as the toc key.
1581 : */
1582 120 : pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1583 120 : shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1584 :
1585 : /*
1586 : * Set up the shared hash join state with no batches initially.
1587 : * ExecHashTableCreate() will prepare at least one later and set nbatch
1588 : * and space_allowed.
1589 : */
1590 120 : pstate->nbatch = 0;
1591 120 : pstate->space_allowed = 0;
1592 120 : pstate->batches = InvalidDsaPointer;
1593 120 : pstate->old_batches = InvalidDsaPointer;
1594 120 : pstate->nbuckets = 0;
1595 120 : pstate->growth = PHJ_GROWTH_OK;
1596 120 : pstate->chunk_work_queue = InvalidDsaPointer;
1597 120 : pg_atomic_init_u32(&pstate->distributor, 0);
1598 120 : pstate->nparticipants = pcxt->nworkers + 1;
1599 120 : pstate->total_tuples = 0;
1600 120 : LWLockInitialize(&pstate->lock,
1601 : LWTRANCHE_PARALLEL_HASH_JOIN);
1602 120 : BarrierInit(&pstate->build_barrier, 0);
1603 120 : BarrierInit(&pstate->grow_batches_barrier, 0);
1604 120 : BarrierInit(&pstate->grow_buckets_barrier, 0);
1605 :
1606 : /* Set up the space we'll use for shared temporary files. */
1607 120 : SharedFileSetInit(&pstate->fileset, pcxt->seg);
1608 :
1609 : /* Initialize the shared state in the hash node. */
1610 120 : hashNode = (HashState *) innerPlanState(state);
1611 120 : hashNode->parallel_state = pstate;
1612 : }
1613 :
1614 : /* ----------------------------------------------------------------
1615 : * ExecHashJoinReInitializeDSM
1616 : *
1617 : * Reset shared state before beginning a fresh scan.
1618 : * ----------------------------------------------------------------
1619 : */
1620 : void
1621 48 : ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1622 : {
1623 48 : int plan_node_id = state->js.ps.plan->plan_node_id;
1624 : ParallelHashJoinState *pstate =
1625 48 : shm_toc_lookup(pcxt->toc, plan_node_id, false);
1626 :
1627 : /*
1628 : * It would be possible to reuse the shared hash table in single-batch
1629 : * cases by resetting and then fast-forwarding build_barrier to
1630 : * PHJ_BUILD_FREE and batch 0's batch_barrier to PHJ_BATCH_PROBE, but
1631 : * currently shared hash tables are already freed by now (by the last
1632 : * participant to detach from the batch). We could consider keeping it
1633 : * around for single-batch joins. We'd also need to adjust
1634 : * finalize_plan() so that it doesn't record a dummy dependency for
1635 : * Parallel Hash nodes, preventing the rescan optimization. For now we
1636 : * don't try.
1637 : */
1638 :
1639 : /* Detach, freeing any remaining shared memory. */
1640 48 : if (state->hj_HashTable != NULL)
1641 : {
1642 0 : ExecHashTableDetachBatch(state->hj_HashTable);
1643 0 : ExecHashTableDetach(state->hj_HashTable);
1644 : }
1645 :
1646 : /* Clear any shared batch files. */
1647 48 : SharedFileSetDeleteAll(&pstate->fileset);
1648 :
1649 : /* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */
1650 48 : BarrierInit(&pstate->build_barrier, 0);
1651 48 : }
1652 :
1653 : void
1654 308 : ExecHashJoinInitializeWorker(HashJoinState *state,
1655 : ParallelWorkerContext *pwcxt)
1656 : {
1657 : HashState *hashNode;
1658 308 : int plan_node_id = state->js.ps.plan->plan_node_id;
1659 : ParallelHashJoinState *pstate =
1660 308 : shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1661 :
1662 : /* Attach to the space for shared temporary files. */
1663 308 : SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1664 :
1665 : /* Attach to the shared state in the hash node. */
1666 308 : hashNode = (HashState *) innerPlanState(state);
1667 308 : hashNode->parallel_state = pstate;
1668 :
1669 308 : ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1670 308 : }
|