Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeRecursiveunion.c
4 : * routines to handle RecursiveUnion nodes.
5 : *
6 : * To implement UNION (without ALL), we need a hashtable that stores tuples
7 : * already seen. The hash key is computed from the grouping columns.
8 : *
9 : *
10 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
11 : * Portions Copyright (c) 1994, Regents of the University of California
12 : *
13 : *
14 : * IDENTIFICATION
15 : * src/backend/executor/nodeRecursiveunion.c
16 : *
17 : *-------------------------------------------------------------------------
18 : */
19 : #include "postgres.h"
20 :
21 : #include "executor/executor.h"
22 : #include "executor/nodeRecursiveunion.h"
23 : #include "miscadmin.h"
24 : #include "utils/memutils.h"
25 :
26 :
27 :
28 : /*
29 : * Initialize the hash table to empty.
30 : */
31 : static void
32 364 : build_hash_table(RecursiveUnionState *rustate)
33 : {
34 364 : RecursiveUnion *node = (RecursiveUnion *) rustate->ps.plan;
35 364 : TupleDesc desc = ExecGetResultType(outerPlanState(rustate));
36 :
37 : Assert(node->numCols > 0);
38 : Assert(node->numGroups > 0);
39 :
40 : /*
41 : * If both child plans deliver the same fixed tuple slot type, we can tell
42 : * BuildTupleHashTable to expect that slot type as input. Otherwise,
43 : * we'll pass NULL denoting that any slot type is possible.
44 : */
45 364 : rustate->hashtable = BuildTupleHashTable(&rustate->ps,
46 : desc,
47 : ExecGetCommonChildSlotOps(&rustate->ps),
48 : node->numCols,
49 : node->dupColIdx,
50 364 : rustate->eqfuncoids,
51 : rustate->hashfunctions,
52 : node->dupCollations,
53 : node->numGroups,
54 : 0,
55 364 : rustate->ps.state->es_query_cxt,
56 : rustate->tableContext,
57 : rustate->tempContext,
58 : false);
59 364 : }
60 :
61 :
62 : /* ----------------------------------------------------------------
63 : * ExecRecursiveUnion(node)
64 : *
65 : * Scans the recursive query sequentially and returns the next
66 : * qualifying tuple.
67 : *
68 : * 1. evaluate non recursive term and assign the result to RT
69 : *
70 : * 2. execute recursive terms
71 : *
72 : * 2.1 WT := RT
73 : * 2.2 while WT is not empty repeat 2.3 to 2.6. if WT is empty returns RT
74 : * 2.3 replace the name of recursive term with WT
75 : * 2.4 evaluate the recursive term and store into WT
76 : * 2.5 append WT to RT
77 : * 2.6 go back to 2.2
78 : * ----------------------------------------------------------------
79 : */
80 : static TupleTableSlot *
81 44890 : ExecRecursiveUnion(PlanState *pstate)
82 : {
83 44890 : RecursiveUnionState *node = castNode(RecursiveUnionState, pstate);
84 44890 : PlanState *outerPlan = outerPlanState(node);
85 44890 : PlanState *innerPlan = innerPlanState(node);
86 44890 : RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
87 : TupleTableSlot *slot;
88 : bool isnew;
89 :
90 44890 : CHECK_FOR_INTERRUPTS();
91 :
92 : /* 1. Evaluate non-recursive term */
93 44890 : if (!node->recursing)
94 : {
95 : for (;;)
96 : {
97 11222 : slot = ExecProcNode(outerPlan);
98 11222 : if (TupIsNull(slot))
99 : break;
100 10454 : if (plan->numCols > 0)
101 : {
102 : /* Find or build hashtable entry for this tuple's group */
103 522 : LookupTupleHashEntry(node->hashtable, slot, &isnew, NULL);
104 : /* Must reset temp context after each hashtable lookup */
105 522 : MemoryContextReset(node->tempContext);
106 : /* Ignore tuple if already seen */
107 522 : if (!isnew)
108 18 : continue;
109 : }
110 : /* Each non-duplicate tuple goes to the working table ... */
111 10436 : tuplestore_puttupleslot(node->working_table, slot);
112 : /* ... and to the caller */
113 10436 : return slot;
114 : }
115 768 : node->recursing = true;
116 : }
117 :
118 : /* 2. Execute recursive term */
119 : for (;;)
120 : {
121 40840 : slot = ExecProcNode(innerPlan);
122 40840 : if (TupIsNull(slot))
123 : {
124 : Tuplestorestate *swaptemp;
125 :
126 : /* Done if there's nothing in the intermediate table */
127 7034 : if (node->intermediate_empty)
128 726 : break;
129 :
130 : /*
131 : * Now we let the intermediate table become the work table. We
132 : * need a fresh intermediate table, so delete the tuples from the
133 : * current working table and use that as the new intermediate
134 : * table. This saves a round of free/malloc from creating a new
135 : * tuple store.
136 : */
137 6308 : tuplestore_clear(node->working_table);
138 :
139 6308 : swaptemp = node->working_table;
140 6308 : node->working_table = node->intermediate_table;
141 6308 : node->intermediate_table = swaptemp;
142 :
143 : /* mark the intermediate table as empty */
144 6308 : node->intermediate_empty = true;
145 :
146 : /* reset the recursive term */
147 6308 : innerPlan->chgParam = bms_add_member(innerPlan->chgParam,
148 : plan->wtParam);
149 :
150 : /* and continue fetching from recursive term */
151 6308 : continue;
152 : }
153 :
154 33806 : if (plan->numCols > 0)
155 : {
156 : /* Find or build hashtable entry for this tuple's group */
157 924 : LookupTupleHashEntry(node->hashtable, slot, &isnew, NULL);
158 : /* Must reset temp context after each hashtable lookup */
159 924 : MemoryContextReset(node->tempContext);
160 : /* Ignore tuple if already seen */
161 924 : if (!isnew)
162 78 : continue;
163 : }
164 :
165 : /* Else, tuple is good; stash it in intermediate table ... */
166 33728 : node->intermediate_empty = false;
167 33728 : tuplestore_puttupleslot(node->intermediate_table, slot);
168 : /* ... and return it */
169 33728 : return slot;
170 : }
171 :
172 726 : return NULL;
173 : }
174 :
175 : /* ----------------------------------------------------------------
176 : * ExecInitRecursiveUnion
177 : * ----------------------------------------------------------------
178 : */
179 : RecursiveUnionState *
180 816 : ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags)
181 : {
182 : RecursiveUnionState *rustate;
183 : ParamExecData *prmdata;
184 :
185 : /* check for unsupported flags */
186 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
187 :
188 : /*
189 : * create state structure
190 : */
191 816 : rustate = makeNode(RecursiveUnionState);
192 816 : rustate->ps.plan = (Plan *) node;
193 816 : rustate->ps.state = estate;
194 816 : rustate->ps.ExecProcNode = ExecRecursiveUnion;
195 :
196 816 : rustate->eqfuncoids = NULL;
197 816 : rustate->hashfunctions = NULL;
198 816 : rustate->hashtable = NULL;
199 816 : rustate->tempContext = NULL;
200 816 : rustate->tableContext = NULL;
201 :
202 : /* initialize processing state */
203 816 : rustate->recursing = false;
204 816 : rustate->intermediate_empty = true;
205 816 : rustate->working_table = tuplestore_begin_heap(false, false, work_mem);
206 816 : rustate->intermediate_table = tuplestore_begin_heap(false, false, work_mem);
207 :
208 : /*
209 : * If hashing, we need a per-tuple memory context for comparisons, and a
210 : * longer-lived context to store the hash table. The table can't just be
211 : * kept in the per-query context because we want to be able to throw it
212 : * away when rescanning.
213 : */
214 816 : if (node->numCols > 0)
215 : {
216 364 : rustate->tempContext =
217 364 : AllocSetContextCreate(CurrentMemoryContext,
218 : "RecursiveUnion",
219 : ALLOCSET_DEFAULT_SIZES);
220 364 : rustate->tableContext =
221 364 : AllocSetContextCreate(CurrentMemoryContext,
222 : "RecursiveUnion hash table",
223 : ALLOCSET_DEFAULT_SIZES);
224 : }
225 :
226 : /*
227 : * Make the state structure available to descendant WorkTableScan nodes
228 : * via the Param slot reserved for it.
229 : */
230 816 : prmdata = &(estate->es_param_exec_vals[node->wtParam]);
231 : Assert(prmdata->execPlan == NULL);
232 816 : prmdata->value = PointerGetDatum(rustate);
233 816 : prmdata->isnull = false;
234 :
235 : /*
236 : * Miscellaneous initialization
237 : *
238 : * RecursiveUnion plans don't have expression contexts because they never
239 : * call ExecQual or ExecProject.
240 : */
241 : Assert(node->plan.qual == NIL);
242 :
243 : /*
244 : * RecursiveUnion nodes still have Result slots, which hold pointers to
245 : * tuples, so we have to initialize them.
246 : */
247 816 : ExecInitResultTypeTL(&rustate->ps);
248 :
249 : /*
250 : * Initialize result tuple type. (Note: we have to set up the result type
251 : * before initializing child nodes, because nodeWorktablescan.c expects it
252 : * to be valid.)
253 : */
254 816 : rustate->ps.ps_ProjInfo = NULL;
255 :
256 : /*
257 : * initialize child nodes
258 : */
259 816 : outerPlanState(rustate) = ExecInitNode(outerPlan(node), estate, eflags);
260 816 : innerPlanState(rustate) = ExecInitNode(innerPlan(node), estate, eflags);
261 :
262 : /*
263 : * If hashing, precompute fmgr lookup data for inner loop, and create the
264 : * hash table.
265 : */
266 816 : if (node->numCols > 0)
267 : {
268 364 : execTuplesHashPrepare(node->numCols,
269 364 : node->dupOperators,
270 : &rustate->eqfuncoids,
271 : &rustate->hashfunctions);
272 364 : build_hash_table(rustate);
273 : }
274 :
275 816 : return rustate;
276 : }
277 :
278 : /* ----------------------------------------------------------------
279 : * ExecEndRecursiveUnion
280 : *
281 : * frees any storage allocated through C routines.
282 : * ----------------------------------------------------------------
283 : */
284 : void
285 816 : ExecEndRecursiveUnion(RecursiveUnionState *node)
286 : {
287 : /* Release tuplestores */
288 816 : tuplestore_end(node->working_table);
289 816 : tuplestore_end(node->intermediate_table);
290 :
291 : /* free subsidiary stuff including hashtable */
292 816 : if (node->tempContext)
293 364 : MemoryContextDelete(node->tempContext);
294 816 : if (node->tableContext)
295 364 : MemoryContextDelete(node->tableContext);
296 :
297 : /*
298 : * close down subplans
299 : */
300 816 : ExecEndNode(outerPlanState(node));
301 816 : ExecEndNode(innerPlanState(node));
302 816 : }
303 :
304 : /* ----------------------------------------------------------------
305 : * ExecReScanRecursiveUnion
306 : *
307 : * Rescans the relation.
308 : * ----------------------------------------------------------------
309 : */
310 : void
311 12 : ExecReScanRecursiveUnion(RecursiveUnionState *node)
312 : {
313 12 : PlanState *outerPlan = outerPlanState(node);
314 12 : PlanState *innerPlan = innerPlanState(node);
315 12 : RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
316 :
317 : /*
318 : * Set recursive term's chgParam to tell it that we'll modify the working
319 : * table and therefore it has to rescan.
320 : */
321 12 : innerPlan->chgParam = bms_add_member(innerPlan->chgParam, plan->wtParam);
322 :
323 : /*
324 : * if chgParam of subnode is not null then plan will be re-scanned by
325 : * first ExecProcNode. Because of above, we only have to do this to the
326 : * non-recursive term.
327 : */
328 12 : if (outerPlan->chgParam == NULL)
329 0 : ExecReScan(outerPlan);
330 :
331 : /* Release any hashtable storage */
332 12 : if (node->tableContext)
333 12 : MemoryContextReset(node->tableContext);
334 :
335 : /* Empty hashtable if needed */
336 12 : if (plan->numCols > 0)
337 12 : ResetTupleHashTable(node->hashtable);
338 :
339 : /* reset processing state */
340 12 : node->recursing = false;
341 12 : node->intermediate_empty = true;
342 12 : tuplestore_clear(node->working_table);
343 12 : tuplestore_clear(node->intermediate_table);
344 12 : }
|