Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeHashjoin.c
4 : * Routines to handle hash join nodes
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeHashjoin.c
12 : *
13 : * HASH JOIN
14 : *
15 : * This is based on the "hybrid hash join" algorithm described shortly in the
16 : * following page
17 : *
18 : * https://en.wikipedia.org/wiki/Hash_join#Hybrid_hash_join
19 : *
20 : * and in detail in the referenced paper:
21 : *
22 : * "An Adaptive Hash Join Algorithm for Multiuser Environments"
23 : * Hansjörg Zeller; Jim Gray (1990). Proceedings of the 16th VLDB conference.
24 : * Brisbane: 186–197.
25 : *
26 : * If the inner side tuples of a hash join do not fit in memory, the hash join
27 : * can be executed in multiple batches.
28 : *
29 : * If the statistics on the inner side relation are accurate, planner chooses a
30 : * multi-batch strategy and estimates the number of batches.
31 : *
32 : * The query executor measures the real size of the hashtable and increases the
33 : * number of batches if the hashtable grows too large.
34 : *
35 : * The number of batches is always a power of two, so an increase in the number
36 : * of batches doubles it.
37 : *
38 : * Serial hash join measures batch size lazily -- waiting until it is loading a
39 : * batch to determine if it will fit in memory. While inserting tuples into the
40 : * hashtable, serial hash join will, if that tuple were to exceed work_mem,
41 : * dump out the hashtable and reassign them either to other batch files or the
42 : * current batch resident in the hashtable.
43 : *
44 : * Parallel hash join, on the other hand, completes all changes to the number
45 : * of batches during the build phase. If it increases the number of batches, it
46 : * dumps out all the tuples from all batches and reassigns them to entirely new
47 : * batch files. Then it checks every batch to ensure it will fit in the space
48 : * budget for the query.
49 : *
50 : * In both parallel and serial hash join, the executor currently makes a best
51 : * effort. If a particular batch will not fit in memory, it tries doubling the
52 : * number of batches. If after a batch increase, there is a batch which
53 : * retained all or none of its tuples, the executor disables growth in the
54 : * number of batches globally. After growth is disabled, all batches that would
55 : * have previously triggered an increase in the number of batches instead
56 : * exceed the space allowed.
57 : *
58 : * PARALLELISM
59 : *
60 : * Hash joins can participate in parallel query execution in several ways. A
61 : * parallel-oblivious hash join is one where the node is unaware that it is
62 : * part of a parallel plan. In this case, a copy of the inner plan is used to
63 : * build a copy of the hash table in every backend, and the outer plan could
64 : * either be built from a partial or complete path, so that the results of the
65 : * hash join are correspondingly either partial or complete. A parallel-aware
66 : * hash join is one that behaves differently, coordinating work between
67 : * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
68 : * Hash Join always appears with a Parallel Hash node.
69 : *
70 : * Parallel-aware hash joins use the same per-backend state machine to track
71 : * progress through the hash join algorithm as parallel-oblivious hash joins.
72 : * In a parallel-aware hash join, there is also a shared state machine that
73 : * co-operating backends use to synchronize their local state machines and
74 : * program counters. The shared state machine is managed with a Barrier IPC
75 : * primitive. When all attached participants arrive at a barrier, the phase
76 : * advances and all waiting participants are released.
77 : *
78 : * When a participant begins working on a parallel hash join, it must first
79 : * figure out how much progress has already been made, because participants
80 : * don't wait for each other to begin. For this reason there are switch
81 : * statements at key points in the code where we have to synchronize our local
82 : * state machine with the phase, and then jump to the correct part of the
83 : * algorithm so that we can get started.
84 : *
85 : * One barrier called build_barrier is used to coordinate the hashing phases.
86 : * The phase is represented by an integer which begins at zero and increments
87 : * one by one, but in the code it is referred to by symbolic names as follows.
88 : * An asterisk indicates a phase that is performed by a single arbitrarily
89 : * chosen process.
90 : *
91 : * PHJ_BUILD_ELECT -- initial state
92 : * PHJ_BUILD_ALLOCATE* -- one sets up the batches and table 0
93 : * PHJ_BUILD_HASH_INNER -- all hash the inner rel
94 : * PHJ_BUILD_HASH_OUTER -- (multi-batch only) all hash the outer
95 : * PHJ_BUILD_RUN -- building done, probing can begin
96 : * PHJ_BUILD_FREE* -- all work complete, one frees batches
97 : *
98 : * While in the phase PHJ_BUILD_HASH_INNER a separate pair of barriers may
99 : * be used repeatedly as required to coordinate expansions in the number of
100 : * batches or buckets. Their phases are as follows:
101 : *
102 : * PHJ_GROW_BATCHES_ELECT -- initial state
103 : * PHJ_GROW_BATCHES_REALLOCATE* -- one allocates new batches
104 : * PHJ_GROW_BATCHES_REPARTITION -- all repartition
105 : * PHJ_GROW_BATCHES_DECIDE* -- one detects skew and cleans up
106 : * PHJ_GROW_BATCHES_FINISH -- finished one growth cycle
107 : *
108 : * PHJ_GROW_BUCKETS_ELECT -- initial state
109 : * PHJ_GROW_BUCKETS_REALLOCATE* -- one allocates new buckets
110 : * PHJ_GROW_BUCKETS_REINSERT -- all insert tuples
111 : *
112 : * If the planner got the number of batches and buckets right, those won't be
113 : * necessary, but on the other hand we might finish up needing to expand the
114 : * buckets or batches multiple times while hashing the inner relation to stay
115 : * within our memory budget and load factor target. For that reason it's a
116 : * separate pair of barriers using circular phases.
117 : *
118 : * The PHJ_BUILD_HASH_OUTER phase is required only for multi-batch joins,
119 : * because we need to divide the outer relation into batches up front in order
120 : * to be able to process batches entirely independently. In contrast, the
121 : * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
122 : * batches whenever it encounters them while scanning and probing, which it
123 : * can do because it processes batches in serial order.
124 : *
125 : * Once PHJ_BUILD_RUN is reached, backends then split up and process
126 : * different batches, or gang up and work together on probing batches if there
127 : * aren't enough to go around. For each batch there is a separate barrier
128 : * with the following phases:
129 : *
130 : * PHJ_BATCH_ELECT -- initial state
131 : * PHJ_BATCH_ALLOCATE* -- one allocates buckets
132 : * PHJ_BATCH_LOAD -- all load the hash table from disk
133 : * PHJ_BATCH_PROBE -- all probe
134 : * PHJ_BATCH_SCAN* -- one does right/right-anti/full unmatched scan
135 : * PHJ_BATCH_FREE* -- one frees memory
136 : *
137 : * Batch 0 is a special case, because it starts out in phase
138 : * PHJ_BATCH_PROBE; populating batch 0's hash table is done during
139 : * PHJ_BUILD_HASH_INNER so we can skip loading.
140 : *
141 : * Initially we try to plan for a single-batch hash join using the combined
142 : * hash_mem of all participants to create a large shared hash table. If that
143 : * turns out either at planning or execution time to be impossible then we
144 : * fall back to regular hash_mem sized hash tables.
145 : *
146 : * To avoid deadlocks, we never wait for any barrier unless it is known that
147 : * all other backends attached to it are actively executing the node or have
148 : * finished. Practically, that means that we never emit a tuple while attached
149 : * to a barrier, unless the barrier has reached a phase that means that no
150 : * process will wait on it again. We emit tuples while attached to the build
151 : * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
152 : * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
153 : * respectively without waiting, using BarrierArriveAndDetach() and
154 : * BarrierArriveAndDetachExceptLast() respectively. The last to detach
155 : * receives a different return value so that it knows that it's safe to
156 : * clean up. Any straggler process that attaches after that phase is reached
157 : * will see that it's too late to participate or access the relevant shared
158 : * memory objects.
159 : *
160 : *-------------------------------------------------------------------------
161 : */
162 :
163 : #include "postgres.h"
164 :
165 : #include "access/htup_details.h"
166 : #include "access/parallel.h"
167 : #include "executor/executor.h"
168 : #include "executor/hashjoin.h"
169 : #include "executor/instrument.h"
170 : #include "executor/nodeHash.h"
171 : #include "executor/nodeHashjoin.h"
172 : #include "miscadmin.h"
173 : #include "utils/lsyscache.h"
174 : #include "utils/sharedtuplestore.h"
175 : #include "utils/tuplestore.h"
176 : #include "utils/wait_event.h"
177 :
178 :
179 : /*
180 : * States of the ExecHashJoin state machine
181 : */
182 : #define HJ_BUILD_HASHTABLE 1
183 : #define HJ_NEED_NEW_OUTER 2
184 : #define HJ_SCAN_BUCKET 3
185 : #define HJ_FILL_OUTER_TUPLE 4
186 : #define HJ_FILL_INNER_TUPLES 5
187 : #define HJ_FILL_OUTER_NULL_TUPLES 6
188 : #define HJ_FILL_INNER_NULL_TUPLES 7
189 : #define HJ_NEED_NEW_BATCH 8
190 :
191 : /* Returns true if doing null-fill on outer relation */
192 : #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
193 : /* Returns true if doing null-fill on inner relation */
194 : #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
195 :
196 : static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
197 : HashJoinState *hjstate,
198 : uint32 *hashvalue);
199 : static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
200 : HashJoinState *hjstate,
201 : uint32 *hashvalue);
202 : static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
203 : BufFile *file,
204 : uint32 *hashvalue,
205 : TupleTableSlot *tupleSlot);
206 : static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
207 : static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
208 : static void ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate);
209 :
210 :
211 : /* ----------------------------------------------------------------
212 : * ExecHashJoinImpl
213 : *
214 : * This function implements the Hybrid Hashjoin algorithm. It is marked
215 : * with an always-inline attribute so that ExecHashJoin() and
216 : * ExecParallelHashJoin() can inline it. Compilers that respect the
217 : * attribute should create versions specialized for parallel == true and
218 : * parallel == false with unnecessary branches removed.
219 : *
220 : * Note: the relation we build hash table on is the "inner"
221 : * the other one is "outer".
222 : * ----------------------------------------------------------------
223 : */
224 : static pg_attribute_always_inline TupleTableSlot *
225 8296124 : ExecHashJoinImpl(PlanState *pstate, bool parallel)
226 : {
227 8296124 : HashJoinState *node = castNode(HashJoinState, pstate);
228 : PlanState *outerNode;
229 : HashState *hashNode;
230 : ExprState *joinqual;
231 : ExprState *otherqual;
232 : ExprContext *econtext;
233 : HashJoinTable hashtable;
234 : TupleTableSlot *outerTupleSlot;
235 : uint32 hashvalue;
236 : int batchno;
237 : ParallelHashJoinState *parallel_state;
238 :
239 : /*
240 : * get information from HashJoin node
241 : */
242 8296124 : joinqual = node->js.joinqual;
243 8296124 : otherqual = node->js.ps.qual;
244 8296124 : hashNode = (HashState *) innerPlanState(node);
245 8296124 : outerNode = outerPlanState(node);
246 8296124 : hashtable = node->hj_HashTable;
247 8296124 : econtext = node->js.ps.ps_ExprContext;
248 8296124 : parallel_state = hashNode->parallel_state;
249 :
250 : /*
251 : * Reset per-tuple memory context to free any expression evaluation
252 : * storage allocated in the previous tuple cycle.
253 : */
254 8296124 : ResetExprContext(econtext);
255 :
256 : /*
257 : * run the hash join state machine
258 : */
259 : for (;;)
260 : {
261 : /*
262 : * It's possible to iterate this loop many times before returning a
263 : * tuple, in some pathological cases such as needing to move much of
264 : * the current batch to a later batch. So let's check for interrupts
265 : * each time through.
266 : */
267 27968802 : CHECK_FOR_INTERRUPTS();
268 :
269 27968802 : switch (node->hj_JoinState)
270 : {
271 21940 : case HJ_BUILD_HASHTABLE:
272 :
273 : /*
274 : * First time through: build hash table for inner relation.
275 : */
276 : Assert(hashtable == NULL);
277 :
278 : /*
279 : * If the outer relation is completely empty, and it's not
280 : * right/right-anti/full join, we can quit without building
281 : * the hash table. However, for an inner join it is only a
282 : * win to check this when the outer relation's startup cost is
283 : * less than the projected cost of building the hash table.
284 : * Otherwise it's best to build the hash table first and see
285 : * if the inner relation is empty. (When it's a left join, we
286 : * should always make this check, since we aren't going to be
287 : * able to skip the join on the strength of an empty inner
288 : * relation anyway.)
289 : *
290 : * If we are rescanning the join, we make use of information
291 : * gained on the previous scan: don't bother to try the
292 : * prefetch if the previous scan found the outer relation
293 : * nonempty. This is not 100% reliable since with new
294 : * parameters the outer relation might yield different
295 : * results, but it's a good heuristic.
296 : *
297 : * The only way to make the check is to try to fetch a tuple
298 : * from the outer plan node. If we succeed, we have to stash
299 : * it away for later consumption by ExecHashJoinOuterGetTuple.
300 : */
301 21940 : if (HJ_FILL_INNER(node))
302 : {
303 : /* no chance to not build the hash table */
304 3262 : node->hj_FirstOuterTupleSlot = NULL;
305 : }
306 18678 : else if (parallel)
307 : {
308 : /*
309 : * The empty-outer optimization is not implemented for
310 : * shared hash tables, because no one participant can
311 : * determine that there are no outer tuples, and it's not
312 : * yet clear that it's worth the synchronization overhead
313 : * of reaching consensus to figure that out. So we have
314 : * to build the hash table.
315 : */
316 228 : node->hj_FirstOuterTupleSlot = NULL;
317 : }
318 18450 : else if (HJ_FILL_OUTER(node) ||
319 14442 : (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
320 12728 : !node->hj_OuterNotEmpty))
321 : {
322 15969 : node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
323 15969 : if (TupIsNull(node->hj_FirstOuterTupleSlot))
324 : {
325 3011 : node->hj_OuterNotEmpty = false;
326 3011 : return NULL;
327 : }
328 : else
329 12958 : node->hj_OuterNotEmpty = true;
330 : }
331 : else
332 2481 : node->hj_FirstOuterTupleSlot = NULL;
333 :
334 : /*
335 : * Create the hash table. If using Parallel Hash, then
336 : * whoever gets here first will create the hash table and any
337 : * later arrivals will merely attach to it.
338 : */
339 18929 : hashtable = ExecHashTableCreate(hashNode);
340 18929 : node->hj_HashTable = hashtable;
341 :
342 : /*
343 : * Execute the Hash node, to build the hash table. If using
344 : * Parallel Hash, then we'll try to help hashing unless we
345 : * arrived too late.
346 : */
347 18929 : hashNode->hashtable = hashtable;
348 18929 : (void) MultiExecProcNode((PlanState *) hashNode);
349 :
350 : /*
351 : * If the inner relation is completely empty, and we're not
352 : * doing a left outer join, we can quit without scanning the
353 : * outer relation. (If the inner relation contains only
354 : * null-keyed tuples that we need to emit, we'll fall through
355 : * and do the outer-relation scan. In principle we could go
356 : * emit those tuples then quit, but it would complicate the
357 : * state machine logic. The case seems rare enough to not be
358 : * worth optimizing.)
359 : */
360 18929 : if (hashtable->totalTuples == 0 &&
361 3801 : hashNode->null_tuple_store == NULL &&
362 3773 : !HJ_FILL_OUTER(node))
363 : {
364 3507 : if (parallel)
365 : {
366 : /*
367 : * Advance the build barrier to PHJ_BUILD_RUN before
368 : * proceeding so we can negotiate resource cleanup.
369 : */
370 0 : Barrier *build_barrier = ¶llel_state->build_barrier;
371 :
372 0 : while (BarrierPhase(build_barrier) < PHJ_BUILD_RUN)
373 0 : BarrierArriveAndWait(build_barrier, 0);
374 : }
375 3507 : return NULL;
376 : }
377 :
378 : /*
379 : * need to remember whether nbatch has increased since we
380 : * began scanning the outer relation
381 : */
382 15422 : hashtable->nbatch_outstart = hashtable->nbatch;
383 :
384 : /*
385 : * Reset OuterNotEmpty for scan. (It's OK if we fetched a
386 : * tuple above, because ExecHashJoinOuterGetTuple will
387 : * immediately set it again.)
388 : */
389 15422 : node->hj_OuterNotEmpty = false;
390 :
391 15422 : if (parallel)
392 276 : {
393 : Barrier *build_barrier;
394 :
395 276 : build_barrier = ¶llel_state->build_barrier;
396 : Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
397 : BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
398 : BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
399 276 : if (BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER)
400 : {
401 : /*
402 : * If multi-batch, we need to hash the outer relation
403 : * up front.
404 : */
405 228 : if (hashtable->nbatch > 1)
406 123 : ExecParallelHashJoinPartitionOuter(node);
407 228 : BarrierArriveAndWait(build_barrier,
408 : WAIT_EVENT_HASH_BUILD_HASH_OUTER);
409 : Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUN);
410 : }
411 :
412 : /*
413 : * Each backend should now select a batch to work on.
414 : * However, if we've already collected some null-keyed
415 : * tuples, dump them first. (That is critical when we
416 : * arrive late enough that no more batches are available;
417 : * otherwise we'd fail to dump those tuples at all.)
418 : */
419 276 : hashtable->curbatch = -1;
420 :
421 276 : if (node->hj_NullOuterTupleStore)
422 4 : node->hj_JoinState = HJ_FILL_OUTER_NULL_TUPLES;
423 272 : else if (hashNode->null_tuple_store)
424 10 : node->hj_JoinState = HJ_FILL_INNER_NULL_TUPLES;
425 : else
426 262 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
427 :
428 276 : continue;
429 : }
430 : else
431 15146 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
432 :
433 : pg_fallthrough;
434 :
435 13654378 : case HJ_NEED_NEW_OUTER:
436 :
437 : /*
438 : * We don't have an outer tuple, try to get the next one
439 : */
440 13654378 : if (parallel)
441 : outerTupleSlot =
442 1444662 : ExecParallelHashJoinOuterGetTuple(outerNode, node,
443 : &hashvalue);
444 : else
445 : outerTupleSlot =
446 12209716 : ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
447 :
448 13654378 : if (TupIsNull(outerTupleSlot))
449 : {
450 : /* end of batch, or maybe whole join */
451 17395 : if (HJ_FILL_INNER(node))
452 : {
453 : /* set up to scan for unmatched inner tuples */
454 2575 : if (parallel)
455 : {
456 : /*
457 : * Only one process is currently allowed to handle
458 : * each batch's unmatched tuples, in a parallel
459 : * join. However, each process must deal with any
460 : * null-keyed tuples it found.
461 : */
462 70 : if (ExecParallelPrepHashTableForUnmatched(node))
463 44 : node->hj_JoinState = HJ_FILL_INNER_TUPLES;
464 26 : else if (node->hj_NullOuterTupleStore)
465 5 : node->hj_JoinState = HJ_FILL_OUTER_NULL_TUPLES;
466 21 : else if (hashNode->null_tuple_store)
467 10 : node->hj_JoinState = HJ_FILL_INNER_NULL_TUPLES;
468 : else
469 11 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
470 : }
471 : else
472 : {
473 2505 : ExecPrepHashTableForUnmatched(node);
474 2505 : node->hj_JoinState = HJ_FILL_INNER_TUPLES;
475 : }
476 : }
477 : else
478 : {
479 : /* might have outer null-keyed tuples to fill */
480 : Assert(hashNode->null_tuple_store == NULL);
481 14820 : if (node->hj_NullOuterTupleStore)
482 33 : node->hj_JoinState = HJ_FILL_OUTER_NULL_TUPLES;
483 : else
484 14787 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
485 : }
486 17395 : continue;
487 : }
488 :
489 13636983 : econtext->ecxt_outertuple = outerTupleSlot;
490 13636983 : node->hj_MatchedOuter = false;
491 :
492 : /*
493 : * Find the corresponding bucket for this tuple in the main
494 : * hash table or skew hash table.
495 : */
496 13636983 : node->hj_CurHashValue = hashvalue;
497 13636983 : ExecHashGetBucketAndBatch(hashtable, hashvalue,
498 : &node->hj_CurBucketNo, &batchno);
499 13636983 : node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
500 : hashvalue);
501 13636983 : node->hj_CurTuple = NULL;
502 :
503 : /*
504 : * The tuple might not belong to the current batch (where
505 : * "current batch" includes the skew buckets if any).
506 : */
507 13636983 : if (batchno != hashtable->curbatch &&
508 959656 : node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
509 958856 : {
510 : bool shouldFree;
511 958856 : MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
512 : &shouldFree);
513 :
514 : /*
515 : * Need to postpone this outer tuple to a later batch.
516 : * Save it in the corresponding outer-batch file.
517 : */
518 : Assert(parallel_state == NULL);
519 : Assert(batchno > hashtable->curbatch);
520 958856 : ExecHashJoinSaveTuple(mintuple, hashvalue,
521 958856 : &hashtable->outerBatchFile[batchno],
522 : hashtable);
523 :
524 958856 : if (shouldFree)
525 958856 : heap_free_minimal_tuple(mintuple);
526 :
527 : /* Loop around, staying in HJ_NEED_NEW_OUTER state */
528 958856 : continue;
529 : }
530 :
531 : /* OK, let's scan the bucket for matches */
532 12678127 : node->hj_JoinState = HJ_SCAN_BUCKET;
533 :
534 : pg_fallthrough;
535 :
536 17513056 : case HJ_SCAN_BUCKET:
537 :
538 : /*
539 : * Scan the selected hash bucket for matches to current outer
540 : */
541 17513056 : if (parallel)
542 : {
543 2804080 : if (!ExecParallelScanHashBucket(node, econtext))
544 : {
545 : /* out of matches; check for possible outer-join fill */
546 1444028 : node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
547 1444028 : continue;
548 : }
549 : }
550 : else
551 : {
552 14708976 : if (!ExecScanHashBucket(node, econtext))
553 : {
554 : /* out of matches; check for possible outer-join fill */
555 7678891 : node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
556 7678891 : continue;
557 : }
558 : }
559 :
560 : /*
561 : * In a right-semijoin, we only need the first match for each
562 : * inner tuple.
563 : */
564 8390441 : if (node->js.jointype == JOIN_RIGHT_SEMI &&
565 304 : HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
566 48 : continue;
567 :
568 : /*
569 : * We've got a match, but still need to test non-hashed quals.
570 : * ExecScanHashBucket already set up all the state needed to
571 : * call ExecQual.
572 : *
573 : * If we pass the qual, then save state for next call and have
574 : * ExecProject form the projection, store it in the tuple
575 : * table, and return the slot.
576 : *
577 : * Only the joinquals determine tuple match status, but all
578 : * quals must pass to actually return the tuple.
579 : */
580 8390089 : if (joinqual == NULL || ExecQual(joinqual, econtext))
581 : {
582 8279777 : node->hj_MatchedOuter = true;
583 :
584 : /*
585 : * This is really only needed if HJ_FILL_INNER(node) or if
586 : * we are in a right-semijoin, but we'll avoid the branch
587 : * and just set it always.
588 : */
589 8279777 : if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
590 4028365 : HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
591 :
592 : /* In an antijoin, we never return a matched tuple */
593 8279777 : if (node->js.jointype == JOIN_ANTI)
594 : {
595 873733 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
596 873733 : continue;
597 : }
598 :
599 : /*
600 : * If we only need to consider the first matching inner
601 : * tuple, then advance to next outer tuple after we've
602 : * processed this one.
603 : */
604 7406044 : if (node->js.single_match)
605 2681397 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
606 :
607 : /*
608 : * In a right-antijoin, we never return a matched tuple.
609 : * If it's not an inner_unique join, we need to stay on
610 : * the current outer tuple to continue scanning the inner
611 : * side for matches.
612 : */
613 7406044 : if (node->js.jointype == JOIN_RIGHT_ANTI)
614 18009 : continue;
615 :
616 7388035 : if (otherqual == NULL || ExecQual(otherqual, econtext))
617 7261091 : return ExecProject(node->js.ps.ps_ProjInfo);
618 : else
619 126944 : InstrCountFiltered2(node, 1);
620 : }
621 : else
622 110312 : InstrCountFiltered1(node, 1);
623 237256 : break;
624 :
625 9122919 : case HJ_FILL_OUTER_TUPLE:
626 :
627 : /*
628 : * The current outer tuple has run out of matches, so check
629 : * whether to emit a dummy outer-join tuple. Whether we emit
630 : * one or not, the next state is NEED_NEW_OUTER.
631 : */
632 9122919 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
633 :
634 9122919 : if (!node->hj_MatchedOuter &&
635 5802891 : HJ_FILL_OUTER(node))
636 : {
637 : /*
638 : * Generate a fake join tuple with nulls for the inner
639 : * tuple, and return it if it passes the non-join quals.
640 : */
641 1610760 : econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
642 :
643 1610760 : if (otherqual == NULL || ExecQual(otherqual, econtext))
644 687990 : return ExecProject(node->js.ps.ps_ProjInfo);
645 : else
646 922770 : InstrCountFiltered2(node, 1);
647 : }
648 8434929 : break;
649 :
650 331752 : case HJ_FILL_INNER_TUPLES:
651 :
652 : /*
653 : * We have finished a batch, but we are doing
654 : * right/right-anti/full join, so any unmatched inner tuples
655 : * in the hashtable have to be emitted before we continue to
656 : * the next batch.
657 : */
658 583456 : if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
659 251704 : : ExecScanHashTableForUnmatched(node, econtext)))
660 : {
661 : /* no more unmatched tuples, but maybe there are nulls */
662 2545 : if (node->hj_NullOuterTupleStore)
663 44 : node->hj_JoinState = HJ_FILL_OUTER_NULL_TUPLES;
664 2501 : else if (hashNode->null_tuple_store)
665 50 : node->hj_JoinState = HJ_FILL_INNER_NULL_TUPLES;
666 : else
667 2451 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
668 2545 : continue;
669 : }
670 :
671 : /*
672 : * Generate a fake join tuple with nulls for the outer tuple,
673 : * and return it if it passes the non-join quals.
674 : */
675 329207 : econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
676 :
677 329207 : if (otherqual == NULL || ExecQual(otherqual, econtext))
678 324128 : return ExecProject(node->js.ps.ps_ProjInfo);
679 : else
680 5079 : InstrCountFiltered2(node, 1);
681 5079 : break;
682 :
683 207 : case HJ_FILL_OUTER_NULL_TUPLES:
684 :
685 : /*
686 : * We have finished a batch, but we are doing left/full join,
687 : * so any null-keyed outer tuples have to be emitted before we
688 : * continue to the next batch.
689 : *
690 : * (We could delay this till the end of the join, but there
691 : * seems little percentage in that.)
692 : *
693 : * We have to use tuplestore_gettupleslot_force because
694 : * hj_OuterTupleSlot may not be able to store a MinimalTuple.
695 : */
696 426 : while (tuplestore_gettupleslot_force(node->hj_NullOuterTupleStore,
697 : true, false,
698 : node->hj_OuterTupleSlot))
699 : {
700 : /*
701 : * Generate a fake join tuple with nulls for the inner
702 : * tuple, and return it if it passes the non-join quals.
703 : */
704 133 : econtext->ecxt_outertuple = node->hj_OuterTupleSlot;
705 133 : econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
706 :
707 133 : if (otherqual == NULL || ExecQual(otherqual, econtext))
708 121 : return ExecProject(node->js.ps.ps_ProjInfo);
709 : else
710 12 : InstrCountFiltered2(node, 1);
711 :
712 12 : ResetExprContext(econtext);
713 :
714 : /* allow this loop to be cancellable */
715 12 : CHECK_FOR_INTERRUPTS();
716 : }
717 :
718 : /* We don't need the tuplestore any more, so discard it. */
719 86 : tuplestore_end(node->hj_NullOuterTupleStore);
720 86 : node->hj_NullOuterTupleStore = NULL;
721 :
722 : /* Fill inner tuples too if it's a full join, else advance. */
723 86 : if (hashNode->null_tuple_store)
724 26 : node->hj_JoinState = HJ_FILL_INNER_NULL_TUPLES;
725 : else
726 60 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
727 86 : break;
728 :
729 160 : case HJ_FILL_INNER_NULL_TUPLES:
730 :
731 : /*
732 : * We have finished a batch, but we are doing
733 : * right/right-anti/full join, so any null-keyed inner tuples
734 : * have to be emitted before we continue to the next batch.
735 : *
736 : * (We could delay this till the end of the join, but there
737 : * seems little percentage in that.)
738 : */
739 324 : while (tuplestore_gettupleslot(hashNode->null_tuple_store,
740 : true, false,
741 : node->hj_HashTupleSlot))
742 : {
743 : /*
744 : * Generate a fake join tuple with nulls for the outer
745 : * tuple, and return it if it passes the non-join quals.
746 : */
747 72 : econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
748 72 : econtext->ecxt_innertuple = node->hj_HashTupleSlot;
749 :
750 72 : if (otherqual == NULL || ExecQual(otherqual, econtext))
751 68 : return ExecProject(node->js.ps.ps_ProjInfo);
752 : else
753 4 : InstrCountFiltered2(node, 1);
754 :
755 4 : ResetExprContext(econtext);
756 :
757 : /* allow this loop to be cancellable */
758 4 : CHECK_FOR_INTERRUPTS();
759 : }
760 :
761 : /*
762 : * Ideally we'd discard the tuplestore now, but we can't
763 : * because we might need it for rescans.
764 : */
765 :
766 : /* Now we can advance to the next batch. */
767 92 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
768 92 : break;
769 :
770 17663 : case HJ_NEED_NEW_BATCH:
771 :
772 : /*
773 : * Try to advance to next batch. Done if there are no more.
774 : */
775 17663 : if (parallel)
776 : {
777 910 : if (!ExecParallelHashJoinNewBatch(node))
778 276 : return NULL; /* end of parallel-aware join */
779 : }
780 : else
781 : {
782 16753 : if (!ExecHashJoinNewBatch(node))
783 15932 : return NULL; /* end of parallel-oblivious join */
784 : }
785 1455 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
786 1455 : break;
787 :
788 0 : default:
789 0 : elog(ERROR, "unrecognized hashjoin state: %d",
790 : (int) node->hj_JoinState);
791 : }
792 : }
793 : }
794 :
795 : /* ----------------------------------------------------------------
796 : * ExecHashJoin
797 : *
798 : * Parallel-oblivious version.
799 : * ----------------------------------------------------------------
800 : */
801 : static TupleTableSlot * /* return: a tuple or NULL */
802 6775756 : ExecHashJoin(PlanState *pstate)
803 : {
804 : /*
805 : * On sufficiently smart compilers this should be inlined with the
806 : * parallel-aware branches removed.
807 : */
808 6775756 : return ExecHashJoinImpl(pstate, false);
809 : }
810 :
811 : /* ----------------------------------------------------------------
812 : * ExecParallelHashJoin
813 : *
814 : * Parallel-aware version.
815 : * ----------------------------------------------------------------
816 : */
817 : static TupleTableSlot * /* return: a tuple or NULL */
818 1520368 : ExecParallelHashJoin(PlanState *pstate)
819 : {
820 : /*
821 : * On sufficiently smart compilers this should be inlined with the
822 : * parallel-oblivious branches removed.
823 : */
824 1520368 : return ExecHashJoinImpl(pstate, true);
825 : }
826 :
827 : /* ----------------------------------------------------------------
828 : * ExecInitHashJoin
829 : *
830 : * Init routine for HashJoin node.
831 : * ----------------------------------------------------------------
832 : */
833 : HashJoinState *
834 26983 : ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
835 : {
836 : HashJoinState *hjstate;
837 : Plan *outerNode;
838 : Hash *hashNode;
839 : TupleDesc outerDesc,
840 : innerDesc;
841 : const TupleTableSlotOps *ops;
842 :
843 : /* check for unsupported flags */
844 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
845 :
846 : /*
847 : * create state structure
848 : */
849 26983 : hjstate = makeNode(HashJoinState);
850 26983 : hjstate->js.ps.plan = (Plan *) node;
851 26983 : hjstate->js.ps.state = estate;
852 :
853 : /*
854 : * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
855 : * where this function may be replaced with a parallel version, if we
856 : * managed to launch a parallel query.
857 : */
858 26983 : hjstate->js.ps.ExecProcNode = ExecHashJoin;
859 26983 : hjstate->js.jointype = node->join.jointype;
860 :
861 : /*
862 : * Miscellaneous initialization
863 : *
864 : * create expression context for node
865 : */
866 26983 : ExecAssignExprContext(estate, &hjstate->js.ps);
867 :
868 : /*
869 : * initialize child nodes
870 : *
871 : * Note: we could suppress the REWIND flag for the inner input, which
872 : * would amount to betting that the hash will be a single batch. Not
873 : * clear if this would be a win or not.
874 : */
875 26983 : outerNode = outerPlan(node);
876 26983 : hashNode = (Hash *) innerPlan(node);
877 :
878 26983 : outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
879 26983 : outerDesc = ExecGetResultType(outerPlanState(hjstate));
880 26983 : innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
881 26983 : innerDesc = ExecGetResultType(innerPlanState(hjstate));
882 :
883 : /*
884 : * Initialize result slot, type and projection.
885 : */
886 26983 : ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
887 26983 : ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
888 :
889 : /*
890 : * tuple table initialization
891 : */
892 26983 : ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
893 26983 : hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
894 : ops);
895 :
896 : /*
897 : * detect whether we need only consider the first matching inner tuple
898 : */
899 37944 : hjstate->js.single_match = (node->join.inner_unique ||
900 10961 : node->join.jointype == JOIN_SEMI);
901 :
902 : /* set up null tuples for outer joins, if needed */
903 26983 : switch (node->join.jointype)
904 : {
905 18421 : case JOIN_INNER:
906 : case JOIN_SEMI:
907 : case JOIN_RIGHT_SEMI:
908 18421 : break;
909 4422 : case JOIN_LEFT:
910 : case JOIN_ANTI:
911 4422 : hjstate->hj_NullInnerTupleSlot =
912 4422 : ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
913 4422 : break;
914 3414 : case JOIN_RIGHT:
915 : case JOIN_RIGHT_ANTI:
916 3414 : hjstate->hj_NullOuterTupleSlot =
917 3414 : ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
918 3414 : break;
919 726 : case JOIN_FULL:
920 726 : hjstate->hj_NullOuterTupleSlot =
921 726 : ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
922 726 : hjstate->hj_NullInnerTupleSlot =
923 726 : ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
924 726 : break;
925 0 : default:
926 0 : elog(ERROR, "unrecognized join type: %d",
927 : (int) node->join.jointype);
928 : }
929 :
930 : /*
931 : * now for some voodoo. our temporary tuple slot is actually the result
932 : * tuple slot of the Hash node (which is our inner plan). we can do this
933 : * because Hash nodes don't return tuples via ExecProcNode() -- instead
934 : * the hash join node uses ExecScanHashBucket() to get at the contents of
935 : * the hash table. -cim 6/9/91
936 : */
937 : {
938 26983 : HashState *hashstate = (HashState *) innerPlanState(hjstate);
939 26983 : Hash *hash = (Hash *) hashstate->ps.plan;
940 26983 : TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
941 : Oid *outer_hashfuncid;
942 : Oid *inner_hashfuncid;
943 : bool *hash_strict;
944 : ListCell *lc;
945 : int nkeys;
946 :
947 :
948 26983 : hjstate->hj_HashTupleSlot = slot;
949 :
950 : /*
951 : * Build ExprStates to obtain hash values for either side of the join.
952 : * Note: must build the ExprStates before ExecHashTableCreate() so we
953 : * properly attribute any SubPlans that exist in the hash expressions
954 : * to the correct PlanState.
955 : */
956 26983 : nkeys = list_length(node->hashoperators);
957 :
958 26983 : outer_hashfuncid = palloc_array(Oid, nkeys);
959 26983 : inner_hashfuncid = palloc_array(Oid, nkeys);
960 26983 : hash_strict = palloc_array(bool, nkeys);
961 :
962 : /*
963 : * Determine the hash function for each side of the join for the given
964 : * join operator, and detect whether the join operator is strict.
965 : */
966 56794 : foreach(lc, node->hashoperators)
967 : {
968 29811 : Oid hashop = lfirst_oid(lc);
969 29811 : int i = foreach_current_index(lc);
970 :
971 29811 : if (!get_op_hash_functions(hashop,
972 29811 : &outer_hashfuncid[i],
973 29811 : &inner_hashfuncid[i]))
974 0 : elog(ERROR,
975 : "could not find hash function for hash operator %u",
976 : hashop);
977 29811 : hash_strict[i] = op_strict(hashop);
978 : }
979 :
980 : /*
981 : * Build an ExprState to generate the hash value for the expressions
982 : * on the outer side of the join.
983 : */
984 26983 : hjstate->hj_OuterHash =
985 26983 : ExecBuildHash32Expr(hjstate->js.ps.ps_ResultTupleDesc,
986 : hjstate->js.ps.resultops,
987 : outer_hashfuncid,
988 26983 : node->hashcollations,
989 26983 : node->hashkeys,
990 : hash_strict,
991 : &hjstate->js.ps,
992 : 0);
993 :
994 : /* As above, but for the inner side of the join */
995 26983 : hashstate->hash_expr =
996 26983 : ExecBuildHash32Expr(hashstate->ps.ps_ResultTupleDesc,
997 : hashstate->ps.resultops,
998 : inner_hashfuncid,
999 26983 : node->hashcollations,
1000 26983 : hash->hashkeys,
1001 : hash_strict,
1002 : &hashstate->ps,
1003 : 0);
1004 :
1005 : /* Remember whether we need to save tuples with null join keys */
1006 26983 : hjstate->hj_KeepNullTuples = HJ_FILL_OUTER(hjstate);
1007 26983 : hashstate->keep_null_tuples = HJ_FILL_INNER(hjstate);
1008 :
1009 : /*
1010 : * Set up the skew table hash function while we have a record of the
1011 : * first key's hash function Oid.
1012 : */
1013 26983 : if (OidIsValid(hash->skewTable))
1014 : {
1015 19637 : hashstate->skew_hashfunction = palloc0_object(FmgrInfo);
1016 19637 : hashstate->skew_collation = linitial_oid(node->hashcollations);
1017 19637 : fmgr_info(outer_hashfuncid[0], hashstate->skew_hashfunction);
1018 : }
1019 :
1020 : /* no need to keep these */
1021 26983 : pfree(outer_hashfuncid);
1022 26983 : pfree(inner_hashfuncid);
1023 26983 : pfree(hash_strict);
1024 : }
1025 :
1026 : /*
1027 : * initialize child expressions
1028 : */
1029 26983 : hjstate->js.ps.qual =
1030 26983 : ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
1031 26983 : hjstate->js.joinqual =
1032 26983 : ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
1033 26983 : hjstate->hashclauses =
1034 26983 : ExecInitQual(node->hashclauses, (PlanState *) hjstate);
1035 :
1036 : /*
1037 : * initialize hash-specific info
1038 : */
1039 26983 : hjstate->hj_HashTable = NULL;
1040 26983 : hjstate->hj_NullOuterTupleStore = NULL;
1041 26983 : hjstate->hj_FirstOuterTupleSlot = NULL;
1042 :
1043 26983 : hjstate->hj_CurHashValue = 0;
1044 26983 : hjstate->hj_CurBucketNo = 0;
1045 26983 : hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
1046 26983 : hjstate->hj_CurTuple = NULL;
1047 :
1048 26983 : hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
1049 26983 : hjstate->hj_MatchedOuter = false;
1050 26983 : hjstate->hj_OuterNotEmpty = false;
1051 :
1052 26983 : return hjstate;
1053 : }
1054 :
1055 : /* ----------------------------------------------------------------
1056 : * ExecEndHashJoin
1057 : *
1058 : * clean up routine for HashJoin node
1059 : * ----------------------------------------------------------------
1060 : */
1061 : void
1062 26911 : ExecEndHashJoin(HashJoinState *node)
1063 : {
1064 26911 : HashState *hashNode = castNode(HashState, innerPlanState(node));
1065 :
1066 : /*
1067 : * Free tuple stores if we made them (must do this before
1068 : * ExecHashTableDestroy deletes hashCxt)
1069 : */
1070 26911 : if (node->hj_NullOuterTupleStore)
1071 : {
1072 0 : tuplestore_end(node->hj_NullOuterTupleStore);
1073 0 : node->hj_NullOuterTupleStore = NULL;
1074 : }
1075 26911 : if (hashNode->null_tuple_store)
1076 : {
1077 64 : tuplestore_end(hashNode->null_tuple_store);
1078 64 : hashNode->null_tuple_store = NULL;
1079 : }
1080 :
1081 : /*
1082 : * Free hash table
1083 : */
1084 26911 : if (node->hj_HashTable)
1085 : {
1086 17961 : ExecHashTableDestroy(node->hj_HashTable);
1087 17961 : node->hj_HashTable = NULL;
1088 : }
1089 :
1090 : /*
1091 : * clean up subtrees
1092 : */
1093 26911 : ExecEndNode(outerPlanState(node));
1094 26911 : ExecEndNode(innerPlanState(node));
1095 26911 : }
1096 :
1097 : /*
1098 : * ExecHashJoinOuterGetTuple
1099 : *
1100 : * get the next outer tuple for a parallel oblivious hashjoin: either by
1101 : * executing the outer plan node in the first pass, or from the temp
1102 : * files for the hashjoin batches.
1103 : *
1104 : * Returns a null slot if no more outer tuples (within the current batch).
1105 : *
1106 : * On success, the tuple's hash value is stored at *hashvalue --- this is
1107 : * either originally computed, or re-read from the temp file.
1108 : */
1109 : static TupleTableSlot *
1110 12209716 : ExecHashJoinOuterGetTuple(PlanState *outerNode,
1111 : HashJoinState *hjstate,
1112 : uint32 *hashvalue)
1113 : {
1114 12209716 : HashJoinTable hashtable = hjstate->hj_HashTable;
1115 12209716 : int curbatch = hashtable->curbatch;
1116 : TupleTableSlot *slot;
1117 :
1118 12209716 : if (curbatch == 0) /* if it is the first pass */
1119 : {
1120 : /*
1121 : * Check to see if first outer tuple was already fetched by
1122 : * ExecHashJoin() and not used yet.
1123 : */
1124 11250039 : slot = hjstate->hj_FirstOuterTupleSlot;
1125 11250039 : if (!TupIsNull(slot))
1126 10973 : hjstate->hj_FirstOuterTupleSlot = NULL;
1127 : else
1128 11239066 : slot = ExecProcNode(outerNode);
1129 :
1130 11250775 : while (!TupIsNull(slot))
1131 : {
1132 : bool isnull;
1133 :
1134 : /*
1135 : * We have to compute the tuple's hash value.
1136 : */
1137 11234835 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1138 :
1139 11234835 : econtext->ecxt_outertuple = slot;
1140 :
1141 11234835 : ResetExprContext(econtext);
1142 :
1143 11234835 : *hashvalue = DatumGetUInt32(ExecEvalExprSwitchContext(hjstate->hj_OuterHash,
1144 : econtext,
1145 : &isnull));
1146 :
1147 11234835 : if (!isnull)
1148 : {
1149 : /* normal case with a non-null join key */
1150 : /* remember outer relation is not empty for possible rescan */
1151 11234099 : hjstate->hj_OuterNotEmpty = true;
1152 :
1153 11234099 : return slot;
1154 : }
1155 736 : else if (hjstate->hj_KeepNullTuples)
1156 : {
1157 : /* null join key, but we must save tuple to be emitted later */
1158 121 : if (hjstate->hj_NullOuterTupleStore == NULL)
1159 74 : hjstate->hj_NullOuterTupleStore = ExecHashBuildNullTupleStore(hashtable);
1160 121 : tuplestore_puttupleslot(hjstate->hj_NullOuterTupleStore, slot);
1161 : }
1162 :
1163 : /*
1164 : * That tuple couldn't match because of a NULL, so discard it and
1165 : * continue with the next one.
1166 : */
1167 736 : slot = ExecProcNode(outerNode);
1168 : }
1169 : }
1170 959677 : else if (curbatch < hashtable->nbatch)
1171 : {
1172 959677 : BufFile *file = hashtable->outerBatchFile[curbatch];
1173 :
1174 : /*
1175 : * In outer-join cases, we could get here even though the batch file
1176 : * is empty.
1177 : */
1178 959677 : if (file == NULL)
1179 0 : return NULL;
1180 :
1181 959677 : slot = ExecHashJoinGetSavedTuple(hjstate,
1182 : file,
1183 : hashvalue,
1184 : hjstate->hj_OuterTupleSlot);
1185 959677 : if (!TupIsNull(slot))
1186 958856 : return slot;
1187 : }
1188 :
1189 : /* End of this batch */
1190 16761 : return NULL;
1191 : }
1192 :
1193 : /*
1194 : * ExecHashJoinOuterGetTuple variant for the parallel case.
1195 : */
1196 : static TupleTableSlot *
1197 1444662 : ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
1198 : HashJoinState *hjstate,
1199 : uint32 *hashvalue)
1200 : {
1201 1444662 : HashJoinTable hashtable = hjstate->hj_HashTable;
1202 1444662 : int curbatch = hashtable->curbatch;
1203 : TupleTableSlot *slot;
1204 :
1205 : /*
1206 : * In the Parallel Hash case we only run the outer plan directly for
1207 : * single-batch hash joins. Otherwise we have to go to batch files, even
1208 : * for batch 0.
1209 : */
1210 1444662 : if (curbatch == 0 && hashtable->nbatch == 1)
1211 : {
1212 644118 : slot = ExecProcNode(outerNode);
1213 :
1214 644138 : while (!TupIsNull(slot))
1215 : {
1216 : bool isnull;
1217 :
1218 644024 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1219 :
1220 644024 : econtext->ecxt_outertuple = slot;
1221 :
1222 644024 : ResetExprContext(econtext);
1223 :
1224 644024 : *hashvalue = DatumGetUInt32(ExecEvalExprSwitchContext(hjstate->hj_OuterHash,
1225 : econtext,
1226 : &isnull));
1227 :
1228 644024 : if (!isnull)
1229 : {
1230 : /* normal case with a non-null join key */
1231 644004 : return slot;
1232 : }
1233 20 : else if (hjstate->hj_KeepNullTuples)
1234 : {
1235 : /* null join key, but we must save tuple to be emitted later */
1236 8 : if (hjstate->hj_NullOuterTupleStore == NULL)
1237 8 : hjstate->hj_NullOuterTupleStore = ExecHashBuildNullTupleStore(hashtable);
1238 8 : tuplestore_puttupleslot(hjstate->hj_NullOuterTupleStore, slot);
1239 : }
1240 :
1241 : /*
1242 : * That tuple couldn't match because of a NULL, so discard it and
1243 : * continue with the next one.
1244 : */
1245 20 : slot = ExecProcNode(outerNode);
1246 : }
1247 : }
1248 800544 : else if (curbatch < hashtable->nbatch)
1249 : {
1250 : MinimalTuple tuple;
1251 :
1252 800544 : tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
1253 : hashvalue);
1254 800544 : if (tuple != NULL)
1255 : {
1256 800024 : ExecForceStoreMinimalTuple(tuple,
1257 : hjstate->hj_OuterTupleSlot,
1258 : false);
1259 800024 : slot = hjstate->hj_OuterTupleSlot;
1260 800024 : return slot;
1261 : }
1262 : else
1263 520 : ExecClearTuple(hjstate->hj_OuterTupleSlot);
1264 : }
1265 :
1266 : /* End of this batch */
1267 634 : hashtable->batches[curbatch].outer_eof = true;
1268 :
1269 634 : return NULL;
1270 : }
1271 :
1272 : /*
1273 : * ExecHashJoinNewBatch
1274 : * switch to a new hashjoin batch
1275 : *
1276 : * Returns true if successful, false if there are no more batches.
1277 : */
1278 : static bool
1279 16753 : ExecHashJoinNewBatch(HashJoinState *hjstate)
1280 : {
1281 16753 : HashJoinTable hashtable = hjstate->hj_HashTable;
1282 : int nbatch;
1283 : int curbatch;
1284 : BufFile *innerFile;
1285 : TupleTableSlot *slot;
1286 : uint32 hashvalue;
1287 :
1288 16753 : nbatch = hashtable->nbatch;
1289 16753 : curbatch = hashtable->curbatch;
1290 :
1291 16753 : if (curbatch > 0)
1292 : {
1293 : /*
1294 : * We no longer need the previous outer batch file; close it right
1295 : * away to free disk space.
1296 : */
1297 821 : if (hashtable->outerBatchFile[curbatch])
1298 821 : BufFileClose(hashtable->outerBatchFile[curbatch]);
1299 821 : hashtable->outerBatchFile[curbatch] = NULL;
1300 : }
1301 : else /* we just finished the first batch */
1302 : {
1303 : /*
1304 : * Reset some of the skew optimization state variables, since we no
1305 : * longer need to consider skew tuples after the first batch. The
1306 : * memory context reset we are about to do will release the skew
1307 : * hashtable itself.
1308 : */
1309 15932 : hashtable->skewEnabled = false;
1310 15932 : hashtable->skewBucket = NULL;
1311 15932 : hashtable->skewBucketNums = NULL;
1312 15932 : hashtable->nSkewBuckets = 0;
1313 15932 : hashtable->spaceUsedSkew = 0;
1314 : }
1315 :
1316 : /*
1317 : * We can always skip over any batches that are completely empty on both
1318 : * sides. We can sometimes skip over batches that are empty on only one
1319 : * side, but there are exceptions:
1320 : *
1321 : * 1. In a left/full outer join, we have to process outer batches even if
1322 : * the inner batch is empty. Similarly, in a right/right-anti/full outer
1323 : * join, we have to process inner batches even if the outer batch is
1324 : * empty.
1325 : *
1326 : * 2. If we have increased nbatch since the initial estimate, we have to
1327 : * scan inner batches since they might contain tuples that need to be
1328 : * reassigned to later inner batches.
1329 : *
1330 : * 3. Similarly, if we have increased nbatch since starting the outer
1331 : * scan, we have to rescan outer batches in case they contain tuples that
1332 : * need to be reassigned.
1333 : */
1334 16753 : curbatch++;
1335 16753 : while (curbatch < nbatch &&
1336 821 : (hashtable->outerBatchFile[curbatch] == NULL ||
1337 821 : hashtable->innerBatchFile[curbatch] == NULL))
1338 : {
1339 0 : if (hashtable->outerBatchFile[curbatch] &&
1340 0 : HJ_FILL_OUTER(hjstate))
1341 0 : break; /* must process due to rule 1 */
1342 0 : if (hashtable->innerBatchFile[curbatch] &&
1343 0 : HJ_FILL_INNER(hjstate))
1344 0 : break; /* must process due to rule 1 */
1345 0 : if (hashtable->innerBatchFile[curbatch] &&
1346 0 : nbatch != hashtable->nbatch_original)
1347 0 : break; /* must process due to rule 2 */
1348 0 : if (hashtable->outerBatchFile[curbatch] &&
1349 0 : nbatch != hashtable->nbatch_outstart)
1350 0 : break; /* must process due to rule 3 */
1351 : /* We can ignore this batch. */
1352 : /* Release associated temp files right away. */
1353 0 : if (hashtable->innerBatchFile[curbatch])
1354 0 : BufFileClose(hashtable->innerBatchFile[curbatch]);
1355 0 : hashtable->innerBatchFile[curbatch] = NULL;
1356 0 : if (hashtable->outerBatchFile[curbatch])
1357 0 : BufFileClose(hashtable->outerBatchFile[curbatch]);
1358 0 : hashtable->outerBatchFile[curbatch] = NULL;
1359 0 : curbatch++;
1360 : }
1361 :
1362 16753 : if (curbatch >= nbatch)
1363 15932 : return false; /* no more batches */
1364 :
1365 821 : hashtable->curbatch = curbatch;
1366 :
1367 : /*
1368 : * Reload the hash table with the new inner batch (which could be empty)
1369 : */
1370 821 : ExecHashTableReset(hashtable);
1371 :
1372 821 : innerFile = hashtable->innerBatchFile[curbatch];
1373 :
1374 821 : if (innerFile != NULL)
1375 : {
1376 821 : if (BufFileSeek(innerFile, 0, 0, SEEK_SET))
1377 0 : ereport(ERROR,
1378 : (errcode_for_file_access(),
1379 : errmsg("could not rewind hash-join temporary file")));
1380 :
1381 2353480 : while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1382 : innerFile,
1383 : &hashvalue,
1384 : hjstate->hj_HashTupleSlot)))
1385 : {
1386 : /*
1387 : * NOTE: some tuples may be sent to future batches. Also, it is
1388 : * possible for hashtable->nbatch to be increased here!
1389 : */
1390 2352659 : ExecHashTableInsert(hashtable, slot, hashvalue);
1391 : }
1392 :
1393 : /*
1394 : * after we build the hash table, the inner batch file is no longer
1395 : * needed
1396 : */
1397 821 : BufFileClose(innerFile);
1398 821 : hashtable->innerBatchFile[curbatch] = NULL;
1399 : }
1400 :
1401 : /*
1402 : * Rewind outer batch file (if present), so that we can start reading it.
1403 : */
1404 821 : if (hashtable->outerBatchFile[curbatch] != NULL)
1405 : {
1406 821 : if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET))
1407 0 : ereport(ERROR,
1408 : (errcode_for_file_access(),
1409 : errmsg("could not rewind hash-join temporary file")));
1410 : }
1411 :
1412 821 : return true;
1413 : }
1414 :
1415 : /*
1416 : * Choose a batch to work on, and attach to it. Returns true if successful,
1417 : * false if there are no more batches.
1418 : */
1419 : static bool
1420 910 : ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
1421 : {
1422 910 : HashJoinTable hashtable = hjstate->hj_HashTable;
1423 : int start_batchno;
1424 : int batchno;
1425 :
1426 : /*
1427 : * If we are a very slow worker, MultiExecParallelHash could have observed
1428 : * build_barrier phase PHJ_BUILD_FREE and not bothered to set up batch
1429 : * accessors. In that case we must be done.
1430 : */
1431 910 : if (hashtable->batches == NULL)
1432 0 : return false;
1433 :
1434 : /*
1435 : * If we were already attached to a batch, remember not to bother checking
1436 : * it again, and detach from it (possibly freeing the hash table if we are
1437 : * last to detach).
1438 : */
1439 910 : if (hashtable->curbatch >= 0)
1440 : {
1441 608 : hashtable->batches[hashtable->curbatch].done = true;
1442 608 : ExecHashTableDetachBatch(hashtable);
1443 : }
1444 :
1445 : /*
1446 : * Search for a batch that isn't done. We use an atomic counter to start
1447 : * our search at a different batch in every participant when there are
1448 : * more batches than participants.
1449 : */
1450 910 : batchno = start_batchno =
1451 910 : pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
1452 910 : hashtable->nbatch;
1453 : do
1454 : {
1455 : uint32 hashvalue;
1456 : MinimalTuple tuple;
1457 : TupleTableSlot *slot;
1458 :
1459 2219 : if (!hashtable->batches[batchno].done)
1460 : {
1461 : SharedTuplestoreAccessor *inner_tuples;
1462 1192 : Barrier *batch_barrier =
1463 1192 : &hashtable->batches[batchno].shared->batch_barrier;
1464 :
1465 1192 : switch (BarrierAttach(batch_barrier))
1466 : {
1467 388 : case PHJ_BATCH_ELECT:
1468 :
1469 : /* One backend allocates the hash table. */
1470 388 : if (BarrierArriveAndWait(batch_barrier,
1471 : WAIT_EVENT_HASH_BATCH_ELECT))
1472 388 : ExecParallelHashTableAlloc(hashtable, batchno);
1473 : pg_fallthrough;
1474 :
1475 : case PHJ_BATCH_ALLOCATE:
1476 : /* Wait for allocation to complete. */
1477 388 : BarrierArriveAndWait(batch_barrier,
1478 : WAIT_EVENT_HASH_BATCH_ALLOCATE);
1479 : pg_fallthrough;
1480 :
1481 401 : case PHJ_BATCH_LOAD:
1482 : /* Start (or join in) loading tuples. */
1483 401 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1484 401 : inner_tuples = hashtable->batches[batchno].inner_tuples;
1485 401 : sts_begin_parallel_scan(inner_tuples);
1486 721513 : while ((tuple = sts_parallel_scan_next(inner_tuples,
1487 : &hashvalue)))
1488 : {
1489 721112 : ExecForceStoreMinimalTuple(tuple,
1490 : hjstate->hj_HashTupleSlot,
1491 : false);
1492 721112 : slot = hjstate->hj_HashTupleSlot;
1493 721112 : ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
1494 : hashvalue);
1495 : }
1496 401 : sts_end_parallel_scan(inner_tuples);
1497 401 : BarrierArriveAndWait(batch_barrier,
1498 : WAIT_EVENT_HASH_BATCH_LOAD);
1499 : pg_fallthrough;
1500 :
1501 634 : case PHJ_BATCH_PROBE:
1502 :
1503 : /*
1504 : * This batch is ready to probe. Return control to
1505 : * caller. We stay attached to batch_barrier so that the
1506 : * hash table stays alive until everyone's finished
1507 : * probing it, but no participant is allowed to wait at
1508 : * this barrier again (or else a deadlock could occur).
1509 : * All attached participants must eventually detach from
1510 : * the barrier and one worker must advance the phase so
1511 : * that the final phase is reached.
1512 : */
1513 634 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1514 634 : sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1515 :
1516 634 : return true;
1517 2 : case PHJ_BATCH_SCAN:
1518 :
1519 : /*
1520 : * In principle, we could help scan for unmatched tuples,
1521 : * since that phase is already underway (the thing we
1522 : * can't do under current deadlock-avoidance rules is wait
1523 : * for others to arrive at PHJ_BATCH_SCAN, because
1524 : * PHJ_BATCH_PROBE emits tuples, but in this case we just
1525 : * got here without waiting). That is not yet done. For
1526 : * now, we just detach and go around again. We have to
1527 : * use ExecHashTableDetachBatch() because there's a small
1528 : * chance we'll be the last to detach, and then we're
1529 : * responsible for freeing memory.
1530 : */
1531 2 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1532 2 : hashtable->batches[batchno].done = true;
1533 2 : ExecHashTableDetachBatch(hashtable);
1534 2 : break;
1535 :
1536 556 : case PHJ_BATCH_FREE:
1537 :
1538 : /*
1539 : * Already done. Detach and go around again (if any
1540 : * remain).
1541 : */
1542 556 : BarrierDetach(batch_barrier);
1543 556 : hashtable->batches[batchno].done = true;
1544 556 : hashtable->curbatch = -1;
1545 556 : break;
1546 :
1547 0 : default:
1548 0 : elog(ERROR, "unexpected batch phase %d",
1549 : BarrierPhase(batch_barrier));
1550 : }
1551 : }
1552 1585 : batchno = (batchno + 1) % hashtable->nbatch;
1553 1585 : } while (batchno != start_batchno);
1554 :
1555 276 : return false;
1556 : }
1557 :
1558 : /*
1559 : * ExecHashJoinSaveTuple
1560 : * save a tuple to a batch file.
1561 : *
1562 : * The data recorded in the file for each tuple is its hash value,
1563 : * then the tuple in MinimalTuple format.
1564 : *
1565 : * fileptr points to a batch file in one of the hashtable arrays.
1566 : *
1567 : * The batch files (and their buffers) are allocated in the spill context
1568 : * created for the hashtable.
1569 : */
1570 : void
1571 3311515 : ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
1572 : BufFile **fileptr, HashJoinTable hashtable)
1573 : {
1574 3311515 : BufFile *file = *fileptr;
1575 :
1576 : /*
1577 : * The batch file is lazily created. If this is the first tuple written to
1578 : * this batch, the batch file is created and its buffer is allocated in
1579 : * the spillCxt context, NOT in the batchCxt.
1580 : *
1581 : * During the build phase, buffered files are created for inner batches.
1582 : * Each batch's buffered file is closed (and its buffer freed) after the
1583 : * batch is loaded into memory during the outer side scan. Therefore, it
1584 : * is necessary to allocate the batch file buffer in a memory context
1585 : * which outlives the batch itself.
1586 : *
1587 : * Also, we use spillCxt instead of hashCxt for a better accounting of the
1588 : * spilling memory consumption.
1589 : */
1590 3311515 : if (file == NULL)
1591 : {
1592 1642 : MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
1593 :
1594 1642 : file = BufFileCreateTemp(false);
1595 1642 : *fileptr = file;
1596 :
1597 1642 : MemoryContextSwitchTo(oldctx);
1598 : }
1599 :
1600 3311515 : BufFileWrite(file, &hashvalue, sizeof(uint32));
1601 3311515 : BufFileWrite(file, tuple, tuple->t_len);
1602 3311515 : }
1603 :
1604 : /*
1605 : * ExecHashJoinGetSavedTuple
1606 : * read the next tuple from a batch file. Return NULL if no more.
1607 : *
1608 : * On success, *hashvalue is set to the tuple's hash value, and the tuple
1609 : * itself is stored in the given slot.
1610 : */
1611 : static TupleTableSlot *
1612 3313157 : ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
1613 : BufFile *file,
1614 : uint32 *hashvalue,
1615 : TupleTableSlot *tupleSlot)
1616 : {
1617 : uint32 header[2];
1618 : size_t nread;
1619 : MinimalTuple tuple;
1620 :
1621 : /*
1622 : * We check for interrupts here because this is typically taken as an
1623 : * alternative code path to an ExecProcNode() call, which would include
1624 : * such a check.
1625 : */
1626 3313157 : CHECK_FOR_INTERRUPTS();
1627 :
1628 : /*
1629 : * Since both the hash value and the MinimalTuple length word are uint32,
1630 : * we can read them both in one BufFileRead() call without any type
1631 : * cheating.
1632 : */
1633 3313157 : nread = BufFileReadMaybeEOF(file, header, sizeof(header), true);
1634 3313157 : if (nread == 0) /* end of file */
1635 : {
1636 1642 : ExecClearTuple(tupleSlot);
1637 1642 : return NULL;
1638 : }
1639 3311515 : *hashvalue = header[0];
1640 3311515 : tuple = (MinimalTuple) palloc(header[1]);
1641 3311515 : tuple->t_len = header[1];
1642 3311515 : BufFileReadExact(file,
1643 : (char *) tuple + sizeof(uint32),
1644 3311515 : header[1] - sizeof(uint32));
1645 3311515 : ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
1646 3311515 : return tupleSlot;
1647 : }
1648 :
1649 :
1650 : void
1651 2233 : ExecReScanHashJoin(HashJoinState *node)
1652 : {
1653 2233 : PlanState *outerPlan = outerPlanState(node);
1654 2233 : PlanState *innerPlan = innerPlanState(node);
1655 :
1656 : /*
1657 : * We're always going to rescan the outer rel, so drop the associated
1658 : * null-keys tuplestore; we'll rebuild it during the rescan. (Must do
1659 : * this before ExecHashTableDestroy deletes hashCxt.)
1660 : */
1661 2233 : if (node->hj_NullOuterTupleStore)
1662 : {
1663 0 : tuplestore_end(node->hj_NullOuterTupleStore);
1664 0 : node->hj_NullOuterTupleStore = NULL;
1665 : }
1666 :
1667 : /*
1668 : * In a multi-batch join, we currently have to do rescans the hard way,
1669 : * primarily because batch temp files may have already been released. But
1670 : * if it's a single-batch join, and there is no parameter change for the
1671 : * inner subnode, then we can just re-use the existing hash table without
1672 : * rebuilding it.
1673 : */
1674 2233 : if (node->hj_HashTable != NULL)
1675 : {
1676 1857 : HashState *hashNode = castNode(HashState, innerPlan);
1677 :
1678 : Assert(hashNode->hashtable == node->hj_HashTable);
1679 :
1680 1857 : if (node->hj_HashTable->nbatch == 1 &&
1681 1857 : innerPlan->chgParam == NULL)
1682 : {
1683 : /*
1684 : * Okay to reuse the hash table; needn't rescan inner, either.
1685 : *
1686 : * However, if it's a right/right-anti/right-semi/full join, we'd
1687 : * better reset the inner-tuple match flags contained in the
1688 : * table.
1689 : */
1690 960 : if (HJ_FILL_INNER(node) || node->js.jointype == JOIN_RIGHT_SEMI)
1691 46 : ExecHashTableResetMatchFlags(node->hj_HashTable);
1692 :
1693 : /*
1694 : * Also, we need to reset our state about the emptiness of the
1695 : * outer relation, so that the new scan of the outer will update
1696 : * it correctly if it turns out to be empty this time. (There's no
1697 : * harm in clearing it now because ExecHashJoin won't need the
1698 : * info. In the other cases, where the hash table doesn't exist
1699 : * or we are destroying it, we leave this state alone because
1700 : * ExecHashJoin will need it the first time through.)
1701 : */
1702 960 : node->hj_OuterNotEmpty = false;
1703 :
1704 : /*
1705 : * Also, rewind inner null-key tuplestore so that we can return
1706 : * those tuples again.
1707 : */
1708 960 : if (hashNode->null_tuple_store)
1709 4 : tuplestore_rescan(hashNode->null_tuple_store);
1710 :
1711 : /* ExecHashJoin can skip the BUILD_HASHTABLE step */
1712 960 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
1713 : }
1714 : else
1715 : {
1716 : /* must destroy and rebuild hash table */
1717 :
1718 : /* accumulate stats from old hash table, if wanted */
1719 : /* (this should match ExecShutdownHash) */
1720 897 : if (hashNode->ps.instrument && !hashNode->hinstrument)
1721 0 : hashNode->hinstrument = palloc0_object(HashInstrumentation);
1722 897 : if (hashNode->hinstrument)
1723 0 : ExecHashAccumInstrumentation(hashNode->hinstrument,
1724 : hashNode->hashtable);
1725 :
1726 : /* free inner null-key tuplestore before ExecHashTableDestroy */
1727 897 : if (hashNode->null_tuple_store)
1728 : {
1729 0 : tuplestore_end(hashNode->null_tuple_store);
1730 0 : hashNode->null_tuple_store = NULL;
1731 : }
1732 :
1733 : /* for safety, be sure to clear child plan node's pointer too */
1734 897 : hashNode->hashtable = NULL;
1735 :
1736 897 : ExecHashTableDestroy(node->hj_HashTable);
1737 897 : node->hj_HashTable = NULL;
1738 897 : node->hj_JoinState = HJ_BUILD_HASHTABLE;
1739 :
1740 : /*
1741 : * if chgParam of subnode is not null then plan will be re-scanned
1742 : * by first ExecProcNode.
1743 : */
1744 897 : if (innerPlan->chgParam == NULL)
1745 0 : ExecReScan(innerPlan);
1746 : }
1747 : }
1748 :
1749 : /* Always reset intra-tuple state */
1750 2233 : node->hj_CurHashValue = 0;
1751 2233 : node->hj_CurBucketNo = 0;
1752 2233 : node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
1753 2233 : node->hj_CurTuple = NULL;
1754 :
1755 2233 : node->hj_MatchedOuter = false;
1756 2233 : node->hj_FirstOuterTupleSlot = NULL;
1757 :
1758 : /*
1759 : * if chgParam of subnode is not null then plan will be re-scanned by
1760 : * first ExecProcNode.
1761 : */
1762 2233 : if (outerPlan->chgParam == NULL)
1763 1476 : ExecReScan(outerPlan);
1764 2233 : }
1765 :
1766 : void
1767 24155 : ExecShutdownHashJoin(HashJoinState *node)
1768 : {
1769 24155 : if (node->hj_HashTable)
1770 : {
1771 : /*
1772 : * Detach from shared state before DSM memory goes away. This makes
1773 : * sure that we don't have any pointers into DSM memory by the time
1774 : * ExecEndHashJoin runs.
1775 : */
1776 17949 : ExecHashTableDetachBatch(node->hj_HashTable);
1777 17949 : ExecHashTableDetach(node->hj_HashTable);
1778 : }
1779 24155 : }
1780 :
1781 : static void
1782 123 : ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
1783 : {
1784 123 : PlanState *outerState = outerPlanState(hjstate);
1785 123 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1786 123 : HashJoinTable hashtable = hjstate->hj_HashTable;
1787 : TupleTableSlot *slot;
1788 : int i;
1789 :
1790 : Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1791 :
1792 : /* Execute outer plan, writing all tuples to shared tuplestores. */
1793 : for (;;)
1794 800052 : {
1795 : bool isnull;
1796 : uint32 hashvalue;
1797 :
1798 800175 : slot = ExecProcNode(outerState);
1799 800175 : if (TupIsNull(slot))
1800 : break;
1801 800052 : econtext->ecxt_outertuple = slot;
1802 :
1803 800052 : ResetExprContext(econtext);
1804 :
1805 800052 : hashvalue = DatumGetUInt32(ExecEvalExprSwitchContext(hjstate->hj_OuterHash,
1806 : econtext,
1807 : &isnull));
1808 :
1809 800052 : if (!isnull)
1810 : {
1811 : /* normal case with a non-null join key */
1812 : int batchno;
1813 : int bucketno;
1814 : bool shouldFree;
1815 800024 : MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1816 :
1817 800024 : ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1818 : &batchno);
1819 800024 : sts_puttuple(hashtable->batches[batchno].outer_tuples,
1820 : &hashvalue, mintup);
1821 :
1822 800024 : if (shouldFree)
1823 800024 : heap_free_minimal_tuple(mintup);
1824 : }
1825 28 : else if (hjstate->hj_KeepNullTuples)
1826 : {
1827 : /* null join key, but we must save tuple to be emitted later */
1828 4 : if (hjstate->hj_NullOuterTupleStore == NULL)
1829 4 : hjstate->hj_NullOuterTupleStore = ExecHashBuildNullTupleStore(hashtable);
1830 4 : tuplestore_puttupleslot(hjstate->hj_NullOuterTupleStore, slot);
1831 : }
1832 : /* else we can just discard the tuple immediately */
1833 :
1834 800052 : CHECK_FOR_INTERRUPTS();
1835 : }
1836 :
1837 : /* Make sure all outer partitions are readable by any backend. */
1838 1043 : for (i = 0; i < hashtable->nbatch; ++i)
1839 920 : sts_end_write(hashtable->batches[i].outer_tuples);
1840 123 : }
1841 :
1842 : void
1843 84 : ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
1844 : {
1845 84 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
1846 84 : shm_toc_estimate_keys(&pcxt->estimator, 1);
1847 84 : }
1848 :
1849 : void
1850 84 : ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1851 : {
1852 84 : int plan_node_id = state->js.ps.plan->plan_node_id;
1853 : HashState *hashNode;
1854 : ParallelHashJoinState *pstate;
1855 :
1856 : /*
1857 : * Disable shared hash table mode if we failed to create a real DSM
1858 : * segment, because that means that we don't have a DSA area to work with.
1859 : */
1860 84 : if (pcxt->seg == NULL)
1861 0 : return;
1862 :
1863 84 : ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1864 :
1865 : /*
1866 : * Set up the state needed to coordinate access to the shared hash
1867 : * table(s), using the plan node ID as the toc key.
1868 : */
1869 84 : pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1870 84 : shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1871 :
1872 : /*
1873 : * Set up the shared hash join state with no batches initially.
1874 : * ExecHashTableCreate() will prepare at least one later and set nbatch
1875 : * and space_allowed.
1876 : */
1877 84 : pstate->nbatch = 0;
1878 84 : pstate->space_allowed = 0;
1879 84 : pstate->batches = InvalidDsaPointer;
1880 84 : pstate->old_batches = InvalidDsaPointer;
1881 84 : pstate->nbuckets = 0;
1882 84 : pstate->growth = PHJ_GROWTH_OK;
1883 84 : pstate->chunk_work_queue = InvalidDsaPointer;
1884 84 : pg_atomic_init_u32(&pstate->distributor, 0);
1885 84 : pstate->nparticipants = pcxt->nworkers + 1;
1886 84 : pstate->total_tuples = 0;
1887 84 : LWLockInitialize(&pstate->lock,
1888 : LWTRANCHE_PARALLEL_HASH_JOIN);
1889 84 : BarrierInit(&pstate->build_barrier, 0);
1890 84 : BarrierInit(&pstate->grow_batches_barrier, 0);
1891 84 : BarrierInit(&pstate->grow_buckets_barrier, 0);
1892 :
1893 : /* Set up the space we'll use for shared temporary files. */
1894 84 : SharedFileSetInit(&pstate->fileset, pcxt->seg);
1895 :
1896 : /* Initialize the shared state in the hash node. */
1897 84 : hashNode = (HashState *) innerPlanState(state);
1898 84 : hashNode->parallel_state = pstate;
1899 : }
1900 :
1901 : /* ----------------------------------------------------------------
1902 : * ExecHashJoinReInitializeDSM
1903 : *
1904 : * Reset shared state before beginning a fresh scan.
1905 : * ----------------------------------------------------------------
1906 : */
1907 : void
1908 32 : ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1909 : {
1910 32 : int plan_node_id = state->js.ps.plan->plan_node_id;
1911 : ParallelHashJoinState *pstate;
1912 : HashState *hashNode;
1913 :
1914 : /* Nothing to do if we failed to create a DSM segment. */
1915 32 : if (pcxt->seg == NULL)
1916 0 : return;
1917 :
1918 32 : pstate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
1919 :
1920 : /*
1921 : * It would be possible to reuse the shared hash table in single-batch
1922 : * cases by resetting and then fast-forwarding build_barrier to
1923 : * PHJ_BUILD_FREE and batch 0's batch_barrier to PHJ_BATCH_PROBE, but
1924 : * currently shared hash tables are already freed by now (by the last
1925 : * participant to detach from the batch). We could consider keeping it
1926 : * around for single-batch joins. We'd also need to adjust
1927 : * finalize_plan() so that it doesn't record a dummy dependency for
1928 : * Parallel Hash nodes, preventing the rescan optimization. For now we
1929 : * don't try.
1930 : */
1931 :
1932 : /* Detach, freeing any remaining shared memory. */
1933 32 : if (state->hj_HashTable != NULL)
1934 : {
1935 0 : ExecHashTableDetachBatch(state->hj_HashTable);
1936 0 : ExecHashTableDetach(state->hj_HashTable);
1937 : }
1938 :
1939 : /* Clear any shared batch files. */
1940 32 : SharedFileSetDeleteAll(&pstate->fileset);
1941 :
1942 : /* We'd better clear our local null-key tuplestores, too. */
1943 32 : if (state->hj_NullOuterTupleStore)
1944 : {
1945 0 : tuplestore_end(state->hj_NullOuterTupleStore);
1946 0 : state->hj_NullOuterTupleStore = NULL;
1947 : }
1948 32 : hashNode = (HashState *) innerPlanState(state);
1949 32 : if (hashNode->null_tuple_store)
1950 : {
1951 0 : tuplestore_end(hashNode->null_tuple_store);
1952 0 : hashNode->null_tuple_store = NULL;
1953 : }
1954 :
1955 :
1956 : /* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */
1957 32 : BarrierInit(&pstate->build_barrier, 0);
1958 : }
1959 :
1960 : void
1961 212 : ExecHashJoinInitializeWorker(HashJoinState *state,
1962 : ParallelWorkerContext *pwcxt)
1963 : {
1964 : HashState *hashNode;
1965 212 : int plan_node_id = state->js.ps.plan->plan_node_id;
1966 : ParallelHashJoinState *pstate =
1967 212 : shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1968 :
1969 : /* Attach to the space for shared temporary files. */
1970 212 : SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1971 :
1972 : /* Attach to the shared state in the hash node. */
1973 212 : hashNode = (HashState *) innerPlanState(state);
1974 212 : hashNode->parallel_state = pstate;
1975 :
1976 212 : ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1977 212 : }
|