Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeWindowAgg.c
4 : * routines to handle WindowAgg nodes.
5 : *
6 : * A WindowAgg node evaluates "window functions" across suitable partitions
7 : * of the input tuple set. Any one WindowAgg works for just a single window
8 : * specification, though it can evaluate multiple window functions sharing
9 : * identical window specifications. The input tuples are required to be
10 : * delivered in sorted order, with the PARTITION BY columns (if any) as
11 : * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12 : * (The planner generates a stack of WindowAggs with intervening Sort nodes
13 : * as needed, if a query involves more than one window specification.)
14 : *
15 : * Since window functions can require access to any or all of the rows in
16 : * the current partition, we accumulate rows of the partition into a
17 : * tuplestore. The window functions are called using the WindowObject API
18 : * so that they can access those rows as needed.
19 : *
20 : * We also support using plain aggregate functions as window functions.
21 : * For these, the regular Agg-node environment is emulated for each partition.
22 : * As required by the SQL spec, the output represents the value of the
23 : * aggregate function over all rows in the current row's window frame.
24 : *
25 : *
26 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
27 : * Portions Copyright (c) 1994, Regents of the University of California
28 : *
29 : * IDENTIFICATION
30 : * src/backend/executor/nodeWindowAgg.c
31 : *
32 : *-------------------------------------------------------------------------
33 : */
34 : #include "postgres.h"
35 :
36 : #include "access/htup_details.h"
37 : #include "catalog/objectaccess.h"
38 : #include "catalog/pg_aggregate.h"
39 : #include "catalog/pg_proc.h"
40 : #include "executor/executor.h"
41 : #include "executor/nodeWindowAgg.h"
42 : #include "miscadmin.h"
43 : #include "nodes/nodeFuncs.h"
44 : #include "optimizer/clauses.h"
45 : #include "optimizer/optimizer.h"
46 : #include "parser/parse_agg.h"
47 : #include "parser/parse_coerce.h"
48 : #include "utils/acl.h"
49 : #include "utils/builtins.h"
50 : #include "utils/datum.h"
51 : #include "utils/expandeddatum.h"
52 : #include "utils/lsyscache.h"
53 : #include "utils/memutils.h"
54 : #include "utils/regproc.h"
55 : #include "utils/syscache.h"
56 : #include "windowapi.h"
57 :
58 : /*
59 : * All the window function APIs are called with this object, which is passed
60 : * to window functions as fcinfo->context.
61 : */
62 : typedef struct WindowObjectData
63 : {
64 : NodeTag type;
65 : WindowAggState *winstate; /* parent WindowAggState */
66 : List *argstates; /* ExprState trees for fn's arguments */
67 : void *localmem; /* WinGetPartitionLocalMemory's chunk */
68 : int markptr; /* tuplestore mark pointer for this fn */
69 : int readptr; /* tuplestore read pointer for this fn */
70 : int64 markpos; /* row that markptr is positioned on */
71 : int64 seekpos; /* row that readptr is positioned on */
72 : } WindowObjectData;
73 :
74 : /*
75 : * We have one WindowStatePerFunc struct for each window function and
76 : * window aggregate handled by this node.
77 : */
78 : typedef struct WindowStatePerFuncData
79 : {
80 : /* Links to WindowFunc expr and state nodes this working state is for */
81 : WindowFuncExprState *wfuncstate;
82 : WindowFunc *wfunc;
83 :
84 : int numArguments; /* number of arguments */
85 :
86 : FmgrInfo flinfo; /* fmgr lookup data for window function */
87 :
88 : Oid winCollation; /* collation derived for window function */
89 :
90 : /*
91 : * We need the len and byval info for the result of each function in order
92 : * to know how to copy/delete values.
93 : */
94 : int16 resulttypeLen;
95 : bool resulttypeByVal;
96 :
97 : bool plain_agg; /* is it just a plain aggregate function? */
98 : int aggno; /* if so, index of its WindowStatePerAggData */
99 :
100 : WindowObject winobj; /* object used in window function API */
101 : } WindowStatePerFuncData;
102 :
103 : /*
104 : * For plain aggregate window functions, we also have one of these.
105 : */
106 : typedef struct WindowStatePerAggData
107 : {
108 : /* Oids of transition functions */
109 : Oid transfn_oid;
110 : Oid invtransfn_oid; /* may be InvalidOid */
111 : Oid finalfn_oid; /* may be InvalidOid */
112 :
113 : /*
114 : * fmgr lookup data for transition functions --- only valid when
115 : * corresponding oid is not InvalidOid. Note in particular that fn_strict
116 : * flags are kept here.
117 : */
118 : FmgrInfo transfn;
119 : FmgrInfo invtransfn;
120 : FmgrInfo finalfn;
121 :
122 : int numFinalArgs; /* number of arguments to pass to finalfn */
123 :
124 : /*
125 : * initial value from pg_aggregate entry
126 : */
127 : Datum initValue;
128 : bool initValueIsNull;
129 :
130 : /*
131 : * cached value for current frame boundaries
132 : */
133 : Datum resultValue;
134 : bool resultValueIsNull;
135 :
136 : /*
137 : * We need the len and byval info for the agg's input, result, and
138 : * transition data types in order to know how to copy/delete values.
139 : */
140 : int16 inputtypeLen,
141 : resulttypeLen,
142 : transtypeLen;
143 : bool inputtypeByVal,
144 : resulttypeByVal,
145 : transtypeByVal;
146 :
147 : int wfuncno; /* index of associated WindowStatePerFuncData */
148 :
149 : /* Context holding transition value and possibly other subsidiary data */
150 : MemoryContext aggcontext; /* may be private, or winstate->aggcontext */
151 :
152 : /* Current transition value */
153 : Datum transValue; /* current transition value */
154 : bool transValueIsNull;
155 :
156 : int64 transValueCount; /* number of currently-aggregated rows */
157 :
158 : /* Data local to eval_windowaggregates() */
159 : bool restart; /* need to restart this agg in this cycle? */
160 : } WindowStatePerAggData;
161 :
162 : static void initialize_windowaggregate(WindowAggState *winstate,
163 : WindowStatePerFunc perfuncstate,
164 : WindowStatePerAgg peraggstate);
165 : static void advance_windowaggregate(WindowAggState *winstate,
166 : WindowStatePerFunc perfuncstate,
167 : WindowStatePerAgg peraggstate);
168 : static bool advance_windowaggregate_base(WindowAggState *winstate,
169 : WindowStatePerFunc perfuncstate,
170 : WindowStatePerAgg peraggstate);
171 : static void finalize_windowaggregate(WindowAggState *winstate,
172 : WindowStatePerFunc perfuncstate,
173 : WindowStatePerAgg peraggstate,
174 : Datum *result, bool *isnull);
175 :
176 : static void eval_windowaggregates(WindowAggState *winstate);
177 : static void eval_windowfunction(WindowAggState *winstate,
178 : WindowStatePerFunc perfuncstate,
179 : Datum *result, bool *isnull);
180 :
181 : static void begin_partition(WindowAggState *winstate);
182 : static void spool_tuples(WindowAggState *winstate, int64 pos);
183 : static void release_partition(WindowAggState *winstate);
184 :
185 : static int row_is_in_frame(WindowAggState *winstate, int64 pos,
186 : TupleTableSlot *slot);
187 : static void update_frameheadpos(WindowAggState *winstate);
188 : static void update_frametailpos(WindowAggState *winstate);
189 : static void update_grouptailpos(WindowAggState *winstate);
190 :
191 : static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
192 : WindowFunc *wfunc,
193 : WindowStatePerAgg peraggstate);
194 : static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
195 :
196 : static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
197 : TupleTableSlot *slot2);
198 : static bool window_gettupleslot(WindowObject winobj, int64 pos,
199 : TupleTableSlot *slot);
200 :
201 :
202 : /*
203 : * initialize_windowaggregate
204 : * parallel to initialize_aggregates in nodeAgg.c
205 : */
206 : static void
207 3998 : initialize_windowaggregate(WindowAggState *winstate,
208 : WindowStatePerFunc perfuncstate,
209 : WindowStatePerAgg peraggstate)
210 : {
211 : MemoryContext oldContext;
212 :
213 : /*
214 : * If we're using a private aggcontext, we may reset it here. But if the
215 : * context is shared, we don't know which other aggregates may still need
216 : * it, so we must leave it to the caller to reset at an appropriate time.
217 : */
218 3998 : if (peraggstate->aggcontext != winstate->aggcontext)
219 2904 : MemoryContextReset(peraggstate->aggcontext);
220 :
221 3998 : if (peraggstate->initValueIsNull)
222 1504 : peraggstate->transValue = peraggstate->initValue;
223 : else
224 : {
225 2494 : oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
226 4988 : peraggstate->transValue = datumCopy(peraggstate->initValue,
227 2494 : peraggstate->transtypeByVal,
228 2494 : peraggstate->transtypeLen);
229 2494 : MemoryContextSwitchTo(oldContext);
230 : }
231 3998 : peraggstate->transValueIsNull = peraggstate->initValueIsNull;
232 3998 : peraggstate->transValueCount = 0;
233 3998 : peraggstate->resultValue = (Datum) 0;
234 3998 : peraggstate->resultValueIsNull = true;
235 3998 : }
236 :
237 : /*
238 : * advance_windowaggregate
239 : * parallel to advance_aggregates in nodeAgg.c
240 : */
241 : static void
242 171542 : advance_windowaggregate(WindowAggState *winstate,
243 : WindowStatePerFunc perfuncstate,
244 : WindowStatePerAgg peraggstate)
245 : {
246 171542 : LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
247 171542 : WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
248 171542 : int numArguments = perfuncstate->numArguments;
249 : Datum newVal;
250 : ListCell *arg;
251 : int i;
252 : MemoryContext oldContext;
253 171542 : ExprContext *econtext = winstate->tmpcontext;
254 171542 : ExprState *filter = wfuncstate->aggfilter;
255 :
256 171542 : oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
257 :
258 : /* Skip anything FILTERed out */
259 171542 : if (filter)
260 : {
261 : bool isnull;
262 342 : Datum res = ExecEvalExpr(filter, econtext, &isnull);
263 :
264 342 : if (isnull || !DatumGetBool(res))
265 : {
266 162 : MemoryContextSwitchTo(oldContext);
267 162 : return;
268 : }
269 : }
270 :
271 : /* We start from 1, since the 0th arg will be the transition value */
272 171380 : i = 1;
273 282310 : foreach(arg, wfuncstate->args)
274 : {
275 110930 : ExprState *argstate = (ExprState *) lfirst(arg);
276 :
277 110930 : fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
278 : &fcinfo->args[i].isnull);
279 110930 : i++;
280 : }
281 :
282 171380 : if (peraggstate->transfn.fn_strict)
283 : {
284 : /*
285 : * For a strict transfn, nothing happens when there's a NULL input; we
286 : * just keep the prior transValue. Note transValueCount doesn't
287 : * change either.
288 : */
289 102516 : for (i = 1; i <= numArguments; i++)
290 : {
291 21096 : if (fcinfo->args[i].isnull)
292 : {
293 198 : MemoryContextSwitchTo(oldContext);
294 198 : return;
295 : }
296 : }
297 :
298 : /*
299 : * For strict transition functions with initial value NULL we use the
300 : * first non-NULL input as the initial state. (We already checked
301 : * that the agg's input type is binary-compatible with its transtype,
302 : * so straight copy here is OK.)
303 : *
304 : * We must copy the datum into aggcontext if it is pass-by-ref. We do
305 : * not need to pfree the old transValue, since it's NULL.
306 : */
307 81420 : if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
308 : {
309 442 : MemoryContextSwitchTo(peraggstate->aggcontext);
310 884 : peraggstate->transValue = datumCopy(fcinfo->args[1].value,
311 442 : peraggstate->transtypeByVal,
312 442 : peraggstate->transtypeLen);
313 442 : peraggstate->transValueIsNull = false;
314 442 : peraggstate->transValueCount = 1;
315 442 : MemoryContextSwitchTo(oldContext);
316 442 : return;
317 : }
318 :
319 80978 : if (peraggstate->transValueIsNull)
320 : {
321 : /*
322 : * Don't call a strict function with NULL inputs. Note it is
323 : * possible to get here despite the above tests, if the transfn is
324 : * strict *and* returned a NULL on a prior cycle. If that happens
325 : * we will propagate the NULL all the way to the end. That can
326 : * only happen if there's no inverse transition function, though,
327 : * since we disallow transitions back to NULL when there is one.
328 : */
329 0 : MemoryContextSwitchTo(oldContext);
330 : Assert(!OidIsValid(peraggstate->invtransfn_oid));
331 0 : return;
332 : }
333 : }
334 :
335 : /*
336 : * OK to call the transition function. Set winstate->curaggcontext while
337 : * calling it, for possible use by AggCheckCallContext.
338 : */
339 170740 : InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
340 : numArguments + 1,
341 : perfuncstate->winCollation,
342 : (void *) winstate, NULL);
343 170740 : fcinfo->args[0].value = peraggstate->transValue;
344 170740 : fcinfo->args[0].isnull = peraggstate->transValueIsNull;
345 170740 : winstate->curaggcontext = peraggstate->aggcontext;
346 170740 : newVal = FunctionCallInvoke(fcinfo);
347 170728 : winstate->curaggcontext = NULL;
348 :
349 : /*
350 : * Moving-aggregate transition functions must not return null, see
351 : * advance_windowaggregate_base().
352 : */
353 170728 : if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
354 0 : ereport(ERROR,
355 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
356 : errmsg("moving-aggregate transition function must not return null")));
357 :
358 : /*
359 : * We must track the number of rows included in transValue, since to
360 : * remove the last input, advance_windowaggregate_base() mustn't call the
361 : * inverse transition function, but simply reset transValue back to its
362 : * initial value.
363 : */
364 170728 : peraggstate->transValueCount++;
365 :
366 : /*
367 : * If pass-by-ref datatype, must copy the new value into aggcontext and
368 : * free the prior transValue. But if transfn returned a pointer to its
369 : * first input, we don't need to do anything. Also, if transfn returned a
370 : * pointer to a R/W expanded object that is already a child of the
371 : * aggcontext, assume we can adopt that value without copying it. (See
372 : * comments for ExecAggCopyTransValue, which this code duplicates.)
373 : */
374 179086 : if (!peraggstate->transtypeByVal &&
375 8358 : DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
376 : {
377 960 : if (!fcinfo->isnull)
378 : {
379 960 : MemoryContextSwitchTo(peraggstate->aggcontext);
380 966 : if (DatumIsReadWriteExpandedObject(newVal,
381 : false,
382 960 : peraggstate->transtypeLen) &&
383 6 : MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
384 : /* do nothing */ ;
385 : else
386 954 : newVal = datumCopy(newVal,
387 954 : peraggstate->transtypeByVal,
388 954 : peraggstate->transtypeLen);
389 : }
390 960 : if (!peraggstate->transValueIsNull)
391 : {
392 900 : if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
393 : false,
394 : peraggstate->transtypeLen))
395 0 : DeleteExpandedObject(peraggstate->transValue);
396 : else
397 900 : pfree(DatumGetPointer(peraggstate->transValue));
398 : }
399 : }
400 :
401 170728 : MemoryContextSwitchTo(oldContext);
402 170728 : peraggstate->transValue = newVal;
403 170728 : peraggstate->transValueIsNull = fcinfo->isnull;
404 : }
405 :
406 : /*
407 : * advance_windowaggregate_base
408 : * Remove the oldest tuple from an aggregation.
409 : *
410 : * This is very much like advance_windowaggregate, except that we will call
411 : * the inverse transition function (which caller must have checked is
412 : * available).
413 : *
414 : * Returns true if we successfully removed the current row from this
415 : * aggregate, false if not (in the latter case, caller is responsible
416 : * for cleaning up by restarting the aggregation).
417 : */
418 : static bool
419 4626 : advance_windowaggregate_base(WindowAggState *winstate,
420 : WindowStatePerFunc perfuncstate,
421 : WindowStatePerAgg peraggstate)
422 : {
423 4626 : LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
424 4626 : WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
425 4626 : int numArguments = perfuncstate->numArguments;
426 : Datum newVal;
427 : ListCell *arg;
428 : int i;
429 : MemoryContext oldContext;
430 4626 : ExprContext *econtext = winstate->tmpcontext;
431 4626 : ExprState *filter = wfuncstate->aggfilter;
432 :
433 4626 : oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
434 :
435 : /* Skip anything FILTERed out */
436 4626 : if (filter)
437 : {
438 : bool isnull;
439 102 : Datum res = ExecEvalExpr(filter, econtext, &isnull);
440 :
441 102 : if (isnull || !DatumGetBool(res))
442 : {
443 48 : MemoryContextSwitchTo(oldContext);
444 48 : return true;
445 : }
446 : }
447 :
448 : /* We start from 1, since the 0th arg will be the transition value */
449 4578 : i = 1;
450 9138 : foreach(arg, wfuncstate->args)
451 : {
452 4560 : ExprState *argstate = (ExprState *) lfirst(arg);
453 :
454 4560 : fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
455 : &fcinfo->args[i].isnull);
456 4560 : i++;
457 : }
458 :
459 4578 : if (peraggstate->invtransfn.fn_strict)
460 : {
461 : /*
462 : * For a strict (inv)transfn, nothing happens when there's a NULL
463 : * input; we just keep the prior transValue. Note transValueCount
464 : * doesn't change either.
465 : */
466 5604 : for (i = 1; i <= numArguments; i++)
467 : {
468 2832 : if (fcinfo->args[i].isnull)
469 : {
470 78 : MemoryContextSwitchTo(oldContext);
471 78 : return true;
472 : }
473 : }
474 : }
475 :
476 : /* There should still be an added but not yet removed value */
477 : Assert(peraggstate->transValueCount > 0);
478 :
479 : /*
480 : * In moving-aggregate mode, the state must never be NULL, except possibly
481 : * before any rows have been aggregated (which is surely not the case at
482 : * this point). This restriction allows us to interpret a NULL result
483 : * from the inverse function as meaning "sorry, can't do an inverse
484 : * transition in this case". We already checked this in
485 : * advance_windowaggregate, but just for safety, check again.
486 : */
487 4500 : if (peraggstate->transValueIsNull)
488 0 : elog(ERROR, "aggregate transition value is NULL before inverse transition");
489 :
490 : /*
491 : * We mustn't use the inverse transition function to remove the last
492 : * input. Doing so would yield a non-NULL state, whereas we should be in
493 : * the initial state afterwards which may very well be NULL. So instead,
494 : * we simply re-initialize the aggregate in this case.
495 : */
496 4500 : if (peraggstate->transValueCount == 1)
497 : {
498 90 : MemoryContextSwitchTo(oldContext);
499 90 : initialize_windowaggregate(winstate,
500 90 : &winstate->perfunc[peraggstate->wfuncno],
501 : peraggstate);
502 90 : return true;
503 : }
504 :
505 : /*
506 : * OK to call the inverse transition function. Set
507 : * winstate->curaggcontext while calling it, for possible use by
508 : * AggCheckCallContext.
509 : */
510 4410 : InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
511 : numArguments + 1,
512 : perfuncstate->winCollation,
513 : (void *) winstate, NULL);
514 4410 : fcinfo->args[0].value = peraggstate->transValue;
515 4410 : fcinfo->args[0].isnull = peraggstate->transValueIsNull;
516 4410 : winstate->curaggcontext = peraggstate->aggcontext;
517 4410 : newVal = FunctionCallInvoke(fcinfo);
518 4410 : winstate->curaggcontext = NULL;
519 :
520 : /*
521 : * If the function returns NULL, report failure, forcing a restart.
522 : */
523 4410 : if (fcinfo->isnull)
524 : {
525 234 : MemoryContextSwitchTo(oldContext);
526 234 : return false;
527 : }
528 :
529 : /* Update number of rows included in transValue */
530 4176 : peraggstate->transValueCount--;
531 :
532 : /*
533 : * If pass-by-ref datatype, must copy the new value into aggcontext and
534 : * free the prior transValue. But if invtransfn returned a pointer to its
535 : * first input, we don't need to do anything. Also, if invtransfn
536 : * returned a pointer to a R/W expanded object that is already a child of
537 : * the aggcontext, assume we can adopt that value without copying it. (See
538 : * comments for ExecAggCopyTransValue, which this code duplicates.)
539 : *
540 : * Note: the checks for null values here will never fire, but it seems
541 : * best to have this stanza look just like advance_windowaggregate.
542 : */
543 6306 : if (!peraggstate->transtypeByVal &&
544 2130 : DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
545 : {
546 666 : if (!fcinfo->isnull)
547 : {
548 666 : MemoryContextSwitchTo(peraggstate->aggcontext);
549 666 : if (DatumIsReadWriteExpandedObject(newVal,
550 : false,
551 666 : peraggstate->transtypeLen) &&
552 0 : MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
553 : /* do nothing */ ;
554 : else
555 666 : newVal = datumCopy(newVal,
556 666 : peraggstate->transtypeByVal,
557 666 : peraggstate->transtypeLen);
558 : }
559 666 : if (!peraggstate->transValueIsNull)
560 : {
561 666 : if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
562 : false,
563 : peraggstate->transtypeLen))
564 0 : DeleteExpandedObject(peraggstate->transValue);
565 : else
566 666 : pfree(DatumGetPointer(peraggstate->transValue));
567 : }
568 : }
569 :
570 4176 : MemoryContextSwitchTo(oldContext);
571 4176 : peraggstate->transValue = newVal;
572 4176 : peraggstate->transValueIsNull = fcinfo->isnull;
573 :
574 4176 : return true;
575 : }
576 :
577 : /*
578 : * finalize_windowaggregate
579 : * parallel to finalize_aggregate in nodeAgg.c
580 : */
581 : static void
582 10590 : finalize_windowaggregate(WindowAggState *winstate,
583 : WindowStatePerFunc perfuncstate,
584 : WindowStatePerAgg peraggstate,
585 : Datum *result, bool *isnull)
586 : {
587 : MemoryContext oldContext;
588 :
589 10590 : oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
590 :
591 : /*
592 : * Apply the agg's finalfn if one is provided, else return transValue.
593 : */
594 10590 : if (OidIsValid(peraggstate->finalfn_oid))
595 : {
596 5884 : LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
597 5884 : int numFinalArgs = peraggstate->numFinalArgs;
598 : bool anynull;
599 : int i;
600 :
601 5884 : InitFunctionCallInfoData(fcinfodata.fcinfo, &(peraggstate->finalfn),
602 : numFinalArgs,
603 : perfuncstate->winCollation,
604 : (void *) winstate, NULL);
605 5884 : fcinfo->args[0].value =
606 5884 : MakeExpandedObjectReadOnly(peraggstate->transValue,
607 : peraggstate->transValueIsNull,
608 : peraggstate->transtypeLen);
609 5884 : fcinfo->args[0].isnull = peraggstate->transValueIsNull;
610 5884 : anynull = peraggstate->transValueIsNull;
611 :
612 : /* Fill any remaining argument positions with nulls */
613 5984 : for (i = 1; i < numFinalArgs; i++)
614 : {
615 100 : fcinfo->args[i].value = (Datum) 0;
616 100 : fcinfo->args[i].isnull = true;
617 100 : anynull = true;
618 : }
619 :
620 5884 : if (fcinfo->flinfo->fn_strict && anynull)
621 : {
622 : /* don't call a strict function with NULL inputs */
623 0 : *result = (Datum) 0;
624 0 : *isnull = true;
625 : }
626 : else
627 : {
628 : Datum res;
629 :
630 5884 : winstate->curaggcontext = peraggstate->aggcontext;
631 5884 : res = FunctionCallInvoke(fcinfo);
632 5872 : winstate->curaggcontext = NULL;
633 5872 : *isnull = fcinfo->isnull;
634 5872 : *result = MakeExpandedObjectReadOnly(res,
635 : fcinfo->isnull,
636 : peraggstate->resulttypeLen);
637 : }
638 : }
639 : else
640 : {
641 4706 : *result =
642 4706 : MakeExpandedObjectReadOnly(peraggstate->transValue,
643 : peraggstate->transValueIsNull,
644 : peraggstate->transtypeLen);
645 4706 : *isnull = peraggstate->transValueIsNull;
646 : }
647 :
648 10578 : MemoryContextSwitchTo(oldContext);
649 10578 : }
650 :
651 : /*
652 : * eval_windowaggregates
653 : * evaluate plain aggregates being used as window functions
654 : *
655 : * This differs from nodeAgg.c in two ways. First, if the window's frame
656 : * start position moves, we use the inverse transition function (if it exists)
657 : * to remove rows from the transition value. And second, we expect to be
658 : * able to call aggregate final functions repeatedly after aggregating more
659 : * data onto the same transition value. This is not a behavior required by
660 : * nodeAgg.c.
661 : */
662 : static void
663 154168 : eval_windowaggregates(WindowAggState *winstate)
664 : {
665 : WindowStatePerAgg peraggstate;
666 : int wfuncno,
667 : numaggs,
668 : numaggs_restart,
669 : i;
670 : int64 aggregatedupto_nonrestarted;
671 : MemoryContext oldContext;
672 : ExprContext *econtext;
673 : WindowObject agg_winobj;
674 : TupleTableSlot *agg_row_slot;
675 : TupleTableSlot *temp_slot;
676 :
677 154168 : numaggs = winstate->numaggs;
678 154168 : if (numaggs == 0)
679 0 : return; /* nothing to do */
680 :
681 : /* final output execution is in ps_ExprContext */
682 154168 : econtext = winstate->ss.ps.ps_ExprContext;
683 154168 : agg_winobj = winstate->agg_winobj;
684 154168 : agg_row_slot = winstate->agg_row_slot;
685 154168 : temp_slot = winstate->temp_slot_1;
686 :
687 : /*
688 : * If the window's frame start clause is UNBOUNDED_PRECEDING and no
689 : * exclusion clause is specified, then the window frame consists of a
690 : * contiguous group of rows extending forward from the start of the
691 : * partition, and rows only enter the frame, never exit it, as the current
692 : * row advances forward. This makes it possible to use an incremental
693 : * strategy for evaluating aggregates: we run the transition function for
694 : * each row added to the frame, and run the final function whenever we
695 : * need the current aggregate value. This is considerably more efficient
696 : * than the naive approach of re-running the entire aggregate calculation
697 : * for each current row. It does assume that the final function doesn't
698 : * damage the running transition value, but we have the same assumption in
699 : * nodeAgg.c too (when it rescans an existing hash table).
700 : *
701 : * If the frame start does sometimes move, we can still optimize as above
702 : * whenever successive rows share the same frame head, but if the frame
703 : * head moves beyond the previous head we try to remove those rows using
704 : * the aggregate's inverse transition function. This function restores
705 : * the aggregate's current state to what it would be if the removed row
706 : * had never been aggregated in the first place. Inverse transition
707 : * functions may optionally return NULL, indicating that the function was
708 : * unable to remove the tuple from aggregation. If this happens, or if
709 : * the aggregate doesn't have an inverse transition function at all, we
710 : * must perform the aggregation all over again for all tuples within the
711 : * new frame boundaries.
712 : *
713 : * If there's any exclusion clause, then we may have to aggregate over a
714 : * non-contiguous set of rows, so we punt and recalculate for every row.
715 : * (For some frame end choices, it might be that the frame is always
716 : * contiguous anyway, but that's an optimization to investigate later.)
717 : *
718 : * In many common cases, multiple rows share the same frame and hence the
719 : * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
720 : * window, then all rows are peers and so they all have window frame equal
721 : * to the whole partition.) We optimize such cases by calculating the
722 : * aggregate value once when we reach the first row of a peer group, and
723 : * then returning the saved value for all subsequent rows.
724 : *
725 : * 'aggregatedupto' keeps track of the first row that has not yet been
726 : * accumulated into the aggregate transition values. Whenever we start a
727 : * new peer group, we accumulate forward to the end of the peer group.
728 : */
729 :
730 : /*
731 : * First, update the frame head position.
732 : *
733 : * The frame head should never move backwards, and the code below wouldn't
734 : * cope if it did, so for safety we complain if it does.
735 : */
736 154168 : update_frameheadpos(winstate);
737 154162 : if (winstate->frameheadpos < winstate->aggregatedbase)
738 0 : elog(ERROR, "window frame head moved backward");
739 :
740 : /*
741 : * If the frame didn't change compared to the previous row, we can re-use
742 : * the result values that were previously saved at the bottom of this
743 : * function. Since we don't know the current frame's end yet, this is not
744 : * possible to check for fully. But if the frame end mode is UNBOUNDED
745 : * FOLLOWING or CURRENT ROW, no exclusion clause is specified, and the
746 : * current row lies within the previous row's frame, then the two frames'
747 : * ends must coincide. Note that on the first row aggregatedbase ==
748 : * aggregatedupto, meaning this test must fail, so we don't need to check
749 : * the "there was no previous row" case explicitly here.
750 : */
751 154162 : if (winstate->aggregatedbase == winstate->frameheadpos &&
752 150384 : (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
753 148464 : FRAMEOPTION_END_CURRENT_ROW)) &&
754 148464 : !(winstate->frameOptions & FRAMEOPTION_EXCLUSION) &&
755 148284 : winstate->aggregatedbase <= winstate->currentpos &&
756 148248 : winstate->aggregatedupto > winstate->currentpos)
757 : {
758 290624 : for (i = 0; i < numaggs; i++)
759 : {
760 145318 : peraggstate = &winstate->peragg[i];
761 145318 : wfuncno = peraggstate->wfuncno;
762 145318 : econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
763 145318 : econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
764 : }
765 145306 : return;
766 : }
767 :
768 : /*----------
769 : * Initialize restart flags.
770 : *
771 : * We restart the aggregation:
772 : * - if we're processing the first row in the partition, or
773 : * - if the frame's head moved and we cannot use an inverse
774 : * transition function, or
775 : * - we have an EXCLUSION clause, or
776 : * - if the new frame doesn't overlap the old one
777 : *
778 : * Note that we don't strictly need to restart in the last case, but if
779 : * we're going to remove all rows from the aggregation anyway, a restart
780 : * surely is faster.
781 : *----------
782 : */
783 8856 : numaggs_restart = 0;
784 19470 : for (i = 0; i < numaggs; i++)
785 : {
786 10614 : peraggstate = &winstate->peragg[i];
787 10614 : if (winstate->currentpos == 0 ||
788 8600 : (winstate->aggregatedbase != winstate->frameheadpos &&
789 5206 : !OidIsValid(peraggstate->invtransfn_oid)) ||
790 8524 : (winstate->frameOptions & FRAMEOPTION_EXCLUSION) ||
791 7408 : winstate->aggregatedupto <= winstate->frameheadpos)
792 : {
793 3674 : peraggstate->restart = true;
794 3674 : numaggs_restart++;
795 : }
796 : else
797 6940 : peraggstate->restart = false;
798 : }
799 :
800 : /*
801 : * If we have any possibly-moving aggregates, attempt to advance
802 : * aggregatedbase to match the frame's head by removing input rows that
803 : * fell off the top of the frame from the aggregations. This can fail,
804 : * i.e. advance_windowaggregate_base() can return false, in which case
805 : * we'll restart that aggregate below.
806 : */
807 12024 : while (numaggs_restart < numaggs &&
808 8530 : winstate->aggregatedbase < winstate->frameheadpos)
809 : {
810 : /*
811 : * Fetch the next tuple of those being removed. This should never fail
812 : * as we should have been here before.
813 : */
814 3168 : if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
815 : temp_slot))
816 0 : elog(ERROR, "could not re-fetch previously fetched frame row");
817 :
818 : /* Set tuple context for evaluation of aggregate arguments */
819 3168 : winstate->tmpcontext->ecxt_outertuple = temp_slot;
820 :
821 : /*
822 : * Perform the inverse transition for each aggregate function in the
823 : * window, unless it has already been marked as needing a restart.
824 : */
825 7806 : for (i = 0; i < numaggs; i++)
826 : {
827 : bool ok;
828 :
829 4638 : peraggstate = &winstate->peragg[i];
830 4638 : if (peraggstate->restart)
831 12 : continue;
832 :
833 4626 : wfuncno = peraggstate->wfuncno;
834 4626 : ok = advance_windowaggregate_base(winstate,
835 4626 : &winstate->perfunc[wfuncno],
836 : peraggstate);
837 4626 : if (!ok)
838 : {
839 : /* Inverse transition function has failed, must restart */
840 234 : peraggstate->restart = true;
841 234 : numaggs_restart++;
842 : }
843 : }
844 :
845 : /* Reset per-input-tuple context after each tuple */
846 3168 : ResetExprContext(winstate->tmpcontext);
847 :
848 : /* And advance the aggregated-row state */
849 3168 : winstate->aggregatedbase++;
850 3168 : ExecClearTuple(temp_slot);
851 : }
852 :
853 : /*
854 : * If we successfully advanced the base rows of all the aggregates,
855 : * aggregatedbase now equals frameheadpos; but if we failed for any, we
856 : * must forcibly update aggregatedbase.
857 : */
858 8856 : winstate->aggregatedbase = winstate->frameheadpos;
859 :
860 : /*
861 : * If we created a mark pointer for aggregates, keep it pushed up to frame
862 : * head, so that tuplestore can discard unnecessary rows.
863 : */
864 8856 : if (agg_winobj->markptr >= 0)
865 6224 : WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
866 :
867 : /*
868 : * Now restart the aggregates that require it.
869 : *
870 : * We assume that aggregates using the shared context always restart if
871 : * *any* aggregate restarts, and we may thus clean up the shared
872 : * aggcontext if that is the case. Private aggcontexts are reset by
873 : * initialize_windowaggregate() if their owning aggregate restarts. If we
874 : * aren't restarting an aggregate, we need to free any previously saved
875 : * result for it, else we'll leak memory.
876 : */
877 8856 : if (numaggs_restart > 0)
878 3706 : MemoryContextReset(winstate->aggcontext);
879 19470 : for (i = 0; i < numaggs; i++)
880 : {
881 10614 : peraggstate = &winstate->peragg[i];
882 :
883 : /* Aggregates using the shared ctx must restart if *any* agg does */
884 : Assert(peraggstate->aggcontext != winstate->aggcontext ||
885 : numaggs_restart == 0 ||
886 : peraggstate->restart);
887 :
888 10614 : if (peraggstate->restart)
889 : {
890 3908 : wfuncno = peraggstate->wfuncno;
891 3908 : initialize_windowaggregate(winstate,
892 3908 : &winstate->perfunc[wfuncno],
893 : peraggstate);
894 : }
895 6706 : else if (!peraggstate->resultValueIsNull)
896 : {
897 6472 : if (!peraggstate->resulttypeByVal)
898 2152 : pfree(DatumGetPointer(peraggstate->resultValue));
899 6472 : peraggstate->resultValue = (Datum) 0;
900 6472 : peraggstate->resultValueIsNull = true;
901 : }
902 : }
903 :
904 : /*
905 : * Non-restarted aggregates now contain the rows between aggregatedbase
906 : * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
907 : * contain no rows. If there are any restarted aggregates, we must thus
908 : * begin aggregating anew at frameheadpos, otherwise we may simply
909 : * continue at aggregatedupto. We must remember the old value of
910 : * aggregatedupto to know how long to skip advancing non-restarted
911 : * aggregates. If we modify aggregatedupto, we must also clear
912 : * agg_row_slot, per the loop invariant below.
913 : */
914 8856 : aggregatedupto_nonrestarted = winstate->aggregatedupto;
915 8856 : if (numaggs_restart > 0 &&
916 3706 : winstate->aggregatedupto != winstate->frameheadpos)
917 : {
918 1398 : winstate->aggregatedupto = winstate->frameheadpos;
919 1398 : ExecClearTuple(agg_row_slot);
920 : }
921 :
922 : /*
923 : * Advance until we reach a row not in frame (or end of partition).
924 : *
925 : * Note the loop invariant: agg_row_slot is either empty or holds the row
926 : * at position aggregatedupto. We advance aggregatedupto after processing
927 : * a row.
928 : */
929 : for (;;)
930 170132 : {
931 : int ret;
932 :
933 : /* Fetch next row if we didn't already */
934 178988 : if (TupIsNull(agg_row_slot))
935 : {
936 175126 : if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
937 : agg_row_slot))
938 4130 : break; /* must be end of partition */
939 : }
940 :
941 : /*
942 : * Exit loop if no more rows can be in frame. Skip aggregation if
943 : * current row is not in frame but there might be more in the frame.
944 : */
945 174858 : ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);
946 174846 : if (ret < 0)
947 4702 : break;
948 170144 : if (ret == 0)
949 1896 : goto next_tuple;
950 :
951 : /* Set tuple context for evaluation of aggregate arguments */
952 168248 : winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
953 :
954 : /* Accumulate row into the aggregates */
955 358474 : for (i = 0; i < numaggs; i++)
956 : {
957 190238 : peraggstate = &winstate->peragg[i];
958 :
959 : /* Non-restarted aggs skip until aggregatedupto_nonrestarted */
960 190238 : if (!peraggstate->restart &&
961 118348 : winstate->aggregatedupto < aggregatedupto_nonrestarted)
962 18696 : continue;
963 :
964 171542 : wfuncno = peraggstate->wfuncno;
965 171542 : advance_windowaggregate(winstate,
966 171542 : &winstate->perfunc[wfuncno],
967 : peraggstate);
968 : }
969 :
970 168236 : next_tuple:
971 : /* Reset per-input-tuple context after each tuple */
972 170132 : ResetExprContext(winstate->tmpcontext);
973 :
974 : /* And advance the aggregated-row state */
975 170132 : winstate->aggregatedupto++;
976 170132 : ExecClearTuple(agg_row_slot);
977 : }
978 :
979 : /* The frame's end is not supposed to move backwards, ever */
980 : Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
981 :
982 : /*
983 : * finalize aggregates and fill result/isnull fields.
984 : */
985 19410 : for (i = 0; i < numaggs; i++)
986 : {
987 : Datum *result;
988 : bool *isnull;
989 :
990 10590 : peraggstate = &winstate->peragg[i];
991 10590 : wfuncno = peraggstate->wfuncno;
992 10590 : result = &econtext->ecxt_aggvalues[wfuncno];
993 10590 : isnull = &econtext->ecxt_aggnulls[wfuncno];
994 10590 : finalize_windowaggregate(winstate,
995 10590 : &winstate->perfunc[wfuncno],
996 : peraggstate,
997 : result, isnull);
998 :
999 : /*
1000 : * save the result in case next row shares the same frame.
1001 : *
1002 : * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
1003 : * advance that the next row can't possibly share the same frame. Is
1004 : * it worth detecting that and skipping this code?
1005 : */
1006 10578 : if (!peraggstate->resulttypeByVal && !*isnull)
1007 : {
1008 2752 : oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
1009 2752 : peraggstate->resultValue =
1010 2752 : datumCopy(*result,
1011 2752 : peraggstate->resulttypeByVal,
1012 2752 : peraggstate->resulttypeLen);
1013 2752 : MemoryContextSwitchTo(oldContext);
1014 : }
1015 : else
1016 : {
1017 7826 : peraggstate->resultValue = *result;
1018 : }
1019 10578 : peraggstate->resultValueIsNull = *isnull;
1020 : }
1021 : }
1022 :
1023 : /*
1024 : * eval_windowfunction
1025 : *
1026 : * Arguments of window functions are not evaluated here, because a window
1027 : * function can need random access to arbitrary rows in the partition.
1028 : * The window function uses the special WinGetFuncArgInPartition and
1029 : * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
1030 : * it wants.
1031 : */
1032 : static void
1033 814884 : eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
1034 : Datum *result, bool *isnull)
1035 : {
1036 814884 : LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1037 : MemoryContext oldContext;
1038 :
1039 814884 : oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1040 :
1041 : /*
1042 : * We don't pass any normal arguments to a window function, but we do pass
1043 : * it the number of arguments, in order to permit window function
1044 : * implementations to support varying numbers of arguments. The real info
1045 : * goes through the WindowObject, which is passed via fcinfo->context.
1046 : */
1047 814884 : InitFunctionCallInfoData(*fcinfo, &(perfuncstate->flinfo),
1048 : perfuncstate->numArguments,
1049 : perfuncstate->winCollation,
1050 : (void *) perfuncstate->winobj, NULL);
1051 : /* Just in case, make all the regular argument slots be null */
1052 1007676 : for (int argno = 0; argno < perfuncstate->numArguments; argno++)
1053 192792 : fcinfo->args[argno].isnull = true;
1054 : /* Window functions don't have a current aggregate context, either */
1055 814884 : winstate->curaggcontext = NULL;
1056 :
1057 814884 : *result = FunctionCallInvoke(fcinfo);
1058 814794 : *isnull = fcinfo->isnull;
1059 :
1060 : /*
1061 : * The window function might have returned a pass-by-ref result that's
1062 : * just a pointer into one of the WindowObject's temporary slots. That's
1063 : * not a problem if it's the only window function using the WindowObject;
1064 : * but if there's more than one function, we'd better copy the result to
1065 : * ensure it's not clobbered by later window functions.
1066 : */
1067 814794 : if (!perfuncstate->resulttypeByVal && !fcinfo->isnull &&
1068 1008 : winstate->numfuncs > 1)
1069 96 : *result = datumCopy(*result,
1070 96 : perfuncstate->resulttypeByVal,
1071 96 : perfuncstate->resulttypeLen);
1072 :
1073 814794 : MemoryContextSwitchTo(oldContext);
1074 814794 : }
1075 :
1076 : /*
1077 : * prepare_tuplestore
1078 : * Prepare the tuplestore and all of the required read pointers for the
1079 : * WindowAggState's frameOptions.
1080 : *
1081 : * Note: We use pg_noinline to avoid bloating the calling function with code
1082 : * which is only called once.
1083 : */
1084 : static pg_noinline void
1085 2078 : prepare_tuplestore(WindowAggState *winstate)
1086 : {
1087 2078 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1088 2078 : int frameOptions = winstate->frameOptions;
1089 2078 : int numfuncs = winstate->numfuncs;
1090 :
1091 : /* we shouldn't be called if this was done already */
1092 : Assert(winstate->buffer == NULL);
1093 :
1094 : /* Create new tuplestore */
1095 2078 : winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
1096 :
1097 : /*
1098 : * Set up read pointers for the tuplestore. The current pointer doesn't
1099 : * need BACKWARD capability, but the per-window-function read pointers do,
1100 : * and the aggregate pointer does if we might need to restart aggregation.
1101 : */
1102 2078 : winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */
1103 :
1104 : /* reset default REWIND capability bit for current ptr */
1105 2078 : tuplestore_set_eflags(winstate->buffer, 0);
1106 :
1107 : /* create read pointers for aggregates, if needed */
1108 2078 : if (winstate->numaggs > 0)
1109 : {
1110 1112 : WindowObject agg_winobj = winstate->agg_winobj;
1111 1112 : int readptr_flags = 0;
1112 :
1113 : /*
1114 : * If the frame head is potentially movable, or we have an EXCLUSION
1115 : * clause, we might need to restart aggregation ...
1116 : */
1117 1112 : if (!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) ||
1118 378 : (frameOptions & FRAMEOPTION_EXCLUSION))
1119 : {
1120 : /* ... so create a mark pointer to track the frame head */
1121 752 : agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
1122 : /* and the read pointer will need BACKWARD capability */
1123 752 : readptr_flags |= EXEC_FLAG_BACKWARD;
1124 : }
1125 :
1126 1112 : agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1127 : readptr_flags);
1128 : }
1129 :
1130 : /* create mark and read pointers for each real window function */
1131 4768 : for (int i = 0; i < numfuncs; i++)
1132 : {
1133 2690 : WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1134 :
1135 2690 : if (!perfuncstate->plain_agg)
1136 : {
1137 1494 : WindowObject winobj = perfuncstate->winobj;
1138 :
1139 1494 : winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
1140 : 0);
1141 1494 : winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1142 : EXEC_FLAG_BACKWARD);
1143 : }
1144 : }
1145 :
1146 : /*
1147 : * If we are in RANGE or GROUPS mode, then determining frame boundaries
1148 : * requires physical access to the frame endpoint rows, except in certain
1149 : * degenerate cases. We create read pointers to point to those rows, to
1150 : * simplify access and ensure that the tuplestore doesn't discard the
1151 : * endpoint rows prematurely. (Must create pointers in exactly the same
1152 : * cases that update_frameheadpos and update_frametailpos need them.)
1153 : */
1154 2078 : winstate->framehead_ptr = winstate->frametail_ptr = -1; /* if not used */
1155 :
1156 2078 : if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1157 : {
1158 1226 : if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
1159 68 : node->ordNumCols != 0) ||
1160 1158 : (frameOptions & FRAMEOPTION_START_OFFSET))
1161 722 : winstate->framehead_ptr =
1162 722 : tuplestore_alloc_read_pointer(winstate->buffer, 0);
1163 1226 : if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
1164 456 : node->ordNumCols != 0) ||
1165 918 : (frameOptions & FRAMEOPTION_END_OFFSET))
1166 1022 : winstate->frametail_ptr =
1167 1022 : tuplestore_alloc_read_pointer(winstate->buffer, 0);
1168 : }
1169 :
1170 : /*
1171 : * If we have an exclusion clause that requires knowing the boundaries of
1172 : * the current row's peer group, we create a read pointer to track the
1173 : * tail position of the peer group (i.e., first row of the next peer
1174 : * group). The head position does not require its own pointer because we
1175 : * maintain that as a side effect of advancing the current row.
1176 : */
1177 2078 : winstate->grouptail_ptr = -1;
1178 :
1179 2078 : if ((frameOptions & (FRAMEOPTION_EXCLUDE_GROUP |
1180 180 : FRAMEOPTION_EXCLUDE_TIES)) &&
1181 180 : node->ordNumCols != 0)
1182 : {
1183 168 : winstate->grouptail_ptr =
1184 168 : tuplestore_alloc_read_pointer(winstate->buffer, 0);
1185 : }
1186 2078 : }
1187 :
1188 : /*
1189 : * begin_partition
1190 : * Start buffering rows of the next partition.
1191 : */
1192 : static void
1193 3370 : begin_partition(WindowAggState *winstate)
1194 : {
1195 3370 : PlanState *outerPlan = outerPlanState(winstate);
1196 3370 : int numfuncs = winstate->numfuncs;
1197 :
1198 3370 : winstate->partition_spooled = false;
1199 3370 : winstate->framehead_valid = false;
1200 3370 : winstate->frametail_valid = false;
1201 3370 : winstate->grouptail_valid = false;
1202 3370 : winstate->spooled_rows = 0;
1203 3370 : winstate->currentpos = 0;
1204 3370 : winstate->frameheadpos = 0;
1205 3370 : winstate->frametailpos = 0;
1206 3370 : winstate->currentgroup = 0;
1207 3370 : winstate->frameheadgroup = 0;
1208 3370 : winstate->frametailgroup = 0;
1209 3370 : winstate->groupheadpos = 0;
1210 3370 : winstate->grouptailpos = -1; /* see update_grouptailpos */
1211 3370 : ExecClearTuple(winstate->agg_row_slot);
1212 3370 : if (winstate->framehead_slot)
1213 1024 : ExecClearTuple(winstate->framehead_slot);
1214 3370 : if (winstate->frametail_slot)
1215 1696 : ExecClearTuple(winstate->frametail_slot);
1216 :
1217 : /*
1218 : * If this is the very first partition, we need to fetch the first input
1219 : * row to store in first_part_slot.
1220 : */
1221 3370 : if (TupIsNull(winstate->first_part_slot))
1222 : {
1223 2156 : TupleTableSlot *outerslot = ExecProcNode(outerPlan);
1224 :
1225 2156 : if (!TupIsNull(outerslot))
1226 2138 : ExecCopySlot(winstate->first_part_slot, outerslot);
1227 : else
1228 : {
1229 : /* outer plan is empty, so we have nothing to do */
1230 18 : winstate->partition_spooled = true;
1231 18 : winstate->more_partitions = false;
1232 18 : return;
1233 : }
1234 : }
1235 :
1236 : /* Create new tuplestore if not done already. */
1237 3352 : if (unlikely(winstate->buffer == NULL))
1238 2078 : prepare_tuplestore(winstate);
1239 :
1240 3352 : winstate->next_partition = false;
1241 :
1242 3352 : if (winstate->numaggs > 0)
1243 : {
1244 1846 : WindowObject agg_winobj = winstate->agg_winobj;
1245 :
1246 : /* reset mark and see positions for aggregate functions */
1247 1846 : agg_winobj->markpos = -1;
1248 1846 : agg_winobj->seekpos = -1;
1249 :
1250 : /* Also reset the row counters for aggregates */
1251 1846 : winstate->aggregatedbase = 0;
1252 1846 : winstate->aggregatedupto = 0;
1253 : }
1254 :
1255 : /* reset mark and seek positions for each real window function */
1256 7544 : for (int i = 0; i < numfuncs; i++)
1257 : {
1258 4192 : WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1259 :
1260 4192 : if (!perfuncstate->plain_agg)
1261 : {
1262 2172 : WindowObject winobj = perfuncstate->winobj;
1263 :
1264 2172 : winobj->markpos = -1;
1265 2172 : winobj->seekpos = -1;
1266 : }
1267 : }
1268 :
1269 : /*
1270 : * Store the first tuple into the tuplestore (it's always available now;
1271 : * we either read it above, or saved it at the end of previous partition)
1272 : */
1273 3352 : tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
1274 3352 : winstate->spooled_rows++;
1275 : }
1276 :
1277 : /*
1278 : * Read tuples from the outer node, up to and including position 'pos', and
1279 : * store them into the tuplestore. If pos is -1, reads the whole partition.
1280 : */
1281 : static void
1282 1682530 : spool_tuples(WindowAggState *winstate, int64 pos)
1283 : {
1284 1682530 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1285 : PlanState *outerPlan;
1286 : TupleTableSlot *outerslot;
1287 : MemoryContext oldcontext;
1288 :
1289 1682530 : if (!winstate->buffer)
1290 6 : return; /* just a safety check */
1291 1682524 : if (winstate->partition_spooled)
1292 110548 : return; /* whole partition done already */
1293 :
1294 : /*
1295 : * When in pass-through mode we can just exhaust all tuples in the current
1296 : * partition. We don't need these tuples for any further window function
1297 : * evaluation, however, we do need to keep them around if we're not the
1298 : * top-level window as another WindowAgg node above must see these.
1299 : */
1300 1571976 : if (winstate->status != WINDOWAGG_RUN)
1301 : {
1302 : Assert(winstate->status == WINDOWAGG_PASSTHROUGH ||
1303 : winstate->status == WINDOWAGG_PASSTHROUGH_STRICT);
1304 :
1305 18 : pos = -1;
1306 : }
1307 :
1308 : /*
1309 : * If the tuplestore has spilled to disk, alternate reading and writing
1310 : * becomes quite expensive due to frequent buffer flushes. It's cheaper
1311 : * to force the entire partition to get spooled in one go.
1312 : *
1313 : * XXX this is a horrid kluge --- it'd be better to fix the performance
1314 : * problem inside tuplestore. FIXME
1315 : */
1316 1571958 : else if (!tuplestore_in_memory(winstate->buffer))
1317 12 : pos = -1;
1318 :
1319 1571976 : outerPlan = outerPlanState(winstate);
1320 :
1321 : /* Must be in query context to call outerplan */
1322 1571976 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1323 :
1324 2533122 : while (winstate->spooled_rows <= pos || pos == -1)
1325 : {
1326 964348 : outerslot = ExecProcNode(outerPlan);
1327 964348 : if (TupIsNull(outerslot))
1328 : {
1329 : /* reached the end of the last partition */
1330 1988 : winstate->partition_spooled = true;
1331 1988 : winstate->more_partitions = false;
1332 1988 : break;
1333 : }
1334 :
1335 962360 : if (node->partNumCols > 0)
1336 : {
1337 135620 : ExprContext *econtext = winstate->tmpcontext;
1338 :
1339 135620 : econtext->ecxt_innertuple = winstate->first_part_slot;
1340 135620 : econtext->ecxt_outertuple = outerslot;
1341 :
1342 : /* Check if this tuple still belongs to the current partition */
1343 135620 : if (!ExecQualAndReset(winstate->partEqfunction, econtext))
1344 : {
1345 : /*
1346 : * end of partition; copy the tuple for the next cycle.
1347 : */
1348 1214 : ExecCopySlot(winstate->first_part_slot, outerslot);
1349 1214 : winstate->partition_spooled = true;
1350 1214 : winstate->more_partitions = true;
1351 1214 : break;
1352 : }
1353 : }
1354 :
1355 : /*
1356 : * Remember the tuple unless we're the top-level window and we're in
1357 : * pass-through mode.
1358 : */
1359 961146 : if (winstate->status != WINDOWAGG_PASSTHROUGH_STRICT)
1360 : {
1361 : /* Still in partition, so save it into the tuplestore */
1362 961134 : tuplestore_puttupleslot(winstate->buffer, outerslot);
1363 961134 : winstate->spooled_rows++;
1364 : }
1365 : }
1366 :
1367 1571976 : MemoryContextSwitchTo(oldcontext);
1368 : }
1369 :
1370 : /*
1371 : * release_partition
1372 : * clear information kept within a partition, including
1373 : * tuplestore and aggregate results.
1374 : */
1375 : static void
1376 5616 : release_partition(WindowAggState *winstate)
1377 : {
1378 : int i;
1379 :
1380 12582 : for (i = 0; i < winstate->numfuncs; i++)
1381 : {
1382 6966 : WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1383 :
1384 : /* Release any partition-local state of this window function */
1385 6966 : if (perfuncstate->winobj)
1386 3504 : perfuncstate->winobj->localmem = NULL;
1387 : }
1388 :
1389 : /*
1390 : * Release all partition-local memory (in particular, any partition-local
1391 : * state that we might have trashed our pointers to in the above loop, and
1392 : * any aggregate temp data). We don't rely on retail pfree because some
1393 : * aggregates might have allocated data we don't have direct pointers to.
1394 : */
1395 5616 : MemoryContextReset(winstate->partcontext);
1396 5616 : MemoryContextReset(winstate->aggcontext);
1397 9078 : for (i = 0; i < winstate->numaggs; i++)
1398 : {
1399 3462 : if (winstate->peragg[i].aggcontext != winstate->aggcontext)
1400 1926 : MemoryContextReset(winstate->peragg[i].aggcontext);
1401 : }
1402 :
1403 5616 : if (winstate->buffer)
1404 3256 : tuplestore_clear(winstate->buffer);
1405 5616 : winstate->partition_spooled = false;
1406 5616 : winstate->next_partition = true;
1407 5616 : }
1408 :
1409 : /*
1410 : * row_is_in_frame
1411 : * Determine whether a row is in the current row's window frame according
1412 : * to our window framing rule
1413 : *
1414 : * The caller must have already determined that the row is in the partition
1415 : * and fetched it into a slot. This function just encapsulates the framing
1416 : * rules.
1417 : *
1418 : * Returns:
1419 : * -1, if the row is out of frame and no succeeding rows can be in frame
1420 : * 0, if the row is out of frame but succeeding rows might be in frame
1421 : * 1, if the row is in frame
1422 : *
1423 : * May clobber winstate->temp_slot_2.
1424 : */
1425 : static int
1426 182982 : row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
1427 : {
1428 182982 : int frameOptions = winstate->frameOptions;
1429 :
1430 : Assert(pos >= 0); /* else caller error */
1431 :
1432 : /*
1433 : * First, check frame starting conditions. We might as well delegate this
1434 : * to update_frameheadpos always; it doesn't add any notable cost.
1435 : */
1436 182982 : update_frameheadpos(winstate);
1437 182982 : if (pos < winstate->frameheadpos)
1438 144 : return 0;
1439 :
1440 : /*
1441 : * Okay so far, now check frame ending conditions. Here, we avoid calling
1442 : * update_frametailpos in simple cases, so as not to spool tuples further
1443 : * ahead than necessary.
1444 : */
1445 182838 : if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1446 : {
1447 152046 : if (frameOptions & FRAMEOPTION_ROWS)
1448 : {
1449 : /* rows after current row are out of frame */
1450 2208 : if (pos > winstate->currentpos)
1451 972 : return -1;
1452 : }
1453 149838 : else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1454 : {
1455 : /* following row that is not peer is out of frame */
1456 149838 : if (pos > winstate->currentpos &&
1457 146486 : !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1458 1264 : return -1;
1459 : }
1460 : else
1461 : Assert(false);
1462 : }
1463 30792 : else if (frameOptions & FRAMEOPTION_END_OFFSET)
1464 : {
1465 17934 : if (frameOptions & FRAMEOPTION_ROWS)
1466 : {
1467 3948 : int64 offset = DatumGetInt64(winstate->endOffsetValue);
1468 :
1469 : /* rows after current row + offset are out of frame */
1470 3948 : if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1471 114 : offset = -offset;
1472 :
1473 3948 : if (pos > winstate->currentpos + offset)
1474 1152 : return -1;
1475 : }
1476 13986 : else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1477 : {
1478 : /* hard cases, so delegate to update_frametailpos */
1479 13986 : update_frametailpos(winstate);
1480 13944 : if (pos >= winstate->frametailpos)
1481 1470 : return -1;
1482 : }
1483 : else
1484 : Assert(false);
1485 : }
1486 :
1487 : /* Check exclusion clause */
1488 177938 : if (frameOptions & FRAMEOPTION_EXCLUDE_CURRENT_ROW)
1489 : {
1490 2466 : if (pos == winstate->currentpos)
1491 420 : return 0;
1492 : }
1493 175472 : else if ((frameOptions & FRAMEOPTION_EXCLUDE_GROUP) ||
1494 172610 : ((frameOptions & FRAMEOPTION_EXCLUDE_TIES) &&
1495 2970 : pos != winstate->currentpos))
1496 : {
1497 5292 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1498 :
1499 : /* If no ORDER BY, all rows are peers with each other */
1500 5292 : if (node->ordNumCols == 0)
1501 468 : return 0;
1502 : /* Otherwise, check the group boundaries */
1503 4824 : if (pos >= winstate->groupheadpos)
1504 : {
1505 2592 : update_grouptailpos(winstate);
1506 2592 : if (pos < winstate->grouptailpos)
1507 1008 : return 0;
1508 : }
1509 : }
1510 :
1511 : /* If we get here, it's in frame */
1512 176042 : return 1;
1513 : }
1514 :
1515 : /*
1516 : * update_frameheadpos
1517 : * make frameheadpos valid for the current row
1518 : *
1519 : * Note that frameheadpos is computed without regard for any window exclusion
1520 : * clause; the current row and/or its peers are considered part of the frame
1521 : * for this purpose even if they must be excluded later.
1522 : *
1523 : * May clobber winstate->temp_slot_2.
1524 : */
1525 : static void
1526 348072 : update_frameheadpos(WindowAggState *winstate)
1527 : {
1528 348072 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1529 348072 : int frameOptions = winstate->frameOptions;
1530 : MemoryContext oldcontext;
1531 :
1532 348072 : if (winstate->framehead_valid)
1533 189170 : return; /* already known for current row */
1534 :
1535 : /* We may be called in a short-lived context */
1536 158902 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1537 :
1538 158902 : if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
1539 : {
1540 : /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
1541 148622 : winstate->frameheadpos = 0;
1542 148622 : winstate->framehead_valid = true;
1543 : }
1544 10280 : else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1545 : {
1546 2804 : if (frameOptions & FRAMEOPTION_ROWS)
1547 : {
1548 : /* In ROWS mode, frame head is the same as current */
1549 2376 : winstate->frameheadpos = winstate->currentpos;
1550 2376 : winstate->framehead_valid = true;
1551 : }
1552 428 : else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1553 : {
1554 : /* If no ORDER BY, all rows are peers with each other */
1555 428 : if (node->ordNumCols == 0)
1556 : {
1557 0 : winstate->frameheadpos = 0;
1558 0 : winstate->framehead_valid = true;
1559 0 : MemoryContextSwitchTo(oldcontext);
1560 0 : return;
1561 : }
1562 :
1563 : /*
1564 : * In RANGE or GROUPS START_CURRENT_ROW mode, frame head is the
1565 : * first row that is a peer of current row. We keep a copy of the
1566 : * last-known frame head row in framehead_slot, and advance as
1567 : * necessary. Note that if we reach end of partition, we will
1568 : * leave frameheadpos = end+1 and framehead_slot empty.
1569 : */
1570 428 : tuplestore_select_read_pointer(winstate->buffer,
1571 : winstate->framehead_ptr);
1572 428 : if (winstate->frameheadpos == 0 &&
1573 212 : TupIsNull(winstate->framehead_slot))
1574 : {
1575 : /* fetch first row into framehead_slot, if we didn't already */
1576 82 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1577 : winstate->framehead_slot))
1578 0 : elog(ERROR, "unexpected end of tuplestore");
1579 : }
1580 :
1581 744 : while (!TupIsNull(winstate->framehead_slot))
1582 : {
1583 744 : if (are_peers(winstate, winstate->framehead_slot,
1584 : winstate->ss.ss_ScanTupleSlot))
1585 428 : break; /* this row is the correct frame head */
1586 : /* Note we advance frameheadpos even if the fetch fails */
1587 316 : winstate->frameheadpos++;
1588 316 : spool_tuples(winstate, winstate->frameheadpos);
1589 316 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1590 : winstate->framehead_slot))
1591 0 : break; /* end of partition */
1592 : }
1593 428 : winstate->framehead_valid = true;
1594 : }
1595 : else
1596 : Assert(false);
1597 : }
1598 7476 : else if (frameOptions & FRAMEOPTION_START_OFFSET)
1599 : {
1600 7476 : if (frameOptions & FRAMEOPTION_ROWS)
1601 : {
1602 : /* In ROWS mode, bound is physically n before/after current */
1603 1572 : int64 offset = DatumGetInt64(winstate->startOffsetValue);
1604 :
1605 1572 : if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1606 1512 : offset = -offset;
1607 :
1608 1572 : winstate->frameheadpos = winstate->currentpos + offset;
1609 : /* frame head can't go before first row */
1610 1572 : if (winstate->frameheadpos < 0)
1611 228 : winstate->frameheadpos = 0;
1612 1344 : else if (winstate->frameheadpos > winstate->currentpos + 1)
1613 : {
1614 : /* make sure frameheadpos is not past end of partition */
1615 0 : spool_tuples(winstate, winstate->frameheadpos - 1);
1616 0 : if (winstate->frameheadpos > winstate->spooled_rows)
1617 0 : winstate->frameheadpos = winstate->spooled_rows;
1618 : }
1619 1572 : winstate->framehead_valid = true;
1620 : }
1621 5904 : else if (frameOptions & FRAMEOPTION_RANGE)
1622 : {
1623 : /*
1624 : * In RANGE START_OFFSET mode, frame head is the first row that
1625 : * satisfies the in_range constraint relative to the current row.
1626 : * We keep a copy of the last-known frame head row in
1627 : * framehead_slot, and advance as necessary. Note that if we
1628 : * reach end of partition, we will leave frameheadpos = end+1 and
1629 : * framehead_slot empty.
1630 : */
1631 4524 : int sortCol = node->ordColIdx[0];
1632 : bool sub,
1633 : less;
1634 :
1635 : /* We must have an ordering column */
1636 : Assert(node->ordNumCols == 1);
1637 :
1638 : /* Precompute flags for in_range checks */
1639 4524 : if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1640 3702 : sub = true; /* subtract startOffset from current row */
1641 : else
1642 822 : sub = false; /* add it */
1643 4524 : less = false; /* normally, we want frame head >= sum */
1644 : /* If sort order is descending, flip both flags */
1645 4524 : if (!winstate->inRangeAsc)
1646 : {
1647 654 : sub = !sub;
1648 654 : less = true;
1649 : }
1650 :
1651 4524 : tuplestore_select_read_pointer(winstate->buffer,
1652 : winstate->framehead_ptr);
1653 4524 : if (winstate->frameheadpos == 0 &&
1654 2502 : TupIsNull(winstate->framehead_slot))
1655 : {
1656 : /* fetch first row into framehead_slot, if we didn't already */
1657 570 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1658 : winstate->framehead_slot))
1659 0 : elog(ERROR, "unexpected end of tuplestore");
1660 : }
1661 :
1662 7266 : while (!TupIsNull(winstate->framehead_slot))
1663 : {
1664 : Datum headval,
1665 : currval;
1666 : bool headisnull,
1667 : currisnull;
1668 :
1669 7062 : headval = slot_getattr(winstate->framehead_slot, sortCol,
1670 : &headisnull);
1671 7062 : currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
1672 : &currisnull);
1673 7062 : if (headisnull || currisnull)
1674 : {
1675 : /* order of the rows depends only on nulls_first */
1676 108 : if (winstate->inRangeNullsFirst)
1677 : {
1678 : /* advance head if head is null and curr is not */
1679 48 : if (!headisnull || currisnull)
1680 : break;
1681 : }
1682 : else
1683 : {
1684 : /* advance head if head is not null and curr is null */
1685 60 : if (headisnull || !currisnull)
1686 : break;
1687 : }
1688 : }
1689 : else
1690 : {
1691 6954 : if (DatumGetBool(FunctionCall5Coll(&winstate->startInRangeFunc,
1692 : winstate->inRangeColl,
1693 : headval,
1694 : currval,
1695 : winstate->startOffsetValue,
1696 : BoolGetDatum(sub),
1697 : BoolGetDatum(less))))
1698 4170 : break; /* this row is the correct frame head */
1699 : }
1700 : /* Note we advance frameheadpos even if the fetch fails */
1701 2796 : winstate->frameheadpos++;
1702 2796 : spool_tuples(winstate, winstate->frameheadpos);
1703 2796 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1704 : winstate->framehead_slot))
1705 54 : break; /* end of partition */
1706 : }
1707 4476 : winstate->framehead_valid = true;
1708 : }
1709 1380 : else if (frameOptions & FRAMEOPTION_GROUPS)
1710 : {
1711 : /*
1712 : * In GROUPS START_OFFSET mode, frame head is the first row of the
1713 : * first peer group whose number satisfies the offset constraint.
1714 : * We keep a copy of the last-known frame head row in
1715 : * framehead_slot, and advance as necessary. Note that if we
1716 : * reach end of partition, we will leave frameheadpos = end+1 and
1717 : * framehead_slot empty.
1718 : */
1719 1380 : int64 offset = DatumGetInt64(winstate->startOffsetValue);
1720 : int64 minheadgroup;
1721 :
1722 1380 : if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1723 1128 : minheadgroup = winstate->currentgroup - offset;
1724 : else
1725 252 : minheadgroup = winstate->currentgroup + offset;
1726 :
1727 1380 : tuplestore_select_read_pointer(winstate->buffer,
1728 : winstate->framehead_ptr);
1729 1380 : if (winstate->frameheadpos == 0 &&
1730 750 : TupIsNull(winstate->framehead_slot))
1731 : {
1732 : /* fetch first row into framehead_slot, if we didn't already */
1733 372 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1734 : winstate->framehead_slot))
1735 0 : elog(ERROR, "unexpected end of tuplestore");
1736 : }
1737 :
1738 2142 : while (!TupIsNull(winstate->framehead_slot))
1739 : {
1740 2118 : if (winstate->frameheadgroup >= minheadgroup)
1741 1320 : break; /* this row is the correct frame head */
1742 798 : ExecCopySlot(winstate->temp_slot_2, winstate->framehead_slot);
1743 : /* Note we advance frameheadpos even if the fetch fails */
1744 798 : winstate->frameheadpos++;
1745 798 : spool_tuples(winstate, winstate->frameheadpos);
1746 798 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1747 : winstate->framehead_slot))
1748 36 : break; /* end of partition */
1749 762 : if (!are_peers(winstate, winstate->temp_slot_2,
1750 : winstate->framehead_slot))
1751 522 : winstate->frameheadgroup++;
1752 : }
1753 1380 : ExecClearTuple(winstate->temp_slot_2);
1754 1380 : winstate->framehead_valid = true;
1755 : }
1756 : else
1757 : Assert(false);
1758 : }
1759 : else
1760 : Assert(false);
1761 :
1762 158854 : MemoryContextSwitchTo(oldcontext);
1763 : }
1764 :
1765 : /*
1766 : * update_frametailpos
1767 : * make frametailpos valid for the current row
1768 : *
1769 : * Note that frametailpos is computed without regard for any window exclusion
1770 : * clause; the current row and/or its peers are considered part of the frame
1771 : * for this purpose even if they must be excluded later.
1772 : *
1773 : * May clobber winstate->temp_slot_2.
1774 : */
1775 : static void
1776 148688 : update_frametailpos(WindowAggState *winstate)
1777 : {
1778 148688 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1779 148688 : int frameOptions = winstate->frameOptions;
1780 : MemoryContext oldcontext;
1781 :
1782 148688 : if (winstate->frametail_valid)
1783 17820 : return; /* already known for current row */
1784 :
1785 : /* We may be called in a short-lived context */
1786 130868 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1787 :
1788 130868 : if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1789 : {
1790 : /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1791 180 : spool_tuples(winstate, -1);
1792 180 : winstate->frametailpos = winstate->spooled_rows;
1793 180 : winstate->frametail_valid = true;
1794 : }
1795 130688 : else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1796 : {
1797 124304 : if (frameOptions & FRAMEOPTION_ROWS)
1798 : {
1799 : /* In ROWS mode, exactly the rows up to current are in frame */
1800 120 : winstate->frametailpos = winstate->currentpos + 1;
1801 120 : winstate->frametail_valid = true;
1802 : }
1803 124184 : else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1804 : {
1805 : /* If no ORDER BY, all rows are peers with each other */
1806 124184 : if (node->ordNumCols == 0)
1807 : {
1808 60 : spool_tuples(winstate, -1);
1809 60 : winstate->frametailpos = winstate->spooled_rows;
1810 60 : winstate->frametail_valid = true;
1811 60 : MemoryContextSwitchTo(oldcontext);
1812 60 : return;
1813 : }
1814 :
1815 : /*
1816 : * In RANGE or GROUPS END_CURRENT_ROW mode, frame end is the last
1817 : * row that is a peer of current row, frame tail is the row after
1818 : * that (if any). We keep a copy of the last-known frame tail row
1819 : * in frametail_slot, and advance as necessary. Note that if we
1820 : * reach end of partition, we will leave frametailpos = end+1 and
1821 : * frametail_slot empty.
1822 : */
1823 124124 : tuplestore_select_read_pointer(winstate->buffer,
1824 : winstate->frametail_ptr);
1825 124124 : if (winstate->frametailpos == 0 &&
1826 676 : TupIsNull(winstate->frametail_slot))
1827 : {
1828 : /* fetch first row into frametail_slot, if we didn't already */
1829 676 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1830 : winstate->frametail_slot))
1831 0 : elog(ERROR, "unexpected end of tuplestore");
1832 : }
1833 :
1834 247584 : while (!TupIsNull(winstate->frametail_slot))
1835 : {
1836 223464 : if (winstate->frametailpos > winstate->currentpos &&
1837 219700 : !are_peers(winstate, winstate->frametail_slot,
1838 : winstate->ss.ss_ScanTupleSlot))
1839 99340 : break; /* this row is the frame tail */
1840 : /* Note we advance frametailpos even if the fetch fails */
1841 124124 : winstate->frametailpos++;
1842 124124 : spool_tuples(winstate, winstate->frametailpos);
1843 124124 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1844 : winstate->frametail_slot))
1845 664 : break; /* end of partition */
1846 : }
1847 124124 : winstate->frametail_valid = true;
1848 : }
1849 : else
1850 : Assert(false);
1851 : }
1852 6384 : else if (frameOptions & FRAMEOPTION_END_OFFSET)
1853 : {
1854 6384 : if (frameOptions & FRAMEOPTION_ROWS)
1855 : {
1856 : /* In ROWS mode, bound is physically n before/after current */
1857 180 : int64 offset = DatumGetInt64(winstate->endOffsetValue);
1858 :
1859 180 : if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1860 0 : offset = -offset;
1861 :
1862 180 : winstate->frametailpos = winstate->currentpos + offset + 1;
1863 : /* smallest allowable value of frametailpos is 0 */
1864 180 : if (winstate->frametailpos < 0)
1865 0 : winstate->frametailpos = 0;
1866 180 : else if (winstate->frametailpos > winstate->currentpos + 1)
1867 : {
1868 : /* make sure frametailpos is not past end of partition */
1869 180 : spool_tuples(winstate, winstate->frametailpos - 1);
1870 180 : if (winstate->frametailpos > winstate->spooled_rows)
1871 36 : winstate->frametailpos = winstate->spooled_rows;
1872 : }
1873 180 : winstate->frametail_valid = true;
1874 : }
1875 6204 : else if (frameOptions & FRAMEOPTION_RANGE)
1876 : {
1877 : /*
1878 : * In RANGE END_OFFSET mode, frame end is the last row that
1879 : * satisfies the in_range constraint relative to the current row,
1880 : * frame tail is the row after that (if any). We keep a copy of
1881 : * the last-known frame tail row in frametail_slot, and advance as
1882 : * necessary. Note that if we reach end of partition, we will
1883 : * leave frametailpos = end+1 and frametail_slot empty.
1884 : */
1885 4884 : int sortCol = node->ordColIdx[0];
1886 : bool sub,
1887 : less;
1888 :
1889 : /* We must have an ordering column */
1890 : Assert(node->ordNumCols == 1);
1891 :
1892 : /* Precompute flags for in_range checks */
1893 4884 : if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1894 912 : sub = true; /* subtract endOffset from current row */
1895 : else
1896 3972 : sub = false; /* add it */
1897 4884 : less = true; /* normally, we want frame tail <= sum */
1898 : /* If sort order is descending, flip both flags */
1899 4884 : if (!winstate->inRangeAsc)
1900 : {
1901 690 : sub = !sub;
1902 690 : less = false;
1903 : }
1904 :
1905 4884 : tuplestore_select_read_pointer(winstate->buffer,
1906 : winstate->frametail_ptr);
1907 4884 : if (winstate->frametailpos == 0 &&
1908 822 : TupIsNull(winstate->frametail_slot))
1909 : {
1910 : /* fetch first row into frametail_slot, if we didn't already */
1911 588 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1912 : winstate->frametail_slot))
1913 0 : elog(ERROR, "unexpected end of tuplestore");
1914 : }
1915 :
1916 9006 : while (!TupIsNull(winstate->frametail_slot))
1917 : {
1918 : Datum tailval,
1919 : currval;
1920 : bool tailisnull,
1921 : currisnull;
1922 :
1923 7440 : tailval = slot_getattr(winstate->frametail_slot, sortCol,
1924 : &tailisnull);
1925 7440 : currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
1926 : &currisnull);
1927 7440 : if (tailisnull || currisnull)
1928 : {
1929 : /* order of the rows depends only on nulls_first */
1930 108 : if (winstate->inRangeNullsFirst)
1931 : {
1932 : /* advance tail if tail is null or curr is not */
1933 48 : if (!tailisnull)
1934 3270 : break;
1935 : }
1936 : else
1937 : {
1938 : /* advance tail if tail is not null or curr is null */
1939 60 : if (!currisnull)
1940 36 : break;
1941 : }
1942 : }
1943 : else
1944 : {
1945 7332 : if (!DatumGetBool(FunctionCall5Coll(&winstate->endInRangeFunc,
1946 : winstate->inRangeColl,
1947 : tailval,
1948 : currval,
1949 : winstate->endOffsetValue,
1950 : BoolGetDatum(sub),
1951 : BoolGetDatum(less))))
1952 2730 : break; /* this row is the correct frame tail */
1953 : }
1954 : /* Note we advance frametailpos even if the fetch fails */
1955 4602 : winstate->frametailpos++;
1956 4602 : spool_tuples(winstate, winstate->frametailpos);
1957 4602 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1958 : winstate->frametail_slot))
1959 480 : break; /* end of partition */
1960 : }
1961 4836 : winstate->frametail_valid = true;
1962 : }
1963 1320 : else if (frameOptions & FRAMEOPTION_GROUPS)
1964 : {
1965 : /*
1966 : * In GROUPS END_OFFSET mode, frame end is the last row of the
1967 : * last peer group whose number satisfies the offset constraint,
1968 : * and frame tail is the row after that (if any). We keep a copy
1969 : * of the last-known frame tail row in frametail_slot, and advance
1970 : * as necessary. Note that if we reach end of partition, we will
1971 : * leave frametailpos = end+1 and frametail_slot empty.
1972 : */
1973 1320 : int64 offset = DatumGetInt64(winstate->endOffsetValue);
1974 : int64 maxtailgroup;
1975 :
1976 1320 : if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1977 72 : maxtailgroup = winstate->currentgroup - offset;
1978 : else
1979 1248 : maxtailgroup = winstate->currentgroup + offset;
1980 :
1981 1320 : tuplestore_select_read_pointer(winstate->buffer,
1982 : winstate->frametail_ptr);
1983 1320 : if (winstate->frametailpos == 0 &&
1984 384 : TupIsNull(winstate->frametail_slot))
1985 : {
1986 : /* fetch first row into frametail_slot, if we didn't already */
1987 366 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1988 : winstate->frametail_slot))
1989 0 : elog(ERROR, "unexpected end of tuplestore");
1990 : }
1991 :
1992 2268 : while (!TupIsNull(winstate->frametail_slot))
1993 : {
1994 2040 : if (winstate->frametailgroup > maxtailgroup)
1995 744 : break; /* this row is the correct frame tail */
1996 1296 : ExecCopySlot(winstate->temp_slot_2, winstate->frametail_slot);
1997 : /* Note we advance frametailpos even if the fetch fails */
1998 1296 : winstate->frametailpos++;
1999 1296 : spool_tuples(winstate, winstate->frametailpos);
2000 1296 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2001 : winstate->frametail_slot))
2002 348 : break; /* end of partition */
2003 948 : if (!are_peers(winstate, winstate->temp_slot_2,
2004 : winstate->frametail_slot))
2005 600 : winstate->frametailgroup++;
2006 : }
2007 1320 : ExecClearTuple(winstate->temp_slot_2);
2008 1320 : winstate->frametail_valid = true;
2009 : }
2010 : else
2011 : Assert(false);
2012 : }
2013 : else
2014 : Assert(false);
2015 :
2016 130760 : MemoryContextSwitchTo(oldcontext);
2017 : }
2018 :
2019 : /*
2020 : * update_grouptailpos
2021 : * make grouptailpos valid for the current row
2022 : *
2023 : * May clobber winstate->temp_slot_2.
2024 : */
2025 : static void
2026 4872 : update_grouptailpos(WindowAggState *winstate)
2027 : {
2028 4872 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
2029 : MemoryContext oldcontext;
2030 :
2031 4872 : if (winstate->grouptail_valid)
2032 3954 : return; /* already known for current row */
2033 :
2034 : /* We may be called in a short-lived context */
2035 918 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
2036 :
2037 : /* If no ORDER BY, all rows are peers with each other */
2038 918 : if (node->ordNumCols == 0)
2039 : {
2040 0 : spool_tuples(winstate, -1);
2041 0 : winstate->grouptailpos = winstate->spooled_rows;
2042 0 : winstate->grouptail_valid = true;
2043 0 : MemoryContextSwitchTo(oldcontext);
2044 0 : return;
2045 : }
2046 :
2047 : /*
2048 : * Because grouptail_valid is reset only when current row advances into a
2049 : * new peer group, we always reach here knowing that grouptailpos needs to
2050 : * be advanced by at least one row. Hence, unlike the otherwise similar
2051 : * case for frame tail tracking, we do not need persistent storage of the
2052 : * group tail row.
2053 : */
2054 : Assert(winstate->grouptailpos <= winstate->currentpos);
2055 918 : tuplestore_select_read_pointer(winstate->buffer,
2056 : winstate->grouptail_ptr);
2057 : for (;;)
2058 : {
2059 : /* Note we advance grouptailpos even if the fetch fails */
2060 1758 : winstate->grouptailpos++;
2061 1758 : spool_tuples(winstate, winstate->grouptailpos);
2062 1758 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2063 : winstate->temp_slot_2))
2064 258 : break; /* end of partition */
2065 1500 : if (winstate->grouptailpos > winstate->currentpos &&
2066 1242 : !are_peers(winstate, winstate->temp_slot_2,
2067 : winstate->ss.ss_ScanTupleSlot))
2068 660 : break; /* this row is the group tail */
2069 : }
2070 918 : ExecClearTuple(winstate->temp_slot_2);
2071 918 : winstate->grouptail_valid = true;
2072 :
2073 918 : MemoryContextSwitchTo(oldcontext);
2074 : }
2075 :
2076 : /*
2077 : * calculate_frame_offsets
2078 : * Determine the startOffsetValue and endOffsetValue values for the
2079 : * WindowAgg's frame options.
2080 : */
2081 : static pg_noinline void
2082 2156 : calculate_frame_offsets(PlanState *pstate)
2083 : {
2084 2156 : WindowAggState *winstate = castNode(WindowAggState, pstate);
2085 : ExprContext *econtext;
2086 2156 : int frameOptions = winstate->frameOptions;
2087 : Datum value;
2088 : bool isnull;
2089 : int16 len;
2090 : bool byval;
2091 :
2092 : /* Ensure we've not been called before for this scan */
2093 : Assert(winstate->all_first);
2094 :
2095 2156 : econtext = winstate->ss.ps.ps_ExprContext;
2096 :
2097 2156 : if (frameOptions & FRAMEOPTION_START_OFFSET)
2098 : {
2099 : Assert(winstate->startOffset != NULL);
2100 816 : value = ExecEvalExprSwitchContext(winstate->startOffset,
2101 : econtext,
2102 : &isnull);
2103 816 : if (isnull)
2104 0 : ereport(ERROR,
2105 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2106 : errmsg("frame starting offset must not be null")));
2107 : /* copy value into query-lifespan context */
2108 816 : get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
2109 : &len,
2110 : &byval);
2111 816 : winstate->startOffsetValue = datumCopy(value, byval, len);
2112 816 : if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2113 : {
2114 : /* value is known to be int8 */
2115 300 : int64 offset = DatumGetInt64(value);
2116 :
2117 300 : if (offset < 0)
2118 0 : ereport(ERROR,
2119 : (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2120 : errmsg("frame starting offset must not be negative")));
2121 : }
2122 : }
2123 :
2124 2156 : if (frameOptions & FRAMEOPTION_END_OFFSET)
2125 : {
2126 : Assert(winstate->endOffset != NULL);
2127 912 : value = ExecEvalExprSwitchContext(winstate->endOffset,
2128 : econtext,
2129 : &isnull);
2130 912 : if (isnull)
2131 0 : ereport(ERROR,
2132 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2133 : errmsg("frame ending offset must not be null")));
2134 : /* copy value into query-lifespan context */
2135 912 : get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
2136 : &len,
2137 : &byval);
2138 912 : winstate->endOffsetValue = datumCopy(value, byval, len);
2139 912 : if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2140 : {
2141 : /* value is known to be int8 */
2142 330 : int64 offset = DatumGetInt64(value);
2143 :
2144 330 : if (offset < 0)
2145 0 : ereport(ERROR,
2146 : (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2147 : errmsg("frame ending offset must not be negative")));
2148 : }
2149 : }
2150 2156 : winstate->all_first = false;
2151 2156 : }
2152 :
2153 : /* -----------------
2154 : * ExecWindowAgg
2155 : *
2156 : * ExecWindowAgg receives tuples from its outer subplan and
2157 : * stores them into a tuplestore, then processes window functions.
2158 : * This node doesn't reduce nor qualify any row so the number of
2159 : * returned rows is exactly the same as its outer subplan's result.
2160 : * -----------------
2161 : */
2162 : static TupleTableSlot *
2163 846294 : ExecWindowAgg(PlanState *pstate)
2164 : {
2165 846294 : WindowAggState *winstate = castNode(WindowAggState, pstate);
2166 : TupleTableSlot *slot;
2167 : ExprContext *econtext;
2168 : int i;
2169 : int numfuncs;
2170 :
2171 846294 : CHECK_FOR_INTERRUPTS();
2172 :
2173 846294 : if (winstate->status == WINDOWAGG_DONE)
2174 0 : return NULL;
2175 :
2176 : /*
2177 : * Compute frame offset values, if any, during first call (or after a
2178 : * rescan). These are assumed to hold constant throughout the scan; if
2179 : * user gives us a volatile expression, we'll only use its initial value.
2180 : */
2181 846294 : if (unlikely(winstate->all_first))
2182 2156 : calculate_frame_offsets(pstate);
2183 :
2184 : /* We need to loop as the runCondition or qual may filter out tuples */
2185 : for (;;)
2186 : {
2187 846366 : if (winstate->next_partition)
2188 : {
2189 : /* Initialize for first partition and set current row = 0 */
2190 2156 : begin_partition(winstate);
2191 : /* If there are no input rows, we'll detect that and exit below */
2192 : }
2193 : else
2194 : {
2195 : /* Advance current row within partition */
2196 844210 : winstate->currentpos++;
2197 : /* This might mean that the frame moves, too */
2198 844210 : winstate->framehead_valid = false;
2199 844210 : winstate->frametail_valid = false;
2200 : /* we don't need to invalidate grouptail here; see below */
2201 : }
2202 :
2203 : /*
2204 : * Spool all tuples up to and including the current row, if we haven't
2205 : * already
2206 : */
2207 846366 : spool_tuples(winstate, winstate->currentpos);
2208 :
2209 : /* Move to the next partition if we reached the end of this partition */
2210 846366 : if (winstate->partition_spooled &&
2211 56520 : winstate->currentpos >= winstate->spooled_rows)
2212 : {
2213 3190 : release_partition(winstate);
2214 :
2215 3190 : if (winstate->more_partitions)
2216 : {
2217 1214 : begin_partition(winstate);
2218 : Assert(winstate->spooled_rows > 0);
2219 :
2220 : /* Come out of pass-through mode when changing partition */
2221 1214 : winstate->status = WINDOWAGG_RUN;
2222 : }
2223 : else
2224 : {
2225 : /* No further partitions? We're done */
2226 1976 : winstate->status = WINDOWAGG_DONE;
2227 1976 : return NULL;
2228 : }
2229 : }
2230 :
2231 : /* final output execution is in ps_ExprContext */
2232 844390 : econtext = winstate->ss.ps.ps_ExprContext;
2233 :
2234 : /* Clear the per-output-tuple context for current row */
2235 844390 : ResetExprContext(econtext);
2236 :
2237 : /*
2238 : * Read the current row from the tuplestore, and save in
2239 : * ScanTupleSlot. (We can't rely on the outerplan's output slot
2240 : * because we may have to read beyond the current row. Also, we have
2241 : * to actually copy the row out of the tuplestore, since window
2242 : * function evaluation might cause the tuplestore to dump its state to
2243 : * disk.)
2244 : *
2245 : * In GROUPS mode, or when tracking a group-oriented exclusion clause,
2246 : * we must also detect entering a new peer group and update associated
2247 : * state when that happens. We use temp_slot_2 to temporarily hold
2248 : * the previous row for this purpose.
2249 : *
2250 : * Current row must be in the tuplestore, since we spooled it above.
2251 : */
2252 844390 : tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
2253 844390 : if ((winstate->frameOptions & (FRAMEOPTION_GROUPS |
2254 : FRAMEOPTION_EXCLUDE_GROUP |
2255 2898 : FRAMEOPTION_EXCLUDE_TIES)) &&
2256 2898 : winstate->currentpos > 0)
2257 : {
2258 2358 : ExecCopySlot(winstate->temp_slot_2, winstate->ss.ss_ScanTupleSlot);
2259 2358 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2260 : winstate->ss.ss_ScanTupleSlot))
2261 0 : elog(ERROR, "unexpected end of tuplestore");
2262 2358 : if (!are_peers(winstate, winstate->temp_slot_2,
2263 : winstate->ss.ss_ScanTupleSlot))
2264 : {
2265 1242 : winstate->currentgroup++;
2266 1242 : winstate->groupheadpos = winstate->currentpos;
2267 1242 : winstate->grouptail_valid = false;
2268 : }
2269 2358 : ExecClearTuple(winstate->temp_slot_2);
2270 : }
2271 : else
2272 : {
2273 842032 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2274 : winstate->ss.ss_ScanTupleSlot))
2275 0 : elog(ERROR, "unexpected end of tuplestore");
2276 : }
2277 :
2278 : /* don't evaluate the window functions when we're in pass-through mode */
2279 844390 : if (winstate->status == WINDOWAGG_RUN)
2280 : {
2281 : /*
2282 : * Evaluate true window functions
2283 : */
2284 844336 : numfuncs = winstate->numfuncs;
2285 1815068 : for (i = 0; i < numfuncs; i++)
2286 : {
2287 970822 : WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
2288 :
2289 970822 : if (perfuncstate->plain_agg)
2290 155938 : continue;
2291 814884 : eval_windowfunction(winstate, perfuncstate,
2292 814884 : &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
2293 814884 : &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
2294 : }
2295 :
2296 : /*
2297 : * Evaluate aggregates
2298 : */
2299 844246 : if (winstate->numaggs > 0)
2300 154168 : eval_windowaggregates(winstate);
2301 : }
2302 :
2303 : /*
2304 : * If we have created auxiliary read pointers for the frame or group
2305 : * boundaries, force them to be kept up-to-date, because we don't know
2306 : * whether the window function(s) will do anything that requires that.
2307 : * Failing to advance the pointers would result in being unable to
2308 : * trim data from the tuplestore, which is bad. (If we could know in
2309 : * advance whether the window functions will use frame boundary info,
2310 : * we could skip creating these pointers in the first place ... but
2311 : * unfortunately the window function API doesn't require that.)
2312 : */
2313 844258 : if (winstate->framehead_ptr >= 0)
2314 6236 : update_frameheadpos(winstate);
2315 844258 : if (winstate->frametail_ptr >= 0)
2316 130280 : update_frametailpos(winstate);
2317 844258 : if (winstate->grouptail_ptr >= 0)
2318 1500 : update_grouptailpos(winstate);
2319 :
2320 : /*
2321 : * Truncate any no-longer-needed rows from the tuplestore.
2322 : */
2323 844258 : tuplestore_trim(winstate->buffer);
2324 :
2325 : /*
2326 : * Form and return a projection tuple using the windowfunc results and
2327 : * the current row. Setting ecxt_outertuple arranges that any Vars
2328 : * will be evaluated with respect to that row.
2329 : */
2330 844258 : econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2331 :
2332 844258 : slot = ExecProject(winstate->ss.ps.ps_ProjInfo);
2333 :
2334 844258 : if (winstate->status == WINDOWAGG_RUN)
2335 : {
2336 844204 : econtext->ecxt_scantuple = slot;
2337 :
2338 : /*
2339 : * Now evaluate the run condition to see if we need to go into
2340 : * pass-through mode, or maybe stop completely.
2341 : */
2342 844204 : if (!ExecQual(winstate->runcondition, econtext))
2343 : {
2344 : /*
2345 : * Determine which mode to move into. If there is no
2346 : * PARTITION BY clause and we're the top-level WindowAgg then
2347 : * we're done. This tuple and any future tuples cannot
2348 : * possibly match the runcondition. However, when there is a
2349 : * PARTITION BY clause or we're not the top-level window we
2350 : * can't just stop as we need to either process other
2351 : * partitions or ensure WindowAgg nodes above us receive all
2352 : * of the tuples they need to process their WindowFuncs.
2353 : */
2354 84 : if (winstate->use_pass_through)
2355 : {
2356 : /*
2357 : * STRICT pass-through mode is required for the top window
2358 : * when there is a PARTITION BY clause. Otherwise we must
2359 : * ensure we store tuples that don't match the
2360 : * runcondition so they're available to WindowAggs above.
2361 : */
2362 42 : if (winstate->top_window)
2363 : {
2364 24 : winstate->status = WINDOWAGG_PASSTHROUGH_STRICT;
2365 24 : continue;
2366 : }
2367 : else
2368 : {
2369 18 : winstate->status = WINDOWAGG_PASSTHROUGH;
2370 :
2371 : /*
2372 : * If we're not the top-window, we'd better NULLify
2373 : * the aggregate results. In pass-through mode we no
2374 : * longer update these and this avoids the old stale
2375 : * results lingering. Some of these might be byref
2376 : * types so we can't have them pointing to free'd
2377 : * memory. The planner insisted that quals used in
2378 : * the runcondition are strict, so the top-level
2379 : * WindowAgg will filter these NULLs out in the filter
2380 : * clause.
2381 : */
2382 18 : numfuncs = winstate->numfuncs;
2383 72 : for (i = 0; i < numfuncs; i++)
2384 : {
2385 54 : econtext->ecxt_aggvalues[i] = (Datum) 0;
2386 54 : econtext->ecxt_aggnulls[i] = true;
2387 : }
2388 : }
2389 : }
2390 : else
2391 : {
2392 : /*
2393 : * Pass-through not required. We can just return NULL.
2394 : * Nothing else will match the runcondition.
2395 : */
2396 42 : winstate->status = WINDOWAGG_DONE;
2397 42 : return NULL;
2398 : }
2399 : }
2400 :
2401 : /*
2402 : * Filter out any tuples we don't need in the top-level WindowAgg.
2403 : */
2404 844138 : if (!ExecQual(winstate->ss.ps.qual, econtext))
2405 : {
2406 18 : InstrCountFiltered1(winstate, 1);
2407 18 : continue;
2408 : }
2409 :
2410 844120 : break;
2411 : }
2412 :
2413 : /*
2414 : * When not in WINDOWAGG_RUN mode, we must still return this tuple if
2415 : * we're anything apart from the top window.
2416 : */
2417 54 : else if (!winstate->top_window)
2418 24 : break;
2419 : }
2420 :
2421 844144 : return slot;
2422 : }
2423 :
2424 : /* -----------------
2425 : * ExecInitWindowAgg
2426 : *
2427 : * Creates the run-time information for the WindowAgg node produced by the
2428 : * planner and initializes its outer subtree
2429 : * -----------------
2430 : */
2431 : WindowAggState *
2432 2480 : ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
2433 : {
2434 : WindowAggState *winstate;
2435 : Plan *outerPlan;
2436 : ExprContext *econtext;
2437 : ExprContext *tmpcontext;
2438 : WindowStatePerFunc perfunc;
2439 : WindowStatePerAgg peragg;
2440 2480 : int frameOptions = node->frameOptions;
2441 : int numfuncs,
2442 : wfuncno,
2443 : numaggs,
2444 : aggno;
2445 : TupleDesc scanDesc;
2446 : ListCell *l;
2447 :
2448 : /* check for unsupported flags */
2449 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2450 :
2451 : /*
2452 : * create state structure
2453 : */
2454 2480 : winstate = makeNode(WindowAggState);
2455 2480 : winstate->ss.ps.plan = (Plan *) node;
2456 2480 : winstate->ss.ps.state = estate;
2457 2480 : winstate->ss.ps.ExecProcNode = ExecWindowAgg;
2458 :
2459 : /* copy frame options to state node for easy access */
2460 2480 : winstate->frameOptions = frameOptions;
2461 :
2462 : /*
2463 : * Create expression contexts. We need two, one for per-input-tuple
2464 : * processing and one for per-output-tuple processing. We cheat a little
2465 : * by using ExecAssignExprContext() to build both.
2466 : */
2467 2480 : ExecAssignExprContext(estate, &winstate->ss.ps);
2468 2480 : tmpcontext = winstate->ss.ps.ps_ExprContext;
2469 2480 : winstate->tmpcontext = tmpcontext;
2470 2480 : ExecAssignExprContext(estate, &winstate->ss.ps);
2471 :
2472 : /* Create long-lived context for storage of partition-local memory etc */
2473 2480 : winstate->partcontext =
2474 2480 : AllocSetContextCreate(CurrentMemoryContext,
2475 : "WindowAgg Partition",
2476 : ALLOCSET_DEFAULT_SIZES);
2477 :
2478 : /*
2479 : * Create mid-lived context for aggregate trans values etc.
2480 : *
2481 : * Note that moving aggregates each use their own private context, not
2482 : * this one.
2483 : */
2484 2480 : winstate->aggcontext =
2485 2480 : AllocSetContextCreate(CurrentMemoryContext,
2486 : "WindowAgg Aggregates",
2487 : ALLOCSET_DEFAULT_SIZES);
2488 :
2489 : /* Only the top-level WindowAgg may have a qual */
2490 : Assert(node->plan.qual == NIL || node->topWindow);
2491 :
2492 : /* Initialize the qual */
2493 2480 : winstate->ss.ps.qual = ExecInitQual(node->plan.qual,
2494 : (PlanState *) winstate);
2495 :
2496 : /*
2497 : * Setup the run condition, if we received one from the query planner.
2498 : * When set, this may allow us to move into pass-through mode so that we
2499 : * don't have to perform any further evaluation of WindowFuncs in the
2500 : * current partition or possibly stop returning tuples altogether when all
2501 : * tuples are in the same partition.
2502 : */
2503 2480 : winstate->runcondition = ExecInitQual(node->runCondition,
2504 : (PlanState *) winstate);
2505 :
2506 : /*
2507 : * When we're not the top-level WindowAgg node or we are but have a
2508 : * PARTITION BY clause we must move into one of the WINDOWAGG_PASSTHROUGH*
2509 : * modes when the runCondition becomes false.
2510 : */
2511 2480 : winstate->use_pass_through = !node->topWindow || node->partNumCols > 0;
2512 :
2513 : /* remember if we're the top-window or we are below the top-window */
2514 2480 : winstate->top_window = node->topWindow;
2515 :
2516 : /*
2517 : * initialize child nodes
2518 : */
2519 2480 : outerPlan = outerPlan(node);
2520 2480 : outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
2521 :
2522 : /*
2523 : * initialize source tuple type (which is also the tuple type that we'll
2524 : * store in the tuplestore and use in all our working slots).
2525 : */
2526 2480 : ExecCreateScanSlotFromOuterPlan(estate, &winstate->ss, &TTSOpsMinimalTuple);
2527 2480 : scanDesc = winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2528 :
2529 : /* the outer tuple isn't the child's tuple, but always a minimal tuple */
2530 2480 : winstate->ss.ps.outeropsset = true;
2531 2480 : winstate->ss.ps.outerops = &TTSOpsMinimalTuple;
2532 2480 : winstate->ss.ps.outeropsfixed = true;
2533 :
2534 : /*
2535 : * tuple table initialization
2536 : */
2537 2480 : winstate->first_part_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2538 : &TTSOpsMinimalTuple);
2539 2480 : winstate->agg_row_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2540 : &TTSOpsMinimalTuple);
2541 2480 : winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate, scanDesc,
2542 : &TTSOpsMinimalTuple);
2543 2480 : winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate, scanDesc,
2544 : &TTSOpsMinimalTuple);
2545 :
2546 : /*
2547 : * create frame head and tail slots only if needed (must create slots in
2548 : * exactly the same cases that update_frameheadpos and update_frametailpos
2549 : * need them)
2550 : */
2551 2480 : winstate->framehead_slot = winstate->frametail_slot = NULL;
2552 :
2553 2480 : if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
2554 : {
2555 1466 : if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
2556 76 : node->ordNumCols != 0) ||
2557 1390 : (frameOptions & FRAMEOPTION_START_OFFSET))
2558 742 : winstate->framehead_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2559 : &TTSOpsMinimalTuple);
2560 1466 : if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
2561 682 : node->ordNumCols != 0) ||
2562 1024 : (frameOptions & FRAMEOPTION_END_OFFSET))
2563 1168 : winstate->frametail_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2564 : &TTSOpsMinimalTuple);
2565 : }
2566 :
2567 : /*
2568 : * Initialize result slot, type and projection.
2569 : */
2570 2480 : ExecInitResultTupleSlotTL(&winstate->ss.ps, &TTSOpsVirtual);
2571 2480 : ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
2572 :
2573 : /* Set up data for comparing tuples */
2574 2480 : if (node->partNumCols > 0)
2575 614 : winstate->partEqfunction =
2576 614 : execTuplesMatchPrepare(scanDesc,
2577 : node->partNumCols,
2578 614 : node->partColIdx,
2579 614 : node->partOperators,
2580 614 : node->partCollations,
2581 : &winstate->ss.ps);
2582 :
2583 2480 : if (node->ordNumCols > 0)
2584 2078 : winstate->ordEqfunction =
2585 2078 : execTuplesMatchPrepare(scanDesc,
2586 : node->ordNumCols,
2587 2078 : node->ordColIdx,
2588 2078 : node->ordOperators,
2589 2078 : node->ordCollations,
2590 : &winstate->ss.ps);
2591 :
2592 : /*
2593 : * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
2594 : */
2595 2480 : numfuncs = winstate->numfuncs;
2596 2480 : numaggs = winstate->numaggs;
2597 2480 : econtext = winstate->ss.ps.ps_ExprContext;
2598 2480 : econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
2599 2480 : econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
2600 :
2601 : /*
2602 : * allocate per-wfunc/per-agg state information.
2603 : */
2604 2480 : perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
2605 2480 : peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
2606 2480 : winstate->perfunc = perfunc;
2607 2480 : winstate->peragg = peragg;
2608 :
2609 2480 : wfuncno = -1;
2610 2480 : aggno = -1;
2611 5632 : foreach(l, winstate->funcs)
2612 : {
2613 3152 : WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
2614 3152 : WindowFunc *wfunc = wfuncstate->wfunc;
2615 : WindowStatePerFunc perfuncstate;
2616 : AclResult aclresult;
2617 : int i;
2618 :
2619 3152 : if (wfunc->winref != node->winref) /* planner screwed up? */
2620 0 : elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
2621 : wfunc->winref, node->winref);
2622 :
2623 : /* Look for a previous duplicate window function */
2624 3956 : for (i = 0; i <= wfuncno; i++)
2625 : {
2626 810 : if (equal(wfunc, perfunc[i].wfunc) &&
2627 6 : !contain_volatile_functions((Node *) wfunc))
2628 6 : break;
2629 : }
2630 3152 : if (i <= wfuncno)
2631 : {
2632 : /* Found a match to an existing entry, so just mark it */
2633 6 : wfuncstate->wfuncno = i;
2634 6 : continue;
2635 : }
2636 :
2637 : /* Nope, so assign a new PerAgg record */
2638 3146 : perfuncstate = &perfunc[++wfuncno];
2639 :
2640 : /* Mark WindowFunc state node with assigned index in the result array */
2641 3146 : wfuncstate->wfuncno = wfuncno;
2642 :
2643 : /* Check permission to call window function */
2644 3146 : aclresult = object_aclcheck(ProcedureRelationId, wfunc->winfnoid, GetUserId(),
2645 : ACL_EXECUTE);
2646 3146 : if (aclresult != ACLCHECK_OK)
2647 0 : aclcheck_error(aclresult, OBJECT_FUNCTION,
2648 0 : get_func_name(wfunc->winfnoid));
2649 3146 : InvokeFunctionExecuteHook(wfunc->winfnoid);
2650 :
2651 : /* Fill in the perfuncstate data */
2652 3146 : perfuncstate->wfuncstate = wfuncstate;
2653 3146 : perfuncstate->wfunc = wfunc;
2654 3146 : perfuncstate->numArguments = list_length(wfuncstate->args);
2655 3146 : perfuncstate->winCollation = wfunc->inputcollid;
2656 :
2657 3146 : get_typlenbyval(wfunc->wintype,
2658 : &perfuncstate->resulttypeLen,
2659 : &perfuncstate->resulttypeByVal);
2660 :
2661 : /*
2662 : * If it's really just a plain aggregate function, we'll emulate the
2663 : * Agg environment for it.
2664 : */
2665 3146 : perfuncstate->plain_agg = wfunc->winagg;
2666 3146 : if (wfunc->winagg)
2667 : {
2668 : WindowStatePerAgg peraggstate;
2669 :
2670 1448 : perfuncstate->aggno = ++aggno;
2671 1448 : peraggstate = &winstate->peragg[aggno];
2672 1448 : initialize_peragg(winstate, wfunc, peraggstate);
2673 1448 : peraggstate->wfuncno = wfuncno;
2674 : }
2675 : else
2676 : {
2677 1698 : WindowObject winobj = makeNode(WindowObjectData);
2678 :
2679 1698 : winobj->winstate = winstate;
2680 1698 : winobj->argstates = wfuncstate->args;
2681 1698 : winobj->localmem = NULL;
2682 1698 : perfuncstate->winobj = winobj;
2683 :
2684 : /* It's a real window function, so set up to call it. */
2685 1698 : fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
2686 : econtext->ecxt_per_query_memory);
2687 1698 : fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
2688 : }
2689 : }
2690 :
2691 : /* Update numfuncs, numaggs to match number of unique functions found */
2692 2480 : winstate->numfuncs = wfuncno + 1;
2693 2480 : winstate->numaggs = aggno + 1;
2694 :
2695 : /* Set up WindowObject for aggregates, if needed */
2696 2480 : if (winstate->numaggs > 0)
2697 : {
2698 1364 : WindowObject agg_winobj = makeNode(WindowObjectData);
2699 :
2700 1364 : agg_winobj->winstate = winstate;
2701 1364 : agg_winobj->argstates = NIL;
2702 1364 : agg_winobj->localmem = NULL;
2703 : /* make sure markptr = -1 to invalidate. It may not get used */
2704 1364 : agg_winobj->markptr = -1;
2705 1364 : agg_winobj->readptr = -1;
2706 1364 : winstate->agg_winobj = agg_winobj;
2707 : }
2708 :
2709 : /* Set the status to running */
2710 2480 : winstate->status = WINDOWAGG_RUN;
2711 :
2712 : /* initialize frame bound offset expressions */
2713 2480 : winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
2714 : (PlanState *) winstate);
2715 2480 : winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
2716 : (PlanState *) winstate);
2717 :
2718 : /* Lookup in_range support functions if needed */
2719 2480 : if (OidIsValid(node->startInRangeFunc))
2720 522 : fmgr_info(node->startInRangeFunc, &winstate->startInRangeFunc);
2721 2480 : if (OidIsValid(node->endInRangeFunc))
2722 588 : fmgr_info(node->endInRangeFunc, &winstate->endInRangeFunc);
2723 2480 : winstate->inRangeColl = node->inRangeColl;
2724 2480 : winstate->inRangeAsc = node->inRangeAsc;
2725 2480 : winstate->inRangeNullsFirst = node->inRangeNullsFirst;
2726 :
2727 2480 : winstate->all_first = true;
2728 2480 : winstate->partition_spooled = false;
2729 2480 : winstate->more_partitions = false;
2730 2480 : winstate->next_partition = true;
2731 :
2732 2480 : return winstate;
2733 : }
2734 :
2735 : /* -----------------
2736 : * ExecEndWindowAgg
2737 : * -----------------
2738 : */
2739 : void
2740 2348 : ExecEndWindowAgg(WindowAggState *node)
2741 : {
2742 : PlanState *outerPlan;
2743 : int i;
2744 :
2745 2348 : if (node->buffer != NULL)
2746 : {
2747 1946 : tuplestore_end(node->buffer);
2748 :
2749 : /* nullify so that release_partition skips the tuplestore_clear() */
2750 1946 : node->buffer = NULL;
2751 : }
2752 :
2753 2348 : release_partition(node);
2754 :
2755 3754 : for (i = 0; i < node->numaggs; i++)
2756 : {
2757 1406 : if (node->peragg[i].aggcontext != node->aggcontext)
2758 780 : MemoryContextDelete(node->peragg[i].aggcontext);
2759 : }
2760 2348 : MemoryContextDelete(node->partcontext);
2761 2348 : MemoryContextDelete(node->aggcontext);
2762 :
2763 2348 : pfree(node->perfunc);
2764 2348 : pfree(node->peragg);
2765 :
2766 2348 : outerPlan = outerPlanState(node);
2767 2348 : ExecEndNode(outerPlan);
2768 2348 : }
2769 :
2770 : /* -----------------
2771 : * ExecReScanWindowAgg
2772 : * -----------------
2773 : */
2774 : void
2775 78 : ExecReScanWindowAgg(WindowAggState *node)
2776 : {
2777 78 : PlanState *outerPlan = outerPlanState(node);
2778 78 : ExprContext *econtext = node->ss.ps.ps_ExprContext;
2779 :
2780 78 : node->status = WINDOWAGG_RUN;
2781 78 : node->all_first = true;
2782 :
2783 : /* release tuplestore et al */
2784 78 : release_partition(node);
2785 :
2786 : /* release all temp tuples, but especially first_part_slot */
2787 78 : ExecClearTuple(node->ss.ss_ScanTupleSlot);
2788 78 : ExecClearTuple(node->first_part_slot);
2789 78 : ExecClearTuple(node->agg_row_slot);
2790 78 : ExecClearTuple(node->temp_slot_1);
2791 78 : ExecClearTuple(node->temp_slot_2);
2792 78 : if (node->framehead_slot)
2793 0 : ExecClearTuple(node->framehead_slot);
2794 78 : if (node->frametail_slot)
2795 6 : ExecClearTuple(node->frametail_slot);
2796 :
2797 : /* Forget current wfunc values */
2798 156 : MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
2799 78 : MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
2800 :
2801 : /*
2802 : * if chgParam of subnode is not null then plan will be re-scanned by
2803 : * first ExecProcNode.
2804 : */
2805 78 : if (outerPlan->chgParam == NULL)
2806 6 : ExecReScan(outerPlan);
2807 78 : }
2808 :
2809 : /*
2810 : * initialize_peragg
2811 : *
2812 : * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
2813 : */
2814 : static WindowStatePerAggData *
2815 1448 : initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
2816 : WindowStatePerAgg peraggstate)
2817 : {
2818 : Oid inputTypes[FUNC_MAX_ARGS];
2819 : int numArguments;
2820 : HeapTuple aggTuple;
2821 : Form_pg_aggregate aggform;
2822 : Oid aggtranstype;
2823 : AttrNumber initvalAttNo;
2824 : AclResult aclresult;
2825 : bool use_ma_code;
2826 : Oid transfn_oid,
2827 : invtransfn_oid,
2828 : finalfn_oid;
2829 : bool finalextra;
2830 : char finalmodify;
2831 : Expr *transfnexpr,
2832 : *invtransfnexpr,
2833 : *finalfnexpr;
2834 : Datum textInitVal;
2835 : int i;
2836 : ListCell *lc;
2837 :
2838 1448 : numArguments = list_length(wfunc->args);
2839 :
2840 1448 : i = 0;
2841 2770 : foreach(lc, wfunc->args)
2842 : {
2843 1322 : inputTypes[i++] = exprType((Node *) lfirst(lc));
2844 : }
2845 :
2846 1448 : aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
2847 1448 : if (!HeapTupleIsValid(aggTuple))
2848 0 : elog(ERROR, "cache lookup failed for aggregate %u",
2849 : wfunc->winfnoid);
2850 1448 : aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2851 :
2852 : /*
2853 : * Figure out whether we want to use the moving-aggregate implementation,
2854 : * and collect the right set of fields from the pg_aggregate entry.
2855 : *
2856 : * It's possible that an aggregate would supply a safe moving-aggregate
2857 : * implementation and an unsafe normal one, in which case our hand is
2858 : * forced. Otherwise, if the frame head can't move, we don't need
2859 : * moving-aggregate code. Even if we'd like to use it, don't do so if the
2860 : * aggregate's arguments (and FILTER clause if any) contain any calls to
2861 : * volatile functions. Otherwise, the difference between restarting and
2862 : * not restarting the aggregation would be user-visible.
2863 : *
2864 : * We also don't risk using moving aggregates when there are subplans in
2865 : * the arguments or FILTER clause. This is partly because
2866 : * contain_volatile_functions() doesn't look inside subplans; but there
2867 : * are other reasons why a subplan's output might be volatile. For
2868 : * example, syncscan mode can render the results nonrepeatable.
2869 : */
2870 1448 : if (!OidIsValid(aggform->aggminvtransfn))
2871 184 : use_ma_code = false; /* sine qua non */
2872 1264 : else if (aggform->aggmfinalmodify == AGGMODIFY_READ_ONLY &&
2873 1264 : aggform->aggfinalmodify != AGGMODIFY_READ_ONLY)
2874 0 : use_ma_code = true; /* decision forced by safety */
2875 1264 : else if (winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
2876 460 : use_ma_code = false; /* non-moving frame head */
2877 804 : else if (contain_volatile_functions((Node *) wfunc))
2878 12 : use_ma_code = false; /* avoid possible behavioral change */
2879 792 : else if (contain_subplans((Node *) wfunc))
2880 0 : use_ma_code = false; /* subplans might contain volatile functions */
2881 : else
2882 792 : use_ma_code = true; /* yes, let's use it */
2883 1448 : if (use_ma_code)
2884 : {
2885 792 : peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
2886 792 : peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
2887 792 : peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
2888 792 : finalextra = aggform->aggmfinalextra;
2889 792 : finalmodify = aggform->aggmfinalmodify;
2890 792 : aggtranstype = aggform->aggmtranstype;
2891 792 : initvalAttNo = Anum_pg_aggregate_aggminitval;
2892 : }
2893 : else
2894 : {
2895 656 : peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
2896 656 : peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
2897 656 : peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2898 656 : finalextra = aggform->aggfinalextra;
2899 656 : finalmodify = aggform->aggfinalmodify;
2900 656 : aggtranstype = aggform->aggtranstype;
2901 656 : initvalAttNo = Anum_pg_aggregate_agginitval;
2902 : }
2903 :
2904 : /*
2905 : * ExecInitWindowAgg already checked permission to call aggregate function
2906 : * ... but we still need to check the component functions
2907 : */
2908 :
2909 : /* Check that aggregate owner has permission to call component fns */
2910 : {
2911 : HeapTuple procTuple;
2912 : Oid aggOwner;
2913 :
2914 1448 : procTuple = SearchSysCache1(PROCOID,
2915 : ObjectIdGetDatum(wfunc->winfnoid));
2916 1448 : if (!HeapTupleIsValid(procTuple))
2917 0 : elog(ERROR, "cache lookup failed for function %u",
2918 : wfunc->winfnoid);
2919 1448 : aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2920 1448 : ReleaseSysCache(procTuple);
2921 :
2922 1448 : aclresult = object_aclcheck(ProcedureRelationId, transfn_oid, aggOwner,
2923 : ACL_EXECUTE);
2924 1448 : if (aclresult != ACLCHECK_OK)
2925 0 : aclcheck_error(aclresult, OBJECT_FUNCTION,
2926 0 : get_func_name(transfn_oid));
2927 1448 : InvokeFunctionExecuteHook(transfn_oid);
2928 :
2929 1448 : if (OidIsValid(invtransfn_oid))
2930 : {
2931 792 : aclresult = object_aclcheck(ProcedureRelationId, invtransfn_oid, aggOwner,
2932 : ACL_EXECUTE);
2933 792 : if (aclresult != ACLCHECK_OK)
2934 0 : aclcheck_error(aclresult, OBJECT_FUNCTION,
2935 0 : get_func_name(invtransfn_oid));
2936 792 : InvokeFunctionExecuteHook(invtransfn_oid);
2937 : }
2938 :
2939 1448 : if (OidIsValid(finalfn_oid))
2940 : {
2941 836 : aclresult = object_aclcheck(ProcedureRelationId, finalfn_oid, aggOwner,
2942 : ACL_EXECUTE);
2943 836 : if (aclresult != ACLCHECK_OK)
2944 0 : aclcheck_error(aclresult, OBJECT_FUNCTION,
2945 0 : get_func_name(finalfn_oid));
2946 836 : InvokeFunctionExecuteHook(finalfn_oid);
2947 : }
2948 : }
2949 :
2950 : /*
2951 : * If the selected finalfn isn't read-only, we can't run this aggregate as
2952 : * a window function. This is a user-facing error, so we take a bit more
2953 : * care with the error message than elsewhere in this function.
2954 : */
2955 1448 : if (finalmodify != AGGMODIFY_READ_ONLY)
2956 0 : ereport(ERROR,
2957 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2958 : errmsg("aggregate function %s does not support use as a window function",
2959 : format_procedure(wfunc->winfnoid))));
2960 :
2961 : /* Detect how many arguments to pass to the finalfn */
2962 1448 : if (finalextra)
2963 26 : peraggstate->numFinalArgs = numArguments + 1;
2964 : else
2965 1422 : peraggstate->numFinalArgs = 1;
2966 :
2967 : /* resolve actual type of transition state, if polymorphic */
2968 1448 : aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
2969 : aggtranstype,
2970 : inputTypes,
2971 : numArguments);
2972 :
2973 : /* build expression trees using actual argument & result types */
2974 1448 : build_aggregate_transfn_expr(inputTypes,
2975 : numArguments,
2976 : 0, /* no ordered-set window functions yet */
2977 : false, /* no variadic window functions yet */
2978 : aggtranstype,
2979 : wfunc->inputcollid,
2980 : transfn_oid,
2981 : invtransfn_oid,
2982 : &transfnexpr,
2983 : &invtransfnexpr);
2984 :
2985 : /* set up infrastructure for calling the transfn(s) and finalfn */
2986 1448 : fmgr_info(transfn_oid, &peraggstate->transfn);
2987 1448 : fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
2988 :
2989 1448 : if (OidIsValid(invtransfn_oid))
2990 : {
2991 792 : fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
2992 792 : fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
2993 : }
2994 :
2995 1448 : if (OidIsValid(finalfn_oid))
2996 : {
2997 836 : build_aggregate_finalfn_expr(inputTypes,
2998 : peraggstate->numFinalArgs,
2999 : aggtranstype,
3000 : wfunc->wintype,
3001 : wfunc->inputcollid,
3002 : finalfn_oid,
3003 : &finalfnexpr);
3004 836 : fmgr_info(finalfn_oid, &peraggstate->finalfn);
3005 836 : fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
3006 : }
3007 :
3008 : /* get info about relevant datatypes */
3009 1448 : get_typlenbyval(wfunc->wintype,
3010 : &peraggstate->resulttypeLen,
3011 : &peraggstate->resulttypeByVal);
3012 1448 : get_typlenbyval(aggtranstype,
3013 : &peraggstate->transtypeLen,
3014 : &peraggstate->transtypeByVal);
3015 :
3016 : /*
3017 : * initval is potentially null, so don't try to access it as a struct
3018 : * field. Must do it the hard way with SysCacheGetAttr.
3019 : */
3020 1448 : textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
3021 : &peraggstate->initValueIsNull);
3022 :
3023 1448 : if (peraggstate->initValueIsNull)
3024 754 : peraggstate->initValue = (Datum) 0;
3025 : else
3026 694 : peraggstate->initValue = GetAggInitVal(textInitVal,
3027 : aggtranstype);
3028 :
3029 : /*
3030 : * If the transfn is strict and the initval is NULL, make sure input type
3031 : * and transtype are the same (or at least binary-compatible), so that
3032 : * it's OK to use the first input value as the initial transValue. This
3033 : * should have been checked at agg definition time, but we must check
3034 : * again in case the transfn's strictness property has been changed.
3035 : */
3036 1448 : if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
3037 : {
3038 152 : if (numArguments < 1 ||
3039 152 : !IsBinaryCoercible(inputTypes[0], aggtranstype))
3040 0 : ereport(ERROR,
3041 : (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3042 : errmsg("aggregate %u needs to have compatible input type and transition type",
3043 : wfunc->winfnoid)));
3044 : }
3045 :
3046 : /*
3047 : * Insist that forward and inverse transition functions have the same
3048 : * strictness setting. Allowing them to differ would require handling
3049 : * more special cases in advance_windowaggregate and
3050 : * advance_windowaggregate_base, for no discernible benefit. This should
3051 : * have been checked at agg definition time, but we must check again in
3052 : * case either function's strictness property has been changed.
3053 : */
3054 1448 : if (OidIsValid(invtransfn_oid) &&
3055 792 : peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
3056 0 : ereport(ERROR,
3057 : (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3058 : errmsg("strictness of aggregate's forward and inverse transition functions must match")));
3059 :
3060 : /*
3061 : * Moving aggregates use their own aggcontext.
3062 : *
3063 : * This is necessary because they might restart at different times, so we
3064 : * might never be able to reset the shared context otherwise. We can't
3065 : * make it the aggregates' responsibility to clean up after themselves,
3066 : * because strict aggregates must be restarted whenever we remove their
3067 : * last non-NULL input, which the aggregate won't be aware is happening.
3068 : * Also, just pfree()ing the transValue upon restarting wouldn't help,
3069 : * since we'd miss any indirectly referenced data. We could, in theory,
3070 : * make the memory allocation rules for moving aggregates different than
3071 : * they have historically been for plain aggregates, but that seems grotty
3072 : * and likely to lead to memory leaks.
3073 : */
3074 1448 : if (OidIsValid(invtransfn_oid))
3075 792 : peraggstate->aggcontext =
3076 792 : AllocSetContextCreate(CurrentMemoryContext,
3077 : "WindowAgg Per Aggregate",
3078 : ALLOCSET_DEFAULT_SIZES);
3079 : else
3080 656 : peraggstate->aggcontext = winstate->aggcontext;
3081 :
3082 1448 : ReleaseSysCache(aggTuple);
3083 :
3084 1448 : return peraggstate;
3085 : }
3086 :
3087 : static Datum
3088 694 : GetAggInitVal(Datum textInitVal, Oid transtype)
3089 : {
3090 : Oid typinput,
3091 : typioparam;
3092 : char *strInitVal;
3093 : Datum initVal;
3094 :
3095 694 : getTypeInputInfo(transtype, &typinput, &typioparam);
3096 694 : strInitVal = TextDatumGetCString(textInitVal);
3097 694 : initVal = OidInputFunctionCall(typinput, strInitVal,
3098 : typioparam, -1);
3099 694 : pfree(strInitVal);
3100 694 : return initVal;
3101 : }
3102 :
3103 : /*
3104 : * are_peers
3105 : * compare two rows to see if they are equal according to the ORDER BY clause
3106 : *
3107 : * NB: this does not consider the window frame mode.
3108 : */
3109 : static bool
3110 537546 : are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
3111 : TupleTableSlot *slot2)
3112 : {
3113 537546 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
3114 537546 : ExprContext *econtext = winstate->tmpcontext;
3115 :
3116 : /* If no ORDER BY, all rows are peers with each other */
3117 537546 : if (node->ordNumCols == 0)
3118 24994 : return true;
3119 :
3120 512552 : econtext->ecxt_outertuple = slot1;
3121 512552 : econtext->ecxt_innertuple = slot2;
3122 512552 : return ExecQualAndReset(winstate->ordEqfunction, econtext);
3123 : }
3124 :
3125 : /*
3126 : * window_gettupleslot
3127 : * Fetch the pos'th tuple of the current partition into the slot,
3128 : * using the winobj's read pointer
3129 : *
3130 : * Returns true if successful, false if no such row
3131 : */
3132 : static bool
3133 700204 : window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
3134 : {
3135 700204 : WindowAggState *winstate = winobj->winstate;
3136 : MemoryContext oldcontext;
3137 :
3138 : /* often called repeatedly in a row */
3139 700204 : CHECK_FOR_INTERRUPTS();
3140 :
3141 : /* Don't allow passing -1 to spool_tuples here */
3142 700204 : if (pos < 0)
3143 312 : return false;
3144 :
3145 : /* If necessary, fetch the tuple into the spool */
3146 699892 : spool_tuples(winstate, pos);
3147 :
3148 699892 : if (pos >= winstate->spooled_rows)
3149 4532 : return false;
3150 :
3151 695360 : if (pos < winobj->markpos)
3152 0 : elog(ERROR, "cannot fetch row before WindowObject's mark position");
3153 :
3154 695360 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
3155 :
3156 695360 : tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
3157 :
3158 : /*
3159 : * Advance or rewind until we are within one tuple of the one we want.
3160 : */
3161 695360 : if (winobj->seekpos < pos - 1)
3162 : {
3163 2244 : if (!tuplestore_skiptuples(winstate->buffer,
3164 2244 : pos - 1 - winobj->seekpos,
3165 : true))
3166 0 : elog(ERROR, "unexpected end of tuplestore");
3167 2244 : winobj->seekpos = pos - 1;
3168 : }
3169 693116 : else if (winobj->seekpos > pos + 1)
3170 : {
3171 2716 : if (!tuplestore_skiptuples(winstate->buffer,
3172 2716 : winobj->seekpos - (pos + 1),
3173 : false))
3174 0 : elog(ERROR, "unexpected end of tuplestore");
3175 2716 : winobj->seekpos = pos + 1;
3176 : }
3177 690400 : else if (winobj->seekpos == pos)
3178 : {
3179 : /*
3180 : * There's no API to refetch the tuple at the current position. We
3181 : * have to move one tuple forward, and then one backward. (We don't
3182 : * do it the other way because we might try to fetch the row before
3183 : * our mark, which isn't allowed.) XXX this case could stand to be
3184 : * optimized.
3185 : */
3186 172254 : tuplestore_advance(winstate->buffer, true);
3187 172254 : winobj->seekpos++;
3188 : }
3189 :
3190 : /*
3191 : * Now we should be on the tuple immediately before or after the one we
3192 : * want, so just fetch forwards or backwards as appropriate.
3193 : *
3194 : * Notice that we tell tuplestore_gettupleslot to make a physical copy of
3195 : * the fetched tuple. This ensures that the slot's contents remain valid
3196 : * through manipulations of the tuplestore, which some callers depend on.
3197 : */
3198 695360 : if (winobj->seekpos > pos)
3199 : {
3200 175102 : if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
3201 0 : elog(ERROR, "unexpected end of tuplestore");
3202 175102 : winobj->seekpos--;
3203 : }
3204 : else
3205 : {
3206 520258 : if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
3207 0 : elog(ERROR, "unexpected end of tuplestore");
3208 520258 : winobj->seekpos++;
3209 : }
3210 :
3211 : Assert(winobj->seekpos == pos);
3212 :
3213 695360 : MemoryContextSwitchTo(oldcontext);
3214 :
3215 695360 : return true;
3216 : }
3217 :
3218 :
3219 : /***********************************************************************
3220 : * API exposed to window functions
3221 : ***********************************************************************/
3222 :
3223 :
3224 : /*
3225 : * WinGetPartitionLocalMemory
3226 : * Get working memory that lives till end of partition processing
3227 : *
3228 : * On first call within a given partition, this allocates and zeroes the
3229 : * requested amount of space. Subsequent calls just return the same chunk.
3230 : *
3231 : * Memory obtained this way is normally used to hold state that should be
3232 : * automatically reset for each new partition. If a window function wants
3233 : * to hold state across the whole query, fcinfo->fn_extra can be used in the
3234 : * usual way for that.
3235 : */
3236 : void *
3237 331470 : WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
3238 : {
3239 : Assert(WindowObjectIsValid(winobj));
3240 331470 : if (winobj->localmem == NULL)
3241 444 : winobj->localmem =
3242 444 : MemoryContextAllocZero(winobj->winstate->partcontext, sz);
3243 331470 : return winobj->localmem;
3244 : }
3245 :
3246 : /*
3247 : * WinGetCurrentPosition
3248 : * Return the current row's position (counting from 0) within the current
3249 : * partition.
3250 : */
3251 : int64
3252 757602 : WinGetCurrentPosition(WindowObject winobj)
3253 : {
3254 : Assert(WindowObjectIsValid(winobj));
3255 757602 : return winobj->winstate->currentpos;
3256 : }
3257 :
3258 : /*
3259 : * WinGetPartitionRowCount
3260 : * Return total number of rows contained in the current partition.
3261 : *
3262 : * Note: this is a relatively expensive operation because it forces the
3263 : * whole partition to be "spooled" into the tuplestore at once. Once
3264 : * executed, however, additional calls within the same partition are cheap.
3265 : */
3266 : int64
3267 162 : WinGetPartitionRowCount(WindowObject winobj)
3268 : {
3269 : Assert(WindowObjectIsValid(winobj));
3270 162 : spool_tuples(winobj->winstate, -1);
3271 162 : return winobj->winstate->spooled_rows;
3272 : }
3273 :
3274 : /*
3275 : * WinSetMarkPosition
3276 : * Set the "mark" position for the window object, which is the oldest row
3277 : * number (counting from 0) it is allowed to fetch during all subsequent
3278 : * operations within the current partition.
3279 : *
3280 : * Window functions do not have to call this, but are encouraged to move the
3281 : * mark forward when possible to keep the tuplestore size down and prevent
3282 : * having to spill rows to disk.
3283 : */
3284 : void
3285 819638 : WinSetMarkPosition(WindowObject winobj, int64 markpos)
3286 : {
3287 : WindowAggState *winstate;
3288 :
3289 : Assert(WindowObjectIsValid(winobj));
3290 819638 : winstate = winobj->winstate;
3291 :
3292 819638 : if (markpos < winobj->markpos)
3293 0 : elog(ERROR, "cannot move WindowObject's mark position backward");
3294 819638 : tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
3295 819638 : if (markpos > winobj->markpos)
3296 : {
3297 814100 : tuplestore_skiptuples(winstate->buffer,
3298 814100 : markpos - winobj->markpos,
3299 : true);
3300 814100 : winobj->markpos = markpos;
3301 : }
3302 819638 : tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
3303 819638 : if (markpos > winobj->seekpos)
3304 : {
3305 462142 : tuplestore_skiptuples(winstate->buffer,
3306 462142 : markpos - winobj->seekpos,
3307 : true);
3308 462142 : winobj->seekpos = markpos;
3309 : }
3310 819638 : }
3311 :
3312 : /*
3313 : * WinRowsArePeers
3314 : * Compare two rows (specified by absolute position in partition) to see
3315 : * if they are equal according to the ORDER BY clause.
3316 : *
3317 : * NB: this does not consider the window frame mode.
3318 : */
3319 : bool
3320 165306 : WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
3321 : {
3322 : WindowAggState *winstate;
3323 : WindowAgg *node;
3324 : TupleTableSlot *slot1;
3325 : TupleTableSlot *slot2;
3326 : bool res;
3327 :
3328 : Assert(WindowObjectIsValid(winobj));
3329 165306 : winstate = winobj->winstate;
3330 165306 : node = (WindowAgg *) winstate->ss.ps.plan;
3331 :
3332 : /* If no ORDER BY, all rows are peers; don't bother to fetch them */
3333 165306 : if (node->ordNumCols == 0)
3334 0 : return true;
3335 :
3336 : /*
3337 : * Note: OK to use temp_slot_2 here because we aren't calling any
3338 : * frame-related functions (those tend to clobber temp_slot_2).
3339 : */
3340 165306 : slot1 = winstate->temp_slot_1;
3341 165306 : slot2 = winstate->temp_slot_2;
3342 :
3343 165306 : if (!window_gettupleslot(winobj, pos1, slot1))
3344 0 : elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3345 : pos1);
3346 165306 : if (!window_gettupleslot(winobj, pos2, slot2))
3347 0 : elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3348 : pos2);
3349 :
3350 165306 : res = are_peers(winstate, slot1, slot2);
3351 :
3352 165306 : ExecClearTuple(slot1);
3353 165306 : ExecClearTuple(slot2);
3354 :
3355 165306 : return res;
3356 : }
3357 :
3358 : /*
3359 : * WinGetFuncArgInPartition
3360 : * Evaluate a window function's argument expression on a specified
3361 : * row of the partition. The row is identified in lseek(2) style,
3362 : * i.e. relative to the current, first, or last row.
3363 : *
3364 : * argno: argument number to evaluate (counted from 0)
3365 : * relpos: signed rowcount offset from the seek position
3366 : * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
3367 : * set_mark: If the row is found and set_mark is true, the mark is moved to
3368 : * the row as a side-effect.
3369 : * isnull: output argument, receives isnull status of result
3370 : * isout: output argument, set to indicate whether target row position
3371 : * is out of partition (can pass NULL if caller doesn't care about this)
3372 : *
3373 : * Specifying a nonexistent row is not an error, it just causes a null result
3374 : * (plus setting *isout true, if isout isn't NULL).
3375 : */
3376 : Datum
3377 182778 : WinGetFuncArgInPartition(WindowObject winobj, int argno,
3378 : int relpos, int seektype, bool set_mark,
3379 : bool *isnull, bool *isout)
3380 : {
3381 : WindowAggState *winstate;
3382 : ExprContext *econtext;
3383 : TupleTableSlot *slot;
3384 : bool gottuple;
3385 : int64 abs_pos;
3386 :
3387 : Assert(WindowObjectIsValid(winobj));
3388 182778 : winstate = winobj->winstate;
3389 182778 : econtext = winstate->ss.ps.ps_ExprContext;
3390 182778 : slot = winstate->temp_slot_1;
3391 :
3392 182778 : switch (seektype)
3393 : {
3394 182778 : case WINDOW_SEEK_CURRENT:
3395 182778 : abs_pos = winstate->currentpos + relpos;
3396 182778 : break;
3397 0 : case WINDOW_SEEK_HEAD:
3398 0 : abs_pos = relpos;
3399 0 : break;
3400 0 : case WINDOW_SEEK_TAIL:
3401 0 : spool_tuples(winstate, -1);
3402 0 : abs_pos = winstate->spooled_rows - 1 + relpos;
3403 0 : break;
3404 0 : default:
3405 0 : elog(ERROR, "unrecognized window seek type: %d", seektype);
3406 : abs_pos = 0; /* keep compiler quiet */
3407 : break;
3408 : }
3409 :
3410 182778 : gottuple = window_gettupleslot(winobj, abs_pos, slot);
3411 :
3412 182778 : if (!gottuple)
3413 : {
3414 318 : if (isout)
3415 318 : *isout = true;
3416 318 : *isnull = true;
3417 318 : return (Datum) 0;
3418 : }
3419 : else
3420 : {
3421 182460 : if (isout)
3422 182460 : *isout = false;
3423 182460 : if (set_mark)
3424 182304 : WinSetMarkPosition(winobj, abs_pos);
3425 182460 : econtext->ecxt_outertuple = slot;
3426 182460 : return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3427 : econtext, isnull);
3428 : }
3429 : }
3430 :
3431 : /*
3432 : * WinGetFuncArgInFrame
3433 : * Evaluate a window function's argument expression on a specified
3434 : * row of the window frame. The row is identified in lseek(2) style,
3435 : * i.e. relative to the first or last row of the frame. (We do not
3436 : * support WINDOW_SEEK_CURRENT here, because it's not very clear what
3437 : * that should mean if the current row isn't part of the frame.)
3438 : *
3439 : * argno: argument number to evaluate (counted from 0)
3440 : * relpos: signed rowcount offset from the seek position
3441 : * seektype: WINDOW_SEEK_HEAD or WINDOW_SEEK_TAIL
3442 : * set_mark: If the row is found/in frame and set_mark is true, the mark is
3443 : * moved to the row as a side-effect.
3444 : * isnull: output argument, receives isnull status of result
3445 : * isout: output argument, set to indicate whether target row position
3446 : * is out of frame (can pass NULL if caller doesn't care about this)
3447 : *
3448 : * Specifying a nonexistent or not-in-frame row is not an error, it just
3449 : * causes a null result (plus setting *isout true, if isout isn't NULL).
3450 : *
3451 : * Note that some exclusion-clause options lead to situations where the
3452 : * rows that are in-frame are not consecutive in the partition. But we
3453 : * count only in-frame rows when measuring relpos.
3454 : *
3455 : * The set_mark flag is interpreted as meaning that the caller will specify
3456 : * a constant (or, perhaps, monotonically increasing) relpos in successive
3457 : * calls, so that *if there is no exclusion clause* there will be no need
3458 : * to fetch a row before the previously fetched row. But we do not expect
3459 : * the caller to know how to account for exclusion clauses. Therefore,
3460 : * if there is an exclusion clause we take responsibility for adjusting the
3461 : * mark request to something that will be safe given the above assumption
3462 : * about relpos.
3463 : */
3464 : Datum
3465 8628 : WinGetFuncArgInFrame(WindowObject winobj, int argno,
3466 : int relpos, int seektype, bool set_mark,
3467 : bool *isnull, bool *isout)
3468 : {
3469 : WindowAggState *winstate;
3470 : ExprContext *econtext;
3471 : TupleTableSlot *slot;
3472 : int64 abs_pos;
3473 : int64 mark_pos;
3474 :
3475 : Assert(WindowObjectIsValid(winobj));
3476 8628 : winstate = winobj->winstate;
3477 8628 : econtext = winstate->ss.ps.ps_ExprContext;
3478 8628 : slot = winstate->temp_slot_1;
3479 :
3480 8628 : switch (seektype)
3481 : {
3482 0 : case WINDOW_SEEK_CURRENT:
3483 0 : elog(ERROR, "WINDOW_SEEK_CURRENT is not supported for WinGetFuncArgInFrame");
3484 : abs_pos = mark_pos = 0; /* keep compiler quiet */
3485 : break;
3486 4206 : case WINDOW_SEEK_HEAD:
3487 : /* rejecting relpos < 0 is easy and simplifies code below */
3488 4206 : if (relpos < 0)
3489 0 : goto out_of_frame;
3490 4206 : update_frameheadpos(winstate);
3491 4164 : abs_pos = winstate->frameheadpos + relpos;
3492 4164 : mark_pos = abs_pos;
3493 :
3494 : /*
3495 : * Account for exclusion option if one is active, but advance only
3496 : * abs_pos not mark_pos. This prevents changes of the current
3497 : * row's peer group from resulting in trying to fetch a row before
3498 : * some previous mark position.
3499 : *
3500 : * Note that in some corner cases such as current row being
3501 : * outside frame, these calculations are theoretically too simple,
3502 : * but it doesn't matter because we'll end up deciding the row is
3503 : * out of frame. We do not attempt to avoid fetching rows past
3504 : * end of frame; that would happen in some cases anyway.
3505 : */
3506 4164 : switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
3507 : {
3508 3504 : case 0:
3509 : /* no adjustment needed */
3510 3504 : break;
3511 240 : case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
3512 240 : if (abs_pos >= winstate->currentpos &&
3513 186 : winstate->currentpos >= winstate->frameheadpos)
3514 66 : abs_pos++;
3515 240 : break;
3516 120 : case FRAMEOPTION_EXCLUDE_GROUP:
3517 120 : update_grouptailpos(winstate);
3518 120 : if (abs_pos >= winstate->groupheadpos &&
3519 72 : winstate->grouptailpos > winstate->frameheadpos)
3520 : {
3521 72 : int64 overlapstart = Max(winstate->groupheadpos,
3522 : winstate->frameheadpos);
3523 :
3524 72 : abs_pos += winstate->grouptailpos - overlapstart;
3525 : }
3526 120 : break;
3527 300 : case FRAMEOPTION_EXCLUDE_TIES:
3528 300 : update_grouptailpos(winstate);
3529 300 : if (abs_pos >= winstate->groupheadpos &&
3530 204 : winstate->grouptailpos > winstate->frameheadpos)
3531 : {
3532 84 : int64 overlapstart = Max(winstate->groupheadpos,
3533 : winstate->frameheadpos);
3534 :
3535 84 : if (abs_pos == overlapstart)
3536 84 : abs_pos = winstate->currentpos;
3537 : else
3538 0 : abs_pos += winstate->grouptailpos - overlapstart - 1;
3539 : }
3540 300 : break;
3541 0 : default:
3542 0 : elog(ERROR, "unrecognized frame option state: 0x%x",
3543 : winstate->frameOptions);
3544 : break;
3545 : }
3546 4164 : break;
3547 4422 : case WINDOW_SEEK_TAIL:
3548 : /* rejecting relpos > 0 is easy and simplifies code below */
3549 4422 : if (relpos > 0)
3550 0 : goto out_of_frame;
3551 4422 : update_frametailpos(winstate);
3552 4416 : abs_pos = winstate->frametailpos - 1 + relpos;
3553 :
3554 : /*
3555 : * Account for exclusion option if one is active. If there is no
3556 : * exclusion, we can safely set the mark at the accessed row. But
3557 : * if there is, we can only mark the frame start, because we can't
3558 : * be sure how far back in the frame the exclusion might cause us
3559 : * to fetch in future. Furthermore, we have to actually check
3560 : * against frameheadpos here, since it's unsafe to try to fetch a
3561 : * row before frame start if the mark might be there already.
3562 : */
3563 4416 : switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
3564 : {
3565 3936 : case 0:
3566 : /* no adjustment needed */
3567 3936 : mark_pos = abs_pos;
3568 3936 : break;
3569 120 : case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
3570 120 : if (abs_pos <= winstate->currentpos &&
3571 12 : winstate->currentpos < winstate->frametailpos)
3572 12 : abs_pos--;
3573 120 : update_frameheadpos(winstate);
3574 120 : if (abs_pos < winstate->frameheadpos)
3575 6 : goto out_of_frame;
3576 114 : mark_pos = winstate->frameheadpos;
3577 114 : break;
3578 240 : case FRAMEOPTION_EXCLUDE_GROUP:
3579 240 : update_grouptailpos(winstate);
3580 240 : if (abs_pos < winstate->grouptailpos &&
3581 54 : winstate->groupheadpos < winstate->frametailpos)
3582 : {
3583 54 : int64 overlapend = Min(winstate->grouptailpos,
3584 : winstate->frametailpos);
3585 :
3586 54 : abs_pos -= overlapend - winstate->groupheadpos;
3587 : }
3588 240 : update_frameheadpos(winstate);
3589 240 : if (abs_pos < winstate->frameheadpos)
3590 54 : goto out_of_frame;
3591 186 : mark_pos = winstate->frameheadpos;
3592 186 : break;
3593 120 : case FRAMEOPTION_EXCLUDE_TIES:
3594 120 : update_grouptailpos(winstate);
3595 120 : if (abs_pos < winstate->grouptailpos &&
3596 36 : winstate->groupheadpos < winstate->frametailpos)
3597 : {
3598 36 : int64 overlapend = Min(winstate->grouptailpos,
3599 : winstate->frametailpos);
3600 :
3601 36 : if (abs_pos == overlapend - 1)
3602 36 : abs_pos = winstate->currentpos;
3603 : else
3604 0 : abs_pos -= overlapend - 1 - winstate->groupheadpos;
3605 : }
3606 120 : update_frameheadpos(winstate);
3607 120 : if (abs_pos < winstate->frameheadpos)
3608 0 : goto out_of_frame;
3609 120 : mark_pos = winstate->frameheadpos;
3610 120 : break;
3611 0 : default:
3612 0 : elog(ERROR, "unrecognized frame option state: 0x%x",
3613 : winstate->frameOptions);
3614 : mark_pos = 0; /* keep compiler quiet */
3615 : break;
3616 : }
3617 4356 : break;
3618 0 : default:
3619 0 : elog(ERROR, "unrecognized window seek type: %d", seektype);
3620 : abs_pos = mark_pos = 0; /* keep compiler quiet */
3621 : break;
3622 : }
3623 :
3624 8520 : if (!window_gettupleslot(winobj, abs_pos, slot))
3625 396 : goto out_of_frame;
3626 :
3627 : /* The code above does not detect all out-of-frame cases, so check */
3628 8124 : if (row_is_in_frame(winstate, abs_pos, slot) <= 0)
3629 300 : goto out_of_frame;
3630 :
3631 7794 : if (isout)
3632 0 : *isout = false;
3633 7794 : if (set_mark)
3634 7752 : WinSetMarkPosition(winobj, mark_pos);
3635 7794 : econtext->ecxt_outertuple = slot;
3636 7794 : return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3637 : econtext, isnull);
3638 :
3639 756 : out_of_frame:
3640 756 : if (isout)
3641 0 : *isout = true;
3642 756 : *isnull = true;
3643 756 : return (Datum) 0;
3644 : }
3645 :
3646 : /*
3647 : * WinGetFuncArgCurrent
3648 : * Evaluate a window function's argument expression on the current row.
3649 : *
3650 : * argno: argument number to evaluate (counted from 0)
3651 : * isnull: output argument, receives isnull status of result
3652 : *
3653 : * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
3654 : * WinGetFuncArgInFrame targeting the current row, because it will succeed
3655 : * even if the WindowObject's mark has been set beyond the current row.
3656 : * This should generally be used for "ordinary" arguments of a window
3657 : * function, such as the offset argument of lead() or lag().
3658 : */
3659 : Datum
3660 1164 : WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
3661 : {
3662 : WindowAggState *winstate;
3663 : ExprContext *econtext;
3664 :
3665 : Assert(WindowObjectIsValid(winobj));
3666 1164 : winstate = winobj->winstate;
3667 :
3668 1164 : econtext = winstate->ss.ps.ps_ExprContext;
3669 :
3670 1164 : econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
3671 1164 : return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3672 : econtext, isnull);
3673 : }
|