Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeHashjoin.c
4 : * Routines to handle hash join nodes
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeHashjoin.c
12 : *
13 : * HASH JOIN
14 : *
15 : * This is based on the "hybrid hash join" algorithm described shortly in the
16 : * following page
17 : *
18 : * https://en.wikipedia.org/wiki/Hash_join#Hybrid_hash_join
19 : *
20 : * and in detail in the referenced paper:
21 : *
22 : * "An Adaptive Hash Join Algorithm for Multiuser Environments"
23 : * Hansjörg Zeller; Jim Gray (1990). Proceedings of the 16th VLDB conference.
24 : * Brisbane: 186–197.
25 : *
26 : * If the inner side tuples of a hash join do not fit in memory, the hash join
27 : * can be executed in multiple batches.
28 : *
29 : * If the statistics on the inner side relation are accurate, planner chooses a
30 : * multi-batch strategy and estimates the number of batches.
31 : *
32 : * The query executor measures the real size of the hashtable and increases the
33 : * number of batches if the hashtable grows too large.
34 : *
35 : * The number of batches is always a power of two, so an increase in the number
36 : * of batches doubles it.
37 : *
38 : * Serial hash join measures batch size lazily -- waiting until it is loading a
39 : * batch to determine if it will fit in memory. While inserting tuples into the
40 : * hashtable, serial hash join will, if that tuple were to exceed work_mem,
41 : * dump out the hashtable and reassign them either to other batch files or the
42 : * current batch resident in the hashtable.
43 : *
44 : * Parallel hash join, on the other hand, completes all changes to the number
45 : * of batches during the build phase. If it increases the number of batches, it
46 : * dumps out all the tuples from all batches and reassigns them to entirely new
47 : * batch files. Then it checks every batch to ensure it will fit in the space
48 : * budget for the query.
49 : *
50 : * In both parallel and serial hash join, the executor currently makes a best
51 : * effort. If a particular batch will not fit in memory, it tries doubling the
52 : * number of batches. If after a batch increase, there is a batch which
53 : * retained all or none of its tuples, the executor disables growth in the
54 : * number of batches globally. After growth is disabled, all batches that would
55 : * have previously triggered an increase in the number of batches instead
56 : * exceed the space allowed.
57 : *
58 : * PARALLELISM
59 : *
60 : * Hash joins can participate in parallel query execution in several ways. A
61 : * parallel-oblivious hash join is one where the node is unaware that it is
62 : * part of a parallel plan. In this case, a copy of the inner plan is used to
63 : * build a copy of the hash table in every backend, and the outer plan could
64 : * either be built from a partial or complete path, so that the results of the
65 : * hash join are correspondingly either partial or complete. A parallel-aware
66 : * hash join is one that behaves differently, coordinating work between
67 : * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
68 : * Hash Join always appears with a Parallel Hash node.
69 : *
70 : * Parallel-aware hash joins use the same per-backend state machine to track
71 : * progress through the hash join algorithm as parallel-oblivious hash joins.
72 : * In a parallel-aware hash join, there is also a shared state machine that
73 : * co-operating backends use to synchronize their local state machines and
74 : * program counters. The shared state machine is managed with a Barrier IPC
75 : * primitive. When all attached participants arrive at a barrier, the phase
76 : * advances and all waiting participants are released.
77 : *
78 : * When a participant begins working on a parallel hash join, it must first
79 : * figure out how much progress has already been made, because participants
80 : * don't wait for each other to begin. For this reason there are switch
81 : * statements at key points in the code where we have to synchronize our local
82 : * state machine with the phase, and then jump to the correct part of the
83 : * algorithm so that we can get started.
84 : *
85 : * One barrier called build_barrier is used to coordinate the hashing phases.
86 : * The phase is represented by an integer which begins at zero and increments
87 : * one by one, but in the code it is referred to by symbolic names as follows.
88 : * An asterisk indicates a phase that is performed by a single arbitrarily
89 : * chosen process.
90 : *
91 : * PHJ_BUILD_ELECT -- initial state
92 : * PHJ_BUILD_ALLOCATE* -- one sets up the batches and table 0
93 : * PHJ_BUILD_HASH_INNER -- all hash the inner rel
94 : * PHJ_BUILD_HASH_OUTER -- (multi-batch only) all hash the outer
95 : * PHJ_BUILD_RUN -- building done, probing can begin
96 : * PHJ_BUILD_FREE* -- all work complete, one frees batches
97 : *
98 : * While in the phase PHJ_BUILD_HASH_INNER a separate pair of barriers may
99 : * be used repeatedly as required to coordinate expansions in the number of
100 : * batches or buckets. Their phases are as follows:
101 : *
102 : * PHJ_GROW_BATCHES_ELECT -- initial state
103 : * PHJ_GROW_BATCHES_REALLOCATE* -- one allocates new batches
104 : * PHJ_GROW_BATCHES_REPARTITION -- all repartition
105 : * PHJ_GROW_BATCHES_DECIDE* -- one detects skew and cleans up
106 : * PHJ_GROW_BATCHES_FINISH -- finished one growth cycle
107 : *
108 : * PHJ_GROW_BUCKETS_ELECT -- initial state
109 : * PHJ_GROW_BUCKETS_REALLOCATE* -- one allocates new buckets
110 : * PHJ_GROW_BUCKETS_REINSERT -- all insert tuples
111 : *
112 : * If the planner got the number of batches and buckets right, those won't be
113 : * necessary, but on the other hand we might finish up needing to expand the
114 : * buckets or batches multiple times while hashing the inner relation to stay
115 : * within our memory budget and load factor target. For that reason it's a
116 : * separate pair of barriers using circular phases.
117 : *
118 : * The PHJ_BUILD_HASH_OUTER phase is required only for multi-batch joins,
119 : * because we need to divide the outer relation into batches up front in order
120 : * to be able to process batches entirely independently. In contrast, the
121 : * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
122 : * batches whenever it encounters them while scanning and probing, which it
123 : * can do because it processes batches in serial order.
124 : *
125 : * Once PHJ_BUILD_RUN is reached, backends then split up and process
126 : * different batches, or gang up and work together on probing batches if there
127 : * aren't enough to go around. For each batch there is a separate barrier
128 : * with the following phases:
129 : *
130 : * PHJ_BATCH_ELECT -- initial state
131 : * PHJ_BATCH_ALLOCATE* -- one allocates buckets
132 : * PHJ_BATCH_LOAD -- all load the hash table from disk
133 : * PHJ_BATCH_PROBE -- all probe
134 : * PHJ_BATCH_SCAN* -- one does right/right-anti/full unmatched scan
135 : * PHJ_BATCH_FREE* -- one frees memory
136 : *
137 : * Batch 0 is a special case, because it starts out in phase
138 : * PHJ_BATCH_PROBE; populating batch 0's hash table is done during
139 : * PHJ_BUILD_HASH_INNER so we can skip loading.
140 : *
141 : * Initially we try to plan for a single-batch hash join using the combined
142 : * hash_mem of all participants to create a large shared hash table. If that
143 : * turns out either at planning or execution time to be impossible then we
144 : * fall back to regular hash_mem sized hash tables.
145 : *
146 : * To avoid deadlocks, we never wait for any barrier unless it is known that
147 : * all other backends attached to it are actively executing the node or have
148 : * finished. Practically, that means that we never emit a tuple while attached
149 : * to a barrier, unless the barrier has reached a phase that means that no
150 : * process will wait on it again. We emit tuples while attached to the build
151 : * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
152 : * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
153 : * respectively without waiting, using BarrierArriveAndDetach() and
154 : * BarrierArriveAndDetachExceptLast() respectively. The last to detach
155 : * receives a different return value so that it knows that it's safe to
156 : * clean up. Any straggler process that attaches after that phase is reached
157 : * will see that it's too late to participate or access the relevant shared
158 : * memory objects.
159 : *
160 : *-------------------------------------------------------------------------
161 : */
162 :
163 : #include "postgres.h"
164 :
165 : #include "access/htup_details.h"
166 : #include "access/parallel.h"
167 : #include "executor/executor.h"
168 : #include "executor/hashjoin.h"
169 : #include "executor/instrument.h"
170 : #include "executor/nodeHash.h"
171 : #include "executor/nodeHashjoin.h"
172 : #include "miscadmin.h"
173 : #include "utils/lsyscache.h"
174 : #include "utils/sharedtuplestore.h"
175 : #include "utils/tuplestore.h"
176 : #include "utils/wait_event.h"
177 :
178 :
179 : /*
180 : * States of the ExecHashJoin state machine
181 : */
182 : #define HJ_BUILD_HASHTABLE 1
183 : #define HJ_NEED_NEW_OUTER 2
184 : #define HJ_SCAN_BUCKET 3
185 : #define HJ_FILL_OUTER_TUPLE 4
186 : #define HJ_FILL_INNER_TUPLES 5
187 : #define HJ_FILL_OUTER_NULL_TUPLES 6
188 : #define HJ_FILL_INNER_NULL_TUPLES 7
189 : #define HJ_NEED_NEW_BATCH 8
190 :
191 : /* Returns true if doing null-fill on outer relation */
192 : #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
193 : /* Returns true if doing null-fill on inner relation */
194 : #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
195 :
196 : static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
197 : HashJoinState *hjstate,
198 : uint32 *hashvalue);
199 : static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
200 : HashJoinState *hjstate,
201 : uint32 *hashvalue);
202 : static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
203 : BufFile *file,
204 : uint32 *hashvalue,
205 : TupleTableSlot *tupleSlot);
206 : static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
207 : static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
208 : static void ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate);
209 :
210 :
211 : /* ----------------------------------------------------------------
212 : * ExecHashJoinImpl
213 : *
214 : * This function implements the Hybrid Hashjoin algorithm. It is marked
215 : * with an always-inline attribute so that ExecHashJoin() and
216 : * ExecParallelHashJoin() can inline it. Compilers that respect the
217 : * attribute should create versions specialized for parallel == true and
218 : * parallel == false with unnecessary branches removed.
219 : *
220 : * Note: the relation we build hash table on is the "inner"
221 : * the other one is "outer".
222 : * ----------------------------------------------------------------
223 : */
224 : static pg_attribute_always_inline TupleTableSlot *
225 8401836 : ExecHashJoinImpl(PlanState *pstate, bool parallel)
226 : {
227 8401836 : HashJoinState *node = castNode(HashJoinState, pstate);
228 : PlanState *outerNode;
229 : HashState *hashNode;
230 : ExprState *joinqual;
231 : ExprState *otherqual;
232 : ExprContext *econtext;
233 : HashJoinTable hashtable;
234 : TupleTableSlot *outerTupleSlot;
235 : uint32 hashvalue;
236 : int batchno;
237 : ParallelHashJoinState *parallel_state;
238 :
239 : /*
240 : * get information from HashJoin node
241 : */
242 8401836 : joinqual = node->js.joinqual;
243 8401836 : otherqual = node->js.ps.qual;
244 8401836 : hashNode = (HashState *) innerPlanState(node);
245 8401836 : outerNode = outerPlanState(node);
246 8401836 : hashtable = node->hj_HashTable;
247 8401836 : econtext = node->js.ps.ps_ExprContext;
248 8401836 : parallel_state = hashNode->parallel_state;
249 :
250 : /*
251 : * Reset per-tuple memory context to free any expression evaluation
252 : * storage allocated in the previous tuple cycle.
253 : */
254 8401836 : ResetExprContext(econtext);
255 :
256 : /*
257 : * run the hash join state machine
258 : */
259 : for (;;)
260 : {
261 : /*
262 : * It's possible to iterate this loop many times before returning a
263 : * tuple, in some pathological cases such as needing to move much of
264 : * the current batch to a later batch. So let's check for interrupts
265 : * each time through.
266 : */
267 28375571 : CHECK_FOR_INTERRUPTS();
268 :
269 28375571 : switch (node->hj_JoinState)
270 : {
271 22213 : case HJ_BUILD_HASHTABLE:
272 :
273 : /*
274 : * First time through: build hash table for inner relation.
275 : */
276 : Assert(hashtable == NULL);
277 :
278 : /*
279 : * If the outer relation is completely empty, and it's not
280 : * right/right-anti/full join, we can quit without building
281 : * the hash table. However, for an inner join it is only a
282 : * win to check this when the outer relation's startup cost is
283 : * less than the projected cost of building the hash table.
284 : * Otherwise it's best to build the hash table first and see
285 : * if the inner relation is empty. (When it's a left join, we
286 : * should always make this check, since we aren't going to be
287 : * able to skip the join on the strength of an empty inner
288 : * relation anyway.)
289 : *
290 : * If we are rescanning the join, we make use of information
291 : * gained on the previous scan: don't bother to try the
292 : * prefetch if the previous scan found the outer relation
293 : * nonempty. This is not 100% reliable since with new
294 : * parameters the outer relation might yield different
295 : * results, but it's a good heuristic.
296 : *
297 : * The only way to make the check is to try to fetch a tuple
298 : * from the outer plan node. If we succeed, we have to stash
299 : * it away for later consumption by ExecHashJoinOuterGetTuple.
300 : */
301 22213 : if (HJ_FILL_INNER(node))
302 : {
303 : /* no chance to not build the hash table */
304 3334 : node->hj_FirstOuterTupleSlot = NULL;
305 : }
306 18879 : else if (parallel)
307 : {
308 : /*
309 : * The empty-outer optimization is not implemented for
310 : * shared hash tables, because no one participant can
311 : * determine that there are no outer tuples, and it's not
312 : * yet clear that it's worth the synchronization overhead
313 : * of reaching consensus to figure that out. So we have
314 : * to build the hash table.
315 : */
316 228 : node->hj_FirstOuterTupleSlot = NULL;
317 : }
318 18651 : else if (HJ_FILL_OUTER(node) ||
319 14607 : (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
320 12893 : !node->hj_OuterNotEmpty))
321 : {
322 16175 : node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
323 16175 : if (TupIsNull(node->hj_FirstOuterTupleSlot))
324 : {
325 3056 : node->hj_OuterNotEmpty = false;
326 3056 : return NULL;
327 : }
328 : else
329 13119 : node->hj_OuterNotEmpty = true;
330 : }
331 : else
332 2476 : node->hj_FirstOuterTupleSlot = NULL;
333 :
334 : /*
335 : * Create the hash table. If using Parallel Hash, then
336 : * whoever gets here first will create the hash table and any
337 : * later arrivals will merely attach to it.
338 : */
339 19157 : hashtable = ExecHashTableCreate(hashNode);
340 19157 : node->hj_HashTable = hashtable;
341 :
342 : /*
343 : * Execute the Hash node, to build the hash table. If using
344 : * Parallel Hash, then we'll try to help hashing unless we
345 : * arrived too late.
346 : */
347 19157 : hashNode->hashtable = hashtable;
348 19157 : (void) MultiExecProcNode((PlanState *) hashNode);
349 :
350 : /*
351 : * If the inner relation is completely empty, and we're not
352 : * doing a left outer join, we can quit without scanning the
353 : * outer relation. (If the inner relation contains only
354 : * null-keyed tuples that we need to emit, we'll fall through
355 : * and do the outer-relation scan. In principle we could go
356 : * emit those tuples then quit, but it would complicate the
357 : * state machine logic. The case seems rare enough to not be
358 : * worth optimizing.)
359 : */
360 19157 : if (hashtable->totalTuples == 0 &&
361 4056 : hashNode->null_tuple_store == NULL &&
362 4028 : !HJ_FILL_OUTER(node))
363 : {
364 3763 : if (parallel)
365 : {
366 : /*
367 : * Advance the build barrier to PHJ_BUILD_RUN before
368 : * proceeding so we can negotiate resource cleanup.
369 : */
370 0 : Barrier *build_barrier = ¶llel_state->build_barrier;
371 :
372 0 : while (BarrierPhase(build_barrier) < PHJ_BUILD_RUN)
373 0 : BarrierArriveAndWait(build_barrier, 0);
374 : }
375 3763 : return NULL;
376 : }
377 :
378 : /*
379 : * need to remember whether nbatch has increased since we
380 : * began scanning the outer relation
381 : */
382 15394 : hashtable->nbatch_outstart = hashtable->nbatch;
383 :
384 : /*
385 : * Reset OuterNotEmpty for scan. (It's OK if we fetched a
386 : * tuple above, because ExecHashJoinOuterGetTuple will
387 : * immediately set it again.)
388 : */
389 15394 : node->hj_OuterNotEmpty = false;
390 :
391 15394 : if (parallel)
392 276 : {
393 : Barrier *build_barrier;
394 :
395 276 : build_barrier = ¶llel_state->build_barrier;
396 : Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
397 : BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
398 : BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
399 276 : if (BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER)
400 : {
401 : /*
402 : * If multi-batch, we need to hash the outer relation
403 : * up front.
404 : */
405 224 : if (hashtable->nbatch > 1)
406 121 : ExecParallelHashJoinPartitionOuter(node);
407 224 : BarrierArriveAndWait(build_barrier,
408 : WAIT_EVENT_HASH_BUILD_HASH_OUTER);
409 : Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUN);
410 : }
411 :
412 : /*
413 : * Each backend should now select a batch to work on.
414 : * However, if we've already collected some null-keyed
415 : * tuples, dump them first. (That is critical when we
416 : * arrive late enough that no more batches are available;
417 : * otherwise we'd fail to dump those tuples at all.)
418 : */
419 276 : hashtable->curbatch = -1;
420 :
421 276 : if (node->hj_NullOuterTupleStore)
422 4 : node->hj_JoinState = HJ_FILL_OUTER_NULL_TUPLES;
423 272 : else if (hashNode->null_tuple_store)
424 11 : node->hj_JoinState = HJ_FILL_INNER_NULL_TUPLES;
425 : else
426 261 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
427 :
428 276 : continue;
429 : }
430 : else
431 15118 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
432 :
433 : pg_fallthrough;
434 :
435 13771945 : case HJ_NEED_NEW_OUTER:
436 :
437 : /*
438 : * We don't have an outer tuple, try to get the next one
439 : */
440 13771945 : if (parallel)
441 : outerTupleSlot =
442 1444668 : ExecParallelHashJoinOuterGetTuple(outerNode, node,
443 : &hashvalue);
444 : else
445 : outerTupleSlot =
446 12327277 : ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
447 :
448 13771945 : if (TupIsNull(outerTupleSlot))
449 : {
450 : /* end of batch, or maybe whole join */
451 17365 : if (HJ_FILL_INNER(node))
452 : {
453 : /* set up to scan for unmatched inner tuples */
454 2553 : if (parallel)
455 : {
456 : /*
457 : * Only one process is currently allowed to handle
458 : * each batch's unmatched tuples, in a parallel
459 : * join. However, each process must deal with any
460 : * null-keyed tuples it found.
461 : */
462 72 : if (ExecParallelPrepHashTableForUnmatched(node))
463 44 : node->hj_JoinState = HJ_FILL_INNER_TUPLES;
464 28 : else if (node->hj_NullOuterTupleStore)
465 6 : node->hj_JoinState = HJ_FILL_OUTER_NULL_TUPLES;
466 22 : else if (hashNode->null_tuple_store)
467 10 : node->hj_JoinState = HJ_FILL_INNER_NULL_TUPLES;
468 : else
469 12 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
470 : }
471 : else
472 : {
473 2481 : ExecPrepHashTableForUnmatched(node);
474 2481 : node->hj_JoinState = HJ_FILL_INNER_TUPLES;
475 : }
476 : }
477 : else
478 : {
479 : /* might have outer null-keyed tuples to fill */
480 : Assert(hashNode->null_tuple_store == NULL);
481 14812 : if (node->hj_NullOuterTupleStore)
482 34 : node->hj_JoinState = HJ_FILL_OUTER_NULL_TUPLES;
483 : else
484 14778 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
485 : }
486 17365 : continue;
487 : }
488 :
489 13754580 : econtext->ecxt_outertuple = outerTupleSlot;
490 13754580 : node->hj_MatchedOuter = false;
491 :
492 : /*
493 : * Find the corresponding bucket for this tuple in the main
494 : * hash table or skew hash table.
495 : */
496 13754580 : node->hj_CurHashValue = hashvalue;
497 13754580 : ExecHashGetBucketAndBatch(hashtable, hashvalue,
498 : &node->hj_CurBucketNo, &batchno);
499 13754580 : node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
500 : hashvalue);
501 13754580 : node->hj_CurTuple = NULL;
502 :
503 : /*
504 : * The tuple might not belong to the current batch (where
505 : * "current batch" includes the skew buckets if any).
506 : */
507 13754580 : if (batchno != hashtable->curbatch &&
508 959656 : node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
509 958856 : {
510 : bool shouldFree;
511 958856 : MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
512 : &shouldFree);
513 :
514 : /*
515 : * Need to postpone this outer tuple to a later batch.
516 : * Save it in the corresponding outer-batch file.
517 : */
518 : Assert(parallel_state == NULL);
519 : Assert(batchno > hashtable->curbatch);
520 958856 : ExecHashJoinSaveTuple(mintuple, hashvalue,
521 958856 : &hashtable->outerBatchFile[batchno],
522 : hashtable);
523 :
524 958856 : if (shouldFree)
525 958856 : heap_free_minimal_tuple(mintuple);
526 :
527 : /* Loop around, staying in HJ_NEED_NEW_OUTER state */
528 958856 : continue;
529 : }
530 :
531 : /* OK, let's scan the bucket for matches */
532 12795724 : node->hj_JoinState = HJ_SCAN_BUCKET;
533 :
534 : pg_fallthrough;
535 :
536 17759711 : case HJ_SCAN_BUCKET:
537 :
538 : /*
539 : * Scan the selected hash bucket for matches to current outer
540 : */
541 17759711 : if (parallel)
542 : {
543 2804080 : if (!ExecParallelScanHashBucket(node, econtext))
544 : {
545 : /* out of matches; check for possible outer-join fill */
546 1444028 : node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
547 1444028 : continue;
548 : }
549 : }
550 : else
551 : {
552 14955631 : if (!ExecScanHashBucket(node, econtext))
553 : {
554 : /* out of matches; check for possible outer-join fill */
555 7836556 : node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
556 7836556 : continue;
557 : }
558 : }
559 :
560 : /*
561 : * In a right-semijoin, we only need the first match for each
562 : * inner tuple.
563 : */
564 8479431 : if (node->js.jointype == JOIN_RIGHT_SEMI &&
565 304 : HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
566 48 : continue;
567 :
568 : /*
569 : * We've got a match, but still need to test non-hashed quals.
570 : * ExecScanHashBucket already set up all the state needed to
571 : * call ExecQual.
572 : *
573 : * If we pass the qual, then save state for next call and have
574 : * ExecProject form the projection, store it in the tuple
575 : * table, and return the slot.
576 : *
577 : * Only the joinquals determine tuple match status, but all
578 : * quals must pass to actually return the tuple.
579 : */
580 8479079 : if (joinqual == NULL || ExecQual(joinqual, econtext))
581 : {
582 8368457 : node->hj_MatchedOuter = true;
583 :
584 : /*
585 : * This is really only needed if HJ_FILL_INNER(node) or if
586 : * we are in a right-semijoin, but we'll avoid the branch
587 : * and just set it always.
588 : */
589 8368457 : if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
590 4031018 : HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
591 :
592 : /* In an antijoin, we never return a matched tuple */
593 8368457 : if (node->js.jointype == JOIN_ANTI)
594 : {
595 856042 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
596 856042 : continue;
597 : }
598 :
599 : /*
600 : * If we only need to consider the first matching inner
601 : * tuple, then advance to next outer tuple after we've
602 : * processed this one.
603 : */
604 7512415 : if (node->js.single_match)
605 2659020 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
606 :
607 : /*
608 : * In a right-antijoin, we never return a matched tuple.
609 : * If it's not an inner_unique join, we need to stay on
610 : * the current outer tuple to continue scanning the inner
611 : * side for matches.
612 : */
613 7512415 : if (node->js.jointype == JOIN_RIGHT_ANTI)
614 20307 : continue;
615 :
616 7492108 : if (otherqual == NULL || ExecQual(otherqual, econtext))
617 7365218 : return ExecProject(node->js.ps.ps_ProjInfo);
618 : else
619 126890 : InstrCountFiltered2(node, 1);
620 : }
621 : else
622 110622 : InstrCountFiltered1(node, 1);
623 237512 : break;
624 :
625 9280584 : case HJ_FILL_OUTER_TUPLE:
626 :
627 : /*
628 : * The current outer tuple has run out of matches, so check
629 : * whether to emit a dummy outer-join tuple. Whether we emit
630 : * one or not, the next state is NEED_NEW_OUTER.
631 : */
632 9280584 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
633 :
634 9280584 : if (!node->hj_MatchedOuter &&
635 5838365 : HJ_FILL_OUTER(node))
636 : {
637 : /*
638 : * Generate a fake join tuple with nulls for the inner
639 : * tuple, and return it if it passes the non-join quals.
640 : */
641 1606171 : econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
642 :
643 1606171 : if (otherqual == NULL || ExecQual(otherqual, econtext))
644 687016 : return ExecProject(node->js.ps.ps_ProjInfo);
645 : else
646 919155 : InstrCountFiltered2(node, 1);
647 : }
648 8593568 : break;
649 :
650 332030 : case HJ_FILL_INNER_TUPLES:
651 :
652 : /*
653 : * We have finished a batch, but we are doing
654 : * right/right-anti/full join, so any unmatched inner tuples
655 : * in the hashtable have to be emitted before we continue to
656 : * the next batch.
657 : */
658 584012 : if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
659 251982 : : ExecScanHashTableForUnmatched(node, econtext)))
660 : {
661 : /* no more unmatched tuples, but maybe there are nulls */
662 2521 : if (node->hj_NullOuterTupleStore)
663 43 : node->hj_JoinState = HJ_FILL_OUTER_NULL_TUPLES;
664 2478 : else if (hashNode->null_tuple_store)
665 52 : node->hj_JoinState = HJ_FILL_INNER_NULL_TUPLES;
666 : else
667 2426 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
668 2521 : continue;
669 : }
670 :
671 : /*
672 : * Generate a fake join tuple with nulls for the outer tuple,
673 : * and return it if it passes the non-join quals.
674 : */
675 329509 : econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
676 :
677 329509 : if (otherqual == NULL || ExecQual(otherqual, econtext))
678 324482 : return ExecProject(node->js.ps.ps_ProjInfo);
679 : else
680 5027 : InstrCountFiltered2(node, 1);
681 5027 : break;
682 :
683 2138 : case HJ_FILL_OUTER_NULL_TUPLES:
684 :
685 : /*
686 : * We have finished a batch, but we are doing left/full join,
687 : * so any null-keyed outer tuples have to be emitted before we
688 : * continue to the next batch.
689 : *
690 : * (We could delay this till the end of the join, but there
691 : * seems little percentage in that.)
692 : *
693 : * We have to use tuplestore_gettupleslot_force because
694 : * hj_OuterTupleSlot may not be able to store a MinimalTuple.
695 : */
696 4288 : while (tuplestore_gettupleslot_force(node->hj_NullOuterTupleStore,
697 : true, false,
698 : node->hj_OuterTupleSlot))
699 : {
700 : /*
701 : * Generate a fake join tuple with nulls for the inner
702 : * tuple, and return it if it passes the non-join quals.
703 : */
704 2063 : econtext->ecxt_outertuple = node->hj_OuterTupleSlot;
705 2063 : econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
706 :
707 2063 : if (otherqual == NULL || ExecQual(otherqual, econtext))
708 2051 : return ExecProject(node->js.ps.ps_ProjInfo);
709 : else
710 12 : InstrCountFiltered2(node, 1);
711 :
712 12 : ResetExprContext(econtext);
713 :
714 : /* allow this loop to be cancellable */
715 12 : CHECK_FOR_INTERRUPTS();
716 : }
717 :
718 : /* We don't need the tuplestore any more, so discard it. */
719 87 : tuplestore_end(node->hj_NullOuterTupleStore);
720 87 : node->hj_NullOuterTupleStore = NULL;
721 :
722 : /* Fill inner tuples too if it's a full join, else advance. */
723 87 : if (hashNode->null_tuple_store)
724 22 : node->hj_JoinState = HJ_FILL_INNER_NULL_TUPLES;
725 : else
726 65 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
727 87 : break;
728 :
729 159 : case HJ_FILL_INNER_NULL_TUPLES:
730 :
731 : /*
732 : * We have finished a batch, but we are doing
733 : * right/right-anti/full join, so any null-keyed inner tuples
734 : * have to be emitted before we continue to the next batch.
735 : *
736 : * (We could delay this till the end of the join, but there
737 : * seems little percentage in that.)
738 : */
739 322 : while (tuplestore_gettupleslot(hashNode->null_tuple_store,
740 : true, false,
741 : node->hj_HashTupleSlot))
742 : {
743 : /*
744 : * Generate a fake join tuple with nulls for the outer
745 : * tuple, and return it if it passes the non-join quals.
746 : */
747 72 : econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
748 72 : econtext->ecxt_innertuple = node->hj_HashTupleSlot;
749 :
750 72 : if (otherqual == NULL || ExecQual(otherqual, econtext))
751 68 : return ExecProject(node->js.ps.ps_ProjInfo);
752 : else
753 4 : InstrCountFiltered2(node, 1);
754 :
755 4 : ResetExprContext(econtext);
756 :
757 : /* allow this loop to be cancellable */
758 4 : CHECK_FOR_INTERRUPTS();
759 : }
760 :
761 : /*
762 : * Ideally we'd discard the tuplestore now, but we can't
763 : * because we might need it for rescans.
764 : */
765 :
766 : /* Now we can advance to the next batch. */
767 91 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
768 91 : break;
769 :
770 17633 : case HJ_NEED_NEW_BATCH:
771 :
772 : /*
773 : * Try to advance to next batch. Done if there are no more.
774 : */
775 17633 : if (parallel)
776 : {
777 916 : if (!ExecParallelHashJoinNewBatch(node))
778 276 : return NULL; /* end of parallel-aware join */
779 : }
780 : else
781 : {
782 16717 : if (!ExecHashJoinNewBatch(node))
783 15906 : return NULL; /* end of parallel-oblivious join */
784 : }
785 1451 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
786 1451 : break;
787 :
788 0 : default:
789 0 : elog(ERROR, "unrecognized hashjoin state: %d",
790 : (int) node->hj_JoinState);
791 : }
792 : }
793 : }
794 :
795 : /* ----------------------------------------------------------------
796 : * ExecHashJoin
797 : *
798 : * Parallel-oblivious version.
799 : * ----------------------------------------------------------------
800 : */
801 : static TupleTableSlot * /* return: a tuple or NULL */
802 6881468 : ExecHashJoin(PlanState *pstate)
803 : {
804 : /*
805 : * On sufficiently smart compilers this should be inlined with the
806 : * parallel-aware branches removed.
807 : */
808 6881468 : return ExecHashJoinImpl(pstate, false);
809 : }
810 :
811 : /* ----------------------------------------------------------------
812 : * ExecParallelHashJoin
813 : *
814 : * Parallel-aware version.
815 : * ----------------------------------------------------------------
816 : */
817 : static TupleTableSlot * /* return: a tuple or NULL */
818 1520368 : ExecParallelHashJoin(PlanState *pstate)
819 : {
820 : /*
821 : * On sufficiently smart compilers this should be inlined with the
822 : * parallel-oblivious branches removed.
823 : */
824 1520368 : return ExecHashJoinImpl(pstate, true);
825 : }
826 :
827 : /* ----------------------------------------------------------------
828 : * ExecInitHashJoin
829 : *
830 : * Init routine for HashJoin node.
831 : * ----------------------------------------------------------------
832 : */
833 : HashJoinState *
834 27262 : ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
835 : {
836 : HashJoinState *hjstate;
837 : Plan *outerNode;
838 : Hash *hashNode;
839 : TupleDesc outerDesc,
840 : innerDesc;
841 : const TupleTableSlotOps *ops;
842 :
843 : /* check for unsupported flags */
844 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
845 :
846 : /*
847 : * create state structure
848 : */
849 27262 : hjstate = makeNode(HashJoinState);
850 27262 : hjstate->js.ps.plan = (Plan *) node;
851 27262 : hjstate->js.ps.state = estate;
852 :
853 : /*
854 : * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
855 : * where this function may be replaced with a parallel version, if we
856 : * managed to launch a parallel query.
857 : */
858 27262 : hjstate->js.ps.ExecProcNode = ExecHashJoin;
859 27262 : hjstate->js.jointype = node->join.jointype;
860 :
861 : /*
862 : * Miscellaneous initialization
863 : *
864 : * create expression context for node
865 : */
866 27262 : ExecAssignExprContext(estate, &hjstate->js.ps);
867 :
868 : /*
869 : * initialize child nodes
870 : *
871 : * Note: we could suppress the REWIND flag for the inner input, which
872 : * would amount to betting that the hash will be a single batch. Not
873 : * clear if this would be a win or not.
874 : */
875 27262 : outerNode = outerPlan(node);
876 27262 : hashNode = (Hash *) innerPlan(node);
877 :
878 27262 : outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
879 27262 : outerDesc = ExecGetResultType(outerPlanState(hjstate));
880 27262 : innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
881 27262 : innerDesc = ExecGetResultType(innerPlanState(hjstate));
882 :
883 : /*
884 : * Initialize result slot, type and projection.
885 : */
886 27262 : ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
887 27262 : ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
888 :
889 : /*
890 : * tuple table initialization
891 : */
892 27262 : ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
893 27262 : hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
894 : ops);
895 :
896 : /*
897 : * detect whether we need only consider the first matching inner tuple
898 : */
899 38117 : hjstate->js.single_match = (node->join.inner_unique ||
900 10855 : node->join.jointype == JOIN_SEMI);
901 :
902 : /* set up null tuples for outer joins, if needed */
903 27262 : switch (node->join.jointype)
904 : {
905 18587 : case JOIN_INNER:
906 : case JOIN_SEMI:
907 : case JOIN_RIGHT_SEMI:
908 18587 : break;
909 4461 : case JOIN_LEFT:
910 : case JOIN_ANTI:
911 4461 : hjstate->hj_NullInnerTupleSlot =
912 4461 : ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
913 4461 : break;
914 3488 : case JOIN_RIGHT:
915 : case JOIN_RIGHT_ANTI:
916 3488 : hjstate->hj_NullOuterTupleSlot =
917 3488 : ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
918 3488 : break;
919 726 : case JOIN_FULL:
920 726 : hjstate->hj_NullOuterTupleSlot =
921 726 : ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
922 726 : hjstate->hj_NullInnerTupleSlot =
923 726 : ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
924 726 : break;
925 0 : default:
926 0 : elog(ERROR, "unrecognized join type: %d",
927 : (int) node->join.jointype);
928 : }
929 :
930 : /*
931 : * now for some voodoo. our temporary tuple slot is actually the result
932 : * tuple slot of the Hash node (which is our inner plan). we can do this
933 : * because Hash nodes don't return tuples via ExecProcNode() -- instead
934 : * the hash join node uses ExecScanHashBucket() to get at the contents of
935 : * the hash table. -cim 6/9/91
936 : */
937 : {
938 27262 : HashState *hashstate = (HashState *) innerPlanState(hjstate);
939 27262 : Hash *hash = (Hash *) hashstate->ps.plan;
940 27262 : TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
941 : Oid *outer_hashfuncid;
942 : Oid *inner_hashfuncid;
943 : bool *hash_strict;
944 : ListCell *lc;
945 : int nkeys;
946 :
947 :
948 27262 : hjstate->hj_HashTupleSlot = slot;
949 :
950 : /*
951 : * Build ExprStates to obtain hash values for either side of the join.
952 : * Note: must build the ExprStates before ExecHashTableCreate() so we
953 : * properly attribute any SubPlans that exist in the hash expressions
954 : * to the correct PlanState.
955 : */
956 27262 : nkeys = list_length(node->hashoperators);
957 :
958 27262 : outer_hashfuncid = palloc_array(Oid, nkeys);
959 27262 : inner_hashfuncid = palloc_array(Oid, nkeys);
960 27262 : hash_strict = palloc_array(bool, nkeys);
961 :
962 : /*
963 : * Determine the hash function for each side of the join for the given
964 : * join operator, and detect whether the join operator is strict.
965 : */
966 57352 : foreach(lc, node->hashoperators)
967 : {
968 30090 : Oid hashop = lfirst_oid(lc);
969 30090 : int i = foreach_current_index(lc);
970 :
971 30090 : if (!get_op_hash_functions(hashop,
972 30090 : &outer_hashfuncid[i],
973 30090 : &inner_hashfuncid[i]))
974 0 : elog(ERROR,
975 : "could not find hash function for hash operator %u",
976 : hashop);
977 30090 : hash_strict[i] = op_strict(hashop);
978 : }
979 :
980 : /*
981 : * Build an ExprState to generate the hash value for the expressions
982 : * on the outer side of the join.
983 : */
984 27262 : hjstate->hj_OuterHash =
985 27262 : ExecBuildHash32Expr(hjstate->js.ps.ps_ResultTupleDesc,
986 : hjstate->js.ps.resultops,
987 : outer_hashfuncid,
988 27262 : node->hashcollations,
989 27262 : node->hashkeys,
990 : hash_strict,
991 : &hjstate->js.ps,
992 : 0);
993 :
994 : /* As above, but for the inner side of the join */
995 27262 : hashstate->hash_expr =
996 27262 : ExecBuildHash32Expr(hashstate->ps.ps_ResultTupleDesc,
997 : hashstate->ps.resultops,
998 : inner_hashfuncid,
999 27262 : node->hashcollations,
1000 27262 : hash->hashkeys,
1001 : hash_strict,
1002 : &hashstate->ps,
1003 : 0);
1004 :
1005 : /* Remember whether we need to save tuples with null join keys */
1006 27262 : hjstate->hj_KeepNullTuples = HJ_FILL_OUTER(hjstate);
1007 27262 : hashstate->keep_null_tuples = HJ_FILL_INNER(hjstate);
1008 :
1009 : /*
1010 : * Set up the skew table hash function while we have a record of the
1011 : * first key's hash function Oid.
1012 : */
1013 27262 : if (OidIsValid(hash->skewTable))
1014 : {
1015 19933 : hashstate->skew_hashfunction = palloc0_object(FmgrInfo);
1016 19933 : hashstate->skew_collation = linitial_oid(node->hashcollations);
1017 19933 : fmgr_info(outer_hashfuncid[0], hashstate->skew_hashfunction);
1018 : }
1019 :
1020 : /* no need to keep these */
1021 27262 : pfree(outer_hashfuncid);
1022 27262 : pfree(inner_hashfuncid);
1023 27262 : pfree(hash_strict);
1024 : }
1025 :
1026 : /*
1027 : * initialize child expressions
1028 : */
1029 27262 : hjstate->js.ps.qual =
1030 27262 : ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
1031 27262 : hjstate->js.joinqual =
1032 27262 : ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
1033 27262 : hjstate->hashclauses =
1034 27262 : ExecInitQual(node->hashclauses, (PlanState *) hjstate);
1035 :
1036 : /*
1037 : * initialize hash-specific info
1038 : */
1039 27262 : hjstate->hj_HashTable = NULL;
1040 27262 : hjstate->hj_NullOuterTupleStore = NULL;
1041 27262 : hjstate->hj_FirstOuterTupleSlot = NULL;
1042 :
1043 27262 : hjstate->hj_CurHashValue = 0;
1044 27262 : hjstate->hj_CurBucketNo = 0;
1045 27262 : hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
1046 27262 : hjstate->hj_CurTuple = NULL;
1047 :
1048 27262 : hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
1049 27262 : hjstate->hj_MatchedOuter = false;
1050 27262 : hjstate->hj_OuterNotEmpty = false;
1051 :
1052 27262 : return hjstate;
1053 : }
1054 :
1055 : /* ----------------------------------------------------------------
1056 : * ExecEndHashJoin
1057 : *
1058 : * clean up routine for HashJoin node
1059 : * ----------------------------------------------------------------
1060 : */
1061 : void
1062 27190 : ExecEndHashJoin(HashJoinState *node)
1063 : {
1064 27190 : HashState *hashNode = castNode(HashState, innerPlanState(node));
1065 :
1066 : /*
1067 : * Free tuple stores if we made them (must do this before
1068 : * ExecHashTableDestroy deletes hashCxt)
1069 : */
1070 27190 : if (node->hj_NullOuterTupleStore)
1071 : {
1072 0 : tuplestore_end(node->hj_NullOuterTupleStore);
1073 0 : node->hj_NullOuterTupleStore = NULL;
1074 : }
1075 27190 : if (hashNode->null_tuple_store)
1076 : {
1077 64 : tuplestore_end(hashNode->null_tuple_store);
1078 64 : hashNode->null_tuple_store = NULL;
1079 : }
1080 :
1081 : /*
1082 : * Free hash table
1083 : */
1084 27190 : if (node->hj_HashTable)
1085 : {
1086 18190 : ExecHashTableDestroy(node->hj_HashTable);
1087 18190 : node->hj_HashTable = NULL;
1088 : }
1089 :
1090 : /*
1091 : * clean up subtrees
1092 : */
1093 27190 : ExecEndNode(outerPlanState(node));
1094 27190 : ExecEndNode(innerPlanState(node));
1095 27190 : }
1096 :
1097 : /*
1098 : * ExecHashJoinOuterGetTuple
1099 : *
1100 : * get the next outer tuple for a parallel oblivious hashjoin: either by
1101 : * executing the outer plan node in the first pass, or from the temp
1102 : * files for the hashjoin batches.
1103 : *
1104 : * Returns a null slot if no more outer tuples (within the current batch).
1105 : *
1106 : * On success, the tuple's hash value is stored at *hashvalue --- this is
1107 : * either originally computed, or re-read from the temp file.
1108 : */
1109 : static TupleTableSlot *
1110 12327277 : ExecHashJoinOuterGetTuple(PlanState *outerNode,
1111 : HashJoinState *hjstate,
1112 : uint32 *hashvalue)
1113 : {
1114 12327277 : HashJoinTable hashtable = hjstate->hj_HashTable;
1115 12327277 : int curbatch = hashtable->curbatch;
1116 : TupleTableSlot *slot;
1117 :
1118 12327277 : if (curbatch == 0) /* if it is the first pass */
1119 : {
1120 : /*
1121 : * Check to see if first outer tuple was already fetched by
1122 : * ExecHashJoin() and not used yet.
1123 : */
1124 11367610 : slot = hjstate->hj_FirstOuterTupleSlot;
1125 11367610 : if (!TupIsNull(slot))
1126 10971 : hjstate->hj_FirstOuterTupleSlot = NULL;
1127 : else
1128 11356639 : slot = ExecProcNode(outerNode);
1129 :
1130 11370276 : while (!TupIsNull(slot))
1131 : {
1132 : bool isnull;
1133 :
1134 : /*
1135 : * We have to compute the tuple's hash value.
1136 : */
1137 11354362 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1138 :
1139 11354362 : econtext->ecxt_outertuple = slot;
1140 :
1141 11354362 : ResetExprContext(econtext);
1142 :
1143 11354362 : *hashvalue = DatumGetUInt32(ExecEvalExprSwitchContext(hjstate->hj_OuterHash,
1144 : econtext,
1145 : &isnull));
1146 :
1147 11354362 : if (!isnull)
1148 : {
1149 : /* normal case with a non-null join key */
1150 : /* remember outer relation is not empty for possible rescan */
1151 11351696 : hjstate->hj_OuterNotEmpty = true;
1152 :
1153 11351696 : return slot;
1154 : }
1155 2666 : else if (hjstate->hj_KeepNullTuples)
1156 : {
1157 : /* null join key, but we must save tuple to be emitted later */
1158 2051 : if (hjstate->hj_NullOuterTupleStore == NULL)
1159 75 : hjstate->hj_NullOuterTupleStore = ExecHashBuildNullTupleStore(hashtable);
1160 2051 : tuplestore_puttupleslot(hjstate->hj_NullOuterTupleStore, slot);
1161 : }
1162 :
1163 : /*
1164 : * That tuple couldn't match because of a NULL, so discard it and
1165 : * continue with the next one.
1166 : */
1167 2666 : slot = ExecProcNode(outerNode);
1168 : }
1169 : }
1170 959667 : else if (curbatch < hashtable->nbatch)
1171 : {
1172 959667 : BufFile *file = hashtable->outerBatchFile[curbatch];
1173 :
1174 : /*
1175 : * In outer-join cases, we could get here even though the batch file
1176 : * is empty.
1177 : */
1178 959667 : if (file == NULL)
1179 0 : return NULL;
1180 :
1181 959667 : slot = ExecHashJoinGetSavedTuple(hjstate,
1182 : file,
1183 : hashvalue,
1184 : hjstate->hj_OuterTupleSlot);
1185 959667 : if (!TupIsNull(slot))
1186 958856 : return slot;
1187 : }
1188 :
1189 : /* End of this batch */
1190 16725 : return NULL;
1191 : }
1192 :
1193 : /*
1194 : * ExecHashJoinOuterGetTuple variant for the parallel case.
1195 : */
1196 : static TupleTableSlot *
1197 1444668 : ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
1198 : HashJoinState *hjstate,
1199 : uint32 *hashvalue)
1200 : {
1201 1444668 : HashJoinTable hashtable = hjstate->hj_HashTable;
1202 1444668 : int curbatch = hashtable->curbatch;
1203 : TupleTableSlot *slot;
1204 :
1205 : /*
1206 : * In the Parallel Hash case we only run the outer plan directly for
1207 : * single-batch hash joins. Otherwise we have to go to batch files, even
1208 : * for batch 0.
1209 : */
1210 1444668 : if (curbatch == 0 && hashtable->nbatch == 1)
1211 : {
1212 644119 : slot = ExecProcNode(outerNode);
1213 :
1214 644139 : while (!TupIsNull(slot))
1215 : {
1216 : bool isnull;
1217 :
1218 644024 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1219 :
1220 644024 : econtext->ecxt_outertuple = slot;
1221 :
1222 644024 : ResetExprContext(econtext);
1223 :
1224 644024 : *hashvalue = DatumGetUInt32(ExecEvalExprSwitchContext(hjstate->hj_OuterHash,
1225 : econtext,
1226 : &isnull));
1227 :
1228 644024 : if (!isnull)
1229 : {
1230 : /* normal case with a non-null join key */
1231 644004 : return slot;
1232 : }
1233 20 : else if (hjstate->hj_KeepNullTuples)
1234 : {
1235 : /* null join key, but we must save tuple to be emitted later */
1236 8 : if (hjstate->hj_NullOuterTupleStore == NULL)
1237 8 : hjstate->hj_NullOuterTupleStore = ExecHashBuildNullTupleStore(hashtable);
1238 8 : tuplestore_puttupleslot(hjstate->hj_NullOuterTupleStore, slot);
1239 : }
1240 :
1241 : /*
1242 : * That tuple couldn't match because of a NULL, so discard it and
1243 : * continue with the next one.
1244 : */
1245 20 : slot = ExecProcNode(outerNode);
1246 : }
1247 : }
1248 800549 : else if (curbatch < hashtable->nbatch)
1249 : {
1250 : MinimalTuple tuple;
1251 :
1252 800549 : tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
1253 : hashvalue);
1254 800549 : if (tuple != NULL)
1255 : {
1256 800024 : ExecForceStoreMinimalTuple(tuple,
1257 : hjstate->hj_OuterTupleSlot,
1258 : false);
1259 800024 : slot = hjstate->hj_OuterTupleSlot;
1260 800024 : return slot;
1261 : }
1262 : else
1263 525 : ExecClearTuple(hjstate->hj_OuterTupleSlot);
1264 : }
1265 :
1266 : /* End of this batch */
1267 640 : hashtable->batches[curbatch].outer_eof = true;
1268 :
1269 640 : return NULL;
1270 : }
1271 :
1272 : /*
1273 : * ExecHashJoinNewBatch
1274 : * switch to a new hashjoin batch
1275 : *
1276 : * Returns true if successful, false if there are no more batches.
1277 : */
1278 : static bool
1279 16717 : ExecHashJoinNewBatch(HashJoinState *hjstate)
1280 : {
1281 16717 : HashJoinTable hashtable = hjstate->hj_HashTable;
1282 : int nbatch;
1283 : int curbatch;
1284 : BufFile *innerFile;
1285 : TupleTableSlot *slot;
1286 : uint32 hashvalue;
1287 :
1288 16717 : nbatch = hashtable->nbatch;
1289 16717 : curbatch = hashtable->curbatch;
1290 :
1291 16717 : if (curbatch > 0)
1292 : {
1293 : /*
1294 : * We no longer need the previous outer batch file; close it right
1295 : * away to free disk space.
1296 : */
1297 811 : if (hashtable->outerBatchFile[curbatch])
1298 811 : BufFileClose(hashtable->outerBatchFile[curbatch]);
1299 811 : hashtable->outerBatchFile[curbatch] = NULL;
1300 : }
1301 : else /* we just finished the first batch */
1302 : {
1303 : /*
1304 : * Reset some of the skew optimization state variables, since we no
1305 : * longer need to consider skew tuples after the first batch. The
1306 : * memory context reset we are about to do will release the skew
1307 : * hashtable itself.
1308 : */
1309 15906 : hashtable->skewEnabled = false;
1310 15906 : hashtable->skewBucket = NULL;
1311 15906 : hashtable->skewBucketNums = NULL;
1312 15906 : hashtable->nSkewBuckets = 0;
1313 15906 : hashtable->spaceUsedSkew = 0;
1314 : }
1315 :
1316 : /*
1317 : * We can always skip over any batches that are completely empty on both
1318 : * sides. We can sometimes skip over batches that are empty on only one
1319 : * side, but there are exceptions:
1320 : *
1321 : * 1. In a left/full outer join, we have to process outer batches even if
1322 : * the inner batch is empty. Similarly, in a right/right-anti/full outer
1323 : * join, we have to process inner batches even if the outer batch is
1324 : * empty.
1325 : *
1326 : * 2. If we have increased nbatch since the initial estimate, we have to
1327 : * scan inner batches since they might contain tuples that need to be
1328 : * reassigned to later inner batches.
1329 : *
1330 : * 3. Similarly, if we have increased nbatch since starting the outer
1331 : * scan, we have to rescan outer batches in case they contain tuples that
1332 : * need to be reassigned.
1333 : */
1334 16717 : curbatch++;
1335 16717 : while (curbatch < nbatch &&
1336 811 : (hashtable->outerBatchFile[curbatch] == NULL ||
1337 811 : hashtable->innerBatchFile[curbatch] == NULL))
1338 : {
1339 0 : if (hashtable->outerBatchFile[curbatch] &&
1340 0 : HJ_FILL_OUTER(hjstate))
1341 0 : break; /* must process due to rule 1 */
1342 0 : if (hashtable->innerBatchFile[curbatch] &&
1343 0 : HJ_FILL_INNER(hjstate))
1344 0 : break; /* must process due to rule 1 */
1345 0 : if (hashtable->innerBatchFile[curbatch] &&
1346 0 : nbatch != hashtable->nbatch_original)
1347 0 : break; /* must process due to rule 2 */
1348 0 : if (hashtable->outerBatchFile[curbatch] &&
1349 0 : nbatch != hashtable->nbatch_outstart)
1350 0 : break; /* must process due to rule 3 */
1351 : /* We can ignore this batch. */
1352 : /* Release associated temp files right away. */
1353 0 : if (hashtable->innerBatchFile[curbatch])
1354 0 : BufFileClose(hashtable->innerBatchFile[curbatch]);
1355 0 : hashtable->innerBatchFile[curbatch] = NULL;
1356 0 : if (hashtable->outerBatchFile[curbatch])
1357 0 : BufFileClose(hashtable->outerBatchFile[curbatch]);
1358 0 : hashtable->outerBatchFile[curbatch] = NULL;
1359 0 : curbatch++;
1360 : }
1361 :
1362 16717 : if (curbatch >= nbatch)
1363 15906 : return false; /* no more batches */
1364 :
1365 811 : hashtable->curbatch = curbatch;
1366 :
1367 : /*
1368 : * Reload the hash table with the new inner batch (which could be empty)
1369 : */
1370 811 : ExecHashTableReset(hashtable);
1371 :
1372 811 : innerFile = hashtable->innerBatchFile[curbatch];
1373 :
1374 811 : if (innerFile != NULL)
1375 : {
1376 811 : if (BufFileSeek(innerFile, 0, 0, SEEK_SET))
1377 0 : ereport(ERROR,
1378 : (errcode_for_file_access(),
1379 : errmsg("could not rewind hash-join temporary file")));
1380 :
1381 2276013 : while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1382 : innerFile,
1383 : &hashvalue,
1384 : hjstate->hj_HashTupleSlot)))
1385 : {
1386 : /*
1387 : * NOTE: some tuples may be sent to future batches. Also, it is
1388 : * possible for hashtable->nbatch to be increased here!
1389 : */
1390 2275202 : ExecHashTableInsert(hashtable, slot, hashvalue);
1391 : }
1392 :
1393 : /*
1394 : * after we build the hash table, the inner batch file is no longer
1395 : * needed
1396 : */
1397 811 : BufFileClose(innerFile);
1398 811 : hashtable->innerBatchFile[curbatch] = NULL;
1399 : }
1400 :
1401 : /*
1402 : * Rewind outer batch file (if present), so that we can start reading it.
1403 : */
1404 811 : if (hashtable->outerBatchFile[curbatch] != NULL)
1405 : {
1406 811 : if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET))
1407 0 : ereport(ERROR,
1408 : (errcode_for_file_access(),
1409 : errmsg("could not rewind hash-join temporary file")));
1410 : }
1411 :
1412 811 : return true;
1413 : }
1414 :
1415 : /*
1416 : * Choose a batch to work on, and attach to it. Returns true if successful,
1417 : * false if there are no more batches.
1418 : */
1419 : static bool
1420 916 : ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
1421 : {
1422 916 : HashJoinTable hashtable = hjstate->hj_HashTable;
1423 : int start_batchno;
1424 : int batchno;
1425 :
1426 : /*
1427 : * If we are a very slow worker, MultiExecParallelHash could have observed
1428 : * build_barrier phase PHJ_BUILD_FREE and not bothered to set up batch
1429 : * accessors. In that case we must be done.
1430 : */
1431 916 : if (hashtable->batches == NULL)
1432 0 : return false;
1433 :
1434 : /*
1435 : * If we were already attached to a batch, remember not to bother checking
1436 : * it again, and detach from it (possibly freeing the hash table if we are
1437 : * last to detach).
1438 : */
1439 916 : if (hashtable->curbatch >= 0)
1440 : {
1441 612 : hashtable->batches[hashtable->curbatch].done = true;
1442 612 : ExecHashTableDetachBatch(hashtable);
1443 : }
1444 :
1445 : /*
1446 : * Search for a batch that isn't done. We use an atomic counter to start
1447 : * our search at a different batch in every participant when there are
1448 : * more batches than participants.
1449 : */
1450 916 : batchno = start_batchno =
1451 916 : pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
1452 916 : hashtable->nbatch;
1453 : do
1454 : {
1455 : uint32 hashvalue;
1456 : MinimalTuple tuple;
1457 : TupleTableSlot *slot;
1458 :
1459 2235 : if (!hashtable->batches[batchno].done)
1460 : {
1461 : SharedTuplestoreAccessor *inner_tuples;
1462 1192 : Barrier *batch_barrier =
1463 1192 : &hashtable->batches[batchno].shared->batch_barrier;
1464 :
1465 1192 : switch (BarrierAttach(batch_barrier))
1466 : {
1467 388 : case PHJ_BATCH_ELECT:
1468 :
1469 : /* One backend allocates the hash table. */
1470 388 : if (BarrierArriveAndWait(batch_barrier,
1471 : WAIT_EVENT_HASH_BATCH_ELECT))
1472 388 : ExecParallelHashTableAlloc(hashtable, batchno);
1473 : pg_fallthrough;
1474 :
1475 : case PHJ_BATCH_ALLOCATE:
1476 : /* Wait for allocation to complete. */
1477 389 : BarrierArriveAndWait(batch_barrier,
1478 : WAIT_EVENT_HASH_BATCH_ALLOCATE);
1479 : pg_fallthrough;
1480 :
1481 406 : case PHJ_BATCH_LOAD:
1482 : /* Start (or join in) loading tuples. */
1483 406 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1484 406 : inner_tuples = hashtable->batches[batchno].inner_tuples;
1485 406 : sts_begin_parallel_scan(inner_tuples);
1486 721518 : while ((tuple = sts_parallel_scan_next(inner_tuples,
1487 : &hashvalue)))
1488 : {
1489 721112 : ExecForceStoreMinimalTuple(tuple,
1490 : hjstate->hj_HashTupleSlot,
1491 : false);
1492 721112 : slot = hjstate->hj_HashTupleSlot;
1493 721112 : ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
1494 : hashvalue);
1495 : }
1496 406 : sts_end_parallel_scan(inner_tuples);
1497 406 : BarrierArriveAndWait(batch_barrier,
1498 : WAIT_EVENT_HASH_BATCH_LOAD);
1499 : pg_fallthrough;
1500 :
1501 640 : case PHJ_BATCH_PROBE:
1502 :
1503 : /*
1504 : * This batch is ready to probe. Return control to
1505 : * caller. We stay attached to batch_barrier so that the
1506 : * hash table stays alive until everyone's finished
1507 : * probing it, but no participant is allowed to wait at
1508 : * this barrier again (or else a deadlock could occur).
1509 : * All attached participants must eventually detach from
1510 : * the barrier and one worker must advance the phase so
1511 : * that the final phase is reached.
1512 : */
1513 640 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1514 640 : sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1515 :
1516 640 : return true;
1517 0 : case PHJ_BATCH_SCAN:
1518 :
1519 : /*
1520 : * In principle, we could help scan for unmatched tuples,
1521 : * since that phase is already underway (the thing we
1522 : * can't do under current deadlock-avoidance rules is wait
1523 : * for others to arrive at PHJ_BATCH_SCAN, because
1524 : * PHJ_BATCH_PROBE emits tuples, but in this case we just
1525 : * got here without waiting). That is not yet done. For
1526 : * now, we just detach and go around again. We have to
1527 : * use ExecHashTableDetachBatch() because there's a small
1528 : * chance we'll be the last to detach, and then we're
1529 : * responsible for freeing memory.
1530 : */
1531 0 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1532 0 : hashtable->batches[batchno].done = true;
1533 0 : ExecHashTableDetachBatch(hashtable);
1534 0 : break;
1535 :
1536 552 : case PHJ_BATCH_FREE:
1537 :
1538 : /*
1539 : * Already done. Detach and go around again (if any
1540 : * remain).
1541 : */
1542 552 : BarrierDetach(batch_barrier);
1543 552 : hashtable->batches[batchno].done = true;
1544 552 : hashtable->curbatch = -1;
1545 552 : break;
1546 :
1547 0 : default:
1548 0 : elog(ERROR, "unexpected batch phase %d",
1549 : BarrierPhase(batch_barrier));
1550 : }
1551 : }
1552 1595 : batchno = (batchno + 1) % hashtable->nbatch;
1553 1595 : } while (batchno != start_batchno);
1554 :
1555 276 : return false;
1556 : }
1557 :
1558 : /*
1559 : * ExecHashJoinSaveTuple
1560 : * save a tuple to a batch file.
1561 : *
1562 : * The data recorded in the file for each tuple is its hash value,
1563 : * then the tuple in MinimalTuple format.
1564 : *
1565 : * fileptr points to a batch file in one of the hashtable arrays.
1566 : *
1567 : * The batch files (and their buffers) are allocated in the spill context
1568 : * created for the hashtable.
1569 : */
1570 : void
1571 3234058 : ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
1572 : BufFile **fileptr, HashJoinTable hashtable)
1573 : {
1574 3234058 : BufFile *file = *fileptr;
1575 :
1576 : /*
1577 : * The batch file is lazily created. If this is the first tuple written to
1578 : * this batch, the batch file is created and its buffer is allocated in
1579 : * the spillCxt context, NOT in the batchCxt.
1580 : *
1581 : * During the build phase, buffered files are created for inner batches.
1582 : * Each batch's buffered file is closed (and its buffer freed) after the
1583 : * batch is loaded into memory during the outer side scan. Therefore, it
1584 : * is necessary to allocate the batch file buffer in a memory context
1585 : * which outlives the batch itself.
1586 : *
1587 : * Also, we use spillCxt instead of hashCxt for a better accounting of the
1588 : * spilling memory consumption.
1589 : */
1590 3234058 : if (file == NULL)
1591 : {
1592 1622 : MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
1593 :
1594 1622 : file = BufFileCreateTemp(false);
1595 1622 : *fileptr = file;
1596 :
1597 1622 : MemoryContextSwitchTo(oldctx);
1598 : }
1599 :
1600 3234058 : BufFileWrite(file, &hashvalue, sizeof(uint32));
1601 3234058 : BufFileWrite(file, tuple, tuple->t_len);
1602 3234058 : }
1603 :
1604 : /*
1605 : * ExecHashJoinGetSavedTuple
1606 : * read the next tuple from a batch file. Return NULL if no more.
1607 : *
1608 : * On success, *hashvalue is set to the tuple's hash value, and the tuple
1609 : * itself is stored in the given slot.
1610 : */
1611 : static TupleTableSlot *
1612 3235680 : ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
1613 : BufFile *file,
1614 : uint32 *hashvalue,
1615 : TupleTableSlot *tupleSlot)
1616 : {
1617 : uint32 header[2];
1618 : size_t nread;
1619 : MinimalTuple tuple;
1620 :
1621 : /*
1622 : * We check for interrupts here because this is typically taken as an
1623 : * alternative code path to an ExecProcNode() call, which would include
1624 : * such a check.
1625 : */
1626 3235680 : CHECK_FOR_INTERRUPTS();
1627 :
1628 : /*
1629 : * Since both the hash value and the MinimalTuple length word are uint32,
1630 : * we can read them both in one BufFileRead() call without any type
1631 : * cheating.
1632 : */
1633 3235680 : nread = BufFileReadMaybeEOF(file, header, sizeof(header), true);
1634 3235680 : if (nread == 0) /* end of file */
1635 : {
1636 1622 : ExecClearTuple(tupleSlot);
1637 1622 : return NULL;
1638 : }
1639 3234058 : *hashvalue = header[0];
1640 3234058 : tuple = (MinimalTuple) palloc(header[1]);
1641 3234058 : tuple->t_len = header[1];
1642 3234058 : BufFileReadExact(file,
1643 : (char *) tuple + sizeof(uint32),
1644 3234058 : header[1] - sizeof(uint32));
1645 3234058 : ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
1646 3234058 : return tupleSlot;
1647 : }
1648 :
1649 :
1650 : void
1651 2245 : ExecReScanHashJoin(HashJoinState *node)
1652 : {
1653 2245 : PlanState *outerPlan = outerPlanState(node);
1654 2245 : PlanState *innerPlan = innerPlanState(node);
1655 :
1656 : /*
1657 : * We're always going to rescan the outer rel, so drop the associated
1658 : * null-keys tuplestore; we'll rebuild it during the rescan. (Must do
1659 : * this before ExecHashTableDestroy deletes hashCxt.)
1660 : */
1661 2245 : if (node->hj_NullOuterTupleStore)
1662 : {
1663 0 : tuplestore_end(node->hj_NullOuterTupleStore);
1664 0 : node->hj_NullOuterTupleStore = NULL;
1665 : }
1666 :
1667 : /*
1668 : * In a multi-batch join, we currently have to do rescans the hard way,
1669 : * primarily because batch temp files may have already been released. But
1670 : * if it's a single-batch join, and there is no parameter change for the
1671 : * inner subnode, then we can just re-use the existing hash table without
1672 : * rebuilding it.
1673 : */
1674 2245 : if (node->hj_HashTable != NULL)
1675 : {
1676 1858 : HashState *hashNode = castNode(HashState, innerPlan);
1677 :
1678 : Assert(hashNode->hashtable == node->hj_HashTable);
1679 :
1680 1858 : if (node->hj_HashTable->nbatch == 1 &&
1681 1858 : innerPlan->chgParam == NULL)
1682 : {
1683 : /*
1684 : * Okay to reuse the hash table; needn't rescan inner, either.
1685 : *
1686 : * However, if it's a right/right-anti/right-semi/full join, we'd
1687 : * better reset the inner-tuple match flags contained in the
1688 : * table.
1689 : */
1690 962 : if (HJ_FILL_INNER(node) || node->js.jointype == JOIN_RIGHT_SEMI)
1691 46 : ExecHashTableResetMatchFlags(node->hj_HashTable);
1692 :
1693 : /*
1694 : * Also, we need to reset our state about the emptiness of the
1695 : * outer relation, so that the new scan of the outer will update
1696 : * it correctly if it turns out to be empty this time. (There's no
1697 : * harm in clearing it now because ExecHashJoin won't need the
1698 : * info. In the other cases, where the hash table doesn't exist
1699 : * or we are destroying it, we leave this state alone because
1700 : * ExecHashJoin will need it the first time through.)
1701 : */
1702 962 : node->hj_OuterNotEmpty = false;
1703 :
1704 : /*
1705 : * Also, rewind inner null-key tuplestore so that we can return
1706 : * those tuples again.
1707 : */
1708 962 : if (hashNode->null_tuple_store)
1709 4 : tuplestore_rescan(hashNode->null_tuple_store);
1710 :
1711 : /* ExecHashJoin can skip the BUILD_HASHTABLE step */
1712 962 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
1713 : }
1714 : else
1715 : {
1716 : /* must destroy and rebuild hash table */
1717 :
1718 : /* accumulate stats from old hash table, if wanted */
1719 : /* (this should match ExecShutdownHash) */
1720 896 : if (hashNode->ps.instrument && !hashNode->hinstrument)
1721 0 : hashNode->hinstrument = palloc0_object(HashInstrumentation);
1722 896 : if (hashNode->hinstrument)
1723 0 : ExecHashAccumInstrumentation(hashNode->hinstrument,
1724 : hashNode->hashtable);
1725 :
1726 : /* free inner null-key tuplestore before ExecHashTableDestroy */
1727 896 : if (hashNode->null_tuple_store)
1728 : {
1729 0 : tuplestore_end(hashNode->null_tuple_store);
1730 0 : hashNode->null_tuple_store = NULL;
1731 : }
1732 :
1733 : /* for safety, be sure to clear child plan node's pointer too */
1734 896 : hashNode->hashtable = NULL;
1735 :
1736 896 : ExecHashTableDestroy(node->hj_HashTable);
1737 896 : node->hj_HashTable = NULL;
1738 896 : node->hj_JoinState = HJ_BUILD_HASHTABLE;
1739 :
1740 : /*
1741 : * if chgParam of subnode is not null then plan will be re-scanned
1742 : * by first ExecProcNode.
1743 : */
1744 896 : if (innerPlan->chgParam == NULL)
1745 0 : ExecReScan(innerPlan);
1746 : }
1747 : }
1748 :
1749 : /* Always reset intra-tuple state */
1750 2245 : node->hj_CurHashValue = 0;
1751 2245 : node->hj_CurBucketNo = 0;
1752 2245 : node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
1753 2245 : node->hj_CurTuple = NULL;
1754 :
1755 2245 : node->hj_MatchedOuter = false;
1756 2245 : node->hj_FirstOuterTupleSlot = NULL;
1757 :
1758 : /*
1759 : * if chgParam of subnode is not null then plan will be re-scanned by
1760 : * first ExecProcNode.
1761 : */
1762 2245 : if (outerPlan->chgParam == NULL)
1763 1485 : ExecReScan(outerPlan);
1764 2245 : }
1765 :
1766 : void
1767 24434 : ExecShutdownHashJoin(HashJoinState *node)
1768 : {
1769 24434 : if (node->hj_HashTable)
1770 : {
1771 : /*
1772 : * Detach from shared state before DSM memory goes away. This makes
1773 : * sure that we don't have any pointers into DSM memory by the time
1774 : * ExecEndHashJoin runs.
1775 : */
1776 18178 : ExecHashTableDetachBatch(node->hj_HashTable);
1777 18178 : ExecHashTableDetach(node->hj_HashTable);
1778 : }
1779 24434 : }
1780 :
1781 : static void
1782 121 : ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
1783 : {
1784 121 : PlanState *outerState = outerPlanState(hjstate);
1785 121 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1786 121 : HashJoinTable hashtable = hjstate->hj_HashTable;
1787 : TupleTableSlot *slot;
1788 : int i;
1789 :
1790 : Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1791 :
1792 : /* Execute outer plan, writing all tuples to shared tuplestores. */
1793 : for (;;)
1794 800052 : {
1795 : bool isnull;
1796 : uint32 hashvalue;
1797 :
1798 800173 : slot = ExecProcNode(outerState);
1799 800173 : if (TupIsNull(slot))
1800 : break;
1801 800052 : econtext->ecxt_outertuple = slot;
1802 :
1803 800052 : ResetExprContext(econtext);
1804 :
1805 800052 : hashvalue = DatumGetUInt32(ExecEvalExprSwitchContext(hjstate->hj_OuterHash,
1806 : econtext,
1807 : &isnull));
1808 :
1809 800052 : if (!isnull)
1810 : {
1811 : /* normal case with a non-null join key */
1812 : int batchno;
1813 : int bucketno;
1814 : bool shouldFree;
1815 800024 : MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1816 :
1817 800024 : ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1818 : &batchno);
1819 800024 : sts_puttuple(hashtable->batches[batchno].outer_tuples,
1820 : &hashvalue, mintup);
1821 :
1822 800024 : if (shouldFree)
1823 800024 : heap_free_minimal_tuple(mintup);
1824 : }
1825 28 : else if (hjstate->hj_KeepNullTuples)
1826 : {
1827 : /* null join key, but we must save tuple to be emitted later */
1828 4 : if (hjstate->hj_NullOuterTupleStore == NULL)
1829 4 : hjstate->hj_NullOuterTupleStore = ExecHashBuildNullTupleStore(hashtable);
1830 4 : tuplestore_puttupleslot(hjstate->hj_NullOuterTupleStore, slot);
1831 : }
1832 : /* else we can just discard the tuple immediately */
1833 :
1834 800052 : CHECK_FOR_INTERRUPTS();
1835 : }
1836 :
1837 : /* Make sure all outer partitions are readable by any backend. */
1838 1025 : for (i = 0; i < hashtable->nbatch; ++i)
1839 904 : sts_end_write(hashtable->batches[i].outer_tuples);
1840 121 : }
1841 :
1842 : void
1843 84 : ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
1844 : {
1845 84 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
1846 84 : shm_toc_estimate_keys(&pcxt->estimator, 1);
1847 84 : }
1848 :
1849 : void
1850 84 : ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1851 : {
1852 84 : int plan_node_id = state->js.ps.plan->plan_node_id;
1853 : HashState *hashNode;
1854 : ParallelHashJoinState *pstate;
1855 :
1856 : /*
1857 : * Disable shared hash table mode if we failed to create a real DSM
1858 : * segment, because that means that we don't have a DSA area to work with.
1859 : */
1860 84 : if (pcxt->seg == NULL)
1861 0 : return;
1862 :
1863 84 : ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1864 :
1865 : /*
1866 : * Set up the state needed to coordinate access to the shared hash
1867 : * table(s), using the plan node ID as the toc key.
1868 : */
1869 84 : pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1870 84 : shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1871 :
1872 : /*
1873 : * Set up the shared hash join state with no batches initially.
1874 : * ExecHashTableCreate() will prepare at least one later and set nbatch
1875 : * and space_allowed.
1876 : */
1877 84 : pstate->nbatch = 0;
1878 84 : pstate->space_allowed = 0;
1879 84 : pstate->batches = InvalidDsaPointer;
1880 84 : pstate->old_batches = InvalidDsaPointer;
1881 84 : pstate->nbuckets = 0;
1882 84 : pstate->growth = PHJ_GROWTH_OK;
1883 84 : pstate->chunk_work_queue = InvalidDsaPointer;
1884 84 : pg_atomic_init_u32(&pstate->distributor, 0);
1885 84 : pstate->nparticipants = pcxt->nworkers + 1;
1886 84 : pstate->total_tuples = 0;
1887 84 : LWLockInitialize(&pstate->lock,
1888 : LWTRANCHE_PARALLEL_HASH_JOIN);
1889 84 : BarrierInit(&pstate->build_barrier, 0);
1890 84 : BarrierInit(&pstate->grow_batches_barrier, 0);
1891 84 : BarrierInit(&pstate->grow_buckets_barrier, 0);
1892 :
1893 : /* Set up the space we'll use for shared temporary files. */
1894 84 : SharedFileSetInit(&pstate->fileset, pcxt->seg);
1895 :
1896 : /* Initialize the shared state in the hash node. */
1897 84 : hashNode = (HashState *) innerPlanState(state);
1898 84 : hashNode->parallel_state = pstate;
1899 : }
1900 :
1901 : /* ----------------------------------------------------------------
1902 : * ExecHashJoinReInitializeDSM
1903 : *
1904 : * Reset shared state before beginning a fresh scan.
1905 : * ----------------------------------------------------------------
1906 : */
1907 : void
1908 32 : ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1909 : {
1910 32 : int plan_node_id = state->js.ps.plan->plan_node_id;
1911 : ParallelHashJoinState *pstate;
1912 : HashState *hashNode;
1913 :
1914 : /* Nothing to do if we failed to create a DSM segment. */
1915 32 : if (pcxt->seg == NULL)
1916 0 : return;
1917 :
1918 32 : pstate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
1919 :
1920 : /*
1921 : * It would be possible to reuse the shared hash table in single-batch
1922 : * cases by resetting and then fast-forwarding build_barrier to
1923 : * PHJ_BUILD_FREE and batch 0's batch_barrier to PHJ_BATCH_PROBE, but
1924 : * currently shared hash tables are already freed by now (by the last
1925 : * participant to detach from the batch). We could consider keeping it
1926 : * around for single-batch joins. We'd also need to adjust
1927 : * finalize_plan() so that it doesn't record a dummy dependency for
1928 : * Parallel Hash nodes, preventing the rescan optimization. For now we
1929 : * don't try.
1930 : */
1931 :
1932 : /* Detach, freeing any remaining shared memory. */
1933 32 : if (state->hj_HashTable != NULL)
1934 : {
1935 0 : ExecHashTableDetachBatch(state->hj_HashTable);
1936 0 : ExecHashTableDetach(state->hj_HashTable);
1937 : }
1938 :
1939 : /* Clear any shared batch files. */
1940 32 : SharedFileSetDeleteAll(&pstate->fileset);
1941 :
1942 : /* We'd better clear our local null-key tuplestores, too. */
1943 32 : if (state->hj_NullOuterTupleStore)
1944 : {
1945 0 : tuplestore_end(state->hj_NullOuterTupleStore);
1946 0 : state->hj_NullOuterTupleStore = NULL;
1947 : }
1948 32 : hashNode = (HashState *) innerPlanState(state);
1949 32 : if (hashNode->null_tuple_store)
1950 : {
1951 0 : tuplestore_end(hashNode->null_tuple_store);
1952 0 : hashNode->null_tuple_store = NULL;
1953 : }
1954 :
1955 :
1956 : /* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */
1957 32 : BarrierInit(&pstate->build_barrier, 0);
1958 : }
1959 :
1960 : void
1961 212 : ExecHashJoinInitializeWorker(HashJoinState *state,
1962 : ParallelWorkerContext *pwcxt)
1963 : {
1964 : HashState *hashNode;
1965 212 : int plan_node_id = state->js.ps.plan->plan_node_id;
1966 : ParallelHashJoinState *pstate =
1967 212 : shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1968 :
1969 : /* Attach to the space for shared temporary files. */
1970 212 : SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1971 :
1972 : /* Attach to the shared state in the hash node. */
1973 212 : hashNode = (HashState *) innerPlanState(state);
1974 212 : hashNode->parallel_state = pstate;
1975 :
1976 212 : ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1977 212 : }
|