Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeWindowAgg.c
4 : * routines to handle WindowAgg nodes.
5 : *
6 : * A WindowAgg node evaluates "window functions" across suitable partitions
7 : * of the input tuple set. Any one WindowAgg works for just a single window
8 : * specification, though it can evaluate multiple window functions sharing
9 : * identical window specifications. The input tuples are required to be
10 : * delivered in sorted order, with the PARTITION BY columns (if any) as
11 : * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12 : * (The planner generates a stack of WindowAggs with intervening Sort nodes
13 : * as needed, if a query involves more than one window specification.)
14 : *
15 : * Since window functions can require access to any or all of the rows in
16 : * the current partition, we accumulate rows of the partition into a
17 : * tuplestore. The window functions are called using the WindowObject API
18 : * so that they can access those rows as needed.
19 : *
20 : * We also support using plain aggregate functions as window functions.
21 : * For these, the regular Agg-node environment is emulated for each partition.
22 : * As required by the SQL spec, the output represents the value of the
23 : * aggregate function over all rows in the current row's window frame.
24 : *
25 : *
26 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
27 : * Portions Copyright (c) 1994, Regents of the University of California
28 : *
29 : * IDENTIFICATION
30 : * src/backend/executor/nodeWindowAgg.c
31 : *
32 : *-------------------------------------------------------------------------
33 : */
34 : #include "postgres.h"
35 :
36 : #include "access/htup_details.h"
37 : #include "catalog/objectaccess.h"
38 : #include "catalog/pg_aggregate.h"
39 : #include "catalog/pg_proc.h"
40 : #include "common/int.h"
41 : #include "executor/executor.h"
42 : #include "executor/instrument.h"
43 : #include "executor/nodeWindowAgg.h"
44 : #include "miscadmin.h"
45 : #include "nodes/nodeFuncs.h"
46 : #include "optimizer/clauses.h"
47 : #include "optimizer/optimizer.h"
48 : #include "parser/parse_agg.h"
49 : #include "parser/parse_coerce.h"
50 : #include "utils/acl.h"
51 : #include "utils/builtins.h"
52 : #include "utils/datum.h"
53 : #include "utils/expandeddatum.h"
54 : #include "utils/lsyscache.h"
55 : #include "utils/memutils.h"
56 : #include "utils/regproc.h"
57 : #include "utils/syscache.h"
58 : #include "utils/tuplestore.h"
59 : #include "windowapi.h"
60 :
61 : /*
62 : * All the window function APIs are called with this object, which is passed
63 : * to window functions as fcinfo->context.
64 : */
65 : typedef struct WindowObjectData
66 : {
67 : NodeTag type;
68 : WindowAggState *winstate; /* parent WindowAggState */
69 : List *argstates; /* ExprState trees for fn's arguments */
70 : void *localmem; /* WinGetPartitionLocalMemory's chunk */
71 : int markptr; /* tuplestore mark pointer for this fn */
72 : int readptr; /* tuplestore read pointer for this fn */
73 : int64 markpos; /* row that markptr is positioned on */
74 : int64 seekpos; /* row that readptr is positioned on */
75 : uint8 **notnull_info; /* not null info for each func args */
76 : int64 *num_notnull_info; /* track size (number of tuples in
77 : * partition) of the notnull_info array
78 : * for each func args */
79 : bool *notnull_info_cacheable; /* can we cache notnull_info? */
80 :
81 : /*
82 : * Null treatment options. One of: NO_NULLTREATMENT, PARSER_IGNORE_NULLS,
83 : * PARSER_RESPECT_NULLS or IGNORE_NULLS.
84 : */
85 : int ignore_nulls;
86 : } WindowObjectData;
87 :
88 : /*
89 : * We have one WindowStatePerFunc struct for each window function and
90 : * window aggregate handled by this node.
91 : */
92 : typedef struct WindowStatePerFuncData
93 : {
94 : /* Links to WindowFunc expr and state nodes this working state is for */
95 : WindowFuncExprState *wfuncstate;
96 : WindowFunc *wfunc;
97 :
98 : int numArguments; /* number of arguments */
99 :
100 : FmgrInfo flinfo; /* fmgr lookup data for window function */
101 :
102 : Oid winCollation; /* collation derived for window function */
103 :
104 : /*
105 : * We need the len and byval info for the result of each function in order
106 : * to know how to copy/delete values.
107 : */
108 : int16 resulttypeLen;
109 : bool resulttypeByVal;
110 :
111 : bool plain_agg; /* is it just a plain aggregate function? */
112 : int aggno; /* if so, index of its WindowStatePerAggData */
113 : uint8 ignore_nulls; /* ignore nulls */
114 :
115 : WindowObject winobj; /* object used in window function API */
116 : } WindowStatePerFuncData;
117 :
118 : /*
119 : * For plain aggregate window functions, we also have one of these.
120 : */
121 : typedef struct WindowStatePerAggData
122 : {
123 : /* Oids of transition functions */
124 : Oid transfn_oid;
125 : Oid invtransfn_oid; /* may be InvalidOid */
126 : Oid finalfn_oid; /* may be InvalidOid */
127 :
128 : /*
129 : * fmgr lookup data for transition functions --- only valid when
130 : * corresponding oid is not InvalidOid. Note in particular that fn_strict
131 : * flags are kept here.
132 : */
133 : FmgrInfo transfn;
134 : FmgrInfo invtransfn;
135 : FmgrInfo finalfn;
136 :
137 : int numFinalArgs; /* number of arguments to pass to finalfn */
138 :
139 : /*
140 : * initial value from pg_aggregate entry
141 : */
142 : Datum initValue;
143 : bool initValueIsNull;
144 :
145 : /*
146 : * cached value for current frame boundaries
147 : */
148 : Datum resultValue;
149 : bool resultValueIsNull;
150 :
151 : /*
152 : * We need the len and byval info for the agg's input, result, and
153 : * transition data types in order to know how to copy/delete values.
154 : */
155 : int16 inputtypeLen,
156 : resulttypeLen,
157 : transtypeLen;
158 : bool inputtypeByVal,
159 : resulttypeByVal,
160 : transtypeByVal;
161 :
162 : int wfuncno; /* index of associated WindowStatePerFuncData */
163 :
164 : /* Context holding transition value and possibly other subsidiary data */
165 : MemoryContext aggcontext; /* may be private, or winstate->aggcontext */
166 :
167 : /* Current transition value */
168 : Datum transValue; /* current transition value */
169 : bool transValueIsNull;
170 :
171 : int64 transValueCount; /* number of currently-aggregated rows */
172 :
173 : /* Data local to eval_windowaggregates() */
174 : bool restart; /* need to restart this agg in this cycle? */
175 : } WindowStatePerAggData;
176 :
177 : static void initialize_windowaggregate(WindowAggState *winstate,
178 : WindowStatePerFunc perfuncstate,
179 : WindowStatePerAgg peraggstate);
180 : static void advance_windowaggregate(WindowAggState *winstate,
181 : WindowStatePerFunc perfuncstate,
182 : WindowStatePerAgg peraggstate);
183 : static bool advance_windowaggregate_base(WindowAggState *winstate,
184 : WindowStatePerFunc perfuncstate,
185 : WindowStatePerAgg peraggstate);
186 : static void finalize_windowaggregate(WindowAggState *winstate,
187 : WindowStatePerFunc perfuncstate,
188 : WindowStatePerAgg peraggstate,
189 : Datum *result, bool *isnull);
190 :
191 : static void eval_windowaggregates(WindowAggState *winstate);
192 : static void eval_windowfunction(WindowAggState *winstate,
193 : WindowStatePerFunc perfuncstate,
194 : Datum *result, bool *isnull);
195 :
196 : static void begin_partition(WindowAggState *winstate);
197 : static void spool_tuples(WindowAggState *winstate, int64 pos);
198 : static void release_partition(WindowAggState *winstate);
199 :
200 : static int row_is_in_frame(WindowObject winobj, int64 pos,
201 : TupleTableSlot *slot, bool fetch_tuple);
202 : static void update_frameheadpos(WindowAggState *winstate);
203 : static void update_frametailpos(WindowAggState *winstate);
204 : static void update_grouptailpos(WindowAggState *winstate);
205 :
206 : static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
207 : WindowFunc *wfunc,
208 : WindowStatePerAgg peraggstate);
209 : static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
210 :
211 : static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
212 : TupleTableSlot *slot2);
213 : static bool window_gettupleslot(WindowObject winobj, int64 pos,
214 : TupleTableSlot *slot);
215 :
216 : static Datum ignorenulls_getfuncarginframe(WindowObject winobj, int argno,
217 : int relpos, int seektype,
218 : bool set_mark, bool *isnull,
219 : bool *isout);
220 : static Datum gettuple_eval_partition(WindowObject winobj, int argno,
221 : int64 abs_pos, bool *isnull,
222 : bool *isout);
223 : static void init_notnull_info(WindowObject winobj,
224 : WindowStatePerFunc perfuncstate);
225 : static void grow_notnull_info(WindowObject winobj,
226 : int64 pos, int argno);
227 : static uint8 get_notnull_info(WindowObject winobj,
228 : int64 pos, int argno);
229 : static void put_notnull_info(WindowObject winobj,
230 : int64 pos, int argno, bool isnull);
231 :
232 : /*
233 : * Not null info bit array consists of 2-bit items
234 : */
235 : #define NN_UNKNOWN 0x00 /* value not calculated yet */
236 : #define NN_NULL 0x01 /* NULL */
237 : #define NN_NOTNULL 0x02 /* NOT NULL */
238 : #define NN_MASK 0x03 /* mask for NOT NULL MAP */
239 : #define NN_BITS_PER_MEMBER 2 /* number of bits in not null map */
240 : /* number of items per variable */
241 : #define NN_ITEM_PER_VAR (BITS_PER_BYTE / NN_BITS_PER_MEMBER)
242 : /* convert map position to byte offset */
243 : #define NN_POS_TO_BYTES(pos) ((pos) / NN_ITEM_PER_VAR)
244 : /* bytes offset to map position */
245 : #define NN_BYTES_TO_POS(bytes) ((bytes) * NN_ITEM_PER_VAR)
246 : /* calculate shift bits */
247 : #define NN_SHIFT(pos) ((pos) % NN_ITEM_PER_VAR) * NN_BITS_PER_MEMBER
248 :
249 : /*
250 : * initialize_windowaggregate
251 : * parallel to initialize_aggregates in nodeAgg.c
252 : */
253 : static void
254 2831 : initialize_windowaggregate(WindowAggState *winstate,
255 : WindowStatePerFunc perfuncstate,
256 : WindowStatePerAgg peraggstate)
257 : {
258 : MemoryContext oldContext;
259 :
260 : /*
261 : * If we're using a private aggcontext, we may reset it here. But if the
262 : * context is shared, we don't know which other aggregates may still need
263 : * it, so we must leave it to the caller to reset at an appropriate time.
264 : */
265 2831 : if (peraggstate->aggcontext != winstate->aggcontext)
266 2068 : MemoryContextReset(peraggstate->aggcontext);
267 :
268 2831 : if (peraggstate->initValueIsNull)
269 1045 : peraggstate->transValue = peraggstate->initValue;
270 : else
271 : {
272 1786 : oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
273 3572 : peraggstate->transValue = datumCopy(peraggstate->initValue,
274 1786 : peraggstate->transtypeByVal,
275 1786 : peraggstate->transtypeLen);
276 1786 : MemoryContextSwitchTo(oldContext);
277 : }
278 2831 : peraggstate->transValueIsNull = peraggstate->initValueIsNull;
279 2831 : peraggstate->transValueCount = 0;
280 2831 : peraggstate->resultValue = (Datum) 0;
281 2831 : peraggstate->resultValueIsNull = true;
282 2831 : }
283 :
284 : /*
285 : * advance_windowaggregate
286 : * parallel to advance_aggregates in nodeAgg.c
287 : */
288 : static void
289 119315 : advance_windowaggregate(WindowAggState *winstate,
290 : WindowStatePerFunc perfuncstate,
291 : WindowStatePerAgg peraggstate)
292 : {
293 119315 : LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
294 119315 : WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
295 119315 : int numArguments = perfuncstate->numArguments;
296 : Datum newVal;
297 : ListCell *arg;
298 : int i;
299 : MemoryContext oldContext;
300 119315 : ExprContext *econtext = winstate->tmpcontext;
301 119315 : ExprState *filter = wfuncstate->aggfilter;
302 :
303 119315 : oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
304 :
305 : /* Skip anything FILTERed out */
306 119315 : if (filter)
307 : {
308 : bool isnull;
309 228 : Datum res = ExecEvalExpr(filter, econtext, &isnull);
310 :
311 228 : if (isnull || !DatumGetBool(res))
312 : {
313 108 : MemoryContextSwitchTo(oldContext);
314 108 : return;
315 : }
316 : }
317 :
318 : /* We start from 1, since the 0th arg will be the transition value */
319 119207 : i = 1;
320 198194 : foreach(arg, wfuncstate->args)
321 : {
322 78987 : ExprState *argstate = (ExprState *) lfirst(arg);
323 :
324 78987 : fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
325 : &fcinfo->args[i].isnull);
326 78987 : i++;
327 : }
328 :
329 119207 : if (peraggstate->transfn.fn_strict)
330 : {
331 : /*
332 : * For a strict transfn, nothing happens when there's a NULL input; we
333 : * just keep the prior transValue. Note transValueCount doesn't
334 : * change either.
335 : */
336 69990 : for (i = 1; i <= numArguments; i++)
337 : {
338 14887 : if (fcinfo->args[i].isnull)
339 : {
340 132 : MemoryContextSwitchTo(oldContext);
341 132 : return;
342 : }
343 : }
344 :
345 : /*
346 : * For strict transition functions with initial value NULL we use the
347 : * first non-NULL input as the initial state. (We already checked
348 : * that the agg's input type is binary-compatible with its transtype,
349 : * so straight copy here is OK.)
350 : *
351 : * We must copy the datum into aggcontext if it is pass-by-ref. We do
352 : * not need to pfree the old transValue, since it's NULL.
353 : */
354 55103 : if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
355 : {
356 305 : MemoryContextSwitchTo(peraggstate->aggcontext);
357 610 : peraggstate->transValue = datumCopy(fcinfo->args[1].value,
358 305 : peraggstate->transtypeByVal,
359 305 : peraggstate->transtypeLen);
360 305 : peraggstate->transValueIsNull = false;
361 305 : peraggstate->transValueCount = 1;
362 305 : MemoryContextSwitchTo(oldContext);
363 305 : return;
364 : }
365 :
366 54798 : if (peraggstate->transValueIsNull)
367 : {
368 : /*
369 : * Don't call a strict function with NULL inputs. Note it is
370 : * possible to get here despite the above tests, if the transfn is
371 : * strict *and* returned a NULL on a prior cycle. If that happens
372 : * we will propagate the NULL all the way to the end. That can
373 : * only happen if there's no inverse transition function, though,
374 : * since we disallow transitions back to NULL when there is one.
375 : */
376 0 : MemoryContextSwitchTo(oldContext);
377 : Assert(!OidIsValid(peraggstate->invtransfn_oid));
378 0 : return;
379 : }
380 : }
381 :
382 : /*
383 : * OK to call the transition function. Set winstate->curaggcontext while
384 : * calling it, for possible use by AggCheckCallContext.
385 : */
386 118770 : InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
387 : numArguments + 1,
388 : perfuncstate->winCollation,
389 : (Node *) winstate, NULL);
390 118770 : fcinfo->args[0].value = peraggstate->transValue;
391 118770 : fcinfo->args[0].isnull = peraggstate->transValueIsNull;
392 118770 : winstate->curaggcontext = peraggstate->aggcontext;
393 118770 : newVal = FunctionCallInvoke(fcinfo);
394 118762 : winstate->curaggcontext = NULL;
395 :
396 : /*
397 : * Moving-aggregate transition functions must not return null, see
398 : * advance_windowaggregate_base().
399 : */
400 118762 : if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
401 0 : ereport(ERROR,
402 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
403 : errmsg("moving-aggregate transition function must not return null")));
404 :
405 : /*
406 : * We must track the number of rows included in transValue, since to
407 : * remove the last input, advance_windowaggregate_base() mustn't call the
408 : * inverse transition function, but simply reset transValue back to its
409 : * initial value.
410 : */
411 118762 : peraggstate->transValueCount++;
412 :
413 : /*
414 : * If pass-by-ref datatype, must copy the new value into aggcontext and
415 : * free the prior transValue. But if transfn returned a pointer to its
416 : * first input, we don't need to do anything. Also, if transfn returned a
417 : * pointer to a R/W expanded object that is already a child of the
418 : * aggcontext, assume we can adopt that value without copying it. (See
419 : * comments for ExecAggCopyTransValue, which this code duplicates.)
420 : */
421 124554 : if (!peraggstate->transtypeByVal &&
422 5792 : DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
423 : {
424 640 : if (!fcinfo->isnull)
425 : {
426 640 : MemoryContextSwitchTo(peraggstate->aggcontext);
427 640 : if (DatumIsReadWriteExpandedObject(newVal,
428 : false,
429 644 : peraggstate->transtypeLen) &&
430 4 : MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
431 : /* do nothing */ ;
432 : else
433 636 : newVal = datumCopy(newVal,
434 636 : peraggstate->transtypeByVal,
435 636 : peraggstate->transtypeLen);
436 : }
437 640 : if (!peraggstate->transValueIsNull)
438 : {
439 600 : if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
440 : false,
441 : peraggstate->transtypeLen))
442 0 : DeleteExpandedObject(peraggstate->transValue);
443 : else
444 600 : pfree(DatumGetPointer(peraggstate->transValue));
445 : }
446 : }
447 :
448 118762 : MemoryContextSwitchTo(oldContext);
449 118762 : peraggstate->transValue = newVal;
450 118762 : peraggstate->transValueIsNull = fcinfo->isnull;
451 : }
452 :
453 : /*
454 : * advance_windowaggregate_base
455 : * Remove the oldest tuple from an aggregation.
456 : *
457 : * This is very much like advance_windowaggregate, except that we will call
458 : * the inverse transition function (which caller must have checked is
459 : * available).
460 : *
461 : * Returns true if we successfully removed the current row from this
462 : * aggregate, false if not (in the latter case, caller is responsible
463 : * for cleaning up by restarting the aggregation).
464 : */
465 : static bool
466 3116 : advance_windowaggregate_base(WindowAggState *winstate,
467 : WindowStatePerFunc perfuncstate,
468 : WindowStatePerAgg peraggstate)
469 : {
470 3116 : LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
471 3116 : WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
472 3116 : int numArguments = perfuncstate->numArguments;
473 : Datum newVal;
474 : ListCell *arg;
475 : int i;
476 : MemoryContext oldContext;
477 3116 : ExprContext *econtext = winstate->tmpcontext;
478 3116 : ExprState *filter = wfuncstate->aggfilter;
479 :
480 3116 : oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
481 :
482 : /* Skip anything FILTERed out */
483 3116 : if (filter)
484 : {
485 : bool isnull;
486 68 : Datum res = ExecEvalExpr(filter, econtext, &isnull);
487 :
488 68 : if (isnull || !DatumGetBool(res))
489 : {
490 32 : MemoryContextSwitchTo(oldContext);
491 32 : return true;
492 : }
493 : }
494 :
495 : /* We start from 1, since the 0th arg will be the transition value */
496 3084 : i = 1;
497 6156 : foreach(arg, wfuncstate->args)
498 : {
499 3072 : ExprState *argstate = (ExprState *) lfirst(arg);
500 :
501 3072 : fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
502 : &fcinfo->args[i].isnull);
503 3072 : i++;
504 : }
505 :
506 3084 : if (peraggstate->invtransfn.fn_strict)
507 : {
508 : /*
509 : * For a strict (inv)transfn, nothing happens when there's a NULL
510 : * input; we just keep the prior transValue. Note transValueCount
511 : * doesn't change either.
512 : */
513 3800 : for (i = 1; i <= numArguments; i++)
514 : {
515 1920 : if (fcinfo->args[i].isnull)
516 : {
517 52 : MemoryContextSwitchTo(oldContext);
518 52 : return true;
519 : }
520 : }
521 : }
522 :
523 : /* There should still be an added but not yet removed value */
524 : Assert(peraggstate->transValueCount > 0);
525 :
526 : /*
527 : * In moving-aggregate mode, the state must never be NULL, except possibly
528 : * before any rows have been aggregated (which is surely not the case at
529 : * this point). This restriction allows us to interpret a NULL result
530 : * from the inverse function as meaning "sorry, can't do an inverse
531 : * transition in this case". We already checked this in
532 : * advance_windowaggregate, but just for safety, check again.
533 : */
534 3032 : if (peraggstate->transValueIsNull)
535 0 : elog(ERROR, "aggregate transition value is NULL before inverse transition");
536 :
537 : /*
538 : * We mustn't use the inverse transition function to remove the last
539 : * input. Doing so would yield a non-NULL state, whereas we should be in
540 : * the initial state afterwards which may very well be NULL. So instead,
541 : * we simply re-initialize the aggregate in this case.
542 : */
543 3032 : if (peraggstate->transValueCount == 1)
544 : {
545 60 : MemoryContextSwitchTo(oldContext);
546 60 : initialize_windowaggregate(winstate,
547 60 : &winstate->perfunc[peraggstate->wfuncno],
548 : peraggstate);
549 60 : return true;
550 : }
551 :
552 : /*
553 : * OK to call the inverse transition function. Set
554 : * winstate->curaggcontext while calling it, for possible use by
555 : * AggCheckCallContext.
556 : */
557 2972 : InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
558 : numArguments + 1,
559 : perfuncstate->winCollation,
560 : (Node *) winstate, NULL);
561 2972 : fcinfo->args[0].value = peraggstate->transValue;
562 2972 : fcinfo->args[0].isnull = peraggstate->transValueIsNull;
563 2972 : winstate->curaggcontext = peraggstate->aggcontext;
564 2972 : newVal = FunctionCallInvoke(fcinfo);
565 2972 : winstate->curaggcontext = NULL;
566 :
567 : /*
568 : * If the function returns NULL, report failure, forcing a restart.
569 : */
570 2972 : if (fcinfo->isnull)
571 : {
572 164 : MemoryContextSwitchTo(oldContext);
573 164 : return false;
574 : }
575 :
576 : /* Update number of rows included in transValue */
577 2808 : peraggstate->transValueCount--;
578 :
579 : /*
580 : * If pass-by-ref datatype, must copy the new value into aggcontext and
581 : * free the prior transValue. But if invtransfn returned a pointer to its
582 : * first input, we don't need to do anything. Also, if invtransfn
583 : * returned a pointer to a R/W expanded object that is already a child of
584 : * the aggcontext, assume we can adopt that value without copying it. (See
585 : * comments for ExecAggCopyTransValue, which this code duplicates.)
586 : *
587 : * Note: the checks for null values here will never fire, but it seems
588 : * best to have this stanza look just like advance_windowaggregate.
589 : */
590 4260 : if (!peraggstate->transtypeByVal &&
591 1452 : DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
592 : {
593 444 : if (!fcinfo->isnull)
594 : {
595 444 : MemoryContextSwitchTo(peraggstate->aggcontext);
596 444 : if (DatumIsReadWriteExpandedObject(newVal,
597 : false,
598 444 : peraggstate->transtypeLen) &&
599 0 : MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
600 : /* do nothing */ ;
601 : else
602 444 : newVal = datumCopy(newVal,
603 444 : peraggstate->transtypeByVal,
604 444 : peraggstate->transtypeLen);
605 : }
606 444 : if (!peraggstate->transValueIsNull)
607 : {
608 444 : if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
609 : false,
610 : peraggstate->transtypeLen))
611 0 : DeleteExpandedObject(peraggstate->transValue);
612 : else
613 444 : pfree(DatumGetPointer(peraggstate->transValue));
614 : }
615 : }
616 :
617 2808 : MemoryContextSwitchTo(oldContext);
618 2808 : peraggstate->transValue = newVal;
619 2808 : peraggstate->transValueIsNull = fcinfo->isnull;
620 :
621 2808 : return true;
622 : }
623 :
624 : /*
625 : * finalize_windowaggregate
626 : * parallel to finalize_aggregate in nodeAgg.c
627 : */
628 : static void
629 7323 : finalize_windowaggregate(WindowAggState *winstate,
630 : WindowStatePerFunc perfuncstate,
631 : WindowStatePerAgg peraggstate,
632 : Datum *result, bool *isnull)
633 : {
634 : MemoryContext oldContext;
635 :
636 7323 : oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
637 :
638 : /*
639 : * Apply the agg's finalfn if one is provided, else return transValue.
640 : */
641 7323 : if (OidIsValid(peraggstate->finalfn_oid))
642 : {
643 4156 : LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
644 4156 : int numFinalArgs = peraggstate->numFinalArgs;
645 : bool anynull;
646 : int i;
647 :
648 4156 : InitFunctionCallInfoData(fcinfodata.fcinfo, &(peraggstate->finalfn),
649 : numFinalArgs,
650 : perfuncstate->winCollation,
651 : (Node *) winstate, NULL);
652 4156 : fcinfo->args[0].value =
653 4156 : MakeExpandedObjectReadOnly(peraggstate->transValue,
654 : peraggstate->transValueIsNull,
655 : peraggstate->transtypeLen);
656 4156 : fcinfo->args[0].isnull = peraggstate->transValueIsNull;
657 4156 : anynull = peraggstate->transValueIsNull;
658 :
659 : /* Fill any remaining argument positions with nulls */
660 4216 : for (i = 1; i < numFinalArgs; i++)
661 : {
662 60 : fcinfo->args[i].value = (Datum) 0;
663 60 : fcinfo->args[i].isnull = true;
664 60 : anynull = true;
665 : }
666 :
667 4156 : if (fcinfo->flinfo->fn_strict && anynull)
668 : {
669 : /* don't call a strict function with NULL inputs */
670 0 : *result = (Datum) 0;
671 0 : *isnull = true;
672 : }
673 : else
674 : {
675 : Datum res;
676 :
677 4156 : winstate->curaggcontext = peraggstate->aggcontext;
678 4156 : res = FunctionCallInvoke(fcinfo);
679 4148 : winstate->curaggcontext = NULL;
680 4148 : *isnull = fcinfo->isnull;
681 4148 : *result = MakeExpandedObjectReadOnly(res,
682 : fcinfo->isnull,
683 : peraggstate->resulttypeLen);
684 : }
685 : }
686 : else
687 : {
688 3167 : *result =
689 3167 : MakeExpandedObjectReadOnly(peraggstate->transValue,
690 : peraggstate->transValueIsNull,
691 : peraggstate->transtypeLen);
692 3167 : *isnull = peraggstate->transValueIsNull;
693 : }
694 :
695 7315 : MemoryContextSwitchTo(oldContext);
696 7315 : }
697 :
698 : /*
699 : * eval_windowaggregates
700 : * evaluate plain aggregates being used as window functions
701 : *
702 : * This differs from nodeAgg.c in two ways. First, if the window's frame
703 : * start position moves, we use the inverse transition function (if it exists)
704 : * to remove rows from the transition value. And second, we expect to be
705 : * able to call aggregate final functions repeatedly after aggregating more
706 : * data onto the same transition value. This is not a behavior required by
707 : * nodeAgg.c.
708 : */
709 : static void
710 107065 : eval_windowaggregates(WindowAggState *winstate)
711 : {
712 : WindowStatePerAgg peraggstate;
713 : int wfuncno,
714 : numaggs,
715 : numaggs_restart,
716 : i;
717 : int64 aggregatedupto_nonrestarted;
718 : MemoryContext oldContext;
719 : ExprContext *econtext;
720 : WindowObject agg_winobj;
721 : TupleTableSlot *agg_row_slot;
722 : TupleTableSlot *temp_slot;
723 :
724 107065 : numaggs = winstate->numaggs;
725 107065 : if (numaggs == 0)
726 0 : return; /* nothing to do */
727 :
728 : /* final output execution is in ps_ExprContext */
729 107065 : econtext = winstate->ss.ps.ps_ExprContext;
730 107065 : agg_winobj = winstate->agg_winobj;
731 107065 : agg_row_slot = winstate->agg_row_slot;
732 107065 : temp_slot = winstate->temp_slot_1;
733 :
734 : /*
735 : * If the window's frame start clause is UNBOUNDED_PRECEDING and no
736 : * exclusion clause is specified, then the window frame consists of a
737 : * contiguous group of rows extending forward from the start of the
738 : * partition, and rows only enter the frame, never exit it, as the current
739 : * row advances forward. This makes it possible to use an incremental
740 : * strategy for evaluating aggregates: we run the transition function for
741 : * each row added to the frame, and run the final function whenever we
742 : * need the current aggregate value. This is considerably more efficient
743 : * than the naive approach of re-running the entire aggregate calculation
744 : * for each current row. It does assume that the final function doesn't
745 : * damage the running transition value, but we have the same assumption in
746 : * nodeAgg.c too (when it rescans an existing hash table).
747 : *
748 : * If the frame start does sometimes move, we can still optimize as above
749 : * whenever successive rows share the same frame head, but if the frame
750 : * head moves beyond the previous head we try to remove those rows using
751 : * the aggregate's inverse transition function. This function restores
752 : * the aggregate's current state to what it would be if the removed row
753 : * had never been aggregated in the first place. Inverse transition
754 : * functions may optionally return NULL, indicating that the function was
755 : * unable to remove the tuple from aggregation. If this happens, or if
756 : * the aggregate doesn't have an inverse transition function at all, we
757 : * must perform the aggregation all over again for all tuples within the
758 : * new frame boundaries.
759 : *
760 : * If there's any exclusion clause, then we may have to aggregate over a
761 : * non-contiguous set of rows, so we punt and recalculate for every row.
762 : * (For some frame end choices, it might be that the frame is always
763 : * contiguous anyway, but that's an optimization to investigate later.)
764 : *
765 : * In many common cases, multiple rows share the same frame and hence the
766 : * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
767 : * window, then all rows are peers and so they all have window frame equal
768 : * to the whole partition.) We optimize such cases by calculating the
769 : * aggregate value once when we reach the first row of a peer group, and
770 : * then returning the saved value for all subsequent rows.
771 : *
772 : * 'aggregatedupto' keeps track of the first row that has not yet been
773 : * accumulated into the aggregate transition values. Whenever we start a
774 : * new peer group, we accumulate forward to the end of the peer group.
775 : */
776 :
777 : /*
778 : * First, update the frame head position.
779 : *
780 : * The frame head should never move backwards, and the code below wouldn't
781 : * cope if it did, so for safety we complain if it does.
782 : */
783 107065 : update_frameheadpos(winstate);
784 107061 : if (winstate->frameheadpos < winstate->aggregatedbase)
785 0 : elog(ERROR, "window frame head moved backward");
786 :
787 : /*
788 : * If the frame didn't change compared to the previous row, we can re-use
789 : * the result values that were previously saved at the bottom of this
790 : * function. Since we don't know the current frame's end yet, this is not
791 : * possible to check for fully. But if the frame end mode is UNBOUNDED
792 : * FOLLOWING or CURRENT ROW, no exclusion clause is specified, and the
793 : * current row lies within the previous row's frame, then the two frames'
794 : * ends must coincide. Note that on the first row aggregatedbase ==
795 : * aggregatedupto, meaning this test must fail, so we don't need to check
796 : * the "there was no previous row" case explicitly here.
797 : */
798 107061 : if (winstate->aggregatedbase == winstate->frameheadpos &&
799 104489 : (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
800 103141 : FRAMEOPTION_END_CURRENT_ROW)) &&
801 103141 : !(winstate->frameOptions & FRAMEOPTION_EXCLUSION) &&
802 103021 : winstate->aggregatedbase <= winstate->currentpos &&
803 102961 : winstate->aggregatedupto > winstate->currentpos)
804 : {
805 201828 : for (i = 0; i < numaggs; i++)
806 : {
807 100918 : peraggstate = &winstate->peragg[i];
808 100918 : wfuncno = peraggstate->wfuncno;
809 100918 : econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
810 100918 : econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
811 : }
812 100910 : return;
813 : }
814 :
815 : /*----------
816 : * Initialize restart flags.
817 : *
818 : * We restart the aggregation:
819 : * - if we're processing the first row in the partition, or
820 : * - if the frame's head moved and we cannot use an inverse
821 : * transition function, or
822 : * - we have an EXCLUSION clause, or
823 : * - if the new frame doesn't overlap the old one
824 : *
825 : * Note that we don't strictly need to restart in the last case, but if
826 : * we're going to remove all rows from the aggregation anyway, a restart
827 : * surely is faster.
828 : *----------
829 : */
830 6151 : numaggs_restart = 0;
831 13490 : for (i = 0; i < numaggs; i++)
832 : {
833 7339 : peraggstate = &winstate->peragg[i];
834 7339 : if (winstate->currentpos == 0 ||
835 5944 : (winstate->aggregatedbase != winstate->frameheadpos &&
836 3516 : !OidIsValid(peraggstate->invtransfn_oid)) ||
837 5896 : (winstate->frameOptions & FRAMEOPTION_EXCLUSION) ||
838 5116 : winstate->aggregatedupto <= winstate->frameheadpos)
839 : {
840 2607 : peraggstate->restart = true;
841 2607 : numaggs_restart++;
842 : }
843 : else
844 4732 : peraggstate->restart = false;
845 : }
846 :
847 : /*
848 : * If we have any possibly-moving aggregates, attempt to advance
849 : * aggregatedbase to match the frame's head by removing input rows that
850 : * fell off the top of the frame from the aggregations. This can fail,
851 : * i.e. advance_windowaggregate_base() can return false, in which case
852 : * we'll restart that aggregate below.
853 : */
854 8295 : while (numaggs_restart < numaggs &&
855 5824 : winstate->aggregatedbase < winstate->frameheadpos)
856 : {
857 : /*
858 : * Fetch the next tuple of those being removed. This should never fail
859 : * as we should have been here before.
860 : */
861 2144 : if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
862 : temp_slot))
863 0 : elog(ERROR, "could not re-fetch previously fetched frame row");
864 :
865 : /* Set tuple context for evaluation of aggregate arguments */
866 2144 : winstate->tmpcontext->ecxt_outertuple = temp_slot;
867 :
868 : /*
869 : * Perform the inverse transition for each aggregate function in the
870 : * window, unless it has already been marked as needing a restart.
871 : */
872 5268 : for (i = 0; i < numaggs; i++)
873 : {
874 : bool ok;
875 :
876 3124 : peraggstate = &winstate->peragg[i];
877 3124 : if (peraggstate->restart)
878 8 : continue;
879 :
880 3116 : wfuncno = peraggstate->wfuncno;
881 3116 : ok = advance_windowaggregate_base(winstate,
882 3116 : &winstate->perfunc[wfuncno],
883 : peraggstate);
884 3116 : if (!ok)
885 : {
886 : /* Inverse transition function has failed, must restart */
887 164 : peraggstate->restart = true;
888 164 : numaggs_restart++;
889 : }
890 : }
891 :
892 : /* Reset per-input-tuple context after each tuple */
893 2144 : ResetExprContext(winstate->tmpcontext);
894 :
895 : /* And advance the aggregated-row state */
896 2144 : winstate->aggregatedbase++;
897 2144 : ExecClearTuple(temp_slot);
898 : }
899 :
900 : /*
901 : * If we successfully advanced the base rows of all the aggregates,
902 : * aggregatedbase now equals frameheadpos; but if we failed for any, we
903 : * must forcibly update aggregatedbase.
904 : */
905 6151 : winstate->aggregatedbase = winstate->frameheadpos;
906 :
907 : /*
908 : * If we created a mark pointer for aggregates, keep it pushed up to frame
909 : * head, so that tuplestore can discard unnecessary rows.
910 : */
911 6151 : if (agg_winobj->markptr >= 0)
912 4306 : WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
913 :
914 : /*
915 : * Now restart the aggregates that require it.
916 : *
917 : * We assume that aggregates using the shared context always restart if
918 : * *any* aggregate restarts, and we may thus clean up the shared
919 : * aggcontext if that is the case. Private aggcontexts are reset by
920 : * initialize_windowaggregate() if their owning aggregate restarts. If we
921 : * aren't restarting an aggregate, we need to free any previously saved
922 : * result for it, else we'll leak memory.
923 : */
924 6151 : if (numaggs_restart > 0)
925 2619 : MemoryContextReset(winstate->aggcontext);
926 13490 : for (i = 0; i < numaggs; i++)
927 : {
928 7339 : peraggstate = &winstate->peragg[i];
929 :
930 : /* Aggregates using the shared ctx must restart if *any* agg does */
931 : Assert(peraggstate->aggcontext != winstate->aggcontext ||
932 : numaggs_restart == 0 ||
933 : peraggstate->restart);
934 :
935 7339 : if (peraggstate->restart)
936 : {
937 2771 : wfuncno = peraggstate->wfuncno;
938 2771 : initialize_windowaggregate(winstate,
939 2771 : &winstate->perfunc[wfuncno],
940 : peraggstate);
941 : }
942 4568 : else if (!peraggstate->resultValueIsNull)
943 : {
944 4412 : if (!peraggstate->resulttypeByVal)
945 1504 : pfree(DatumGetPointer(peraggstate->resultValue));
946 4412 : peraggstate->resultValue = (Datum) 0;
947 4412 : peraggstate->resultValueIsNull = true;
948 : }
949 : }
950 :
951 : /*
952 : * Non-restarted aggregates now contain the rows between aggregatedbase
953 : * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
954 : * contain no rows. If there are any restarted aggregates, we must thus
955 : * begin aggregating anew at frameheadpos, otherwise we may simply
956 : * continue at aggregatedupto. We must remember the old value of
957 : * aggregatedupto to know how long to skip advancing non-restarted
958 : * aggregates. If we modify aggregatedupto, we must also clear
959 : * agg_row_slot, per the loop invariant below.
960 : */
961 6151 : aggregatedupto_nonrestarted = winstate->aggregatedupto;
962 6151 : if (numaggs_restart > 0 &&
963 2619 : winstate->aggregatedupto != winstate->frameheadpos)
964 : {
965 980 : winstate->aggregatedupto = winstate->frameheadpos;
966 980 : ExecClearTuple(agg_row_slot);
967 : }
968 :
969 : /*
970 : * Advance until we reach a row not in frame (or end of partition).
971 : *
972 : * Note the loop invariant: agg_row_slot is either empty or holds the row
973 : * at position aggregatedupto. We advance aggregatedupto after processing
974 : * a row.
975 : */
976 : for (;;)
977 118328 : {
978 : int ret;
979 :
980 : /* Fetch next row if we didn't already */
981 124479 : if (TupIsNull(agg_row_slot))
982 : {
983 121835 : if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
984 : agg_row_slot))
985 2931 : break; /* must be end of partition */
986 : }
987 :
988 : /*
989 : * Exit loop if no more rows can be in frame. Skip aggregation if
990 : * current row is not in frame but there might be more in the frame.
991 : */
992 121548 : ret = row_is_in_frame(agg_winobj, winstate->aggregatedupto,
993 : agg_row_slot, false);
994 121540 : if (ret < 0)
995 3204 : break;
996 118336 : if (ret == 0)
997 1304 : goto next_tuple;
998 :
999 : /* Set tuple context for evaluation of aggregate arguments */
1000 117032 : winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
1001 :
1002 : /* Accumulate row into the aggregates */
1003 249786 : for (i = 0; i < numaggs; i++)
1004 : {
1005 132762 : peraggstate = &winstate->peragg[i];
1006 :
1007 : /* Non-restarted aggs skip until aggregatedupto_nonrestarted */
1008 132762 : if (!peraggstate->restart &&
1009 79951 : winstate->aggregatedupto < aggregatedupto_nonrestarted)
1010 13447 : continue;
1011 :
1012 119315 : wfuncno = peraggstate->wfuncno;
1013 119315 : advance_windowaggregate(winstate,
1014 119315 : &winstate->perfunc[wfuncno],
1015 : peraggstate);
1016 : }
1017 :
1018 117024 : next_tuple:
1019 : /* Reset per-input-tuple context after each tuple */
1020 118328 : ResetExprContext(winstate->tmpcontext);
1021 :
1022 : /* And advance the aggregated-row state */
1023 118328 : winstate->aggregatedupto++;
1024 118328 : ExecClearTuple(agg_row_slot);
1025 : }
1026 :
1027 : /* The frame's end is not supposed to move backwards, ever */
1028 : Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
1029 :
1030 : /*
1031 : * finalize aggregates and fill result/isnull fields.
1032 : */
1033 13450 : for (i = 0; i < numaggs; i++)
1034 : {
1035 : Datum *result;
1036 : bool *isnull;
1037 :
1038 7323 : peraggstate = &winstate->peragg[i];
1039 7323 : wfuncno = peraggstate->wfuncno;
1040 7323 : result = &econtext->ecxt_aggvalues[wfuncno];
1041 7323 : isnull = &econtext->ecxt_aggnulls[wfuncno];
1042 7323 : finalize_windowaggregate(winstate,
1043 7323 : &winstate->perfunc[wfuncno],
1044 : peraggstate,
1045 : result, isnull);
1046 :
1047 : /*
1048 : * save the result in case next row shares the same frame.
1049 : *
1050 : * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
1051 : * advance that the next row can't possibly share the same frame. Is
1052 : * it worth detecting that and skipping this code?
1053 : */
1054 7315 : if (!peraggstate->resulttypeByVal && !*isnull)
1055 : {
1056 1908 : oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
1057 1908 : peraggstate->resultValue =
1058 1908 : datumCopy(*result,
1059 1908 : peraggstate->resulttypeByVal,
1060 1908 : peraggstate->resulttypeLen);
1061 1908 : MemoryContextSwitchTo(oldContext);
1062 : }
1063 : else
1064 : {
1065 5407 : peraggstate->resultValue = *result;
1066 : }
1067 7315 : peraggstate->resultValueIsNull = *isnull;
1068 : }
1069 : }
1070 :
1071 : /*
1072 : * eval_windowfunction
1073 : *
1074 : * Arguments of window functions are not evaluated here, because a window
1075 : * function can need random access to arbitrary rows in the partition.
1076 : * The window function uses the special WinGetFuncArgInPartition and
1077 : * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
1078 : * it wants.
1079 : */
1080 : static void
1081 580715 : eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
1082 : Datum *result, bool *isnull)
1083 : {
1084 580715 : LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1085 : MemoryContext oldContext;
1086 :
1087 580715 : oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1088 :
1089 : /*
1090 : * We don't pass any normal arguments to a window function, but we do pass
1091 : * it the number of arguments, in order to permit window function
1092 : * implementations to support varying numbers of arguments. The real info
1093 : * goes through the WindowObject, which is passed via fcinfo->context.
1094 : */
1095 580715 : InitFunctionCallInfoData(*fcinfo, &(perfuncstate->flinfo),
1096 : perfuncstate->numArguments,
1097 : perfuncstate->winCollation,
1098 : (Node *) perfuncstate->winobj, NULL);
1099 : /* Just in case, make all the regular argument slots be null */
1100 747039 : for (int argno = 0; argno < perfuncstate->numArguments; argno++)
1101 166324 : fcinfo->args[argno].isnull = true;
1102 : /* Window functions don't have a current aggregate context, either */
1103 580715 : winstate->curaggcontext = NULL;
1104 :
1105 580715 : *result = FunctionCallInvoke(fcinfo);
1106 580607 : *isnull = fcinfo->isnull;
1107 :
1108 : /*
1109 : * The window function might have returned a pass-by-ref result that's
1110 : * just a pointer into one of the WindowObject's temporary slots. That's
1111 : * not a problem if it's the only window function using the WindowObject;
1112 : * but if there's more than one function, we'd better copy the result to
1113 : * ensure it's not clobbered by later window functions.
1114 : */
1115 580607 : if (!perfuncstate->resulttypeByVal && !fcinfo->isnull &&
1116 680 : winstate->numfuncs > 1)
1117 72 : *result = datumCopy(*result,
1118 72 : perfuncstate->resulttypeByVal,
1119 72 : perfuncstate->resulttypeLen);
1120 :
1121 580607 : MemoryContextSwitchTo(oldContext);
1122 580607 : }
1123 :
1124 : /*
1125 : * prepare_tuplestore
1126 : * Prepare the tuplestore and all of the required read pointers for the
1127 : * WindowAggState's frameOptions.
1128 : *
1129 : * Note: We use pg_noinline to avoid bloating the calling function with code
1130 : * which is only called once.
1131 : */
1132 : static pg_noinline void
1133 1565 : prepare_tuplestore(WindowAggState *winstate)
1134 : {
1135 1565 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1136 1565 : int frameOptions = winstate->frameOptions;
1137 1565 : int numfuncs = winstate->numfuncs;
1138 :
1139 : /* we shouldn't be called if this was done already */
1140 : Assert(winstate->buffer == NULL);
1141 :
1142 : /* Create new tuplestore */
1143 1565 : winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
1144 :
1145 : /*
1146 : * Set up read pointers for the tuplestore. The current pointer doesn't
1147 : * need BACKWARD capability, but the per-window-function read pointers do,
1148 : * and the aggregate pointer does if we might need to restart aggregation.
1149 : */
1150 1565 : winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */
1151 :
1152 : /* reset default REWIND capability bit for current ptr */
1153 1565 : tuplestore_set_eflags(winstate->buffer, 0);
1154 :
1155 : /* create read pointers for aggregates, if needed */
1156 1565 : if (winstate->numaggs > 0)
1157 : {
1158 775 : WindowObject agg_winobj = winstate->agg_winobj;
1159 775 : int readptr_flags = 0;
1160 :
1161 : /*
1162 : * If the frame head is potentially movable, or we have an EXCLUSION
1163 : * clause, we might need to restart aggregation ...
1164 : */
1165 775 : if (!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) ||
1166 270 : (frameOptions & FRAMEOPTION_EXCLUSION))
1167 : {
1168 : /* ... so create a mark pointer to track the frame head */
1169 517 : agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
1170 : /* and the read pointer will need BACKWARD capability */
1171 517 : readptr_flags |= EXEC_FLAG_BACKWARD;
1172 : }
1173 :
1174 775 : agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1175 : readptr_flags);
1176 : }
1177 :
1178 : /* create mark and read pointers for each real window function */
1179 3638 : for (int i = 0; i < numfuncs; i++)
1180 : {
1181 2073 : WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1182 :
1183 2073 : if (!perfuncstate->plain_agg)
1184 : {
1185 1234 : WindowObject winobj = perfuncstate->winobj;
1186 :
1187 1234 : winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
1188 : 0);
1189 1234 : winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1190 : EXEC_FLAG_BACKWARD);
1191 : }
1192 : }
1193 :
1194 : /*
1195 : * If we are in RANGE or GROUPS mode, then determining frame boundaries
1196 : * requires physical access to the frame endpoint rows, except in certain
1197 : * degenerate cases. We create read pointers to point to those rows, to
1198 : * simplify access and ensure that the tuplestore doesn't discard the
1199 : * endpoint rows prematurely. (Must create pointers in exactly the same
1200 : * cases that update_frameheadpos and update_frametailpos need them.)
1201 : */
1202 1565 : winstate->framehead_ptr = winstate->frametail_ptr = -1; /* if not used */
1203 :
1204 1565 : if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1205 : {
1206 863 : if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
1207 49 : node->ordNumCols != 0) ||
1208 814 : (frameOptions & FRAMEOPTION_START_OFFSET))
1209 489 : winstate->framehead_ptr =
1210 489 : tuplestore_alloc_read_pointer(winstate->buffer, 0);
1211 863 : if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
1212 342 : node->ordNumCols != 0) ||
1213 634 : (frameOptions & FRAMEOPTION_END_OFFSET))
1214 709 : winstate->frametail_ptr =
1215 709 : tuplestore_alloc_read_pointer(winstate->buffer, 0);
1216 : }
1217 :
1218 : /*
1219 : * If we have an exclusion clause that requires knowing the boundaries of
1220 : * the current row's peer group, we create a read pointer to track the
1221 : * tail position of the peer group (i.e., first row of the next peer
1222 : * group). The head position does not require its own pointer because we
1223 : * maintain that as a side effect of advancing the current row.
1224 : */
1225 1565 : winstate->grouptail_ptr = -1;
1226 :
1227 1565 : if ((frameOptions & (FRAMEOPTION_EXCLUDE_GROUP |
1228 120 : FRAMEOPTION_EXCLUDE_TIES)) &&
1229 120 : node->ordNumCols != 0)
1230 : {
1231 112 : winstate->grouptail_ptr =
1232 112 : tuplestore_alloc_read_pointer(winstate->buffer, 0);
1233 : }
1234 1565 : }
1235 :
1236 : /*
1237 : * begin_partition
1238 : * Start buffering rows of the next partition.
1239 : */
1240 : static void
1241 2443 : begin_partition(WindowAggState *winstate)
1242 : {
1243 2443 : PlanState *outerPlan = outerPlanState(winstate);
1244 2443 : int numfuncs = winstate->numfuncs;
1245 :
1246 2443 : winstate->partition_spooled = false;
1247 2443 : winstate->framehead_valid = false;
1248 2443 : winstate->frametail_valid = false;
1249 2443 : winstate->grouptail_valid = false;
1250 2443 : winstate->spooled_rows = 0;
1251 2443 : winstate->currentpos = 0;
1252 2443 : winstate->frameheadpos = 0;
1253 2443 : winstate->frametailpos = 0;
1254 2443 : winstate->currentgroup = 0;
1255 2443 : winstate->frameheadgroup = 0;
1256 2443 : winstate->frametailgroup = 0;
1257 2443 : winstate->groupheadpos = 0;
1258 2443 : winstate->grouptailpos = -1; /* see update_grouptailpos */
1259 2443 : ExecClearTuple(winstate->agg_row_slot);
1260 2443 : if (winstate->framehead_slot)
1261 690 : ExecClearTuple(winstate->framehead_slot);
1262 2443 : if (winstate->frametail_slot)
1263 1158 : ExecClearTuple(winstate->frametail_slot);
1264 :
1265 : /*
1266 : * If this is the very first partition, we need to fetch the first input
1267 : * row to store in first_part_slot.
1268 : */
1269 2443 : if (TupIsNull(winstate->first_part_slot))
1270 : {
1271 1617 : TupleTableSlot *outerslot = ExecProcNode(outerPlan);
1272 :
1273 1617 : if (!TupIsNull(outerslot))
1274 1605 : ExecCopySlot(winstate->first_part_slot, outerslot);
1275 : else
1276 : {
1277 : /* outer plan is empty, so we have nothing to do */
1278 12 : winstate->partition_spooled = true;
1279 12 : winstate->more_partitions = false;
1280 12 : return;
1281 : }
1282 : }
1283 :
1284 : /* Create new tuplestore if not done already. */
1285 2431 : if (unlikely(winstate->buffer == NULL))
1286 1565 : prepare_tuplestore(winstate);
1287 :
1288 2431 : winstate->next_partition = false;
1289 :
1290 2431 : if (winstate->numaggs > 0)
1291 : {
1292 1267 : WindowObject agg_winobj = winstate->agg_winobj;
1293 :
1294 : /* reset mark and see positions for aggregate functions */
1295 1267 : agg_winobj->markpos = -1;
1296 1267 : agg_winobj->seekpos = -1;
1297 :
1298 : /* Also reset the row counters for aggregates */
1299 1267 : winstate->aggregatedbase = 0;
1300 1267 : winstate->aggregatedupto = 0;
1301 : }
1302 :
1303 : /* reset mark and seek positions for each real window function */
1304 5574 : for (int i = 0; i < numfuncs; i++)
1305 : {
1306 3143 : WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1307 :
1308 3143 : if (!perfuncstate->plain_agg)
1309 : {
1310 1744 : WindowObject winobj = perfuncstate->winobj;
1311 :
1312 1744 : winobj->markpos = -1;
1313 1744 : winobj->seekpos = -1;
1314 :
1315 : /* reset null map */
1316 1744 : if (winobj->ignore_nulls == IGNORE_NULLS ||
1317 1724 : winobj->ignore_nulls == PARSER_IGNORE_NULLS)
1318 : {
1319 156 : int numargs = perfuncstate->numArguments;
1320 :
1321 360 : for (int j = 0; j < numargs; j++)
1322 : {
1323 204 : int n = winobj->num_notnull_info[j];
1324 :
1325 204 : if (n > 0)
1326 20 : memset(winobj->notnull_info[j], 0,
1327 20 : NN_POS_TO_BYTES(n));
1328 : }
1329 : }
1330 : }
1331 : }
1332 :
1333 : /*
1334 : * Store the first tuple into the tuplestore (it's always available now;
1335 : * we either read it above, or saved it at the end of previous partition)
1336 : */
1337 2431 : tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
1338 2431 : winstate->spooled_rows++;
1339 : }
1340 :
1341 : /*
1342 : * Read tuples from the outer node, up to and including position 'pos', and
1343 : * store them into the tuplestore. If pos is -1, reads the whole partition.
1344 : */
1345 : static void
1346 1241604 : spool_tuples(WindowAggState *winstate, int64 pos)
1347 : {
1348 1241604 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1349 : PlanState *outerPlan;
1350 : TupleTableSlot *outerslot;
1351 : MemoryContext oldcontext;
1352 :
1353 1241604 : if (!winstate->buffer)
1354 4 : return; /* just a safety check */
1355 1241600 : if (winstate->partition_spooled)
1356 83806 : return; /* whole partition done already */
1357 :
1358 : /*
1359 : * When in pass-through mode we can just exhaust all tuples in the current
1360 : * partition. We don't need these tuples for any further window function
1361 : * evaluation, however, we do need to keep them around if we're not the
1362 : * top-level window as another WindowAgg node above must see these.
1363 : */
1364 1157794 : if (winstate->status != WINDOWAGG_RUN)
1365 : {
1366 : Assert(winstate->status == WINDOWAGG_PASSTHROUGH ||
1367 : winstate->status == WINDOWAGG_PASSTHROUGH_STRICT);
1368 :
1369 20 : pos = -1;
1370 : }
1371 :
1372 : /*
1373 : * If the tuplestore has spilled to disk, alternate reading and writing
1374 : * becomes quite expensive due to frequent buffer flushes. It's cheaper
1375 : * to force the entire partition to get spooled in one go.
1376 : *
1377 : * XXX this is a horrid kluge --- it'd be better to fix the performance
1378 : * problem inside tuplestore. FIXME
1379 : */
1380 1157774 : else if (!tuplestore_in_memory(winstate->buffer))
1381 8 : pos = -1;
1382 :
1383 1157794 : outerPlan = outerPlanState(winstate);
1384 :
1385 : /* Must be in query context to call outerplan */
1386 1157794 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1387 :
1388 2997141 : while (winstate->spooled_rows <= pos || pos == -1)
1389 : {
1390 683852 : outerslot = ExecProcNode(outerPlan);
1391 683852 : if (TupIsNull(outerslot))
1392 : {
1393 : /* reached the end of the last partition */
1394 1473 : winstate->partition_spooled = true;
1395 1473 : winstate->more_partitions = false;
1396 1473 : break;
1397 : }
1398 :
1399 682379 : if (node->partNumCols > 0)
1400 : {
1401 92455 : ExprContext *econtext = winstate->tmpcontext;
1402 :
1403 92455 : econtext->ecxt_innertuple = winstate->first_part_slot;
1404 92455 : econtext->ecxt_outertuple = outerslot;
1405 :
1406 : /* Check if this tuple still belongs to the current partition */
1407 92455 : if (!ExecQualAndReset(winstate->partEqfunction, econtext))
1408 : {
1409 : /*
1410 : * end of partition; copy the tuple for the next cycle.
1411 : */
1412 826 : ExecCopySlot(winstate->first_part_slot, outerslot);
1413 826 : winstate->partition_spooled = true;
1414 826 : winstate->more_partitions = true;
1415 826 : break;
1416 : }
1417 : }
1418 :
1419 : /*
1420 : * Remember the tuple unless we're the top-level window and we're in
1421 : * pass-through mode.
1422 : */
1423 681553 : if (winstate->status != WINDOWAGG_PASSTHROUGH_STRICT)
1424 : {
1425 : /* Still in partition, so save it into the tuplestore */
1426 681545 : tuplestore_puttupleslot(winstate->buffer, outerslot);
1427 681545 : winstate->spooled_rows++;
1428 : }
1429 : }
1430 :
1431 1157794 : MemoryContextSwitchTo(oldcontext);
1432 : }
1433 :
1434 : /*
1435 : * release_partition
1436 : * clear information kept within a partition, including
1437 : * tuplestore and aggregate results.
1438 : */
1439 : static void
1440 4113 : release_partition(WindowAggState *winstate)
1441 : {
1442 : int i;
1443 :
1444 9390 : for (i = 0; i < winstate->numfuncs; i++)
1445 : {
1446 5277 : WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1447 :
1448 : /* Release any partition-local state of this window function */
1449 5277 : if (perfuncstate->winobj)
1450 2779 : perfuncstate->winobj->localmem = NULL;
1451 : }
1452 :
1453 : /*
1454 : * Release all partition-local memory (in particular, any partition-local
1455 : * state that we might have trashed our pointers to in the above loop, and
1456 : * any aggregate temp data). We don't rely on retail pfree because some
1457 : * aggregates might have allocated data we don't have direct pointers to.
1458 : */
1459 4113 : MemoryContextReset(winstate->partcontext);
1460 4113 : MemoryContextReset(winstate->aggcontext);
1461 6611 : for (i = 0; i < winstate->numaggs; i++)
1462 : {
1463 2498 : if (winstate->peragg[i].aggcontext != winstate->aggcontext)
1464 1320 : MemoryContextReset(winstate->peragg[i].aggcontext);
1465 : }
1466 :
1467 4113 : if (winstate->buffer)
1468 2319 : tuplestore_clear(winstate->buffer);
1469 4113 : winstate->partition_spooled = false;
1470 4113 : winstate->next_partition = true;
1471 4113 : }
1472 :
1473 : /*
1474 : * row_is_in_frame
1475 : * Determine whether a row is in the current row's window frame according
1476 : * to our window framing rule
1477 : *
1478 : * The caller must have already determined that the row is in the partition
1479 : * and fetched it into a slot if fetch_tuple is false.
1480 : * This function just encapsulates the framing rules.
1481 : *
1482 : * Returns:
1483 : * -1, if the row is out of frame and no succeeding rows can be in frame
1484 : * 0, if the row is out of frame but succeeding rows might be in frame
1485 : * 1, if the row is in frame
1486 : *
1487 : * May clobber winstate->temp_slot_2.
1488 : */
1489 : static int
1490 128832 : row_is_in_frame(WindowObject winobj, int64 pos, TupleTableSlot *slot,
1491 : bool fetch_tuple)
1492 : {
1493 128832 : WindowAggState *winstate = winobj->winstate;
1494 128832 : int frameOptions = winstate->frameOptions;
1495 :
1496 : Assert(pos >= 0); /* else caller error */
1497 :
1498 : /*
1499 : * First, check frame starting conditions. We might as well delegate this
1500 : * to update_frameheadpos always; it doesn't add any notable cost.
1501 : */
1502 128832 : update_frameheadpos(winstate);
1503 128832 : if (pos < winstate->frameheadpos)
1504 96 : return 0;
1505 :
1506 : /*
1507 : * Okay so far, now check frame ending conditions. Here, we avoid calling
1508 : * update_frametailpos in simple cases, so as not to spool tuples further
1509 : * ahead than necessary.
1510 : */
1511 128736 : if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1512 : {
1513 105635 : if (frameOptions & FRAMEOPTION_ROWS)
1514 : {
1515 : /* rows after current row are out of frame */
1516 1544 : if (pos > winstate->currentpos)
1517 656 : return -1;
1518 : }
1519 104091 : else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1520 : {
1521 : /* following row that is not peer is out of frame */
1522 104091 : if (pos > winstate->currentpos)
1523 : {
1524 101766 : if (fetch_tuple) /* need to fetch tuple? */
1525 0 : if (!window_gettupleslot(winobj, pos, slot))
1526 0 : return -1;
1527 101766 : if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1528 912 : return -1;
1529 : }
1530 : }
1531 : else
1532 : Assert(false);
1533 : }
1534 23101 : else if (frameOptions & FRAMEOPTION_END_OFFSET)
1535 : {
1536 13572 : if (frameOptions & FRAMEOPTION_ROWS)
1537 : {
1538 4208 : int64 offset = DatumGetInt64(winstate->endOffsetValue);
1539 4208 : int64 frameendpos = 0;
1540 :
1541 : /* rows after current row + offset are out of frame */
1542 4208 : if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1543 76 : offset = -offset;
1544 :
1545 : /*
1546 : * If we have an overflow, it means the frame end is beyond the
1547 : * range of int64. Since currentpos >= 0, this can only be a
1548 : * positive overflow. We treat this as meaning that the frame
1549 : * extends to end of partition.
1550 : */
1551 4208 : if (!pg_add_s64_overflow(winstate->currentpos, offset,
1552 3996 : &frameendpos) &&
1553 3996 : pos > frameendpos)
1554 804 : return -1;
1555 : }
1556 9364 : else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1557 : {
1558 : /* hard cases, so delegate to update_frametailpos */
1559 9364 : update_frametailpos(winstate);
1560 9336 : if (pos >= winstate->frametailpos)
1561 980 : return -1;
1562 : }
1563 : else
1564 : Assert(false);
1565 : }
1566 :
1567 : /* Check exclusion clause */
1568 125356 : if (frameOptions & FRAMEOPTION_EXCLUDE_CURRENT_ROW)
1569 : {
1570 2220 : if (pos == winstate->currentpos)
1571 372 : return 0;
1572 : }
1573 123136 : else if ((frameOptions & FRAMEOPTION_EXCLUDE_GROUP) ||
1574 121228 : ((frameOptions & FRAMEOPTION_EXCLUDE_TIES) &&
1575 1980 : pos != winstate->currentpos))
1576 : {
1577 3528 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1578 :
1579 : /* If no ORDER BY, all rows are peers with each other */
1580 3528 : if (node->ordNumCols == 0)
1581 312 : return 0;
1582 : /* Otherwise, check the group boundaries */
1583 3216 : if (pos >= winstate->groupheadpos)
1584 : {
1585 1728 : update_grouptailpos(winstate);
1586 1728 : if (pos < winstate->grouptailpos)
1587 672 : return 0;
1588 : }
1589 : }
1590 :
1591 : /* If we get here, it's in frame */
1592 124000 : return 1;
1593 : }
1594 :
1595 : /*
1596 : * update_frameheadpos
1597 : * make frameheadpos valid for the current row
1598 : *
1599 : * Note that frameheadpos is computed without regard for any window exclusion
1600 : * clause; the current row and/or its peers are considered part of the frame
1601 : * for this purpose even if they must be excluded later.
1602 : *
1603 : * May clobber winstate->temp_slot_2.
1604 : */
1605 : static void
1606 243935 : update_frameheadpos(WindowAggState *winstate)
1607 : {
1608 243935 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1609 243935 : int frameOptions = winstate->frameOptions;
1610 : MemoryContext oldcontext;
1611 :
1612 243935 : if (winstate->framehead_valid)
1613 133234 : return; /* already known for current row */
1614 :
1615 : /* We may be called in a short-lived context */
1616 110701 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1617 :
1618 110701 : if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
1619 : {
1620 : /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
1621 103371 : winstate->frameheadpos = 0;
1622 103371 : winstate->framehead_valid = true;
1623 : }
1624 7330 : else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1625 : {
1626 1986 : if (frameOptions & FRAMEOPTION_ROWS)
1627 : {
1628 : /* In ROWS mode, frame head is the same as current */
1629 1664 : winstate->frameheadpos = winstate->currentpos;
1630 1664 : winstate->framehead_valid = true;
1631 : }
1632 322 : else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1633 : {
1634 : /* If no ORDER BY, all rows are peers with each other */
1635 322 : if (node->ordNumCols == 0)
1636 : {
1637 0 : winstate->frameheadpos = 0;
1638 0 : winstate->framehead_valid = true;
1639 0 : MemoryContextSwitchTo(oldcontext);
1640 0 : return;
1641 : }
1642 :
1643 : /*
1644 : * In RANGE or GROUPS START_CURRENT_ROW mode, frame head is the
1645 : * first row that is a peer of current row. We keep a copy of the
1646 : * last-known frame head row in framehead_slot, and advance as
1647 : * necessary. Note that if we reach end of partition, we will
1648 : * leave frameheadpos = end+1 and framehead_slot empty.
1649 : */
1650 322 : tuplestore_select_read_pointer(winstate->buffer,
1651 : winstate->framehead_ptr);
1652 322 : if (winstate->frameheadpos == 0 &&
1653 156 : TupIsNull(winstate->framehead_slot))
1654 : {
1655 : /* fetch first row into framehead_slot, if we didn't already */
1656 58 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1657 : winstate->framehead_slot))
1658 0 : elog(ERROR, "unexpected end of tuplestore");
1659 : }
1660 :
1661 562 : while (!TupIsNull(winstate->framehead_slot))
1662 : {
1663 562 : if (are_peers(winstate, winstate->framehead_slot,
1664 : winstate->ss.ss_ScanTupleSlot))
1665 322 : break; /* this row is the correct frame head */
1666 : /* Note we advance frameheadpos even if the fetch fails */
1667 240 : winstate->frameheadpos++;
1668 240 : spool_tuples(winstate, winstate->frameheadpos);
1669 240 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1670 : winstate->framehead_slot))
1671 0 : break; /* end of partition */
1672 : }
1673 322 : winstate->framehead_valid = true;
1674 : }
1675 : else
1676 : Assert(false);
1677 : }
1678 5344 : else if (frameOptions & FRAMEOPTION_START_OFFSET)
1679 : {
1680 5344 : if (frameOptions & FRAMEOPTION_ROWS)
1681 : {
1682 : /* In ROWS mode, bound is physically n before/after current */
1683 1368 : int64 offset = DatumGetInt64(winstate->startOffsetValue);
1684 :
1685 1368 : if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1686 1288 : offset = -offset;
1687 :
1688 : /*
1689 : * If we have an overflow, it means the frame head is beyond the
1690 : * range of int64. Since currentpos >= 0, this can only be a
1691 : * positive overflow. We treat this as being beyond end of
1692 : * partition.
1693 : */
1694 1368 : if (pg_add_s64_overflow(winstate->currentpos, offset,
1695 : &winstate->frameheadpos))
1696 36 : winstate->frameheadpos = PG_INT64_MAX;
1697 :
1698 : /* frame head can't go before first row */
1699 1368 : if (winstate->frameheadpos < 0)
1700 224 : winstate->frameheadpos = 0;
1701 1144 : else if (winstate->frameheadpos > winstate->currentpos + 1)
1702 : {
1703 : /* make sure frameheadpos is not past end of partition */
1704 40 : spool_tuples(winstate, winstate->frameheadpos - 1);
1705 40 : if (winstate->frameheadpos > winstate->spooled_rows)
1706 40 : winstate->frameheadpos = winstate->spooled_rows;
1707 : }
1708 1368 : winstate->framehead_valid = true;
1709 : }
1710 3976 : else if (frameOptions & FRAMEOPTION_RANGE)
1711 : {
1712 : /*
1713 : * In RANGE START_OFFSET mode, frame head is the first row that
1714 : * satisfies the in_range constraint relative to the current row.
1715 : * We keep a copy of the last-known frame head row in
1716 : * framehead_slot, and advance as necessary. Note that if we
1717 : * reach end of partition, we will leave frameheadpos = end+1 and
1718 : * framehead_slot empty.
1719 : */
1720 3016 : int sortCol = node->ordColIdx[0];
1721 : bool sub,
1722 : less;
1723 :
1724 : /* We must have an ordering column */
1725 : Assert(node->ordNumCols == 1);
1726 :
1727 : /* Precompute flags for in_range checks */
1728 3016 : if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1729 2468 : sub = true; /* subtract startOffset from current row */
1730 : else
1731 548 : sub = false; /* add it */
1732 3016 : less = false; /* normally, we want frame head >= sum */
1733 : /* If sort order is descending, flip both flags */
1734 3016 : if (!winstate->inRangeAsc)
1735 : {
1736 436 : sub = !sub;
1737 436 : less = true;
1738 : }
1739 :
1740 3016 : tuplestore_select_read_pointer(winstate->buffer,
1741 : winstate->framehead_ptr);
1742 3016 : if (winstate->frameheadpos == 0 &&
1743 1668 : TupIsNull(winstate->framehead_slot))
1744 : {
1745 : /* fetch first row into framehead_slot, if we didn't already */
1746 380 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1747 : winstate->framehead_slot))
1748 0 : elog(ERROR, "unexpected end of tuplestore");
1749 : }
1750 :
1751 4844 : while (!TupIsNull(winstate->framehead_slot))
1752 : {
1753 : Datum headval,
1754 : currval;
1755 : bool headisnull,
1756 : currisnull;
1757 :
1758 4708 : headval = slot_getattr(winstate->framehead_slot, sortCol,
1759 : &headisnull);
1760 4708 : currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
1761 : &currisnull);
1762 4708 : if (headisnull || currisnull)
1763 : {
1764 : /* order of the rows depends only on nulls_first */
1765 72 : if (winstate->inRangeNullsFirst)
1766 : {
1767 : /* advance head if head is null and curr is not */
1768 32 : if (!headisnull || currisnull)
1769 : break;
1770 : }
1771 : else
1772 : {
1773 : /* advance head if head is not null and curr is null */
1774 40 : if (headisnull || !currisnull)
1775 : break;
1776 : }
1777 : }
1778 : else
1779 : {
1780 4636 : if (DatumGetBool(FunctionCall5Coll(&winstate->startInRangeFunc,
1781 : winstate->inRangeColl,
1782 : headval,
1783 : currval,
1784 : winstate->startOffsetValue,
1785 : BoolGetDatum(sub),
1786 : BoolGetDatum(less))))
1787 2780 : break; /* this row is the correct frame head */
1788 : }
1789 : /* Note we advance frameheadpos even if the fetch fails */
1790 1864 : winstate->frameheadpos++;
1791 1864 : spool_tuples(winstate, winstate->frameheadpos);
1792 1864 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1793 : winstate->framehead_slot))
1794 36 : break; /* end of partition */
1795 : }
1796 2984 : winstate->framehead_valid = true;
1797 : }
1798 960 : else if (frameOptions & FRAMEOPTION_GROUPS)
1799 : {
1800 : /*
1801 : * In GROUPS START_OFFSET mode, frame head is the first row of the
1802 : * first peer group whose number satisfies the offset constraint.
1803 : * We keep a copy of the last-known frame head row in
1804 : * framehead_slot, and advance as necessary. Note that if we
1805 : * reach end of partition, we will leave frameheadpos = end+1 and
1806 : * framehead_slot empty.
1807 : */
1808 960 : int64 offset = DatumGetInt64(winstate->startOffsetValue);
1809 960 : int64 minheadgroup = 0;
1810 :
1811 960 : if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1812 752 : minheadgroup = winstate->currentgroup - offset;
1813 : else
1814 : {
1815 : /*
1816 : * If we have an overflow, it means the target group is beyond
1817 : * the range of int64. We treat this as "infinity", which
1818 : * ensures the loop below advances to end of partition.
1819 : */
1820 208 : if (pg_add_s64_overflow(winstate->currentgroup, offset,
1821 : &minheadgroup))
1822 28 : minheadgroup = PG_INT64_MAX;
1823 : }
1824 :
1825 960 : tuplestore_select_read_pointer(winstate->buffer,
1826 : winstate->framehead_ptr);
1827 960 : if (winstate->frameheadpos == 0 &&
1828 504 : TupIsNull(winstate->framehead_slot))
1829 : {
1830 : /* fetch first row into framehead_slot, if we didn't already */
1831 252 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1832 : winstate->framehead_slot))
1833 0 : elog(ERROR, "unexpected end of tuplestore");
1834 : }
1835 :
1836 2464 : while (!TupIsNull(winstate->framehead_slot))
1837 : {
1838 1452 : if (winstate->frameheadgroup >= minheadgroup)
1839 880 : break; /* this row is the correct frame head */
1840 572 : ExecCopySlot(winstate->temp_slot_2, winstate->framehead_slot);
1841 : /* Note we advance frameheadpos even if the fetch fails */
1842 572 : winstate->frameheadpos++;
1843 572 : spool_tuples(winstate, winstate->frameheadpos);
1844 572 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1845 : winstate->framehead_slot))
1846 28 : break; /* end of partition */
1847 544 : if (!are_peers(winstate, winstate->temp_slot_2,
1848 : winstate->framehead_slot))
1849 360 : winstate->frameheadgroup++;
1850 : }
1851 960 : ExecClearTuple(winstate->temp_slot_2);
1852 960 : winstate->framehead_valid = true;
1853 : }
1854 : else
1855 : Assert(false);
1856 : }
1857 : else
1858 : Assert(false);
1859 :
1860 110669 : MemoryContextSwitchTo(oldcontext);
1861 : }
1862 :
1863 : /*
1864 : * update_frametailpos
1865 : * make frametailpos valid for the current row
1866 : *
1867 : * Note that frametailpos is computed without regard for any window exclusion
1868 : * clause; the current row and/or its peers are considered part of the frame
1869 : * for this purpose even if they must be excluded later.
1870 : *
1871 : * May clobber winstate->temp_slot_2.
1872 : */
1873 : static void
1874 135378 : update_frametailpos(WindowAggState *winstate)
1875 : {
1876 135378 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1877 135378 : int frameOptions = winstate->frameOptions;
1878 : MemoryContext oldcontext;
1879 :
1880 135378 : if (winstate->frametail_valid)
1881 12000 : return; /* already known for current row */
1882 :
1883 : /* We may be called in a short-lived context */
1884 123378 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1885 :
1886 123378 : if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1887 : {
1888 : /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1889 160 : spool_tuples(winstate, -1);
1890 160 : winstate->frametailpos = winstate->spooled_rows;
1891 160 : winstate->frametail_valid = true;
1892 : }
1893 123218 : else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1894 : {
1895 118722 : if (frameOptions & FRAMEOPTION_ROWS)
1896 : {
1897 : /* In ROWS mode, exactly the rows up to current are in frame */
1898 80 : winstate->frametailpos = winstate->currentpos + 1;
1899 80 : winstate->frametail_valid = true;
1900 : }
1901 118642 : else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1902 : {
1903 : /* If no ORDER BY, all rows are peers with each other */
1904 118642 : if (node->ordNumCols == 0)
1905 : {
1906 40 : spool_tuples(winstate, -1);
1907 40 : winstate->frametailpos = winstate->spooled_rows;
1908 40 : winstate->frametail_valid = true;
1909 40 : MemoryContextSwitchTo(oldcontext);
1910 40 : return;
1911 : }
1912 :
1913 : /*
1914 : * In RANGE or GROUPS END_CURRENT_ROW mode, frame end is the last
1915 : * row that is a peer of current row, frame tail is the row after
1916 : * that (if any). We keep a copy of the last-known frame tail row
1917 : * in frametail_slot, and advance as necessary. Note that if we
1918 : * reach end of partition, we will leave frametailpos = end+1 and
1919 : * frametail_slot empty.
1920 : */
1921 118602 : tuplestore_select_read_pointer(winstate->buffer,
1922 : winstate->frametail_ptr);
1923 118602 : if (winstate->frametailpos == 0 &&
1924 474 : TupIsNull(winstate->frametail_slot))
1925 : {
1926 : /* fetch first row into frametail_slot, if we didn't already */
1927 474 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1928 : winstate->frametail_slot))
1929 0 : elog(ERROR, "unexpected end of tuplestore");
1930 : }
1931 :
1932 236738 : while (!TupIsNull(winstate->frametail_slot))
1933 : {
1934 220658 : if (winstate->frametailpos > winstate->currentpos &&
1935 182296 : !are_peers(winstate, winstate->frametail_slot,
1936 : winstate->ss.ss_ScanTupleSlot))
1937 102056 : break; /* this row is the frame tail */
1938 : /* Note we advance frametailpos even if the fetch fails */
1939 118602 : winstate->frametailpos++;
1940 118602 : spool_tuples(winstate, winstate->frametailpos);
1941 118602 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1942 : winstate->frametail_slot))
1943 466 : break; /* end of partition */
1944 : }
1945 118602 : winstate->frametail_valid = true;
1946 : }
1947 : else
1948 : Assert(false);
1949 : }
1950 4496 : else if (frameOptions & FRAMEOPTION_END_OFFSET)
1951 : {
1952 4496 : if (frameOptions & FRAMEOPTION_ROWS)
1953 : {
1954 : /* In ROWS mode, bound is physically n before/after current */
1955 320 : int64 offset = DatumGetInt64(winstate->endOffsetValue);
1956 :
1957 320 : if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1958 0 : offset = -offset;
1959 :
1960 : /*
1961 : * If we have an overflow, it means the frame tail is beyond the
1962 : * range of int64. Since currentpos >= 0, this can only be a
1963 : * positive overflow. We treat this as being beyond end of
1964 : * partition.
1965 : */
1966 320 : if (pg_add_s64_overflow(winstate->currentpos, offset,
1967 284 : &winstate->frametailpos) ||
1968 284 : pg_add_s64_overflow(winstate->frametailpos, 1,
1969 : &winstate->frametailpos))
1970 40 : winstate->frametailpos = PG_INT64_MAX;
1971 :
1972 : /* smallest allowable value of frametailpos is 0 */
1973 320 : if (winstate->frametailpos < 0)
1974 0 : winstate->frametailpos = 0;
1975 320 : else if (winstate->frametailpos > winstate->currentpos + 1)
1976 : {
1977 : /* make sure frametailpos is not past end of partition */
1978 320 : spool_tuples(winstate, winstate->frametailpos - 1);
1979 320 : if (winstate->frametailpos > winstate->spooled_rows)
1980 104 : winstate->frametailpos = winstate->spooled_rows;
1981 : }
1982 320 : winstate->frametail_valid = true;
1983 : }
1984 4176 : else if (frameOptions & FRAMEOPTION_RANGE)
1985 : {
1986 : /*
1987 : * In RANGE END_OFFSET mode, frame end is the last row that
1988 : * satisfies the in_range constraint relative to the current row,
1989 : * frame tail is the row after that (if any). We keep a copy of
1990 : * the last-known frame tail row in frametail_slot, and advance as
1991 : * necessary. Note that if we reach end of partition, we will
1992 : * leave frametailpos = end+1 and frametail_slot empty.
1993 : */
1994 3256 : int sortCol = node->ordColIdx[0];
1995 : bool sub,
1996 : less;
1997 :
1998 : /* We must have an ordering column */
1999 : Assert(node->ordNumCols == 1);
2000 :
2001 : /* Precompute flags for in_range checks */
2002 3256 : if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
2003 608 : sub = true; /* subtract endOffset from current row */
2004 : else
2005 2648 : sub = false; /* add it */
2006 3256 : less = true; /* normally, we want frame tail <= sum */
2007 : /* If sort order is descending, flip both flags */
2008 3256 : if (!winstate->inRangeAsc)
2009 : {
2010 460 : sub = !sub;
2011 460 : less = false;
2012 : }
2013 :
2014 3256 : tuplestore_select_read_pointer(winstate->buffer,
2015 : winstate->frametail_ptr);
2016 3256 : if (winstate->frametailpos == 0 &&
2017 548 : TupIsNull(winstate->frametail_slot))
2018 : {
2019 : /* fetch first row into frametail_slot, if we didn't already */
2020 392 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2021 : winstate->frametail_slot))
2022 0 : elog(ERROR, "unexpected end of tuplestore");
2023 : }
2024 :
2025 6004 : while (!TupIsNull(winstate->frametail_slot))
2026 : {
2027 : Datum tailval,
2028 : currval;
2029 : bool tailisnull,
2030 : currisnull;
2031 :
2032 4960 : tailval = slot_getattr(winstate->frametail_slot, sortCol,
2033 : &tailisnull);
2034 4960 : currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
2035 : &currisnull);
2036 4960 : if (tailisnull || currisnull)
2037 : {
2038 : /* order of the rows depends only on nulls_first */
2039 72 : if (winstate->inRangeNullsFirst)
2040 : {
2041 : /* advance tail if tail is null or curr is not */
2042 32 : if (!tailisnull)
2043 2180 : break;
2044 : }
2045 : else
2046 : {
2047 : /* advance tail if tail is not null or curr is null */
2048 40 : if (!currisnull)
2049 24 : break;
2050 : }
2051 : }
2052 : else
2053 : {
2054 4888 : if (!DatumGetBool(FunctionCall5Coll(&winstate->endInRangeFunc,
2055 : winstate->inRangeColl,
2056 : tailval,
2057 : currval,
2058 : winstate->endOffsetValue,
2059 : BoolGetDatum(sub),
2060 : BoolGetDatum(less))))
2061 1820 : break; /* this row is the correct frame tail */
2062 : }
2063 : /* Note we advance frametailpos even if the fetch fails */
2064 3068 : winstate->frametailpos++;
2065 3068 : spool_tuples(winstate, winstate->frametailpos);
2066 3068 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2067 : winstate->frametail_slot))
2068 320 : break; /* end of partition */
2069 : }
2070 3224 : winstate->frametail_valid = true;
2071 : }
2072 920 : else if (frameOptions & FRAMEOPTION_GROUPS)
2073 : {
2074 : /*
2075 : * In GROUPS END_OFFSET mode, frame end is the last row of the
2076 : * last peer group whose number satisfies the offset constraint,
2077 : * and frame tail is the row after that (if any). We keep a copy
2078 : * of the last-known frame tail row in frametail_slot, and advance
2079 : * as necessary. Note that if we reach end of partition, we will
2080 : * leave frametailpos = end+1 and frametail_slot empty.
2081 : */
2082 920 : int64 offset = DatumGetInt64(winstate->endOffsetValue);
2083 920 : int64 maxtailgroup = 0;
2084 :
2085 920 : if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
2086 48 : maxtailgroup = winstate->currentgroup - offset;
2087 : else
2088 : {
2089 : /*
2090 : * If we have an overflow, it means the target group is beyond
2091 : * the range of int64. We treat this as "infinity", which
2092 : * ensures the loop below advances to end of partition.
2093 : */
2094 872 : if (pg_add_s64_overflow(winstate->currentgroup, offset,
2095 : &maxtailgroup))
2096 28 : maxtailgroup = PG_INT64_MAX;
2097 : }
2098 :
2099 920 : tuplestore_select_read_pointer(winstate->buffer,
2100 : winstate->frametail_ptr);
2101 920 : if (winstate->frametailpos == 0 &&
2102 260 : TupIsNull(winstate->frametail_slot))
2103 : {
2104 : /* fetch first row into frametail_slot, if we didn't already */
2105 248 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2106 : winstate->frametail_slot))
2107 0 : elog(ERROR, "unexpected end of tuplestore");
2108 : }
2109 :
2110 2508 : while (!TupIsNull(winstate->frametail_slot))
2111 : {
2112 1400 : if (winstate->frametailgroup > maxtailgroup)
2113 496 : break; /* this row is the correct frame tail */
2114 904 : ExecCopySlot(winstate->temp_slot_2, winstate->frametail_slot);
2115 : /* Note we advance frametailpos even if the fetch fails */
2116 904 : winstate->frametailpos++;
2117 904 : spool_tuples(winstate, winstate->frametailpos);
2118 904 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2119 : winstate->frametail_slot))
2120 236 : break; /* end of partition */
2121 668 : if (!are_peers(winstate, winstate->temp_slot_2,
2122 : winstate->frametail_slot))
2123 412 : winstate->frametailgroup++;
2124 : }
2125 920 : ExecClearTuple(winstate->temp_slot_2);
2126 920 : winstate->frametail_valid = true;
2127 : }
2128 : else
2129 : Assert(false);
2130 : }
2131 : else
2132 : Assert(false);
2133 :
2134 123306 : MemoryContextSwitchTo(oldcontext);
2135 : }
2136 :
2137 : /*
2138 : * update_grouptailpos
2139 : * make grouptailpos valid for the current row
2140 : *
2141 : * May clobber winstate->temp_slot_2.
2142 : */
2143 : static void
2144 3248 : update_grouptailpos(WindowAggState *winstate)
2145 : {
2146 3248 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
2147 : MemoryContext oldcontext;
2148 :
2149 3248 : if (winstate->grouptail_valid)
2150 2636 : return; /* already known for current row */
2151 :
2152 : /* We may be called in a short-lived context */
2153 612 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
2154 :
2155 : /* If no ORDER BY, all rows are peers with each other */
2156 612 : if (node->ordNumCols == 0)
2157 : {
2158 0 : spool_tuples(winstate, -1);
2159 0 : winstate->grouptailpos = winstate->spooled_rows;
2160 0 : winstate->grouptail_valid = true;
2161 0 : MemoryContextSwitchTo(oldcontext);
2162 0 : return;
2163 : }
2164 :
2165 : /*
2166 : * Because grouptail_valid is reset only when current row advances into a
2167 : * new peer group, we always reach here knowing that grouptailpos needs to
2168 : * be advanced by at least one row. Hence, unlike the otherwise similar
2169 : * case for frame tail tracking, we do not need persistent storage of the
2170 : * group tail row.
2171 : */
2172 : Assert(winstate->grouptailpos <= winstate->currentpos);
2173 612 : tuplestore_select_read_pointer(winstate->buffer,
2174 : winstate->grouptail_ptr);
2175 : for (;;)
2176 : {
2177 : /* Note we advance grouptailpos even if the fetch fails */
2178 1172 : winstate->grouptailpos++;
2179 1172 : spool_tuples(winstate, winstate->grouptailpos);
2180 1172 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2181 : winstate->temp_slot_2))
2182 172 : break; /* end of partition */
2183 1000 : if (winstate->grouptailpos > winstate->currentpos &&
2184 828 : !are_peers(winstate, winstate->temp_slot_2,
2185 : winstate->ss.ss_ScanTupleSlot))
2186 440 : break; /* this row is the group tail */
2187 : }
2188 612 : ExecClearTuple(winstate->temp_slot_2);
2189 612 : winstate->grouptail_valid = true;
2190 :
2191 612 : MemoryContextSwitchTo(oldcontext);
2192 : }
2193 :
2194 : /*
2195 : * calculate_frame_offsets
2196 : * Determine the startOffsetValue and endOffsetValue values for the
2197 : * WindowAgg's frame options.
2198 : */
2199 : static pg_noinline void
2200 1617 : calculate_frame_offsets(PlanState *pstate)
2201 : {
2202 1617 : WindowAggState *winstate = castNode(WindowAggState, pstate);
2203 : ExprContext *econtext;
2204 1617 : int frameOptions = winstate->frameOptions;
2205 : Datum value;
2206 : bool isnull;
2207 : int16 len;
2208 : bool byval;
2209 :
2210 : /* Ensure we've not been called before for this scan */
2211 : Assert(winstate->all_first);
2212 :
2213 1617 : econtext = winstate->ss.ps.ps_ExprContext;
2214 :
2215 1617 : if (frameOptions & FRAMEOPTION_START_OFFSET)
2216 : {
2217 : Assert(winstate->startOffset != NULL);
2218 584 : value = ExecEvalExprSwitchContext(winstate->startOffset,
2219 : econtext,
2220 : &isnull);
2221 584 : if (isnull)
2222 0 : ereport(ERROR,
2223 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2224 : errmsg("frame starting offset must not be null")));
2225 : /* copy value into query-lifespan context */
2226 584 : get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
2227 : &len,
2228 : &byval);
2229 584 : winstate->startOffsetValue = datumCopy(value, byval, len);
2230 584 : if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2231 : {
2232 : /* value is known to be int8 */
2233 240 : int64 offset = DatumGetInt64(value);
2234 :
2235 240 : if (offset < 0)
2236 0 : ereport(ERROR,
2237 : (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2238 : errmsg("frame starting offset must not be negative")));
2239 : }
2240 : }
2241 :
2242 1617 : if (frameOptions & FRAMEOPTION_END_OFFSET)
2243 : {
2244 : Assert(winstate->endOffset != NULL);
2245 656 : value = ExecEvalExprSwitchContext(winstate->endOffset,
2246 : econtext,
2247 : &isnull);
2248 656 : if (isnull)
2249 0 : ereport(ERROR,
2250 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2251 : errmsg("frame ending offset must not be null")));
2252 : /* copy value into query-lifespan context */
2253 656 : get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
2254 : &len,
2255 : &byval);
2256 656 : winstate->endOffsetValue = datumCopy(value, byval, len);
2257 656 : if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2258 : {
2259 : /* value is known to be int8 */
2260 268 : int64 offset = DatumGetInt64(value);
2261 :
2262 268 : if (offset < 0)
2263 0 : ereport(ERROR,
2264 : (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2265 : errmsg("frame ending offset must not be negative")));
2266 : }
2267 : }
2268 1617 : winstate->all_first = false;
2269 1617 : }
2270 :
2271 : /* -----------------
2272 : * ExecWindowAgg
2273 : *
2274 : * ExecWindowAgg receives tuples from its outer subplan and
2275 : * stores them into a tuplestore, then processes window functions.
2276 : * This node doesn't reduce nor qualify any row so the number of
2277 : * returned rows is exactly the same as its outer subplan's result.
2278 : * -----------------
2279 : */
2280 : static TupleTableSlot *
2281 605129 : ExecWindowAgg(PlanState *pstate)
2282 : {
2283 605129 : WindowAggState *winstate = castNode(WindowAggState, pstate);
2284 : TupleTableSlot *slot;
2285 : ExprContext *econtext;
2286 : int i;
2287 : int numfuncs;
2288 :
2289 605129 : CHECK_FOR_INTERRUPTS();
2290 :
2291 605129 : if (winstate->status == WINDOWAGG_DONE)
2292 0 : return NULL;
2293 :
2294 : /*
2295 : * Compute frame offset values, if any, during first call (or after a
2296 : * rescan). These are assumed to hold constant throughout the scan; if
2297 : * user gives us a volatile expression, we'll only use its initial value.
2298 : */
2299 605129 : if (unlikely(winstate->all_first))
2300 1617 : calculate_frame_offsets(pstate);
2301 :
2302 : /* We need to loop as the runCondition or qual may filter out tuples */
2303 : for (;;)
2304 : {
2305 605217 : if (winstate->next_partition)
2306 : {
2307 : /* Initialize for first partition and set current row = 0 */
2308 1617 : begin_partition(winstate);
2309 : /* If there are no input rows, we'll detect that and exit below */
2310 : }
2311 : else
2312 : {
2313 : /* Advance current row within partition */
2314 603600 : winstate->currentpos++;
2315 : /* This might mean that the frame moves, too */
2316 603600 : winstate->framehead_valid = false;
2317 603600 : winstate->frametail_valid = false;
2318 : /* we don't need to invalidate grouptail here; see below */
2319 : }
2320 :
2321 : /*
2322 : * Spool all tuples up to and including the current row, if we haven't
2323 : * already
2324 : */
2325 605217 : spool_tuples(winstate, winstate->currentpos);
2326 :
2327 : /* Move to the next partition if we reached the end of this partition */
2328 605217 : if (winstate->partition_spooled &&
2329 42229 : winstate->currentpos >= winstate->spooled_rows)
2330 : {
2331 2275 : release_partition(winstate);
2332 :
2333 2275 : if (winstate->more_partitions)
2334 : {
2335 826 : begin_partition(winstate);
2336 : Assert(winstate->spooled_rows > 0);
2337 :
2338 : /* Come out of pass-through mode when changing partition */
2339 826 : winstate->status = WINDOWAGG_RUN;
2340 : }
2341 : else
2342 : {
2343 : /* No further partitions? We're done */
2344 1449 : winstate->status = WINDOWAGG_DONE;
2345 1449 : return NULL;
2346 : }
2347 : }
2348 :
2349 : /* final output execution is in ps_ExprContext */
2350 603768 : econtext = winstate->ss.ps.ps_ExprContext;
2351 :
2352 : /* Clear the per-output-tuple context for current row */
2353 603768 : ResetExprContext(econtext);
2354 :
2355 : /*
2356 : * Read the current row from the tuplestore, and save in
2357 : * ScanTupleSlot. (We can't rely on the outerplan's output slot
2358 : * because we may have to read beyond the current row. Also, we have
2359 : * to actually copy the row out of the tuplestore, since window
2360 : * function evaluation might cause the tuplestore to dump its state to
2361 : * disk.)
2362 : *
2363 : * In GROUPS mode, or when tracking a group-oriented exclusion clause,
2364 : * we must also detect entering a new peer group and update associated
2365 : * state when that happens. We use temp_slot_2 to temporarily hold
2366 : * the previous row for this purpose.
2367 : *
2368 : * Current row must be in the tuplestore, since we spooled it above.
2369 : */
2370 603768 : tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
2371 603768 : if ((winstate->frameOptions & (FRAMEOPTION_GROUPS |
2372 : FRAMEOPTION_EXCLUDE_GROUP |
2373 2012 : FRAMEOPTION_EXCLUDE_TIES)) &&
2374 2012 : winstate->currentpos > 0)
2375 : {
2376 1644 : ExecCopySlot(winstate->temp_slot_2, winstate->ss.ss_ScanTupleSlot);
2377 1644 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2378 : winstate->ss.ss_ScanTupleSlot))
2379 0 : elog(ERROR, "unexpected end of tuplestore");
2380 1644 : if (!are_peers(winstate, winstate->temp_slot_2,
2381 : winstate->ss.ss_ScanTupleSlot))
2382 : {
2383 852 : winstate->currentgroup++;
2384 852 : winstate->groupheadpos = winstate->currentpos;
2385 852 : winstate->grouptail_valid = false;
2386 : }
2387 1644 : ExecClearTuple(winstate->temp_slot_2);
2388 : }
2389 : else
2390 : {
2391 602124 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2392 : winstate->ss.ss_ScanTupleSlot))
2393 0 : elog(ERROR, "unexpected end of tuplestore");
2394 : }
2395 :
2396 : /* don't evaluate the window functions when we're in pass-through mode */
2397 603768 : if (winstate->status == WINDOWAGG_RUN)
2398 : {
2399 : /*
2400 : * Evaluate true window functions
2401 : */
2402 603724 : numfuncs = winstate->numfuncs;
2403 1292592 : for (i = 0; i < numfuncs; i++)
2404 : {
2405 688976 : WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
2406 :
2407 688976 : if (perfuncstate->plain_agg)
2408 108261 : continue;
2409 580715 : eval_windowfunction(winstate, perfuncstate,
2410 580715 : &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
2411 580715 : &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
2412 : }
2413 :
2414 : /*
2415 : * Evaluate aggregates
2416 : */
2417 603616 : if (winstate->numaggs > 0)
2418 107065 : eval_windowaggregates(winstate);
2419 : }
2420 :
2421 : /*
2422 : * If we have created auxiliary read pointers for the frame or group
2423 : * boundaries, force them to be kept up-to-date, because we don't know
2424 : * whether the window function(s) will do anything that requires that.
2425 : * Failing to advance the pointers would result in being unable to
2426 : * trim data from the tuplestore, which is bad. (If we could know in
2427 : * advance whether the window functions will use frame boundary info,
2428 : * we could skip creating these pointers in the first place ... but
2429 : * unfortunately the window function API doesn't require that.)
2430 : */
2431 603632 : if (winstate->framehead_ptr >= 0)
2432 4234 : update_frameheadpos(winstate);
2433 603632 : if (winstate->frametail_ptr >= 0)
2434 122746 : update_frametailpos(winstate);
2435 603632 : if (winstate->grouptail_ptr >= 0)
2436 1000 : update_grouptailpos(winstate);
2437 :
2438 : /*
2439 : * Truncate any no-longer-needed rows from the tuplestore.
2440 : */
2441 603632 : tuplestore_trim(winstate->buffer);
2442 :
2443 : /*
2444 : * Form and return a projection tuple using the windowfunc results and
2445 : * the current row. Setting ecxt_outertuple arranges that any Vars
2446 : * will be evaluated with respect to that row.
2447 : */
2448 603632 : econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2449 :
2450 603632 : slot = ExecProject(winstate->ss.ps.ps_ProjInfo);
2451 :
2452 603632 : if (winstate->status == WINDOWAGG_RUN)
2453 : {
2454 603588 : econtext->ecxt_scantuple = slot;
2455 :
2456 : /*
2457 : * Now evaluate the run condition to see if we need to go into
2458 : * pass-through mode, or maybe stop completely.
2459 : */
2460 603588 : if (!ExecQual(winstate->runcondition, econtext))
2461 : {
2462 : /*
2463 : * Determine which mode to move into. If there is no
2464 : * PARTITION BY clause and we're the top-level WindowAgg then
2465 : * we're done. This tuple and any future tuples cannot
2466 : * possibly match the runcondition. However, when there is a
2467 : * PARTITION BY clause or we're not the top-level window we
2468 : * can't just stop as we need to either process other
2469 : * partitions or ensure WindowAgg nodes above us receive all
2470 : * of the tuples they need to process their WindowFuncs.
2471 : */
2472 88 : if (winstate->use_pass_through)
2473 : {
2474 : /*
2475 : * When switching into a pass-through mode, we'd better
2476 : * NULLify the aggregate results as these are no longer
2477 : * updated and NULLifying them avoids the old stale
2478 : * results lingering. Some of these might be byref types
2479 : * so we can't have them pointing to free'd memory. The
2480 : * planner insisted that quals used in the runcondition
2481 : * are strict, so the top-level WindowAgg will always
2482 : * filter these NULLs out in the filter clause.
2483 : */
2484 60 : numfuncs = winstate->numfuncs;
2485 176 : for (i = 0; i < numfuncs; i++)
2486 : {
2487 116 : econtext->ecxt_aggvalues[i] = (Datum) 0;
2488 116 : econtext->ecxt_aggnulls[i] = true;
2489 : }
2490 :
2491 : /*
2492 : * STRICT pass-through mode is required for the top window
2493 : * when there is a PARTITION BY clause. Otherwise we must
2494 : * ensure we store tuples that don't match the
2495 : * runcondition so they're available to WindowAggs above.
2496 : */
2497 60 : if (winstate->top_window)
2498 : {
2499 48 : winstate->status = WINDOWAGG_PASSTHROUGH_STRICT;
2500 48 : continue;
2501 : }
2502 : else
2503 : {
2504 12 : winstate->status = WINDOWAGG_PASSTHROUGH;
2505 : }
2506 : }
2507 : else
2508 : {
2509 : /*
2510 : * Pass-through not required. We can just return NULL.
2511 : * Nothing else will match the runcondition.
2512 : */
2513 28 : winstate->status = WINDOWAGG_DONE;
2514 28 : return NULL;
2515 : }
2516 : }
2517 :
2518 : /*
2519 : * Filter out any tuples we don't need in the top-level WindowAgg.
2520 : */
2521 603512 : if (!ExecQual(winstate->ss.ps.qual, econtext))
2522 : {
2523 12 : InstrCountFiltered1(winstate, 1);
2524 12 : continue;
2525 : }
2526 :
2527 603500 : break;
2528 : }
2529 :
2530 : /*
2531 : * When not in WINDOWAGG_RUN mode, we must still return this tuple if
2532 : * we're anything apart from the top window.
2533 : */
2534 44 : else if (!winstate->top_window)
2535 16 : break;
2536 : }
2537 :
2538 603516 : return slot;
2539 : }
2540 :
2541 : /* -----------------
2542 : * ExecInitWindowAgg
2543 : *
2544 : * Creates the run-time information for the WindowAgg node produced by the
2545 : * planner and initializes its outer subtree
2546 : * -----------------
2547 : */
2548 : WindowAggState *
2549 1922 : ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
2550 : {
2551 : WindowAggState *winstate;
2552 : Plan *outerPlan;
2553 : ExprContext *econtext;
2554 : ExprContext *tmpcontext;
2555 : WindowStatePerFunc perfunc;
2556 : WindowStatePerAgg peragg;
2557 1922 : int frameOptions = node->frameOptions;
2558 : int numfuncs,
2559 : wfuncno,
2560 : numaggs,
2561 : aggno;
2562 : TupleDesc scanDesc;
2563 : ListCell *l;
2564 :
2565 : /* check for unsupported flags */
2566 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2567 :
2568 : /*
2569 : * create state structure
2570 : */
2571 1922 : winstate = makeNode(WindowAggState);
2572 1922 : winstate->ss.ps.plan = (Plan *) node;
2573 1922 : winstate->ss.ps.state = estate;
2574 1922 : winstate->ss.ps.ExecProcNode = ExecWindowAgg;
2575 :
2576 : /* copy frame options to state node for easy access */
2577 1922 : winstate->frameOptions = frameOptions;
2578 :
2579 : /*
2580 : * Create expression contexts. We need two, one for per-input-tuple
2581 : * processing and one for per-output-tuple processing. We cheat a little
2582 : * by using ExecAssignExprContext() to build both.
2583 : */
2584 1922 : ExecAssignExprContext(estate, &winstate->ss.ps);
2585 1922 : tmpcontext = winstate->ss.ps.ps_ExprContext;
2586 1922 : winstate->tmpcontext = tmpcontext;
2587 1922 : ExecAssignExprContext(estate, &winstate->ss.ps);
2588 :
2589 : /* Create long-lived context for storage of partition-local memory etc */
2590 1922 : winstate->partcontext =
2591 1922 : AllocSetContextCreate(CurrentMemoryContext,
2592 : "WindowAgg Partition",
2593 : ALLOCSET_DEFAULT_SIZES);
2594 :
2595 : /*
2596 : * Create mid-lived context for aggregate trans values etc.
2597 : *
2598 : * Note that moving aggregates each use their own private context, not
2599 : * this one.
2600 : */
2601 1922 : winstate->aggcontext =
2602 1922 : AllocSetContextCreate(CurrentMemoryContext,
2603 : "WindowAgg Aggregates",
2604 : ALLOCSET_DEFAULT_SIZES);
2605 :
2606 : /* Only the top-level WindowAgg may have a qual */
2607 : Assert(node->plan.qual == NIL || node->topWindow);
2608 :
2609 : /* Initialize the qual */
2610 1922 : winstate->ss.ps.qual = ExecInitQual(node->plan.qual,
2611 : (PlanState *) winstate);
2612 :
2613 : /*
2614 : * Setup the run condition, if we received one from the query planner.
2615 : * When set, this may allow us to move into pass-through mode so that we
2616 : * don't have to perform any further evaluation of WindowFuncs in the
2617 : * current partition or possibly stop returning tuples altogether when all
2618 : * tuples are in the same partition.
2619 : */
2620 1922 : winstate->runcondition = ExecInitQual(node->runCondition,
2621 : (PlanState *) winstate);
2622 :
2623 : /*
2624 : * When we're not the top-level WindowAgg node or we are but have a
2625 : * PARTITION BY clause we must move into one of the WINDOWAGG_PASSTHROUGH*
2626 : * modes when the runCondition becomes false.
2627 : */
2628 1922 : winstate->use_pass_through = !node->topWindow || node->partNumCols > 0;
2629 :
2630 : /* remember if we're the top-window or we are below the top-window */
2631 1922 : winstate->top_window = node->topWindow;
2632 :
2633 : /*
2634 : * initialize child nodes
2635 : */
2636 1922 : outerPlan = outerPlan(node);
2637 1922 : outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
2638 :
2639 : /*
2640 : * initialize source tuple type (which is also the tuple type that we'll
2641 : * store in the tuplestore and use in all our working slots).
2642 : */
2643 1922 : ExecCreateScanSlotFromOuterPlan(estate, &winstate->ss, &TTSOpsMinimalTuple);
2644 1922 : scanDesc = winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2645 :
2646 : /* the outer tuple isn't the child's tuple, but always a minimal tuple */
2647 1922 : winstate->ss.ps.outeropsset = true;
2648 1922 : winstate->ss.ps.outerops = &TTSOpsMinimalTuple;
2649 1922 : winstate->ss.ps.outeropsfixed = true;
2650 :
2651 : /*
2652 : * tuple table initialization
2653 : */
2654 1922 : winstate->first_part_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2655 : &TTSOpsMinimalTuple);
2656 1922 : winstate->agg_row_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2657 : &TTSOpsMinimalTuple);
2658 1922 : winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate, scanDesc,
2659 : &TTSOpsMinimalTuple);
2660 1922 : winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate, scanDesc,
2661 : &TTSOpsMinimalTuple);
2662 :
2663 : /*
2664 : * create frame head and tail slots only if needed (must create slots in
2665 : * exactly the same cases that update_frameheadpos and update_frametailpos
2666 : * need them)
2667 : */
2668 1922 : winstate->framehead_slot = winstate->frametail_slot = NULL;
2669 :
2670 1922 : if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
2671 : {
2672 1103 : if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
2673 54 : node->ordNumCols != 0) ||
2674 1049 : (frameOptions & FRAMEOPTION_START_OFFSET))
2675 502 : winstate->framehead_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2676 : &TTSOpsMinimalTuple);
2677 1103 : if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
2678 573 : node->ordNumCols != 0) ||
2679 769 : (frameOptions & FRAMEOPTION_END_OFFSET))
2680 822 : winstate->frametail_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2681 : &TTSOpsMinimalTuple);
2682 : }
2683 :
2684 : /*
2685 : * Initialize result slot, type and projection.
2686 : */
2687 1922 : ExecInitResultTupleSlotTL(&winstate->ss.ps, &TTSOpsVirtual);
2688 1922 : ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
2689 :
2690 : /* Set up data for comparing tuples */
2691 1922 : if (node->partNumCols > 0)
2692 452 : winstate->partEqfunction =
2693 452 : execTuplesMatchPrepare(scanDesc,
2694 : node->partNumCols,
2695 452 : node->partColIdx,
2696 452 : node->partOperators,
2697 452 : node->partCollations,
2698 : &winstate->ss.ps);
2699 :
2700 1922 : if (node->ordNumCols > 0)
2701 1493 : winstate->ordEqfunction =
2702 1493 : execTuplesMatchPrepare(scanDesc,
2703 : node->ordNumCols,
2704 1493 : node->ordColIdx,
2705 1493 : node->ordOperators,
2706 1493 : node->ordCollations,
2707 : &winstate->ss.ps);
2708 :
2709 : /*
2710 : * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
2711 : */
2712 1922 : numfuncs = winstate->numfuncs;
2713 1922 : numaggs = winstate->numaggs;
2714 1922 : econtext = winstate->ss.ps.ps_ExprContext;
2715 1922 : econtext->ecxt_aggvalues = palloc0_array(Datum, numfuncs);
2716 1922 : econtext->ecxt_aggnulls = palloc0_array(bool, numfuncs);
2717 :
2718 : /*
2719 : * allocate per-wfunc/per-agg state information.
2720 : */
2721 1922 : perfunc = palloc0_array(WindowStatePerFuncData, numfuncs);
2722 1922 : peragg = palloc0_array(WindowStatePerAggData, numaggs);
2723 1922 : winstate->perfunc = perfunc;
2724 1922 : winstate->peragg = peragg;
2725 :
2726 1922 : wfuncno = -1;
2727 1922 : aggno = -1;
2728 4408 : foreach(l, winstate->funcs)
2729 : {
2730 2486 : WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
2731 2486 : WindowFunc *wfunc = wfuncstate->wfunc;
2732 : WindowStatePerFunc perfuncstate;
2733 : AclResult aclresult;
2734 : int i;
2735 :
2736 2486 : if (wfunc->winref != node->winref) /* planner screwed up? */
2737 0 : elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
2738 : wfunc->winref, node->winref);
2739 :
2740 : /*
2741 : * Look for a previous duplicate window function, which needs the same
2742 : * ignore_nulls value
2743 : */
2744 3222 : for (i = 0; i <= wfuncno; i++)
2745 : {
2746 744 : if (equal(wfunc, perfunc[i].wfunc) &&
2747 8 : !contain_volatile_functions((Node *) wfunc))
2748 8 : break;
2749 : }
2750 2486 : if (i <= wfuncno && wfunc->ignore_nulls == perfunc[i].ignore_nulls)
2751 : {
2752 : /* Found a match to an existing entry, so just mark it */
2753 8 : wfuncstate->wfuncno = i;
2754 8 : continue;
2755 : }
2756 :
2757 : /* Nope, so assign a new PerAgg record */
2758 2478 : perfuncstate = &perfunc[++wfuncno];
2759 :
2760 : /* Mark WindowFunc state node with assigned index in the result array */
2761 2478 : wfuncstate->wfuncno = wfuncno;
2762 :
2763 : /* Check permission to call window function */
2764 2478 : aclresult = object_aclcheck(ProcedureRelationId, wfunc->winfnoid, GetUserId(),
2765 : ACL_EXECUTE);
2766 2478 : if (aclresult != ACLCHECK_OK)
2767 0 : aclcheck_error(aclresult, OBJECT_FUNCTION,
2768 0 : get_func_name(wfunc->winfnoid));
2769 2478 : InvokeFunctionExecuteHook(wfunc->winfnoid);
2770 :
2771 : /* Fill in the perfuncstate data */
2772 2478 : perfuncstate->wfuncstate = wfuncstate;
2773 2478 : perfuncstate->wfunc = wfunc;
2774 2478 : perfuncstate->numArguments = list_length(wfuncstate->args);
2775 2478 : perfuncstate->winCollation = wfunc->inputcollid;
2776 :
2777 2478 : get_typlenbyval(wfunc->wintype,
2778 : &perfuncstate->resulttypeLen,
2779 : &perfuncstate->resulttypeByVal);
2780 :
2781 : /*
2782 : * If it's really just a plain aggregate function, we'll emulate the
2783 : * Agg environment for it.
2784 : */
2785 2478 : perfuncstate->plain_agg = wfunc->winagg;
2786 2478 : if (wfunc->winagg)
2787 : {
2788 : WindowStatePerAgg peraggstate;
2789 :
2790 1103 : perfuncstate->aggno = ++aggno;
2791 1103 : peraggstate = &winstate->peragg[aggno];
2792 1103 : initialize_peragg(winstate, wfunc, peraggstate);
2793 1103 : peraggstate->wfuncno = wfuncno;
2794 : }
2795 : else
2796 : {
2797 1375 : WindowObject winobj = makeNode(WindowObjectData);
2798 :
2799 1375 : winobj->winstate = winstate;
2800 1375 : winobj->argstates = wfuncstate->args;
2801 1375 : winobj->localmem = NULL;
2802 1375 : perfuncstate->winobj = winobj;
2803 1375 : winobj->ignore_nulls = wfunc->ignore_nulls;
2804 1375 : init_notnull_info(winobj, perfuncstate);
2805 :
2806 : /* It's a real window function, so set up to call it. */
2807 1375 : fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
2808 : econtext->ecxt_per_query_memory);
2809 1375 : fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
2810 : }
2811 : }
2812 :
2813 : /* Update numfuncs, numaggs to match number of unique functions found */
2814 1922 : winstate->numfuncs = wfuncno + 1;
2815 1922 : winstate->numaggs = aggno + 1;
2816 :
2817 : /* Set up WindowObject for aggregates, if needed */
2818 1922 : if (winstate->numaggs > 0)
2819 : {
2820 1027 : WindowObject agg_winobj = makeNode(WindowObjectData);
2821 :
2822 1027 : agg_winobj->winstate = winstate;
2823 1027 : agg_winobj->argstates = NIL;
2824 1027 : agg_winobj->localmem = NULL;
2825 : /* make sure markptr = -1 to invalidate. It may not get used */
2826 1027 : agg_winobj->markptr = -1;
2827 1027 : agg_winobj->readptr = -1;
2828 1027 : winstate->agg_winobj = agg_winobj;
2829 : }
2830 :
2831 : /* Set the status to running */
2832 1922 : winstate->status = WINDOWAGG_RUN;
2833 :
2834 : /* initialize frame bound offset expressions */
2835 1922 : winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
2836 : (PlanState *) winstate);
2837 1922 : winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
2838 : (PlanState *) winstate);
2839 :
2840 : /* Lookup in_range support functions if needed */
2841 1922 : if (OidIsValid(node->startInRangeFunc))
2842 348 : fmgr_info(node->startInRangeFunc, &winstate->startInRangeFunc);
2843 1922 : if (OidIsValid(node->endInRangeFunc))
2844 392 : fmgr_info(node->endInRangeFunc, &winstate->endInRangeFunc);
2845 1922 : winstate->inRangeColl = node->inRangeColl;
2846 1922 : winstate->inRangeAsc = node->inRangeAsc;
2847 1922 : winstate->inRangeNullsFirst = node->inRangeNullsFirst;
2848 :
2849 1922 : winstate->all_first = true;
2850 1922 : winstate->partition_spooled = false;
2851 1922 : winstate->more_partitions = false;
2852 1922 : winstate->next_partition = true;
2853 :
2854 1922 : return winstate;
2855 : }
2856 :
2857 : /* -----------------
2858 : * ExecEndWindowAgg
2859 : * -----------------
2860 : */
2861 : void
2862 1786 : ExecEndWindowAgg(WindowAggState *node)
2863 : {
2864 : PlanState *outerPlan;
2865 : int i;
2866 :
2867 1786 : if (node->buffer != NULL)
2868 : {
2869 1429 : tuplestore_end(node->buffer);
2870 :
2871 : /* nullify so that release_partition skips the tuplestore_clear() */
2872 1429 : node->buffer = NULL;
2873 : }
2874 :
2875 1786 : release_partition(node);
2876 :
2877 2861 : for (i = 0; i < node->numaggs; i++)
2878 : {
2879 1075 : if (node->peragg[i].aggcontext != node->aggcontext)
2880 540 : MemoryContextDelete(node->peragg[i].aggcontext);
2881 : }
2882 1786 : MemoryContextDelete(node->partcontext);
2883 1786 : MemoryContextDelete(node->aggcontext);
2884 :
2885 1786 : pfree(node->perfunc);
2886 1786 : pfree(node->peragg);
2887 :
2888 1786 : outerPlan = outerPlanState(node);
2889 1786 : ExecEndNode(outerPlan);
2890 1786 : }
2891 :
2892 : /* -----------------
2893 : * ExecReScanWindowAgg
2894 : * -----------------
2895 : */
2896 : void
2897 52 : ExecReScanWindowAgg(WindowAggState *node)
2898 : {
2899 52 : PlanState *outerPlan = outerPlanState(node);
2900 52 : ExprContext *econtext = node->ss.ps.ps_ExprContext;
2901 :
2902 52 : node->status = WINDOWAGG_RUN;
2903 52 : node->all_first = true;
2904 :
2905 : /* release tuplestore et al */
2906 52 : release_partition(node);
2907 :
2908 : /* release all temp tuples, but especially first_part_slot */
2909 52 : ExecClearTuple(node->ss.ss_ScanTupleSlot);
2910 52 : ExecClearTuple(node->first_part_slot);
2911 52 : ExecClearTuple(node->agg_row_slot);
2912 52 : ExecClearTuple(node->temp_slot_1);
2913 52 : ExecClearTuple(node->temp_slot_2);
2914 52 : if (node->framehead_slot)
2915 0 : ExecClearTuple(node->framehead_slot);
2916 52 : if (node->frametail_slot)
2917 4 : ExecClearTuple(node->frametail_slot);
2918 :
2919 : /* Forget current wfunc values */
2920 104 : MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
2921 52 : MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
2922 :
2923 : /*
2924 : * if chgParam of subnode is not null then plan will be re-scanned by
2925 : * first ExecProcNode.
2926 : */
2927 52 : if (outerPlan->chgParam == NULL)
2928 4 : ExecReScan(outerPlan);
2929 52 : }
2930 :
2931 : /*
2932 : * initialize_peragg
2933 : *
2934 : * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
2935 : */
2936 : static WindowStatePerAggData *
2937 1103 : initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
2938 : WindowStatePerAgg peraggstate)
2939 : {
2940 : Oid inputTypes[FUNC_MAX_ARGS];
2941 : int numArguments;
2942 : HeapTuple aggTuple;
2943 : Form_pg_aggregate aggform;
2944 : Oid aggtranstype;
2945 : AttrNumber initvalAttNo;
2946 : AclResult aclresult;
2947 : bool use_ma_code;
2948 : Oid transfn_oid,
2949 : invtransfn_oid,
2950 : finalfn_oid;
2951 : bool finalextra;
2952 : char finalmodify;
2953 : Expr *transfnexpr,
2954 : *invtransfnexpr,
2955 : *finalfnexpr;
2956 : Datum textInitVal;
2957 : int i;
2958 : ListCell *lc;
2959 :
2960 1103 : numArguments = list_length(wfunc->args);
2961 :
2962 1103 : i = 0;
2963 2130 : foreach(lc, wfunc->args)
2964 : {
2965 1027 : inputTypes[i++] = exprType((Node *) lfirst(lc));
2966 : }
2967 :
2968 1103 : aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
2969 1103 : if (!HeapTupleIsValid(aggTuple))
2970 0 : elog(ERROR, "cache lookup failed for aggregate %u",
2971 : wfunc->winfnoid);
2972 1103 : aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2973 :
2974 : /*
2975 : * Figure out whether we want to use the moving-aggregate implementation,
2976 : * and collect the right set of fields from the pg_aggregate entry.
2977 : *
2978 : * It's possible that an aggregate would supply a safe moving-aggregate
2979 : * implementation and an unsafe normal one, in which case our hand is
2980 : * forced. Otherwise, if the frame head can't move, we don't need
2981 : * moving-aggregate code. Even if we'd like to use it, don't do so if the
2982 : * aggregate's arguments (and FILTER clause if any) contain any calls to
2983 : * volatile functions. Otherwise, the difference between restarting and
2984 : * not restarting the aggregation would be user-visible.
2985 : *
2986 : * We also don't risk using moving aggregates when there are subplans in
2987 : * the arguments or FILTER clause. This is partly because
2988 : * contain_volatile_functions() doesn't look inside subplans; but there
2989 : * are other reasons why a subplan's output might be volatile. For
2990 : * example, syncscan mode can render the results nonrepeatable.
2991 : */
2992 1103 : if (!OidIsValid(aggform->aggminvtransfn))
2993 189 : use_ma_code = false; /* sine qua non */
2994 914 : else if (aggform->aggmfinalmodify == AGGMODIFY_READ_ONLY &&
2995 914 : aggform->aggfinalmodify != AGGMODIFY_READ_ONLY)
2996 0 : use_ma_code = true; /* decision forced by safety */
2997 914 : else if (winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
2998 358 : use_ma_code = false; /* non-moving frame head */
2999 556 : else if (contain_volatile_functions((Node *) wfunc))
3000 8 : use_ma_code = false; /* avoid possible behavioral change */
3001 548 : else if (contain_subplans((Node *) wfunc))
3002 0 : use_ma_code = false; /* subplans might contain volatile functions */
3003 : else
3004 548 : use_ma_code = true; /* yes, let's use it */
3005 1103 : if (use_ma_code)
3006 : {
3007 548 : peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
3008 548 : peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
3009 548 : peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
3010 548 : finalextra = aggform->aggmfinalextra;
3011 548 : finalmodify = aggform->aggmfinalmodify;
3012 548 : aggtranstype = aggform->aggmtranstype;
3013 548 : initvalAttNo = Anum_pg_aggregate_aggminitval;
3014 : }
3015 : else
3016 : {
3017 555 : peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
3018 555 : peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
3019 555 : peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
3020 555 : finalextra = aggform->aggfinalextra;
3021 555 : finalmodify = aggform->aggfinalmodify;
3022 555 : aggtranstype = aggform->aggtranstype;
3023 555 : initvalAttNo = Anum_pg_aggregate_agginitval;
3024 : }
3025 :
3026 : /*
3027 : * ExecInitWindowAgg already checked permission to call aggregate function
3028 : * ... but we still need to check the component functions
3029 : */
3030 :
3031 : /* Check that aggregate owner has permission to call component fns */
3032 : {
3033 : HeapTuple procTuple;
3034 : Oid aggOwner;
3035 :
3036 1103 : procTuple = SearchSysCache1(PROCOID,
3037 : ObjectIdGetDatum(wfunc->winfnoid));
3038 1103 : if (!HeapTupleIsValid(procTuple))
3039 0 : elog(ERROR, "cache lookup failed for function %u",
3040 : wfunc->winfnoid);
3041 1103 : aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
3042 1103 : ReleaseSysCache(procTuple);
3043 :
3044 1103 : aclresult = object_aclcheck(ProcedureRelationId, transfn_oid, aggOwner,
3045 : ACL_EXECUTE);
3046 1103 : if (aclresult != ACLCHECK_OK)
3047 0 : aclcheck_error(aclresult, OBJECT_FUNCTION,
3048 0 : get_func_name(transfn_oid));
3049 1103 : InvokeFunctionExecuteHook(transfn_oid);
3050 :
3051 1103 : if (OidIsValid(invtransfn_oid))
3052 : {
3053 548 : aclresult = object_aclcheck(ProcedureRelationId, invtransfn_oid, aggOwner,
3054 : ACL_EXECUTE);
3055 548 : if (aclresult != ACLCHECK_OK)
3056 0 : aclcheck_error(aclresult, OBJECT_FUNCTION,
3057 0 : get_func_name(invtransfn_oid));
3058 548 : InvokeFunctionExecuteHook(invtransfn_oid);
3059 : }
3060 :
3061 1103 : if (OidIsValid(finalfn_oid))
3062 : {
3063 584 : aclresult = object_aclcheck(ProcedureRelationId, finalfn_oid, aggOwner,
3064 : ACL_EXECUTE);
3065 584 : if (aclresult != ACLCHECK_OK)
3066 0 : aclcheck_error(aclresult, OBJECT_FUNCTION,
3067 0 : get_func_name(finalfn_oid));
3068 584 : InvokeFunctionExecuteHook(finalfn_oid);
3069 : }
3070 : }
3071 :
3072 : /*
3073 : * If the selected finalfn isn't read-only, we can't run this aggregate as
3074 : * a window function. This is a user-facing error, so we take a bit more
3075 : * care with the error message than elsewhere in this function.
3076 : */
3077 1103 : if (finalmodify != AGGMODIFY_READ_ONLY)
3078 0 : ereport(ERROR,
3079 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3080 : errmsg("aggregate function %s does not support use as a window function",
3081 : format_procedure(wfunc->winfnoid))));
3082 :
3083 : /* Detect how many arguments to pass to the finalfn */
3084 1103 : if (finalextra)
3085 16 : peraggstate->numFinalArgs = numArguments + 1;
3086 : else
3087 1087 : peraggstate->numFinalArgs = 1;
3088 :
3089 : /* resolve actual type of transition state, if polymorphic */
3090 1103 : aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
3091 : aggtranstype,
3092 : inputTypes,
3093 : numArguments);
3094 :
3095 : /* build expression trees using actual argument & result types */
3096 1103 : build_aggregate_transfn_expr(inputTypes,
3097 : numArguments,
3098 : 0, /* no ordered-set window functions yet */
3099 : false, /* no variadic window functions yet */
3100 : aggtranstype,
3101 : wfunc->inputcollid,
3102 : transfn_oid,
3103 : invtransfn_oid,
3104 : &transfnexpr,
3105 : &invtransfnexpr);
3106 :
3107 : /* set up infrastructure for calling the transfn(s) and finalfn */
3108 1103 : fmgr_info(transfn_oid, &peraggstate->transfn);
3109 1103 : fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
3110 :
3111 1103 : if (OidIsValid(invtransfn_oid))
3112 : {
3113 548 : fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
3114 548 : fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
3115 : }
3116 :
3117 1103 : if (OidIsValid(finalfn_oid))
3118 : {
3119 584 : build_aggregate_finalfn_expr(inputTypes,
3120 : peraggstate->numFinalArgs,
3121 : aggtranstype,
3122 : wfunc->wintype,
3123 : wfunc->inputcollid,
3124 : finalfn_oid,
3125 : &finalfnexpr);
3126 584 : fmgr_info(finalfn_oid, &peraggstate->finalfn);
3127 584 : fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
3128 : }
3129 :
3130 : /* get info about relevant datatypes */
3131 1103 : get_typlenbyval(wfunc->wintype,
3132 : &peraggstate->resulttypeLen,
3133 : &peraggstate->resulttypeByVal);
3134 1103 : get_typlenbyval(aggtranstype,
3135 : &peraggstate->transtypeLen,
3136 : &peraggstate->transtypeByVal);
3137 :
3138 : /*
3139 : * initval is potentially null, so don't try to access it as a struct
3140 : * field. Must do it the hard way with SysCacheGetAttr.
3141 : */
3142 1103 : textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
3143 : &peraggstate->initValueIsNull);
3144 :
3145 1103 : if (peraggstate->initValueIsNull)
3146 617 : peraggstate->initValue = (Datum) 0;
3147 : else
3148 486 : peraggstate->initValue = GetAggInitVal(textInitVal,
3149 : aggtranstype);
3150 :
3151 : /*
3152 : * If the transfn is strict and the initval is NULL, make sure input type
3153 : * and transtype are the same (or at least binary-compatible), so that
3154 : * it's OK to use the first input value as the initial transValue. This
3155 : * should have been checked at agg definition time, but we must check
3156 : * again in case the transfn's strictness property has been changed.
3157 : */
3158 1103 : if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
3159 : {
3160 161 : if (numArguments < 1 ||
3161 161 : !IsBinaryCoercible(inputTypes[0], aggtranstype))
3162 0 : ereport(ERROR,
3163 : (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3164 : errmsg("aggregate %u needs to have compatible input type and transition type",
3165 : wfunc->winfnoid)));
3166 : }
3167 :
3168 : /*
3169 : * Insist that forward and inverse transition functions have the same
3170 : * strictness setting. Allowing them to differ would require handling
3171 : * more special cases in advance_windowaggregate and
3172 : * advance_windowaggregate_base, for no discernible benefit. This should
3173 : * have been checked at agg definition time, but we must check again in
3174 : * case either function's strictness property has been changed.
3175 : */
3176 1103 : if (OidIsValid(invtransfn_oid) &&
3177 548 : peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
3178 0 : ereport(ERROR,
3179 : (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3180 : errmsg("strictness of aggregate's forward and inverse transition functions must match")));
3181 :
3182 : /*
3183 : * Moving aggregates use their own aggcontext.
3184 : *
3185 : * This is necessary because they might restart at different times, so we
3186 : * might never be able to reset the shared context otherwise. We can't
3187 : * make it the aggregates' responsibility to clean up after themselves,
3188 : * because strict aggregates must be restarted whenever we remove their
3189 : * last non-NULL input, which the aggregate won't be aware is happening.
3190 : * Also, just pfree()ing the transValue upon restarting wouldn't help,
3191 : * since we'd miss any indirectly referenced data. We could, in theory,
3192 : * make the memory allocation rules for moving aggregates different than
3193 : * they have historically been for plain aggregates, but that seems grotty
3194 : * and likely to lead to memory leaks.
3195 : */
3196 1103 : if (OidIsValid(invtransfn_oid))
3197 548 : peraggstate->aggcontext =
3198 548 : AllocSetContextCreate(CurrentMemoryContext,
3199 : "WindowAgg Per Aggregate",
3200 : ALLOCSET_DEFAULT_SIZES);
3201 : else
3202 555 : peraggstate->aggcontext = winstate->aggcontext;
3203 :
3204 1103 : ReleaseSysCache(aggTuple);
3205 :
3206 1103 : return peraggstate;
3207 : }
3208 :
3209 : static Datum
3210 486 : GetAggInitVal(Datum textInitVal, Oid transtype)
3211 : {
3212 : Oid typinput,
3213 : typioparam;
3214 : char *strInitVal;
3215 : Datum initVal;
3216 :
3217 486 : getTypeInputInfo(transtype, &typinput, &typioparam);
3218 486 : strInitVal = TextDatumGetCString(textInitVal);
3219 486 : initVal = OidInputFunctionCall(typinput, strInitVal,
3220 : typioparam, -1);
3221 486 : pfree(strInitVal);
3222 486 : return initVal;
3223 : }
3224 :
3225 : /*
3226 : * are_peers
3227 : * compare two rows to see if they are equal according to the ORDER BY clause
3228 : *
3229 : * NB: this does not consider the window frame mode.
3230 : */
3231 : static bool
3232 398503 : are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
3233 : TupleTableSlot *slot2)
3234 : {
3235 398503 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
3236 398503 : ExprContext *econtext = winstate->tmpcontext;
3237 :
3238 : /* If no ORDER BY, all rows are peers with each other */
3239 398503 : if (node->ordNumCols == 0)
3240 20702 : return true;
3241 :
3242 377801 : econtext->ecxt_outertuple = slot1;
3243 377801 : econtext->ecxt_innertuple = slot2;
3244 377801 : return ExecQualAndReset(winstate->ordEqfunction, econtext);
3245 : }
3246 :
3247 : /*
3248 : * window_gettupleslot
3249 : * Fetch the pos'th tuple of the current partition into the slot,
3250 : * using the winobj's read pointer
3251 : *
3252 : * Returns true if successful, false if no such row
3253 : */
3254 : static bool
3255 509469 : window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
3256 : {
3257 509469 : WindowAggState *winstate = winobj->winstate;
3258 : MemoryContext oldcontext;
3259 :
3260 : /* often called repeatedly in a row */
3261 509469 : CHECK_FOR_INTERRUPTS();
3262 :
3263 : /* Don't allow passing -1 to spool_tuples here */
3264 509469 : if (pos < 0)
3265 272 : return false;
3266 :
3267 : /* If necessary, fetch the tuple into the spool */
3268 509197 : spool_tuples(winstate, pos);
3269 :
3270 509197 : if (pos >= winstate->spooled_rows)
3271 3347 : return false;
3272 :
3273 505850 : if (pos < winobj->markpos)
3274 0 : elog(ERROR, "cannot fetch row before WindowObject's mark position");
3275 :
3276 505850 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
3277 :
3278 505850 : tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
3279 :
3280 : /*
3281 : * Advance or rewind until we are within one tuple of the one we want.
3282 : */
3283 505850 : if (winobj->seekpos < pos - 1)
3284 : {
3285 1668 : if (!tuplestore_skiptuples(winstate->buffer,
3286 1668 : pos - 1 - winobj->seekpos,
3287 : true))
3288 0 : elog(ERROR, "unexpected end of tuplestore");
3289 1668 : winobj->seekpos = pos - 1;
3290 : }
3291 504182 : else if (winobj->seekpos > pos + 1)
3292 : {
3293 1869 : if (!tuplestore_skiptuples(winstate->buffer,
3294 1869 : winobj->seekpos - (pos + 1),
3295 : false))
3296 0 : elog(ERROR, "unexpected end of tuplestore");
3297 1869 : winobj->seekpos = pos + 1;
3298 : }
3299 502313 : else if (winobj->seekpos == pos)
3300 : {
3301 : /*
3302 : * There's no API to refetch the tuple at the current position. We
3303 : * have to move one tuple forward, and then one backward. (We don't
3304 : * do it the other way because we might try to fetch the row before
3305 : * our mark, which isn't allowed.) XXX this case could stand to be
3306 : * optimized.
3307 : */
3308 115325 : tuplestore_advance(winstate->buffer, true);
3309 115325 : winobj->seekpos++;
3310 : }
3311 :
3312 : /*
3313 : * Now we should be on the tuple immediately before or after the one we
3314 : * want, so just fetch forwards or backwards as appropriate.
3315 : *
3316 : * Notice that we tell tuplestore_gettupleslot to make a physical copy of
3317 : * the fetched tuple. This ensures that the slot's contents remain valid
3318 : * through manipulations of the tuplestore, which some callers depend on.
3319 : */
3320 505850 : if (winobj->seekpos > pos)
3321 : {
3322 117397 : if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
3323 0 : elog(ERROR, "unexpected end of tuplestore");
3324 117397 : winobj->seekpos--;
3325 : }
3326 : else
3327 : {
3328 388453 : if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
3329 0 : elog(ERROR, "unexpected end of tuplestore");
3330 388453 : winobj->seekpos++;
3331 : }
3332 :
3333 : Assert(winobj->seekpos == pos);
3334 :
3335 505850 : MemoryContextSwitchTo(oldcontext);
3336 :
3337 505850 : return true;
3338 : }
3339 :
3340 : /*
3341 : * gettuple_eval_partition
3342 : * get tuple in a partition and evaluate the window function's argument
3343 : * expression on it.
3344 : */
3345 : static Datum
3346 158236 : gettuple_eval_partition(WindowObject winobj, int argno,
3347 : int64 abs_pos, bool *isnull, bool *isout)
3348 : {
3349 : WindowAggState *winstate;
3350 : ExprContext *econtext;
3351 : TupleTableSlot *slot;
3352 :
3353 158236 : winstate = winobj->winstate;
3354 158236 : slot = winstate->temp_slot_1;
3355 158236 : if (!window_gettupleslot(winobj, abs_pos, slot))
3356 : {
3357 : /* out of partition */
3358 404 : if (isout)
3359 404 : *isout = true;
3360 404 : *isnull = true;
3361 404 : return (Datum) 0;
3362 : }
3363 :
3364 157832 : if (isout)
3365 157832 : *isout = false;
3366 157832 : econtext = winstate->ss.ps.ps_ExprContext;
3367 157832 : econtext->ecxt_outertuple = slot;
3368 157832 : return ExecEvalExpr((ExprState *) list_nth
3369 157832 : (winobj->argstates, argno),
3370 : econtext, isnull);
3371 : }
3372 :
3373 : /*
3374 : * ignorenulls_getfuncarginframe
3375 : * For IGNORE NULLS, get the next nonnull value in the frame, moving forward
3376 : * or backward until we find a value or reach the frame's end.
3377 : */
3378 : static Datum
3379 680 : ignorenulls_getfuncarginframe(WindowObject winobj, int argno,
3380 : int relpos, int seektype, bool set_mark,
3381 : bool *isnull, bool *isout)
3382 : {
3383 : WindowAggState *winstate;
3384 : ExprContext *econtext;
3385 : TupleTableSlot *slot;
3386 : Datum datum;
3387 : int64 abs_pos;
3388 : int64 mark_pos;
3389 : int notnull_offset;
3390 : int notnull_relpos;
3391 : int forward;
3392 :
3393 : Assert(WindowObjectIsValid(winobj));
3394 680 : winstate = winobj->winstate;
3395 680 : econtext = winstate->ss.ps.ps_ExprContext;
3396 680 : slot = winstate->temp_slot_1;
3397 680 : datum = (Datum) 0;
3398 680 : notnull_offset = 0;
3399 680 : notnull_relpos = abs(relpos);
3400 :
3401 680 : switch (seektype)
3402 : {
3403 0 : case WINDOW_SEEK_CURRENT:
3404 0 : elog(ERROR, "WINDOW_SEEK_CURRENT is not supported for WinGetFuncArgInFrame");
3405 : abs_pos = mark_pos = 0; /* keep compiler quiet */
3406 : break;
3407 480 : case WINDOW_SEEK_HEAD:
3408 : /* rejecting relpos < 0 is easy and simplifies code below */
3409 480 : if (relpos < 0)
3410 0 : goto out_of_frame;
3411 480 : update_frameheadpos(winstate);
3412 480 : abs_pos = winstate->frameheadpos;
3413 480 : mark_pos = winstate->frameheadpos;
3414 480 : forward = 1;
3415 480 : break;
3416 200 : case WINDOW_SEEK_TAIL:
3417 : /* rejecting relpos > 0 is easy and simplifies code below */
3418 200 : if (relpos > 0)
3419 0 : goto out_of_frame;
3420 200 : update_frametailpos(winstate);
3421 200 : abs_pos = winstate->frametailpos - 1;
3422 200 : mark_pos = 0; /* keep compiler quiet */
3423 200 : forward = -1;
3424 200 : break;
3425 0 : default:
3426 0 : elog(ERROR, "unrecognized window seek type: %d", seektype);
3427 : abs_pos = mark_pos = 0; /* keep compiler quiet */
3428 : break;
3429 : }
3430 :
3431 : /*
3432 : * Get the next nonnull value in the frame, moving forward or backward
3433 : * until we find a value or reach the frame's end.
3434 : */
3435 : do
3436 : {
3437 : int inframe;
3438 : int v;
3439 :
3440 : /*
3441 : * Check apparent out of frame case. We need to do this because we
3442 : * may not call window_gettupleslot before row_is_in_frame, which
3443 : * supposes abs_pos is never negative.
3444 : */
3445 1600 : if (abs_pos < 0)
3446 8 : goto out_of_frame;
3447 :
3448 : /* check whether row is in frame */
3449 1592 : inframe = row_is_in_frame(winobj, abs_pos, slot, true);
3450 1592 : if (inframe == -1)
3451 44 : goto out_of_frame;
3452 1548 : else if (inframe == 0)
3453 52 : goto advance;
3454 :
3455 1496 : if (isout)
3456 0 : *isout = false;
3457 :
3458 1496 : v = get_notnull_info(winobj, abs_pos, argno);
3459 1496 : if (v == NN_NULL) /* this row is known to be NULL */
3460 400 : goto advance;
3461 :
3462 1096 : else if (v == NN_UNKNOWN) /* need to check NULL or not */
3463 : {
3464 588 : if (!window_gettupleslot(winobj, abs_pos, slot))
3465 20 : goto out_of_frame;
3466 :
3467 568 : econtext->ecxt_outertuple = slot;
3468 568 : datum = ExecEvalExpr(
3469 568 : (ExprState *) list_nth(winobj->argstates,
3470 : argno), econtext,
3471 : isnull);
3472 568 : if (!*isnull)
3473 332 : notnull_offset++;
3474 :
3475 : /* record the row status */
3476 568 : put_notnull_info(winobj, abs_pos, argno, *isnull);
3477 : }
3478 : else /* this row is known to be NOT NULL */
3479 : {
3480 508 : notnull_offset++;
3481 508 : if (notnull_offset > notnull_relpos)
3482 : {
3483 : /* to prepare exiting this loop, datum needs to be set */
3484 320 : if (!window_gettupleslot(winobj, abs_pos, slot))
3485 0 : goto out_of_frame;
3486 :
3487 320 : econtext->ecxt_outertuple = slot;
3488 320 : datum = ExecEvalExpr(
3489 320 : (ExprState *) list_nth
3490 320 : (winobj->argstates, argno),
3491 : econtext, isnull);
3492 : }
3493 : }
3494 188 : advance:
3495 1528 : abs_pos += forward;
3496 1528 : } while (notnull_offset <= notnull_relpos);
3497 :
3498 608 : if (set_mark)
3499 608 : WinSetMarkPosition(winobj, mark_pos);
3500 :
3501 608 : return datum;
3502 :
3503 72 : out_of_frame:
3504 72 : if (isout)
3505 0 : *isout = true;
3506 72 : *isnull = true;
3507 72 : return (Datum) 0;
3508 : }
3509 :
3510 :
3511 : /*
3512 : * init_notnull_info
3513 : * Initialize non null map.
3514 : */
3515 : static void
3516 1375 : init_notnull_info(WindowObject winobj, WindowStatePerFunc perfuncstate)
3517 : {
3518 1375 : int numargs = perfuncstate->numArguments;
3519 :
3520 1375 : if (winobj->ignore_nulls == PARSER_IGNORE_NULLS)
3521 : {
3522 136 : int argno = 0;
3523 : ListCell *lc;
3524 :
3525 136 : winobj->notnull_info = palloc0_array(uint8 *, numargs);
3526 136 : winobj->num_notnull_info = palloc0_array(int64, numargs);
3527 136 : winobj->notnull_info_cacheable = palloc_array(bool, numargs);
3528 :
3529 308 : foreach(lc, perfuncstate->wfunc->args)
3530 : {
3531 172 : Node *arg = (Node *) lfirst(lc);
3532 :
3533 172 : winobj->notnull_info_cacheable[argno] =
3534 336 : !contain_volatile_functions(arg) &&
3535 164 : !contain_subplans(arg);
3536 :
3537 172 : argno++;
3538 : }
3539 : }
3540 1375 : }
3541 :
3542 : /*
3543 : * grow_notnull_info
3544 : * expand notnull_info if necessary.
3545 : * pos: not null info position
3546 : * argno: argument number
3547 : */
3548 : static void
3549 2632 : grow_notnull_info(WindowObject winobj, int64 pos, int argno)
3550 : {
3551 : /* initial number of notnull info members */
3552 : #define INIT_NOT_NULL_INFO_NUM 128
3553 :
3554 2632 : if (pos >= winobj->num_notnull_info[argno])
3555 : {
3556 : /* We may be called in a short-lived context */
3557 100 : MemoryContext oldcontext = MemoryContextSwitchTo
3558 100 : (winobj->winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
3559 :
3560 : for (;;)
3561 0 : {
3562 100 : Size oldsize = NN_POS_TO_BYTES
3563 : (winobj->num_notnull_info[argno]);
3564 : Size newsize;
3565 :
3566 100 : if (oldsize == 0) /* memory has not been allocated yet for this
3567 : * arg */
3568 : {
3569 100 : newsize = NN_POS_TO_BYTES(INIT_NOT_NULL_INFO_NUM);
3570 100 : winobj->notnull_info[argno] = palloc0(newsize);
3571 : }
3572 : else
3573 : {
3574 0 : newsize = oldsize * 2;
3575 0 : winobj->notnull_info[argno] =
3576 0 : repalloc0(winobj->notnull_info[argno], oldsize, newsize);
3577 : }
3578 100 : winobj->num_notnull_info[argno] = NN_BYTES_TO_POS(newsize);
3579 100 : if (winobj->num_notnull_info[argno] > pos)
3580 100 : break;
3581 : }
3582 100 : MemoryContextSwitchTo(oldcontext);
3583 : }
3584 2632 : }
3585 :
3586 : /*
3587 : * get_notnull_info
3588 : * retrieve a map
3589 : * pos: map position
3590 : * argno: argument number
3591 : */
3592 : static uint8
3593 1948 : get_notnull_info(WindowObject winobj, int64 pos, int argno)
3594 : {
3595 : uint8 *mbp;
3596 : uint8 mb;
3597 : int64 bpos;
3598 :
3599 1948 : if (!winobj->notnull_info_cacheable[argno])
3600 100 : return NN_UNKNOWN;
3601 :
3602 1848 : grow_notnull_info(winobj, pos, argno);
3603 1848 : bpos = NN_POS_TO_BYTES(pos);
3604 1848 : mbp = winobj->notnull_info[argno];
3605 1848 : mb = mbp[bpos];
3606 1848 : return (mb >> (NN_SHIFT(pos))) & NN_MASK;
3607 : }
3608 :
3609 : /*
3610 : * put_notnull_info
3611 : * update map
3612 : * pos: map position
3613 : * argno: argument number
3614 : * isnull: indicate NULL or NOT
3615 : */
3616 : static void
3617 876 : put_notnull_info(WindowObject winobj, int64 pos, int argno, bool isnull)
3618 : {
3619 : uint8 *mbp;
3620 : uint8 mb;
3621 : int64 bpos;
3622 876 : uint8 val = isnull ? NN_NULL : NN_NOTNULL;
3623 : int shift;
3624 :
3625 876 : if (!winobj->notnull_info_cacheable[argno])
3626 92 : return;
3627 :
3628 784 : grow_notnull_info(winobj, pos, argno);
3629 784 : bpos = NN_POS_TO_BYTES(pos);
3630 784 : mbp = winobj->notnull_info[argno];
3631 784 : mb = mbp[bpos];
3632 784 : shift = NN_SHIFT(pos);
3633 784 : mb &= ~(NN_MASK << shift); /* clear map */
3634 784 : mb |= (val << shift); /* update map */
3635 784 : mbp[bpos] = mb;
3636 : }
3637 :
3638 : /***********************************************************************
3639 : * API exposed to window functions
3640 : ***********************************************************************/
3641 :
3642 :
3643 : /*
3644 : * WinCheckAndInitializeNullTreatment
3645 : * Check null treatment clause and sets ignore_nulls
3646 : *
3647 : * Window functions should call this to check if they are being called with
3648 : * a null treatment clause when they don't allow it, or to set ignore_nulls.
3649 : */
3650 : void
3651 580715 : WinCheckAndInitializeNullTreatment(WindowObject winobj,
3652 : bool allowNullTreatment,
3653 : FunctionCallInfo fcinfo)
3654 : {
3655 : Assert(WindowObjectIsValid(winobj));
3656 580715 : if (winobj->ignore_nulls != NO_NULLTREATMENT && !allowNullTreatment)
3657 : {
3658 48 : const char *funcname = get_func_name(fcinfo->flinfo->fn_oid);
3659 :
3660 48 : if (!funcname)
3661 0 : elog(ERROR, "could not get function name");
3662 48 : ereport(ERROR,
3663 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3664 : errmsg("function %s does not allow RESPECT/IGNORE NULLS",
3665 : funcname)));
3666 : }
3667 580667 : else if (winobj->ignore_nulls == PARSER_IGNORE_NULLS)
3668 112 : winobj->ignore_nulls = IGNORE_NULLS;
3669 580667 : }
3670 :
3671 : /*
3672 : * WinGetPartitionLocalMemory
3673 : * Get working memory that lives till end of partition processing
3674 : *
3675 : * On first call within a given partition, this allocates and zeroes the
3676 : * requested amount of space. Subsequent calls just return the same chunk.
3677 : *
3678 : * Memory obtained this way is normally used to hold state that should be
3679 : * automatically reset for each new partition. If a window function wants
3680 : * to hold state across the whole query, fcinfo->fn_extra can be used in the
3681 : * usual way for that.
3682 : */
3683 : void *
3684 221280 : WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
3685 : {
3686 : Assert(WindowObjectIsValid(winobj));
3687 221280 : if (winobj->localmem == NULL)
3688 295 : winobj->localmem =
3689 295 : MemoryContextAllocZero(winobj->winstate->partcontext, sz);
3690 221280 : return winobj->localmem;
3691 : }
3692 :
3693 : /*
3694 : * WinGetCurrentPosition
3695 : * Return the current row's position (counting from 0) within the current
3696 : * partition.
3697 : */
3698 : int64
3699 501546 : WinGetCurrentPosition(WindowObject winobj)
3700 : {
3701 : Assert(WindowObjectIsValid(winobj));
3702 501546 : return winobj->winstate->currentpos;
3703 : }
3704 :
3705 : /*
3706 : * WinGetPartitionRowCount
3707 : * Return total number of rows contained in the current partition.
3708 : *
3709 : * Note: this is a relatively expensive operation because it forces the
3710 : * whole partition to be "spooled" into the tuplestore at once. Once
3711 : * executed, however, additional calls within the same partition are cheap.
3712 : */
3713 : int64
3714 208 : WinGetPartitionRowCount(WindowObject winobj)
3715 : {
3716 : Assert(WindowObjectIsValid(winobj));
3717 208 : spool_tuples(winobj->winstate, -1);
3718 208 : return winobj->winstate->spooled_rows;
3719 : }
3720 :
3721 : /*
3722 : * WinSetMarkPosition
3723 : * Set the "mark" position for the window object, which is the oldest row
3724 : * number (counting from 0) it is allowed to fetch during all subsequent
3725 : * operations within the current partition.
3726 : *
3727 : * Window functions do not have to call this, but are encouraged to move the
3728 : * mark forward when possible to keep the tuplestore size down and prevent
3729 : * having to spill rows to disk.
3730 : */
3731 : void
3732 583733 : WinSetMarkPosition(WindowObject winobj, int64 markpos)
3733 : {
3734 : WindowAggState *winstate;
3735 :
3736 : Assert(WindowObjectIsValid(winobj));
3737 583733 : winstate = winobj->winstate;
3738 :
3739 583733 : if (markpos < winobj->markpos)
3740 0 : elog(ERROR, "cannot move WindowObject's mark position backward");
3741 583733 : tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
3742 583733 : if (markpos > winobj->markpos)
3743 : {
3744 579401 : tuplestore_skiptuples(winstate->buffer,
3745 579401 : markpos - winobj->markpos,
3746 : true);
3747 579401 : winobj->markpos = markpos;
3748 : }
3749 583733 : tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
3750 583733 : if (markpos > winobj->seekpos)
3751 : {
3752 308334 : tuplestore_skiptuples(winstate->buffer,
3753 308334 : markpos - winobj->seekpos,
3754 : true);
3755 308334 : winobj->seekpos = markpos;
3756 : }
3757 583733 : }
3758 :
3759 : /*
3760 : * WinRowsArePeers
3761 : * Compare two rows (specified by absolute position in partition) to see
3762 : * if they are equal according to the ORDER BY clause.
3763 : *
3764 : * NB: this does not consider the window frame mode.
3765 : */
3766 : bool
3767 110375 : WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
3768 : {
3769 : WindowAggState *winstate;
3770 : WindowAgg *node;
3771 : TupleTableSlot *slot1;
3772 : TupleTableSlot *slot2;
3773 : bool res;
3774 :
3775 : Assert(WindowObjectIsValid(winobj));
3776 110375 : winstate = winobj->winstate;
3777 110375 : node = (WindowAgg *) winstate->ss.ps.plan;
3778 :
3779 : /* If no ORDER BY, all rows are peers; don't bother to fetch them */
3780 110375 : if (node->ordNumCols == 0)
3781 180 : return true;
3782 :
3783 : /*
3784 : * Note: OK to use temp_slot_2 here because we aren't calling any
3785 : * frame-related functions (those tend to clobber temp_slot_2).
3786 : */
3787 110195 : slot1 = winstate->temp_slot_1;
3788 110195 : slot2 = winstate->temp_slot_2;
3789 :
3790 110195 : if (!window_gettupleslot(winobj, pos1, slot1))
3791 0 : elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3792 : pos1);
3793 110195 : if (!window_gettupleslot(winobj, pos2, slot2))
3794 0 : elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3795 : pos2);
3796 :
3797 110195 : res = are_peers(winstate, slot1, slot2);
3798 :
3799 110195 : ExecClearTuple(slot1);
3800 110195 : ExecClearTuple(slot2);
3801 :
3802 110195 : return res;
3803 : }
3804 :
3805 : /*
3806 : * WinGetFuncArgInPartition
3807 : * Evaluate a window function's argument expression on a specified
3808 : * row of the partition. The row is identified in lseek(2) style,
3809 : * i.e. relative to the current, first, or last row.
3810 : *
3811 : * argno: argument number to evaluate (counted from 0)
3812 : * relpos: signed rowcount offset from the seek position
3813 : * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
3814 : * set_mark: If the row is found and set_mark is true, the mark is moved to
3815 : * the row as a side-effect.
3816 : * isnull: output argument, receives isnull status of result
3817 : * isout: output argument, set to indicate whether target row position
3818 : * is out of partition (can pass NULL if caller doesn't care about this)
3819 : *
3820 : * Specifying a nonexistent row is not an error, it just causes a null result
3821 : * (plus setting *isout true, if isout isn't NULL).
3822 : */
3823 : Datum
3824 158060 : WinGetFuncArgInPartition(WindowObject winobj, int argno,
3825 : int relpos, int seektype, bool set_mark,
3826 : bool *isnull, bool *isout)
3827 : {
3828 : WindowAggState *winstate;
3829 : int64 abs_pos;
3830 : int64 mark_pos;
3831 : Datum datum;
3832 : bool null_treatment;
3833 : int notnull_offset;
3834 : int notnull_relpos;
3835 : int forward;
3836 : bool myisout;
3837 : bool got_datum;
3838 :
3839 : Assert(WindowObjectIsValid(winobj));
3840 158060 : winstate = winobj->winstate;
3841 :
3842 158060 : null_treatment = (winobj->ignore_nulls == IGNORE_NULLS && relpos != 0);
3843 :
3844 158060 : switch (seektype)
3845 : {
3846 158060 : case WINDOW_SEEK_CURRENT:
3847 158060 : if (null_treatment)
3848 340 : abs_pos = winstate->currentpos;
3849 : else
3850 157720 : abs_pos = winstate->currentpos + relpos;
3851 158060 : break;
3852 0 : case WINDOW_SEEK_HEAD:
3853 0 : if (null_treatment)
3854 0 : abs_pos = 0;
3855 : else
3856 0 : abs_pos = relpos;
3857 0 : break;
3858 0 : case WINDOW_SEEK_TAIL:
3859 0 : spool_tuples(winstate, -1);
3860 0 : abs_pos = winstate->spooled_rows - 1 + relpos;
3861 0 : break;
3862 0 : default:
3863 0 : elog(ERROR, "unrecognized window seek type: %d", seektype);
3864 : abs_pos = 0; /* keep compiler quiet */
3865 : break;
3866 : }
3867 :
3868 : /* Easy case if IGNORE NULLS is not specified */
3869 158060 : if (!null_treatment)
3870 : {
3871 : /* get tuple and evaluate in partition */
3872 157720 : datum = gettuple_eval_partition(winobj, argno,
3873 : abs_pos, isnull, &myisout);
3874 157720 : if (!myisout && set_mark)
3875 157360 : WinSetMarkPosition(winobj, abs_pos);
3876 157720 : if (isout)
3877 157720 : *isout = myisout;
3878 157720 : return datum;
3879 : }
3880 :
3881 : /* Prepare for loop */
3882 340 : notnull_offset = 0;
3883 340 : notnull_relpos = abs(relpos);
3884 340 : forward = relpos > 0 ? 1 : -1;
3885 340 : myisout = false;
3886 340 : got_datum = false;
3887 340 : datum = 0;
3888 :
3889 : /*
3890 : * IGNORE NULLS + WINDOW_SEEK_CURRENT + relpos > 0 case, we would fetch
3891 : * beyond the current row + relpos to find out the target row. If we mark
3892 : * at abs_pos, next call to WinGetFuncArgInPartition or
3893 : * WinGetFuncArgInFrame (in case when a window function have multiple
3894 : * args) could fail with "cannot fetch row before WindowObject's mark
3895 : * position". So keep the mark position at currentpos.
3896 : */
3897 340 : if (seektype == WINDOW_SEEK_CURRENT && relpos > 0)
3898 180 : mark_pos = winstate->currentpos;
3899 : else
3900 : {
3901 : /*
3902 : * For other cases we have no idea what position of row callers would
3903 : * fetch next time. Also for relpos < 0 case (we go backward), we
3904 : * cannot set mark either. For those cases we always set mark at 0.
3905 : */
3906 160 : mark_pos = 0;
3907 : }
3908 :
3909 : /*
3910 : * Get the next nonnull value in the partition, moving forward or backward
3911 : * until we find a value or reach the partition's end. We cache the
3912 : * nullness status because we may repeat this process many times.
3913 : */
3914 : do
3915 : {
3916 : int nn_info; /* NOT NULL status */
3917 :
3918 504 : abs_pos += forward;
3919 504 : if (abs_pos < 0) /* clearly out of partition */
3920 52 : break;
3921 :
3922 : /* check NOT NULL cached info */
3923 452 : nn_info = get_notnull_info(winobj, abs_pos, argno);
3924 452 : if (nn_info == NN_NOTNULL) /* this row is known to be NOT NULL */
3925 60 : notnull_offset++;
3926 392 : else if (nn_info == NN_NULL) /* this row is known to be NULL */
3927 36 : continue; /* keep on moving forward or backward */
3928 : else /* need to check NULL or not */
3929 : {
3930 : /*
3931 : * NOT NULL info does not exist yet. Get tuple and evaluate func
3932 : * arg in partition. Keep the return value in case this row is the
3933 : * target; re-evaluating a volatile argument could give a
3934 : * different nullness status.
3935 : */
3936 356 : datum = gettuple_eval_partition(winobj, argno,
3937 : abs_pos, isnull, &myisout);
3938 356 : if (myisout) /* out of partition? */
3939 48 : break;
3940 308 : if (!*isnull)
3941 : {
3942 180 : notnull_offset++;
3943 180 : if (notnull_offset >= notnull_relpos)
3944 180 : got_datum = true;
3945 : }
3946 : /* record the row status */
3947 308 : put_notnull_info(winobj, abs_pos, argno, *isnull);
3948 : }
3949 404 : } while (notnull_offset < notnull_relpos);
3950 :
3951 : /* get tuple and evaluate func arg in partition */
3952 340 : if (!got_datum)
3953 160 : datum = gettuple_eval_partition(winobj, argno,
3954 : abs_pos, isnull, &myisout);
3955 340 : if (!myisout && set_mark)
3956 240 : WinSetMarkPosition(winobj, mark_pos);
3957 340 : if (isout)
3958 340 : *isout = myisout;
3959 :
3960 340 : return datum;
3961 : }
3962 :
3963 : /*
3964 : * WinGetFuncArgInFrame
3965 : * Evaluate a window function's argument expression on a specified
3966 : * row of the window frame. The row is identified in lseek(2) style,
3967 : * i.e. relative to the first or last row of the frame. (We do not
3968 : * support WINDOW_SEEK_CURRENT here, because it's not very clear what
3969 : * that should mean if the current row isn't part of the frame.)
3970 : *
3971 : * argno: argument number to evaluate (counted from 0)
3972 : * relpos: signed rowcount offset from the seek position
3973 : * seektype: WINDOW_SEEK_HEAD or WINDOW_SEEK_TAIL
3974 : * set_mark: If the row is found/in frame and set_mark is true, the mark is
3975 : * moved to the row as a side-effect.
3976 : * isnull: output argument, receives isnull status of result
3977 : * isout: output argument, set to indicate whether target row position
3978 : * is out of frame (can pass NULL if caller doesn't care about this)
3979 : *
3980 : * Specifying a nonexistent or not-in-frame row is not an error, it just
3981 : * causes a null result (plus setting *isout true, if isout isn't NULL).
3982 : *
3983 : * Note that some exclusion-clause options lead to situations where the
3984 : * rows that are in-frame are not consecutive in the partition. But we
3985 : * count only in-frame rows when measuring relpos.
3986 : *
3987 : * The set_mark flag is interpreted as meaning that the caller will specify
3988 : * a constant (or, perhaps, monotonically increasing) relpos in successive
3989 : * calls, so that *if there is no exclusion clause* there will be no need
3990 : * to fetch a row before the previously fetched row. But we do not expect
3991 : * the caller to know how to account for exclusion clauses. Therefore,
3992 : * if there is an exclusion clause we take responsibility for adjusting the
3993 : * mark request to something that will be safe given the above assumption
3994 : * about relpos.
3995 : */
3996 : Datum
3997 6712 : WinGetFuncArgInFrame(WindowObject winobj, int argno,
3998 : int relpos, int seektype, bool set_mark,
3999 : bool *isnull, bool *isout)
4000 : {
4001 : WindowAggState *winstate;
4002 : ExprContext *econtext;
4003 : TupleTableSlot *slot;
4004 : int64 abs_pos;
4005 : int64 mark_pos;
4006 :
4007 : Assert(WindowObjectIsValid(winobj));
4008 6712 : winstate = winobj->winstate;
4009 6712 : econtext = winstate->ss.ps.ps_ExprContext;
4010 6712 : slot = winstate->temp_slot_1;
4011 :
4012 6712 : if (winobj->ignore_nulls == IGNORE_NULLS)
4013 680 : return ignorenulls_getfuncarginframe(winobj, argno, relpos, seektype,
4014 : set_mark, isnull, isout);
4015 :
4016 6032 : switch (seektype)
4017 : {
4018 0 : case WINDOW_SEEK_CURRENT:
4019 0 : elog(ERROR, "WINDOW_SEEK_CURRENT is not supported for WinGetFuncArgInFrame");
4020 : abs_pos = mark_pos = 0; /* keep compiler quiet */
4021 : break;
4022 2964 : case WINDOW_SEEK_HEAD:
4023 : /* rejecting relpos < 0 is easy and simplifies code below */
4024 2964 : if (relpos < 0)
4025 0 : goto out_of_frame;
4026 2964 : update_frameheadpos(winstate);
4027 2936 : abs_pos = winstate->frameheadpos + relpos;
4028 2936 : mark_pos = abs_pos;
4029 :
4030 : /*
4031 : * Account for exclusion option if one is active, but advance only
4032 : * abs_pos not mark_pos. This prevents changes of the current
4033 : * row's peer group from resulting in trying to fetch a row before
4034 : * some previous mark position.
4035 : *
4036 : * Note that in some corner cases such as current row being
4037 : * outside frame, these calculations are theoretically too simple,
4038 : * but it doesn't matter because we'll end up deciding the row is
4039 : * out of frame. We do not attempt to avoid fetching rows past
4040 : * end of frame; that would happen in some cases anyway.
4041 : */
4042 2936 : switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
4043 : {
4044 2496 : case 0:
4045 : /* no adjustment needed */
4046 2496 : break;
4047 160 : case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
4048 160 : if (abs_pos >= winstate->currentpos &&
4049 124 : winstate->currentpos >= winstate->frameheadpos)
4050 44 : abs_pos++;
4051 160 : break;
4052 80 : case FRAMEOPTION_EXCLUDE_GROUP:
4053 80 : update_grouptailpos(winstate);
4054 80 : if (abs_pos >= winstate->groupheadpos &&
4055 48 : winstate->grouptailpos > winstate->frameheadpos)
4056 : {
4057 48 : int64 overlapstart = Max(winstate->groupheadpos,
4058 : winstate->frameheadpos);
4059 :
4060 48 : abs_pos += winstate->grouptailpos - overlapstart;
4061 : }
4062 80 : break;
4063 200 : case FRAMEOPTION_EXCLUDE_TIES:
4064 200 : update_grouptailpos(winstate);
4065 200 : if (abs_pos >= winstate->groupheadpos &&
4066 136 : winstate->grouptailpos > winstate->frameheadpos)
4067 : {
4068 56 : int64 overlapstart = Max(winstate->groupheadpos,
4069 : winstate->frameheadpos);
4070 :
4071 56 : if (abs_pos == overlapstart)
4072 56 : abs_pos = winstate->currentpos;
4073 : else
4074 0 : abs_pos += winstate->grouptailpos - overlapstart - 1;
4075 : }
4076 200 : break;
4077 0 : default:
4078 0 : elog(ERROR, "unrecognized frame option state: 0x%x",
4079 : winstate->frameOptions);
4080 : break;
4081 : }
4082 2936 : break;
4083 3068 : case WINDOW_SEEK_TAIL:
4084 : /* rejecting relpos > 0 is easy and simplifies code below */
4085 3068 : if (relpos > 0)
4086 0 : goto out_of_frame;
4087 3068 : update_frametailpos(winstate);
4088 3064 : abs_pos = winstate->frametailpos - 1 + relpos;
4089 :
4090 : /*
4091 : * Account for exclusion option if one is active. If there is no
4092 : * exclusion, we can safely set the mark at the accessed row. But
4093 : * if there is, we can only mark the frame start, because we can't
4094 : * be sure how far back in the frame the exclusion might cause us
4095 : * to fetch in future. Furthermore, we have to actually check
4096 : * against frameheadpos here, since it's unsafe to try to fetch a
4097 : * row before frame start if the mark might be there already.
4098 : */
4099 3064 : switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
4100 : {
4101 2704 : case 0:
4102 : /* no adjustment needed */
4103 2704 : mark_pos = abs_pos;
4104 2704 : break;
4105 120 : case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
4106 120 : if (abs_pos <= winstate->currentpos &&
4107 12 : winstate->currentpos < winstate->frametailpos)
4108 12 : abs_pos--;
4109 120 : update_frameheadpos(winstate);
4110 120 : if (abs_pos < winstate->frameheadpos)
4111 8 : goto out_of_frame;
4112 112 : mark_pos = winstate->frameheadpos;
4113 112 : break;
4114 160 : case FRAMEOPTION_EXCLUDE_GROUP:
4115 160 : update_grouptailpos(winstate);
4116 160 : if (abs_pos < winstate->grouptailpos &&
4117 36 : winstate->groupheadpos < winstate->frametailpos)
4118 : {
4119 36 : int64 overlapend = Min(winstate->grouptailpos,
4120 : winstate->frametailpos);
4121 :
4122 36 : abs_pos -= overlapend - winstate->groupheadpos;
4123 : }
4124 160 : update_frameheadpos(winstate);
4125 160 : if (abs_pos < winstate->frameheadpos)
4126 36 : goto out_of_frame;
4127 124 : mark_pos = winstate->frameheadpos;
4128 124 : break;
4129 80 : case FRAMEOPTION_EXCLUDE_TIES:
4130 80 : update_grouptailpos(winstate);
4131 80 : if (abs_pos < winstate->grouptailpos &&
4132 24 : winstate->groupheadpos < winstate->frametailpos)
4133 : {
4134 24 : int64 overlapend = Min(winstate->grouptailpos,
4135 : winstate->frametailpos);
4136 :
4137 24 : if (abs_pos == overlapend - 1)
4138 24 : abs_pos = winstate->currentpos;
4139 : else
4140 0 : abs_pos -= overlapend - 1 - winstate->groupheadpos;
4141 : }
4142 80 : update_frameheadpos(winstate);
4143 80 : if (abs_pos < winstate->frameheadpos)
4144 0 : goto out_of_frame;
4145 80 : mark_pos = winstate->frameheadpos;
4146 80 : break;
4147 0 : default:
4148 0 : elog(ERROR, "unrecognized frame option state: 0x%x",
4149 : winstate->frameOptions);
4150 : mark_pos = 0; /* keep compiler quiet */
4151 : break;
4152 : }
4153 3020 : break;
4154 0 : default:
4155 0 : elog(ERROR, "unrecognized window seek type: %d", seektype);
4156 : abs_pos = mark_pos = 0; /* keep compiler quiet */
4157 : break;
4158 : }
4159 :
4160 5956 : if (!window_gettupleslot(winobj, abs_pos, slot))
4161 264 : goto out_of_frame;
4162 :
4163 : /* The code above does not detect all out-of-frame cases, so check */
4164 5692 : if (row_is_in_frame(winobj, abs_pos, slot, false) <= 0)
4165 200 : goto out_of_frame;
4166 :
4167 5472 : if (isout)
4168 0 : *isout = false;
4169 5472 : if (set_mark)
4170 5444 : WinSetMarkPosition(winobj, mark_pos);
4171 5472 : econtext->ecxt_outertuple = slot;
4172 5472 : return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
4173 : econtext, isnull);
4174 :
4175 508 : out_of_frame:
4176 508 : if (isout)
4177 0 : *isout = true;
4178 508 : *isnull = true;
4179 508 : return (Datum) 0;
4180 : }
4181 :
4182 : /*
4183 : * WinGetFuncArgCurrent
4184 : * Evaluate a window function's argument expression on the current row.
4185 : *
4186 : * argno: argument number to evaluate (counted from 0)
4187 : * isnull: output argument, receives isnull status of result
4188 : *
4189 : * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
4190 : * WinGetFuncArgInFrame targeting the current row, because it will succeed
4191 : * even if the WindowObject's mark has been set beyond the current row.
4192 : * This should generally be used for "ordinary" arguments of a window
4193 : * function, such as the offset argument of lead() or lag().
4194 : */
4195 : Datum
4196 1360 : WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
4197 : {
4198 : WindowAggState *winstate;
4199 : ExprContext *econtext;
4200 :
4201 : Assert(WindowObjectIsValid(winobj));
4202 1360 : winstate = winobj->winstate;
4203 :
4204 1360 : econtext = winstate->ss.ps.ps_ExprContext;
4205 :
4206 1360 : econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
4207 1360 : return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
4208 : econtext, isnull);
4209 : }
|