Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeRecursiveunion.c
4 : * routines to handle RecursiveUnion nodes.
5 : *
6 : * To implement UNION (without ALL), we need a hashtable that stores tuples
7 : * already seen. The hash key is computed from the grouping columns.
8 : *
9 : *
10 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
11 : * Portions Copyright (c) 1994, Regents of the University of California
12 : *
13 : *
14 : * IDENTIFICATION
15 : * src/backend/executor/nodeRecursiveunion.c
16 : *
17 : *-------------------------------------------------------------------------
18 : */
19 : #include "postgres.h"
20 :
21 : #include "executor/executor.h"
22 : #include "executor/nodeRecursiveunion.h"
23 : #include "miscadmin.h"
24 : #include "utils/memutils.h"
25 :
26 :
27 :
28 : /*
29 : * Initialize the hash table to empty.
30 : */
31 : static void
32 354 : build_hash_table(RecursiveUnionState *rustate)
33 : {
34 354 : RecursiveUnion *node = (RecursiveUnion *) rustate->ps.plan;
35 354 : TupleDesc desc = ExecGetResultType(outerPlanState(rustate));
36 :
37 : Assert(node->numCols > 0);
38 : Assert(node->numGroups > 0);
39 :
40 708 : rustate->hashtable = BuildTupleHashTableExt(&rustate->ps,
41 : desc,
42 : node->numCols,
43 : node->dupColIdx,
44 354 : rustate->eqfuncoids,
45 : rustate->hashfunctions,
46 : node->dupCollations,
47 : node->numGroups,
48 : 0,
49 354 : rustate->ps.state->es_query_cxt,
50 : rustate->tableContext,
51 : rustate->tempContext,
52 : false);
53 354 : }
54 :
55 :
56 : /* ----------------------------------------------------------------
57 : * ExecRecursiveUnion(node)
58 : *
59 : * Scans the recursive query sequentially and returns the next
60 : * qualifying tuple.
61 : *
62 : * 1. evaluate non recursive term and assign the result to RT
63 : *
64 : * 2. execute recursive terms
65 : *
66 : * 2.1 WT := RT
67 : * 2.2 while WT is not empty repeat 2.3 to 2.6. if WT is empty returns RT
68 : * 2.3 replace the name of recursive term with WT
69 : * 2.4 evaluate the recursive term and store into WT
70 : * 2.5 append WT to RT
71 : * 2.6 go back to 2.2
72 : * ----------------------------------------------------------------
73 : */
74 : static TupleTableSlot *
75 44844 : ExecRecursiveUnion(PlanState *pstate)
76 : {
77 44844 : RecursiveUnionState *node = castNode(RecursiveUnionState, pstate);
78 44844 : PlanState *outerPlan = outerPlanState(node);
79 44844 : PlanState *innerPlan = innerPlanState(node);
80 44844 : RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
81 : TupleTableSlot *slot;
82 : bool isnew;
83 :
84 44844 : CHECK_FOR_INTERRUPTS();
85 :
86 : /* 1. Evaluate non-recursive term */
87 44844 : if (!node->recursing)
88 : {
89 : for (;;)
90 : {
91 11194 : slot = ExecProcNode(outerPlan);
92 11194 : if (TupIsNull(slot))
93 : break;
94 10436 : if (plan->numCols > 0)
95 : {
96 : /* Find or build hashtable entry for this tuple's group */
97 504 : LookupTupleHashEntry(node->hashtable, slot, &isnew, NULL);
98 : /* Must reset temp context after each hashtable lookup */
99 504 : MemoryContextReset(node->tempContext);
100 : /* Ignore tuple if already seen */
101 504 : if (!isnew)
102 12 : continue;
103 : }
104 : /* Each non-duplicate tuple goes to the working table ... */
105 10424 : tuplestore_puttupleslot(node->working_table, slot);
106 : /* ... and to the caller */
107 10424 : return slot;
108 : }
109 758 : node->recursing = true;
110 : }
111 :
112 : /* 2. Execute recursive term */
113 : for (;;)
114 : {
115 40782 : slot = ExecProcNode(innerPlan);
116 40782 : if (TupIsNull(slot))
117 : {
118 : Tuplestorestate *swaptemp;
119 :
120 : /* Done if there's nothing in the intermediate table */
121 7006 : if (node->intermediate_empty)
122 716 : break;
123 :
124 : /*
125 : * Now we let the intermediate table become the work table. We
126 : * need a fresh intermediate table, so delete the tuples from the
127 : * current working table and use that as the new intermediate
128 : * table. This saves a round of free/malloc from creating a new
129 : * tuple store.
130 : */
131 6290 : tuplestore_clear(node->working_table);
132 :
133 6290 : swaptemp = node->working_table;
134 6290 : node->working_table = node->intermediate_table;
135 6290 : node->intermediate_table = swaptemp;
136 :
137 : /* mark the intermediate table as empty */
138 6290 : node->intermediate_empty = true;
139 :
140 : /* reset the recursive term */
141 6290 : innerPlan->chgParam = bms_add_member(innerPlan->chgParam,
142 : plan->wtParam);
143 :
144 : /* and continue fetching from recursive term */
145 6290 : continue;
146 : }
147 :
148 33776 : if (plan->numCols > 0)
149 : {
150 : /* Find or build hashtable entry for this tuple's group */
151 894 : LookupTupleHashEntry(node->hashtable, slot, &isnew, NULL);
152 : /* Must reset temp context after each hashtable lookup */
153 894 : MemoryContextReset(node->tempContext);
154 : /* Ignore tuple if already seen */
155 894 : if (!isnew)
156 72 : continue;
157 : }
158 :
159 : /* Else, tuple is good; stash it in intermediate table ... */
160 33704 : node->intermediate_empty = false;
161 33704 : tuplestore_puttupleslot(node->intermediate_table, slot);
162 : /* ... and return it */
163 33704 : return slot;
164 : }
165 :
166 716 : return NULL;
167 : }
168 :
169 : /* ----------------------------------------------------------------
170 : * ExecInitRecursiveUnion
171 : * ----------------------------------------------------------------
172 : */
173 : RecursiveUnionState *
174 806 : ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags)
175 : {
176 : RecursiveUnionState *rustate;
177 : ParamExecData *prmdata;
178 :
179 : /* check for unsupported flags */
180 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
181 :
182 : /*
183 : * create state structure
184 : */
185 806 : rustate = makeNode(RecursiveUnionState);
186 806 : rustate->ps.plan = (Plan *) node;
187 806 : rustate->ps.state = estate;
188 806 : rustate->ps.ExecProcNode = ExecRecursiveUnion;
189 :
190 806 : rustate->eqfuncoids = NULL;
191 806 : rustate->hashfunctions = NULL;
192 806 : rustate->hashtable = NULL;
193 806 : rustate->tempContext = NULL;
194 806 : rustate->tableContext = NULL;
195 :
196 : /* initialize processing state */
197 806 : rustate->recursing = false;
198 806 : rustate->intermediate_empty = true;
199 806 : rustate->working_table = tuplestore_begin_heap(false, false, work_mem);
200 806 : rustate->intermediate_table = tuplestore_begin_heap(false, false, work_mem);
201 :
202 : /*
203 : * If hashing, we need a per-tuple memory context for comparisons, and a
204 : * longer-lived context to store the hash table. The table can't just be
205 : * kept in the per-query context because we want to be able to throw it
206 : * away when rescanning.
207 : */
208 806 : if (node->numCols > 0)
209 : {
210 354 : rustate->tempContext =
211 354 : AllocSetContextCreate(CurrentMemoryContext,
212 : "RecursiveUnion",
213 : ALLOCSET_DEFAULT_SIZES);
214 354 : rustate->tableContext =
215 354 : AllocSetContextCreate(CurrentMemoryContext,
216 : "RecursiveUnion hash table",
217 : ALLOCSET_DEFAULT_SIZES);
218 : }
219 :
220 : /*
221 : * Make the state structure available to descendant WorkTableScan nodes
222 : * via the Param slot reserved for it.
223 : */
224 806 : prmdata = &(estate->es_param_exec_vals[node->wtParam]);
225 : Assert(prmdata->execPlan == NULL);
226 806 : prmdata->value = PointerGetDatum(rustate);
227 806 : prmdata->isnull = false;
228 :
229 : /*
230 : * Miscellaneous initialization
231 : *
232 : * RecursiveUnion plans don't have expression contexts because they never
233 : * call ExecQual or ExecProject.
234 : */
235 : Assert(node->plan.qual == NIL);
236 :
237 : /*
238 : * RecursiveUnion nodes still have Result slots, which hold pointers to
239 : * tuples, so we have to initialize them.
240 : */
241 806 : ExecInitResultTypeTL(&rustate->ps);
242 :
243 : /*
244 : * Initialize result tuple type. (Note: we have to set up the result type
245 : * before initializing child nodes, because nodeWorktablescan.c expects it
246 : * to be valid.)
247 : */
248 806 : rustate->ps.ps_ProjInfo = NULL;
249 :
250 : /*
251 : * initialize child nodes
252 : */
253 806 : outerPlanState(rustate) = ExecInitNode(outerPlan(node), estate, eflags);
254 806 : innerPlanState(rustate) = ExecInitNode(innerPlan(node), estate, eflags);
255 :
256 : /*
257 : * If hashing, precompute fmgr lookup data for inner loop, and create the
258 : * hash table.
259 : */
260 806 : if (node->numCols > 0)
261 : {
262 354 : execTuplesHashPrepare(node->numCols,
263 354 : node->dupOperators,
264 : &rustate->eqfuncoids,
265 : &rustate->hashfunctions);
266 354 : build_hash_table(rustate);
267 : }
268 :
269 806 : return rustate;
270 : }
271 :
272 : /* ----------------------------------------------------------------
273 : * ExecEndRecursiveUnion
274 : *
275 : * frees any storage allocated through C routines.
276 : * ----------------------------------------------------------------
277 : */
278 : void
279 806 : ExecEndRecursiveUnion(RecursiveUnionState *node)
280 : {
281 : /* Release tuplestores */
282 806 : tuplestore_end(node->working_table);
283 806 : tuplestore_end(node->intermediate_table);
284 :
285 : /* free subsidiary stuff including hashtable */
286 806 : if (node->tempContext)
287 354 : MemoryContextDelete(node->tempContext);
288 806 : if (node->tableContext)
289 354 : MemoryContextDelete(node->tableContext);
290 :
291 : /*
292 : * close down subplans
293 : */
294 806 : ExecEndNode(outerPlanState(node));
295 806 : ExecEndNode(innerPlanState(node));
296 806 : }
297 :
298 : /* ----------------------------------------------------------------
299 : * ExecReScanRecursiveUnion
300 : *
301 : * Rescans the relation.
302 : * ----------------------------------------------------------------
303 : */
304 : void
305 12 : ExecReScanRecursiveUnion(RecursiveUnionState *node)
306 : {
307 12 : PlanState *outerPlan = outerPlanState(node);
308 12 : PlanState *innerPlan = innerPlanState(node);
309 12 : RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
310 :
311 : /*
312 : * Set recursive term's chgParam to tell it that we'll modify the working
313 : * table and therefore it has to rescan.
314 : */
315 12 : innerPlan->chgParam = bms_add_member(innerPlan->chgParam, plan->wtParam);
316 :
317 : /*
318 : * if chgParam of subnode is not null then plan will be re-scanned by
319 : * first ExecProcNode. Because of above, we only have to do this to the
320 : * non-recursive term.
321 : */
322 12 : if (outerPlan->chgParam == NULL)
323 0 : ExecReScan(outerPlan);
324 :
325 : /* Release any hashtable storage */
326 12 : if (node->tableContext)
327 12 : MemoryContextReset(node->tableContext);
328 :
329 : /* Empty hashtable if needed */
330 12 : if (plan->numCols > 0)
331 12 : ResetTupleHashTable(node->hashtable);
332 :
333 : /* reset processing state */
334 12 : node->recursing = false;
335 12 : node->intermediate_empty = true;
336 12 : tuplestore_clear(node->working_table);
337 12 : tuplestore_clear(node->intermediate_table);
338 12 : }
|