Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeWindowAgg.c
4 : * routines to handle WindowAgg nodes.
5 : *
6 : * A WindowAgg node evaluates "window functions" across suitable partitions
7 : * of the input tuple set. Any one WindowAgg works for just a single window
8 : * specification, though it can evaluate multiple window functions sharing
9 : * identical window specifications. The input tuples are required to be
10 : * delivered in sorted order, with the PARTITION BY columns (if any) as
11 : * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12 : * (The planner generates a stack of WindowAggs with intervening Sort nodes
13 : * as needed, if a query involves more than one window specification.)
14 : *
15 : * Since window functions can require access to any or all of the rows in
16 : * the current partition, we accumulate rows of the partition into a
17 : * tuplestore. The window functions are called using the WindowObject API
18 : * so that they can access those rows as needed.
19 : *
20 : * We also support using plain aggregate functions as window functions.
21 : * For these, the regular Agg-node environment is emulated for each partition.
22 : * As required by the SQL spec, the output represents the value of the
23 : * aggregate function over all rows in the current row's window frame.
24 : *
25 : *
26 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
27 : * Portions Copyright (c) 1994, Regents of the University of California
28 : *
29 : * IDENTIFICATION
30 : * src/backend/executor/nodeWindowAgg.c
31 : *
32 : *-------------------------------------------------------------------------
33 : */
34 : #include "postgres.h"
35 :
36 : #include "access/htup_details.h"
37 : #include "catalog/objectaccess.h"
38 : #include "catalog/pg_aggregate.h"
39 : #include "catalog/pg_proc.h"
40 : #include "executor/executor.h"
41 : #include "executor/instrument.h"
42 : #include "executor/nodeWindowAgg.h"
43 : #include "miscadmin.h"
44 : #include "nodes/nodeFuncs.h"
45 : #include "optimizer/clauses.h"
46 : #include "optimizer/optimizer.h"
47 : #include "parser/parse_agg.h"
48 : #include "parser/parse_coerce.h"
49 : #include "utils/acl.h"
50 : #include "utils/builtins.h"
51 : #include "utils/datum.h"
52 : #include "utils/expandeddatum.h"
53 : #include "utils/lsyscache.h"
54 : #include "utils/memutils.h"
55 : #include "utils/regproc.h"
56 : #include "utils/syscache.h"
57 : #include "utils/tuplestore.h"
58 : #include "windowapi.h"
59 :
60 : /*
61 : * All the window function APIs are called with this object, which is passed
62 : * to window functions as fcinfo->context.
63 : */
64 : typedef struct WindowObjectData
65 : {
66 : NodeTag type;
67 : WindowAggState *winstate; /* parent WindowAggState */
68 : List *argstates; /* ExprState trees for fn's arguments */
69 : void *localmem; /* WinGetPartitionLocalMemory's chunk */
70 : int markptr; /* tuplestore mark pointer for this fn */
71 : int readptr; /* tuplestore read pointer for this fn */
72 : int64 markpos; /* row that markptr is positioned on */
73 : int64 seekpos; /* row that readptr is positioned on */
74 : uint8 **notnull_info; /* not null info for each func args */
75 : int64 *num_notnull_info; /* track size (number of tuples in
76 : * partition) of the notnull_info array
77 : * for each func args */
78 :
79 : /*
80 : * Null treatment options. One of: NO_NULLTREATMENT, PARSER_IGNORE_NULLS,
81 : * PARSER_RESPECT_NULLS or IGNORE_NULLS.
82 : */
83 : int ignore_nulls;
84 : } WindowObjectData;
85 :
86 : /*
87 : * We have one WindowStatePerFunc struct for each window function and
88 : * window aggregate handled by this node.
89 : */
90 : typedef struct WindowStatePerFuncData
91 : {
92 : /* Links to WindowFunc expr and state nodes this working state is for */
93 : WindowFuncExprState *wfuncstate;
94 : WindowFunc *wfunc;
95 :
96 : int numArguments; /* number of arguments */
97 :
98 : FmgrInfo flinfo; /* fmgr lookup data for window function */
99 :
100 : Oid winCollation; /* collation derived for window function */
101 :
102 : /*
103 : * We need the len and byval info for the result of each function in order
104 : * to know how to copy/delete values.
105 : */
106 : int16 resulttypeLen;
107 : bool resulttypeByVal;
108 :
109 : bool plain_agg; /* is it just a plain aggregate function? */
110 : int aggno; /* if so, index of its WindowStatePerAggData */
111 : uint8 ignore_nulls; /* ignore nulls */
112 :
113 : WindowObject winobj; /* object used in window function API */
114 : } WindowStatePerFuncData;
115 :
116 : /*
117 : * For plain aggregate window functions, we also have one of these.
118 : */
119 : typedef struct WindowStatePerAggData
120 : {
121 : /* Oids of transition functions */
122 : Oid transfn_oid;
123 : Oid invtransfn_oid; /* may be InvalidOid */
124 : Oid finalfn_oid; /* may be InvalidOid */
125 :
126 : /*
127 : * fmgr lookup data for transition functions --- only valid when
128 : * corresponding oid is not InvalidOid. Note in particular that fn_strict
129 : * flags are kept here.
130 : */
131 : FmgrInfo transfn;
132 : FmgrInfo invtransfn;
133 : FmgrInfo finalfn;
134 :
135 : int numFinalArgs; /* number of arguments to pass to finalfn */
136 :
137 : /*
138 : * initial value from pg_aggregate entry
139 : */
140 : Datum initValue;
141 : bool initValueIsNull;
142 :
143 : /*
144 : * cached value for current frame boundaries
145 : */
146 : Datum resultValue;
147 : bool resultValueIsNull;
148 :
149 : /*
150 : * We need the len and byval info for the agg's input, result, and
151 : * transition data types in order to know how to copy/delete values.
152 : */
153 : int16 inputtypeLen,
154 : resulttypeLen,
155 : transtypeLen;
156 : bool inputtypeByVal,
157 : resulttypeByVal,
158 : transtypeByVal;
159 :
160 : int wfuncno; /* index of associated WindowStatePerFuncData */
161 :
162 : /* Context holding transition value and possibly other subsidiary data */
163 : MemoryContext aggcontext; /* may be private, or winstate->aggcontext */
164 :
165 : /* Current transition value */
166 : Datum transValue; /* current transition value */
167 : bool transValueIsNull;
168 :
169 : int64 transValueCount; /* number of currently-aggregated rows */
170 :
171 : /* Data local to eval_windowaggregates() */
172 : bool restart; /* need to restart this agg in this cycle? */
173 : } WindowStatePerAggData;
174 :
175 : static void initialize_windowaggregate(WindowAggState *winstate,
176 : WindowStatePerFunc perfuncstate,
177 : WindowStatePerAgg peraggstate);
178 : static void advance_windowaggregate(WindowAggState *winstate,
179 : WindowStatePerFunc perfuncstate,
180 : WindowStatePerAgg peraggstate);
181 : static bool advance_windowaggregate_base(WindowAggState *winstate,
182 : WindowStatePerFunc perfuncstate,
183 : WindowStatePerAgg peraggstate);
184 : static void finalize_windowaggregate(WindowAggState *winstate,
185 : WindowStatePerFunc perfuncstate,
186 : WindowStatePerAgg peraggstate,
187 : Datum *result, bool *isnull);
188 :
189 : static void eval_windowaggregates(WindowAggState *winstate);
190 : static void eval_windowfunction(WindowAggState *winstate,
191 : WindowStatePerFunc perfuncstate,
192 : Datum *result, bool *isnull);
193 :
194 : static void begin_partition(WindowAggState *winstate);
195 : static void spool_tuples(WindowAggState *winstate, int64 pos);
196 : static void release_partition(WindowAggState *winstate);
197 :
198 : static int row_is_in_frame(WindowObject winobj, int64 pos,
199 : TupleTableSlot *slot, bool fetch_tuple);
200 : static void update_frameheadpos(WindowAggState *winstate);
201 : static void update_frametailpos(WindowAggState *winstate);
202 : static void update_grouptailpos(WindowAggState *winstate);
203 :
204 : static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
205 : WindowFunc *wfunc,
206 : WindowStatePerAgg peraggstate);
207 : static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
208 :
209 : static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
210 : TupleTableSlot *slot2);
211 : static bool window_gettupleslot(WindowObject winobj, int64 pos,
212 : TupleTableSlot *slot);
213 :
214 : static Datum ignorenulls_getfuncarginframe(WindowObject winobj, int argno,
215 : int relpos, int seektype,
216 : bool set_mark, bool *isnull,
217 : bool *isout);
218 : static Datum gettuple_eval_partition(WindowObject winobj, int argno,
219 : int64 abs_pos, bool *isnull,
220 : bool *isout);
221 : static void init_notnull_info(WindowObject winobj,
222 : WindowStatePerFunc perfuncstate);
223 : static void grow_notnull_info(WindowObject winobj,
224 : int64 pos, int argno);
225 : static uint8 get_notnull_info(WindowObject winobj,
226 : int64 pos, int argno);
227 : static void put_notnull_info(WindowObject winobj,
228 : int64 pos, int argno, bool isnull);
229 :
230 : /*
231 : * Not null info bit array consists of 2-bit items
232 : */
233 : #define NN_UNKNOWN 0x00 /* value not calculated yet */
234 : #define NN_NULL 0x01 /* NULL */
235 : #define NN_NOTNULL 0x02 /* NOT NULL */
236 : #define NN_MASK 0x03 /* mask for NOT NULL MAP */
237 : #define NN_BITS_PER_MEMBER 2 /* number of bits in not null map */
238 : /* number of items per variable */
239 : #define NN_ITEM_PER_VAR (BITS_PER_BYTE / NN_BITS_PER_MEMBER)
240 : /* convert map position to byte offset */
241 : #define NN_POS_TO_BYTES(pos) ((pos) / NN_ITEM_PER_VAR)
242 : /* bytes offset to map position */
243 : #define NN_BYTES_TO_POS(bytes) ((bytes) * NN_ITEM_PER_VAR)
244 : /* calculate shift bits */
245 : #define NN_SHIFT(pos) ((pos) % NN_ITEM_PER_VAR) * NN_BITS_PER_MEMBER
246 :
247 : /*
248 : * initialize_windowaggregate
249 : * parallel to initialize_aggregates in nodeAgg.c
250 : */
251 : static void
252 2702 : initialize_windowaggregate(WindowAggState *winstate,
253 : WindowStatePerFunc perfuncstate,
254 : WindowStatePerAgg peraggstate)
255 : {
256 : MemoryContext oldContext;
257 :
258 : /*
259 : * If we're using a private aggcontext, we may reset it here. But if the
260 : * context is shared, we don't know which other aggregates may still need
261 : * it, so we must leave it to the caller to reset at an appropriate time.
262 : */
263 2702 : if (peraggstate->aggcontext != winstate->aggcontext)
264 1939 : MemoryContextReset(peraggstate->aggcontext);
265 :
266 2702 : if (peraggstate->initValueIsNull)
267 1040 : peraggstate->transValue = peraggstate->initValue;
268 : else
269 : {
270 1662 : oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
271 3324 : peraggstate->transValue = datumCopy(peraggstate->initValue,
272 1662 : peraggstate->transtypeByVal,
273 1662 : peraggstate->transtypeLen);
274 1662 : MemoryContextSwitchTo(oldContext);
275 : }
276 2702 : peraggstate->transValueIsNull = peraggstate->initValueIsNull;
277 2702 : peraggstate->transValueCount = 0;
278 2702 : peraggstate->resultValue = (Datum) 0;
279 2702 : peraggstate->resultValueIsNull = true;
280 2702 : }
281 :
282 : /*
283 : * advance_windowaggregate
284 : * parallel to advance_aggregates in nodeAgg.c
285 : */
286 : static void
287 118275 : advance_windowaggregate(WindowAggState *winstate,
288 : WindowStatePerFunc perfuncstate,
289 : WindowStatePerAgg peraggstate)
290 : {
291 118275 : LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
292 118275 : WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
293 118275 : int numArguments = perfuncstate->numArguments;
294 : Datum newVal;
295 : ListCell *arg;
296 : int i;
297 : MemoryContext oldContext;
298 118275 : ExprContext *econtext = winstate->tmpcontext;
299 118275 : ExprState *filter = wfuncstate->aggfilter;
300 :
301 118275 : oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
302 :
303 : /* Skip anything FILTERed out */
304 118275 : if (filter)
305 : {
306 : bool isnull;
307 228 : Datum res = ExecEvalExpr(filter, econtext, &isnull);
308 :
309 228 : if (isnull || !DatumGetBool(res))
310 : {
311 108 : MemoryContextSwitchTo(oldContext);
312 108 : return;
313 : }
314 : }
315 :
316 : /* We start from 1, since the 0th arg will be the transition value */
317 118167 : i = 1;
318 196114 : foreach(arg, wfuncstate->args)
319 : {
320 77947 : ExprState *argstate = (ExprState *) lfirst(arg);
321 :
322 77947 : fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
323 : &fcinfo->args[i].isnull);
324 77947 : i++;
325 : }
326 :
327 118167 : if (peraggstate->transfn.fn_strict)
328 : {
329 : /*
330 : * For a strict transfn, nothing happens when there's a NULL input; we
331 : * just keep the prior transValue. Note transValueCount doesn't
332 : * change either.
333 : */
334 67910 : for (i = 1; i <= numArguments; i++)
335 : {
336 13847 : if (fcinfo->args[i].isnull)
337 : {
338 132 : MemoryContextSwitchTo(oldContext);
339 132 : return;
340 : }
341 : }
342 :
343 : /*
344 : * For strict transition functions with initial value NULL we use the
345 : * first non-NULL input as the initial state. (We already checked
346 : * that the agg's input type is binary-compatible with its transtype,
347 : * so straight copy here is OK.)
348 : *
349 : * We must copy the datum into aggcontext if it is pass-by-ref. We do
350 : * not need to pfree the old transValue, since it's NULL.
351 : */
352 54063 : if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
353 : {
354 300 : MemoryContextSwitchTo(peraggstate->aggcontext);
355 600 : peraggstate->transValue = datumCopy(fcinfo->args[1].value,
356 300 : peraggstate->transtypeByVal,
357 300 : peraggstate->transtypeLen);
358 300 : peraggstate->transValueIsNull = false;
359 300 : peraggstate->transValueCount = 1;
360 300 : MemoryContextSwitchTo(oldContext);
361 300 : return;
362 : }
363 :
364 53763 : if (peraggstate->transValueIsNull)
365 : {
366 : /*
367 : * Don't call a strict function with NULL inputs. Note it is
368 : * possible to get here despite the above tests, if the transfn is
369 : * strict *and* returned a NULL on a prior cycle. If that happens
370 : * we will propagate the NULL all the way to the end. That can
371 : * only happen if there's no inverse transition function, though,
372 : * since we disallow transitions back to NULL when there is one.
373 : */
374 0 : MemoryContextSwitchTo(oldContext);
375 : Assert(!OidIsValid(peraggstate->invtransfn_oid));
376 0 : return;
377 : }
378 : }
379 :
380 : /*
381 : * OK to call the transition function. Set winstate->curaggcontext while
382 : * calling it, for possible use by AggCheckCallContext.
383 : */
384 117735 : InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
385 : numArguments + 1,
386 : perfuncstate->winCollation,
387 : (Node *) winstate, NULL);
388 117735 : fcinfo->args[0].value = peraggstate->transValue;
389 117735 : fcinfo->args[0].isnull = peraggstate->transValueIsNull;
390 117735 : winstate->curaggcontext = peraggstate->aggcontext;
391 117735 : newVal = FunctionCallInvoke(fcinfo);
392 117727 : winstate->curaggcontext = NULL;
393 :
394 : /*
395 : * Moving-aggregate transition functions must not return null, see
396 : * advance_windowaggregate_base().
397 : */
398 117727 : if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
399 0 : ereport(ERROR,
400 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
401 : errmsg("moving-aggregate transition function must not return null")));
402 :
403 : /*
404 : * We must track the number of rows included in transValue, since to
405 : * remove the last input, advance_windowaggregate_base() mustn't call the
406 : * inverse transition function, but simply reset transValue back to its
407 : * initial value.
408 : */
409 117727 : peraggstate->transValueCount++;
410 :
411 : /*
412 : * If pass-by-ref datatype, must copy the new value into aggcontext and
413 : * free the prior transValue. But if transfn returned a pointer to its
414 : * first input, we don't need to do anything. Also, if transfn returned a
415 : * pointer to a R/W expanded object that is already a child of the
416 : * aggcontext, assume we can adopt that value without copying it. (See
417 : * comments for ExecAggCopyTransValue, which this code duplicates.)
418 : */
419 123299 : if (!peraggstate->transtypeByVal &&
420 5572 : DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
421 : {
422 640 : if (!fcinfo->isnull)
423 : {
424 640 : MemoryContextSwitchTo(peraggstate->aggcontext);
425 640 : if (DatumIsReadWriteExpandedObject(newVal,
426 : false,
427 644 : peraggstate->transtypeLen) &&
428 4 : MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
429 : /* do nothing */ ;
430 : else
431 636 : newVal = datumCopy(newVal,
432 636 : peraggstate->transtypeByVal,
433 636 : peraggstate->transtypeLen);
434 : }
435 640 : if (!peraggstate->transValueIsNull)
436 : {
437 600 : if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
438 : false,
439 : peraggstate->transtypeLen))
440 0 : DeleteExpandedObject(peraggstate->transValue);
441 : else
442 600 : pfree(DatumGetPointer(peraggstate->transValue));
443 : }
444 : }
445 :
446 117727 : MemoryContextSwitchTo(oldContext);
447 117727 : peraggstate->transValue = newVal;
448 117727 : peraggstate->transValueIsNull = fcinfo->isnull;
449 : }
450 :
451 : /*
452 : * advance_windowaggregate_base
453 : * Remove the oldest tuple from an aggregation.
454 : *
455 : * This is very much like advance_windowaggregate, except that we will call
456 : * the inverse transition function (which caller must have checked is
457 : * available).
458 : *
459 : * Returns true if we successfully removed the current row from this
460 : * aggregate, false if not (in the latter case, caller is responsible
461 : * for cleaning up by restarting the aggregation).
462 : */
463 : static bool
464 3084 : advance_windowaggregate_base(WindowAggState *winstate,
465 : WindowStatePerFunc perfuncstate,
466 : WindowStatePerAgg peraggstate)
467 : {
468 3084 : LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
469 3084 : WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
470 3084 : int numArguments = perfuncstate->numArguments;
471 : Datum newVal;
472 : ListCell *arg;
473 : int i;
474 : MemoryContext oldContext;
475 3084 : ExprContext *econtext = winstate->tmpcontext;
476 3084 : ExprState *filter = wfuncstate->aggfilter;
477 :
478 3084 : oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
479 :
480 : /* Skip anything FILTERed out */
481 3084 : if (filter)
482 : {
483 : bool isnull;
484 68 : Datum res = ExecEvalExpr(filter, econtext, &isnull);
485 :
486 68 : if (isnull || !DatumGetBool(res))
487 : {
488 32 : MemoryContextSwitchTo(oldContext);
489 32 : return true;
490 : }
491 : }
492 :
493 : /* We start from 1, since the 0th arg will be the transition value */
494 3052 : i = 1;
495 6092 : foreach(arg, wfuncstate->args)
496 : {
497 3040 : ExprState *argstate = (ExprState *) lfirst(arg);
498 :
499 3040 : fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
500 : &fcinfo->args[i].isnull);
501 3040 : i++;
502 : }
503 :
504 3052 : if (peraggstate->invtransfn.fn_strict)
505 : {
506 : /*
507 : * For a strict (inv)transfn, nothing happens when there's a NULL
508 : * input; we just keep the prior transValue. Note transValueCount
509 : * doesn't change either.
510 : */
511 3736 : for (i = 1; i <= numArguments; i++)
512 : {
513 1888 : if (fcinfo->args[i].isnull)
514 : {
515 52 : MemoryContextSwitchTo(oldContext);
516 52 : return true;
517 : }
518 : }
519 : }
520 :
521 : /* There should still be an added but not yet removed value */
522 : Assert(peraggstate->transValueCount > 0);
523 :
524 : /*
525 : * In moving-aggregate mode, the state must never be NULL, except possibly
526 : * before any rows have been aggregated (which is surely not the case at
527 : * this point). This restriction allows us to interpret a NULL result
528 : * from the inverse function as meaning "sorry, can't do an inverse
529 : * transition in this case". We already checked this in
530 : * advance_windowaggregate, but just for safety, check again.
531 : */
532 3000 : if (peraggstate->transValueIsNull)
533 0 : elog(ERROR, "aggregate transition value is NULL before inverse transition");
534 :
535 : /*
536 : * We mustn't use the inverse transition function to remove the last
537 : * input. Doing so would yield a non-NULL state, whereas we should be in
538 : * the initial state afterwards which may very well be NULL. So instead,
539 : * we simply re-initialize the aggregate in this case.
540 : */
541 3000 : if (peraggstate->transValueCount == 1)
542 : {
543 60 : MemoryContextSwitchTo(oldContext);
544 60 : initialize_windowaggregate(winstate,
545 60 : &winstate->perfunc[peraggstate->wfuncno],
546 : peraggstate);
547 60 : return true;
548 : }
549 :
550 : /*
551 : * OK to call the inverse transition function. Set
552 : * winstate->curaggcontext while calling it, for possible use by
553 : * AggCheckCallContext.
554 : */
555 2940 : InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
556 : numArguments + 1,
557 : perfuncstate->winCollation,
558 : (Node *) winstate, NULL);
559 2940 : fcinfo->args[0].value = peraggstate->transValue;
560 2940 : fcinfo->args[0].isnull = peraggstate->transValueIsNull;
561 2940 : winstate->curaggcontext = peraggstate->aggcontext;
562 2940 : newVal = FunctionCallInvoke(fcinfo);
563 2940 : winstate->curaggcontext = NULL;
564 :
565 : /*
566 : * If the function returns NULL, report failure, forcing a restart.
567 : */
568 2940 : if (fcinfo->isnull)
569 : {
570 159 : MemoryContextSwitchTo(oldContext);
571 159 : return false;
572 : }
573 :
574 : /* Update number of rows included in transValue */
575 2781 : peraggstate->transValueCount--;
576 :
577 : /*
578 : * If pass-by-ref datatype, must copy the new value into aggcontext and
579 : * free the prior transValue. But if invtransfn returned a pointer to its
580 : * first input, we don't need to do anything. Also, if invtransfn
581 : * returned a pointer to a R/W expanded object that is already a child of
582 : * the aggcontext, assume we can adopt that value without copying it. (See
583 : * comments for ExecAggCopyTransValue, which this code duplicates.)
584 : *
585 : * Note: the checks for null values here will never fire, but it seems
586 : * best to have this stanza look just like advance_windowaggregate.
587 : */
588 4201 : if (!peraggstate->transtypeByVal &&
589 1420 : DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
590 : {
591 444 : if (!fcinfo->isnull)
592 : {
593 444 : MemoryContextSwitchTo(peraggstate->aggcontext);
594 444 : if (DatumIsReadWriteExpandedObject(newVal,
595 : false,
596 444 : peraggstate->transtypeLen) &&
597 0 : MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
598 : /* do nothing */ ;
599 : else
600 444 : newVal = datumCopy(newVal,
601 444 : peraggstate->transtypeByVal,
602 444 : peraggstate->transtypeLen);
603 : }
604 444 : if (!peraggstate->transValueIsNull)
605 : {
606 444 : if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
607 : false,
608 : peraggstate->transtypeLen))
609 0 : DeleteExpandedObject(peraggstate->transValue);
610 : else
611 444 : pfree(DatumGetPointer(peraggstate->transValue));
612 : }
613 : }
614 :
615 2781 : MemoryContextSwitchTo(oldContext);
616 2781 : peraggstate->transValue = newVal;
617 2781 : peraggstate->transValueIsNull = fcinfo->isnull;
618 :
619 2781 : return true;
620 : }
621 :
622 : /*
623 : * finalize_windowaggregate
624 : * parallel to finalize_aggregate in nodeAgg.c
625 : */
626 : static void
627 7163 : finalize_windowaggregate(WindowAggState *winstate,
628 : WindowStatePerFunc perfuncstate,
629 : WindowStatePerAgg peraggstate,
630 : Datum *result, bool *isnull)
631 : {
632 : MemoryContext oldContext;
633 :
634 7163 : oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
635 :
636 : /*
637 : * Apply the agg's finalfn if one is provided, else return transValue.
638 : */
639 7163 : if (OidIsValid(peraggstate->finalfn_oid))
640 : {
641 3996 : LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
642 3996 : int numFinalArgs = peraggstate->numFinalArgs;
643 : bool anynull;
644 : int i;
645 :
646 3996 : InitFunctionCallInfoData(fcinfodata.fcinfo, &(peraggstate->finalfn),
647 : numFinalArgs,
648 : perfuncstate->winCollation,
649 : (Node *) winstate, NULL);
650 3996 : fcinfo->args[0].value =
651 3996 : MakeExpandedObjectReadOnly(peraggstate->transValue,
652 : peraggstate->transValueIsNull,
653 : peraggstate->transtypeLen);
654 3996 : fcinfo->args[0].isnull = peraggstate->transValueIsNull;
655 3996 : anynull = peraggstate->transValueIsNull;
656 :
657 : /* Fill any remaining argument positions with nulls */
658 4056 : for (i = 1; i < numFinalArgs; i++)
659 : {
660 60 : fcinfo->args[i].value = (Datum) 0;
661 60 : fcinfo->args[i].isnull = true;
662 60 : anynull = true;
663 : }
664 :
665 3996 : if (fcinfo->flinfo->fn_strict && anynull)
666 : {
667 : /* don't call a strict function with NULL inputs */
668 0 : *result = (Datum) 0;
669 0 : *isnull = true;
670 : }
671 : else
672 : {
673 : Datum res;
674 :
675 3996 : winstate->curaggcontext = peraggstate->aggcontext;
676 3996 : res = FunctionCallInvoke(fcinfo);
677 3988 : winstate->curaggcontext = NULL;
678 3988 : *isnull = fcinfo->isnull;
679 3988 : *result = MakeExpandedObjectReadOnly(res,
680 : fcinfo->isnull,
681 : peraggstate->resulttypeLen);
682 : }
683 : }
684 : else
685 : {
686 3167 : *result =
687 3167 : MakeExpandedObjectReadOnly(peraggstate->transValue,
688 : peraggstate->transValueIsNull,
689 : peraggstate->transtypeLen);
690 3167 : *isnull = peraggstate->transValueIsNull;
691 : }
692 :
693 7155 : MemoryContextSwitchTo(oldContext);
694 7155 : }
695 :
696 : /*
697 : * eval_windowaggregates
698 : * evaluate plain aggregates being used as window functions
699 : *
700 : * This differs from nodeAgg.c in two ways. First, if the window's frame
701 : * start position moves, we use the inverse transition function (if it exists)
702 : * to remove rows from the transition value. And second, we expect to be
703 : * able to call aggregate final functions repeatedly after aggregating more
704 : * data onto the same transition value. This is not a behavior required by
705 : * nodeAgg.c.
706 : */
707 : static void
708 106905 : eval_windowaggregates(WindowAggState *winstate)
709 : {
710 : WindowStatePerAgg peraggstate;
711 : int wfuncno,
712 : numaggs,
713 : numaggs_restart,
714 : i;
715 : int64 aggregatedupto_nonrestarted;
716 : MemoryContext oldContext;
717 : ExprContext *econtext;
718 : WindowObject agg_winobj;
719 : TupleTableSlot *agg_row_slot;
720 : TupleTableSlot *temp_slot;
721 :
722 106905 : numaggs = winstate->numaggs;
723 106905 : if (numaggs == 0)
724 0 : return; /* nothing to do */
725 :
726 : /* final output execution is in ps_ExprContext */
727 106905 : econtext = winstate->ss.ps.ps_ExprContext;
728 106905 : agg_winobj = winstate->agg_winobj;
729 106905 : agg_row_slot = winstate->agg_row_slot;
730 106905 : temp_slot = winstate->temp_slot_1;
731 :
732 : /*
733 : * If the window's frame start clause is UNBOUNDED_PRECEDING and no
734 : * exclusion clause is specified, then the window frame consists of a
735 : * contiguous group of rows extending forward from the start of the
736 : * partition, and rows only enter the frame, never exit it, as the current
737 : * row advances forward. This makes it possible to use an incremental
738 : * strategy for evaluating aggregates: we run the transition function for
739 : * each row added to the frame, and run the final function whenever we
740 : * need the current aggregate value. This is considerably more efficient
741 : * than the naive approach of re-running the entire aggregate calculation
742 : * for each current row. It does assume that the final function doesn't
743 : * damage the running transition value, but we have the same assumption in
744 : * nodeAgg.c too (when it rescans an existing hash table).
745 : *
746 : * If the frame start does sometimes move, we can still optimize as above
747 : * whenever successive rows share the same frame head, but if the frame
748 : * head moves beyond the previous head we try to remove those rows using
749 : * the aggregate's inverse transition function. This function restores
750 : * the aggregate's current state to what it would be if the removed row
751 : * had never been aggregated in the first place. Inverse transition
752 : * functions may optionally return NULL, indicating that the function was
753 : * unable to remove the tuple from aggregation. If this happens, or if
754 : * the aggregate doesn't have an inverse transition function at all, we
755 : * must perform the aggregation all over again for all tuples within the
756 : * new frame boundaries.
757 : *
758 : * If there's any exclusion clause, then we may have to aggregate over a
759 : * non-contiguous set of rows, so we punt and recalculate for every row.
760 : * (For some frame end choices, it might be that the frame is always
761 : * contiguous anyway, but that's an optimization to investigate later.)
762 : *
763 : * In many common cases, multiple rows share the same frame and hence the
764 : * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
765 : * window, then all rows are peers and so they all have window frame equal
766 : * to the whole partition.) We optimize such cases by calculating the
767 : * aggregate value once when we reach the first row of a peer group, and
768 : * then returning the saved value for all subsequent rows.
769 : *
770 : * 'aggregatedupto' keeps track of the first row that has not yet been
771 : * accumulated into the aggregate transition values. Whenever we start a
772 : * new peer group, we accumulate forward to the end of the peer group.
773 : */
774 :
775 : /*
776 : * First, update the frame head position.
777 : *
778 : * The frame head should never move backwards, and the code below wouldn't
779 : * cope if it did, so for safety we complain if it does.
780 : */
781 106905 : update_frameheadpos(winstate);
782 106901 : if (winstate->frameheadpos < winstate->aggregatedbase)
783 0 : elog(ERROR, "window frame head moved backward");
784 :
785 : /*
786 : * If the frame didn't change compared to the previous row, we can re-use
787 : * the result values that were previously saved at the bottom of this
788 : * function. Since we don't know the current frame's end yet, this is not
789 : * possible to check for fully. But if the frame end mode is UNBOUNDED
790 : * FOLLOWING or CURRENT ROW, no exclusion clause is specified, and the
791 : * current row lies within the previous row's frame, then the two frames'
792 : * ends must coincide. Note that on the first row aggregatedbase ==
793 : * aggregatedupto, meaning this test must fail, so we don't need to check
794 : * the "there was no previous row" case explicitly here.
795 : */
796 106901 : if (winstate->aggregatedbase == winstate->frameheadpos &&
797 104385 : (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
798 103105 : FRAMEOPTION_END_CURRENT_ROW)) &&
799 103105 : !(winstate->frameOptions & FRAMEOPTION_EXCLUSION) &&
800 102985 : winstate->aggregatedbase <= winstate->currentpos &&
801 102961 : winstate->aggregatedupto > winstate->currentpos)
802 : {
803 201828 : for (i = 0; i < numaggs; i++)
804 : {
805 100918 : peraggstate = &winstate->peragg[i];
806 100918 : wfuncno = peraggstate->wfuncno;
807 100918 : econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
808 100918 : econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
809 : }
810 100910 : return;
811 : }
812 :
813 : /*----------
814 : * Initialize restart flags.
815 : *
816 : * We restart the aggregation:
817 : * - if we're processing the first row in the partition, or
818 : * - if the frame's head moved and we cannot use an inverse
819 : * transition function, or
820 : * - we have an EXCLUSION clause, or
821 : * - if the new frame doesn't overlap the old one
822 : *
823 : * Note that we don't strictly need to restart in the last case, but if
824 : * we're going to remove all rows from the aggregation anyway, a restart
825 : * surely is faster.
826 : *----------
827 : */
828 5991 : numaggs_restart = 0;
829 13170 : for (i = 0; i < numaggs; i++)
830 : {
831 7179 : peraggstate = &winstate->peragg[i];
832 7179 : if (winstate->currentpos == 0 ||
833 5800 : (winstate->aggregatedbase != winstate->frameheadpos &&
834 3468 : !OidIsValid(peraggstate->invtransfn_oid)) ||
835 5752 : (winstate->frameOptions & FRAMEOPTION_EXCLUSION) ||
836 5008 : winstate->aggregatedupto <= winstate->frameheadpos)
837 : {
838 2483 : peraggstate->restart = true;
839 2483 : numaggs_restart++;
840 : }
841 : else
842 4696 : peraggstate->restart = false;
843 : }
844 :
845 : /*
846 : * If we have any possibly-moving aggregates, attempt to advance
847 : * aggregatedbase to match the frame's head by removing input rows that
848 : * fell off the top of the frame from the aggregations. This can fail,
849 : * i.e. advance_windowaggregate_base() can return false, in which case
850 : * we'll restart that aggregate below.
851 : */
852 8103 : while (numaggs_restart < numaggs &&
853 5756 : winstate->aggregatedbase < winstate->frameheadpos)
854 : {
855 : /*
856 : * Fetch the next tuple of those being removed. This should never fail
857 : * as we should have been here before.
858 : */
859 2112 : if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
860 : temp_slot))
861 0 : elog(ERROR, "could not re-fetch previously fetched frame row");
862 :
863 : /* Set tuple context for evaluation of aggregate arguments */
864 2112 : winstate->tmpcontext->ecxt_outertuple = temp_slot;
865 :
866 : /*
867 : * Perform the inverse transition for each aggregate function in the
868 : * window, unless it has already been marked as needing a restart.
869 : */
870 5204 : for (i = 0; i < numaggs; i++)
871 : {
872 : bool ok;
873 :
874 3092 : peraggstate = &winstate->peragg[i];
875 3092 : if (peraggstate->restart)
876 8 : continue;
877 :
878 3084 : wfuncno = peraggstate->wfuncno;
879 3084 : ok = advance_windowaggregate_base(winstate,
880 3084 : &winstate->perfunc[wfuncno],
881 : peraggstate);
882 3084 : if (!ok)
883 : {
884 : /* Inverse transition function has failed, must restart */
885 159 : peraggstate->restart = true;
886 159 : numaggs_restart++;
887 : }
888 : }
889 :
890 : /* Reset per-input-tuple context after each tuple */
891 2112 : ResetExprContext(winstate->tmpcontext);
892 :
893 : /* And advance the aggregated-row state */
894 2112 : winstate->aggregatedbase++;
895 2112 : ExecClearTuple(temp_slot);
896 : }
897 :
898 : /*
899 : * If we successfully advanced the base rows of all the aggregates,
900 : * aggregatedbase now equals frameheadpos; but if we failed for any, we
901 : * must forcibly update aggregatedbase.
902 : */
903 5991 : winstate->aggregatedbase = winstate->frameheadpos;
904 :
905 : /*
906 : * If we created a mark pointer for aggregates, keep it pushed up to frame
907 : * head, so that tuplestore can discard unnecessary rows.
908 : */
909 5991 : if (agg_winobj->markptr >= 0)
910 4146 : WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
911 :
912 : /*
913 : * Now restart the aggregates that require it.
914 : *
915 : * We assume that aggregates using the shared context always restart if
916 : * *any* aggregate restarts, and we may thus clean up the shared
917 : * aggcontext if that is the case. Private aggcontexts are reset by
918 : * initialize_windowaggregate() if their owning aggregate restarts. If we
919 : * aren't restarting an aggregate, we need to free any previously saved
920 : * result for it, else we'll leak memory.
921 : */
922 5991 : if (numaggs_restart > 0)
923 2491 : MemoryContextReset(winstate->aggcontext);
924 13170 : for (i = 0; i < numaggs; i++)
925 : {
926 7179 : peraggstate = &winstate->peragg[i];
927 :
928 : /* Aggregates using the shared ctx must restart if *any* agg does */
929 : Assert(peraggstate->aggcontext != winstate->aggcontext ||
930 : numaggs_restart == 0 ||
931 : peraggstate->restart);
932 :
933 7179 : if (peraggstate->restart)
934 : {
935 2642 : wfuncno = peraggstate->wfuncno;
936 2642 : initialize_windowaggregate(winstate,
937 2642 : &winstate->perfunc[wfuncno],
938 : peraggstate);
939 : }
940 4537 : else if (!peraggstate->resultValueIsNull)
941 : {
942 4381 : if (!peraggstate->resulttypeByVal)
943 1504 : pfree(DatumGetPointer(peraggstate->resultValue));
944 4381 : peraggstate->resultValue = (Datum) 0;
945 4381 : peraggstate->resultValueIsNull = true;
946 : }
947 : }
948 :
949 : /*
950 : * Non-restarted aggregates now contain the rows between aggregatedbase
951 : * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
952 : * contain no rows. If there are any restarted aggregates, we must thus
953 : * begin aggregating anew at frameheadpos, otherwise we may simply
954 : * continue at aggregatedupto. We must remember the old value of
955 : * aggregatedupto to know how long to skip advancing non-restarted
956 : * aggregates. If we modify aggregatedupto, we must also clear
957 : * agg_row_slot, per the loop invariant below.
958 : */
959 5991 : aggregatedupto_nonrestarted = winstate->aggregatedupto;
960 5991 : if (numaggs_restart > 0 &&
961 2491 : winstate->aggregatedupto != winstate->frameheadpos)
962 : {
963 932 : winstate->aggregatedupto = winstate->frameheadpos;
964 932 : ExecClearTuple(agg_row_slot);
965 : }
966 :
967 : /*
968 : * Advance until we reach a row not in frame (or end of partition).
969 : *
970 : * Note the loop invariant: agg_row_slot is either empty or holds the row
971 : * at position aggregatedupto. We advance aggregatedupto after processing
972 : * a row.
973 : */
974 : for (;;)
975 117576 : {
976 : int ret;
977 :
978 : /* Fetch next row if we didn't already */
979 123567 : if (TupIsNull(agg_row_slot))
980 : {
981 120923 : if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
982 : agg_row_slot))
983 2771 : break; /* must be end of partition */
984 : }
985 :
986 : /*
987 : * Exit loop if no more rows can be in frame. Skip aggregation if
988 : * current row is not in frame but there might be more in the frame.
989 : */
990 120796 : ret = row_is_in_frame(agg_winobj, winstate->aggregatedupto,
991 : agg_row_slot, false);
992 120788 : if (ret < 0)
993 3204 : break;
994 117584 : if (ret == 0)
995 1264 : goto next_tuple;
996 :
997 : /* Set tuple context for evaluation of aggregate arguments */
998 116320 : winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
999 :
1000 : /* Accumulate row into the aggregates */
1001 247378 : for (i = 0; i < numaggs; i++)
1002 : {
1003 131066 : peraggstate = &winstate->peragg[i];
1004 :
1005 : /* Non-restarted aggs skip until aggregatedupto_nonrestarted */
1006 131066 : if (!peraggstate->restart &&
1007 79295 : winstate->aggregatedupto < aggregatedupto_nonrestarted)
1008 12791 : continue;
1009 :
1010 118275 : wfuncno = peraggstate->wfuncno;
1011 118275 : advance_windowaggregate(winstate,
1012 118275 : &winstate->perfunc[wfuncno],
1013 : peraggstate);
1014 : }
1015 :
1016 116312 : next_tuple:
1017 : /* Reset per-input-tuple context after each tuple */
1018 117576 : ResetExprContext(winstate->tmpcontext);
1019 :
1020 : /* And advance the aggregated-row state */
1021 117576 : winstate->aggregatedupto++;
1022 117576 : ExecClearTuple(agg_row_slot);
1023 : }
1024 :
1025 : /* The frame's end is not supposed to move backwards, ever */
1026 : Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
1027 :
1028 : /*
1029 : * finalize aggregates and fill result/isnull fields.
1030 : */
1031 13130 : for (i = 0; i < numaggs; i++)
1032 : {
1033 : Datum *result;
1034 : bool *isnull;
1035 :
1036 7163 : peraggstate = &winstate->peragg[i];
1037 7163 : wfuncno = peraggstate->wfuncno;
1038 7163 : result = &econtext->ecxt_aggvalues[wfuncno];
1039 7163 : isnull = &econtext->ecxt_aggnulls[wfuncno];
1040 7163 : finalize_windowaggregate(winstate,
1041 7163 : &winstate->perfunc[wfuncno],
1042 : peraggstate,
1043 : result, isnull);
1044 :
1045 : /*
1046 : * save the result in case next row shares the same frame.
1047 : *
1048 : * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
1049 : * advance that the next row can't possibly share the same frame. Is
1050 : * it worth detecting that and skipping this code?
1051 : */
1052 7155 : if (!peraggstate->resulttypeByVal && !*isnull)
1053 : {
1054 1908 : oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
1055 1908 : peraggstate->resultValue =
1056 1908 : datumCopy(*result,
1057 1908 : peraggstate->resulttypeByVal,
1058 1908 : peraggstate->resulttypeLen);
1059 1908 : MemoryContextSwitchTo(oldContext);
1060 : }
1061 : else
1062 : {
1063 5247 : peraggstate->resultValue = *result;
1064 : }
1065 7155 : peraggstate->resultValueIsNull = *isnull;
1066 : }
1067 : }
1068 :
1069 : /*
1070 : * eval_windowfunction
1071 : *
1072 : * Arguments of window functions are not evaluated here, because a window
1073 : * function can need random access to arbitrary rows in the partition.
1074 : * The window function uses the special WinGetFuncArgInPartition and
1075 : * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
1076 : * it wants.
1077 : */
1078 : static void
1079 580615 : eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
1080 : Datum *result, bool *isnull)
1081 : {
1082 580615 : LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1083 : MemoryContext oldContext;
1084 :
1085 580615 : oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1086 :
1087 : /*
1088 : * We don't pass any normal arguments to a window function, but we do pass
1089 : * it the number of arguments, in order to permit window function
1090 : * implementations to support varying numbers of arguments. The real info
1091 : * goes through the WindowObject, which is passed via fcinfo->context.
1092 : */
1093 580615 : InitFunctionCallInfoData(*fcinfo, &(perfuncstate->flinfo),
1094 : perfuncstate->numArguments,
1095 : perfuncstate->winCollation,
1096 : (Node *) perfuncstate->winobj, NULL);
1097 : /* Just in case, make all the regular argument slots be null */
1098 746819 : for (int argno = 0; argno < perfuncstate->numArguments; argno++)
1099 166204 : fcinfo->args[argno].isnull = true;
1100 : /* Window functions don't have a current aggregate context, either */
1101 580615 : winstate->curaggcontext = NULL;
1102 :
1103 580615 : *result = FunctionCallInvoke(fcinfo);
1104 580507 : *isnull = fcinfo->isnull;
1105 :
1106 : /*
1107 : * The window function might have returned a pass-by-ref result that's
1108 : * just a pointer into one of the WindowObject's temporary slots. That's
1109 : * not a problem if it's the only window function using the WindowObject;
1110 : * but if there's more than one function, we'd better copy the result to
1111 : * ensure it's not clobbered by later window functions.
1112 : */
1113 580507 : if (!perfuncstate->resulttypeByVal && !fcinfo->isnull &&
1114 680 : winstate->numfuncs > 1)
1115 72 : *result = datumCopy(*result,
1116 72 : perfuncstate->resulttypeByVal,
1117 72 : perfuncstate->resulttypeLen);
1118 :
1119 580507 : MemoryContextSwitchTo(oldContext);
1120 580507 : }
1121 :
1122 : /*
1123 : * prepare_tuplestore
1124 : * Prepare the tuplestore and all of the required read pointers for the
1125 : * WindowAggState's frameOptions.
1126 : *
1127 : * Note: We use pg_noinline to avoid bloating the calling function with code
1128 : * which is only called once.
1129 : */
1130 : static pg_noinline void
1131 1533 : prepare_tuplestore(WindowAggState *winstate)
1132 : {
1133 1533 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1134 1533 : int frameOptions = winstate->frameOptions;
1135 1533 : int numfuncs = winstate->numfuncs;
1136 :
1137 : /* we shouldn't be called if this was done already */
1138 : Assert(winstate->buffer == NULL);
1139 :
1140 : /* Create new tuplestore */
1141 1533 : winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
1142 :
1143 : /*
1144 : * Set up read pointers for the tuplestore. The current pointer doesn't
1145 : * need BACKWARD capability, but the per-window-function read pointers do,
1146 : * and the aggregate pointer does if we might need to restart aggregation.
1147 : */
1148 1533 : winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */
1149 :
1150 : /* reset default REWIND capability bit for current ptr */
1151 1533 : tuplestore_set_eflags(winstate->buffer, 0);
1152 :
1153 : /* create read pointers for aggregates, if needed */
1154 1533 : if (winstate->numaggs > 0)
1155 : {
1156 759 : WindowObject agg_winobj = winstate->agg_winobj;
1157 759 : int readptr_flags = 0;
1158 :
1159 : /*
1160 : * If the frame head is potentially movable, or we have an EXCLUSION
1161 : * clause, we might need to restart aggregation ...
1162 : */
1163 759 : if (!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) ||
1164 270 : (frameOptions & FRAMEOPTION_EXCLUSION))
1165 : {
1166 : /* ... so create a mark pointer to track the frame head */
1167 501 : agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
1168 : /* and the read pointer will need BACKWARD capability */
1169 501 : readptr_flags |= EXEC_FLAG_BACKWARD;
1170 : }
1171 :
1172 759 : agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1173 : readptr_flags);
1174 : }
1175 :
1176 : /* create mark and read pointers for each real window function */
1177 3574 : for (int i = 0; i < numfuncs; i++)
1178 : {
1179 2041 : WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1180 :
1181 2041 : if (!perfuncstate->plain_agg)
1182 : {
1183 1218 : WindowObject winobj = perfuncstate->winobj;
1184 :
1185 1218 : winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
1186 : 0);
1187 1218 : winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1188 : EXEC_FLAG_BACKWARD);
1189 : }
1190 : }
1191 :
1192 : /*
1193 : * If we are in RANGE or GROUPS mode, then determining frame boundaries
1194 : * requires physical access to the frame endpoint rows, except in certain
1195 : * degenerate cases. We create read pointers to point to those rows, to
1196 : * simplify access and ensure that the tuplestore doesn't discard the
1197 : * endpoint rows prematurely. (Must create pointers in exactly the same
1198 : * cases that update_frameheadpos and update_frametailpos need them.)
1199 : */
1200 1533 : winstate->framehead_ptr = winstate->frametail_ptr = -1; /* if not used */
1201 :
1202 1533 : if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1203 : {
1204 851 : if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
1205 45 : node->ordNumCols != 0) ||
1206 806 : (frameOptions & FRAMEOPTION_START_OFFSET))
1207 481 : winstate->framehead_ptr =
1208 481 : tuplestore_alloc_read_pointer(winstate->buffer, 0);
1209 851 : if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
1210 338 : node->ordNumCols != 0) ||
1211 626 : (frameOptions & FRAMEOPTION_END_OFFSET))
1212 701 : winstate->frametail_ptr =
1213 701 : tuplestore_alloc_read_pointer(winstate->buffer, 0);
1214 : }
1215 :
1216 : /*
1217 : * If we have an exclusion clause that requires knowing the boundaries of
1218 : * the current row's peer group, we create a read pointer to track the
1219 : * tail position of the peer group (i.e., first row of the next peer
1220 : * group). The head position does not require its own pointer because we
1221 : * maintain that as a side effect of advancing the current row.
1222 : */
1223 1533 : winstate->grouptail_ptr = -1;
1224 :
1225 1533 : if ((frameOptions & (FRAMEOPTION_EXCLUDE_GROUP |
1226 120 : FRAMEOPTION_EXCLUDE_TIES)) &&
1227 120 : node->ordNumCols != 0)
1228 : {
1229 112 : winstate->grouptail_ptr =
1230 112 : tuplestore_alloc_read_pointer(winstate->buffer, 0);
1231 : }
1232 1533 : }
1233 :
1234 : /*
1235 : * begin_partition
1236 : * Start buffering rows of the next partition.
1237 : */
1238 : static void
1239 2411 : begin_partition(WindowAggState *winstate)
1240 : {
1241 2411 : PlanState *outerPlan = outerPlanState(winstate);
1242 2411 : int numfuncs = winstate->numfuncs;
1243 :
1244 2411 : winstate->partition_spooled = false;
1245 2411 : winstate->framehead_valid = false;
1246 2411 : winstate->frametail_valid = false;
1247 2411 : winstate->grouptail_valid = false;
1248 2411 : winstate->spooled_rows = 0;
1249 2411 : winstate->currentpos = 0;
1250 2411 : winstate->frameheadpos = 0;
1251 2411 : winstate->frametailpos = 0;
1252 2411 : winstate->currentgroup = 0;
1253 2411 : winstate->frameheadgroup = 0;
1254 2411 : winstate->frametailgroup = 0;
1255 2411 : winstate->groupheadpos = 0;
1256 2411 : winstate->grouptailpos = -1; /* see update_grouptailpos */
1257 2411 : ExecClearTuple(winstate->agg_row_slot);
1258 2411 : if (winstate->framehead_slot)
1259 682 : ExecClearTuple(winstate->framehead_slot);
1260 2411 : if (winstate->frametail_slot)
1261 1150 : ExecClearTuple(winstate->frametail_slot);
1262 :
1263 : /*
1264 : * If this is the very first partition, we need to fetch the first input
1265 : * row to store in first_part_slot.
1266 : */
1267 2411 : if (TupIsNull(winstate->first_part_slot))
1268 : {
1269 1585 : TupleTableSlot *outerslot = ExecProcNode(outerPlan);
1270 :
1271 1585 : if (!TupIsNull(outerslot))
1272 1573 : ExecCopySlot(winstate->first_part_slot, outerslot);
1273 : else
1274 : {
1275 : /* outer plan is empty, so we have nothing to do */
1276 12 : winstate->partition_spooled = true;
1277 12 : winstate->more_partitions = false;
1278 12 : return;
1279 : }
1280 : }
1281 :
1282 : /* Create new tuplestore if not done already. */
1283 2399 : if (unlikely(winstate->buffer == NULL))
1284 1533 : prepare_tuplestore(winstate);
1285 :
1286 2399 : winstate->next_partition = false;
1287 :
1288 2399 : if (winstate->numaggs > 0)
1289 : {
1290 1251 : WindowObject agg_winobj = winstate->agg_winobj;
1291 :
1292 : /* reset mark and see positions for aggregate functions */
1293 1251 : agg_winobj->markpos = -1;
1294 1251 : agg_winobj->seekpos = -1;
1295 :
1296 : /* Also reset the row counters for aggregates */
1297 1251 : winstate->aggregatedbase = 0;
1298 1251 : winstate->aggregatedupto = 0;
1299 : }
1300 :
1301 : /* reset mark and seek positions for each real window function */
1302 5510 : for (int i = 0; i < numfuncs; i++)
1303 : {
1304 3111 : WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1305 :
1306 3111 : if (!perfuncstate->plain_agg)
1307 : {
1308 1728 : WindowObject winobj = perfuncstate->winobj;
1309 :
1310 1728 : winobj->markpos = -1;
1311 1728 : winobj->seekpos = -1;
1312 :
1313 : /* reset null map */
1314 1728 : if (winobj->ignore_nulls == IGNORE_NULLS ||
1315 1708 : winobj->ignore_nulls == PARSER_IGNORE_NULLS)
1316 : {
1317 144 : int numargs = perfuncstate->numArguments;
1318 :
1319 332 : for (int j = 0; j < numargs; j++)
1320 : {
1321 188 : int n = winobj->num_notnull_info[j];
1322 :
1323 188 : if (n > 0)
1324 20 : memset(winobj->notnull_info[j], 0,
1325 20 : NN_POS_TO_BYTES(n));
1326 : }
1327 : }
1328 : }
1329 : }
1330 :
1331 : /*
1332 : * Store the first tuple into the tuplestore (it's always available now;
1333 : * we either read it above, or saved it at the end of previous partition)
1334 : */
1335 2399 : tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
1336 2399 : winstate->spooled_rows++;
1337 : }
1338 :
1339 : /*
1340 : * Read tuples from the outer node, up to and including position 'pos', and
1341 : * store them into the tuplestore. If pos is -1, reads the whole partition.
1342 : */
1343 : static void
1344 1240180 : spool_tuples(WindowAggState *winstate, int64 pos)
1345 : {
1346 1240180 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1347 : PlanState *outerPlan;
1348 : TupleTableSlot *outerslot;
1349 : MemoryContext oldcontext;
1350 :
1351 1240180 : if (!winstate->buffer)
1352 4 : return; /* just a safety check */
1353 1240176 : if (winstate->partition_spooled)
1354 82566 : return; /* whole partition done already */
1355 :
1356 : /*
1357 : * When in pass-through mode we can just exhaust all tuples in the current
1358 : * partition. We don't need these tuples for any further window function
1359 : * evaluation, however, we do need to keep them around if we're not the
1360 : * top-level window as another WindowAgg node above must see these.
1361 : */
1362 1157610 : if (winstate->status != WINDOWAGG_RUN)
1363 : {
1364 : Assert(winstate->status == WINDOWAGG_PASSTHROUGH ||
1365 : winstate->status == WINDOWAGG_PASSTHROUGH_STRICT);
1366 :
1367 20 : pos = -1;
1368 : }
1369 :
1370 : /*
1371 : * If the tuplestore has spilled to disk, alternate reading and writing
1372 : * becomes quite expensive due to frequent buffer flushes. It's cheaper
1373 : * to force the entire partition to get spooled in one go.
1374 : *
1375 : * XXX this is a horrid kluge --- it'd be better to fix the performance
1376 : * problem inside tuplestore. FIXME
1377 : */
1378 1157590 : else if (!tuplestore_in_memory(winstate->buffer))
1379 8 : pos = -1;
1380 :
1381 1157610 : outerPlan = outerPlanState(winstate);
1382 :
1383 : /* Must be in query context to call outerplan */
1384 1157610 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1385 :
1386 2996545 : while (winstate->spooled_rows <= pos || pos == -1)
1387 : {
1388 683592 : outerslot = ExecProcNode(outerPlan);
1389 683592 : if (TupIsNull(outerslot))
1390 : {
1391 : /* reached the end of the last partition */
1392 1441 : winstate->partition_spooled = true;
1393 1441 : winstate->more_partitions = false;
1394 1441 : break;
1395 : }
1396 :
1397 682151 : if (node->partNumCols > 0)
1398 : {
1399 92455 : ExprContext *econtext = winstate->tmpcontext;
1400 :
1401 92455 : econtext->ecxt_innertuple = winstate->first_part_slot;
1402 92455 : econtext->ecxt_outertuple = outerslot;
1403 :
1404 : /* Check if this tuple still belongs to the current partition */
1405 92455 : if (!ExecQualAndReset(winstate->partEqfunction, econtext))
1406 : {
1407 : /*
1408 : * end of partition; copy the tuple for the next cycle.
1409 : */
1410 826 : ExecCopySlot(winstate->first_part_slot, outerslot);
1411 826 : winstate->partition_spooled = true;
1412 826 : winstate->more_partitions = true;
1413 826 : break;
1414 : }
1415 : }
1416 :
1417 : /*
1418 : * Remember the tuple unless we're the top-level window and we're in
1419 : * pass-through mode.
1420 : */
1421 681325 : if (winstate->status != WINDOWAGG_PASSTHROUGH_STRICT)
1422 : {
1423 : /* Still in partition, so save it into the tuplestore */
1424 681317 : tuplestore_puttupleslot(winstate->buffer, outerslot);
1425 681317 : winstate->spooled_rows++;
1426 : }
1427 : }
1428 :
1429 1157610 : MemoryContextSwitchTo(oldcontext);
1430 : }
1431 :
1432 : /*
1433 : * release_partition
1434 : * clear information kept within a partition, including
1435 : * tuplestore and aggregate results.
1436 : */
1437 : static void
1438 4049 : release_partition(WindowAggState *winstate)
1439 : {
1440 : int i;
1441 :
1442 9262 : for (i = 0; i < winstate->numfuncs; i++)
1443 : {
1444 5213 : WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1445 :
1446 : /* Release any partition-local state of this window function */
1447 5213 : if (perfuncstate->winobj)
1448 2747 : perfuncstate->winobj->localmem = NULL;
1449 : }
1450 :
1451 : /*
1452 : * Release all partition-local memory (in particular, any partition-local
1453 : * state that we might have trashed our pointers to in the above loop, and
1454 : * any aggregate temp data). We don't rely on retail pfree because some
1455 : * aggregates might have allocated data we don't have direct pointers to.
1456 : */
1457 4049 : MemoryContextReset(winstate->partcontext);
1458 4049 : MemoryContextReset(winstate->aggcontext);
1459 6515 : for (i = 0; i < winstate->numaggs; i++)
1460 : {
1461 2466 : if (winstate->peragg[i].aggcontext != winstate->aggcontext)
1462 1288 : MemoryContextReset(winstate->peragg[i].aggcontext);
1463 : }
1464 :
1465 4049 : if (winstate->buffer)
1466 2287 : tuplestore_clear(winstate->buffer);
1467 4049 : winstate->partition_spooled = false;
1468 4049 : winstate->next_partition = true;
1469 4049 : }
1470 :
1471 : /*
1472 : * row_is_in_frame
1473 : * Determine whether a row is in the current row's window frame according
1474 : * to our window framing rule
1475 : *
1476 : * The caller must have already determined that the row is in the partition
1477 : * and fetched it into a slot if fetch_tuple is false.
1478 : * This function just encapsulates the framing rules.
1479 : *
1480 : * Returns:
1481 : * -1, if the row is out of frame and no succeeding rows can be in frame
1482 : * 0, if the row is out of frame but succeeding rows might be in frame
1483 : * 1, if the row is in frame
1484 : *
1485 : * May clobber winstate->temp_slot_2.
1486 : */
1487 : static int
1488 127972 : row_is_in_frame(WindowObject winobj, int64 pos, TupleTableSlot *slot,
1489 : bool fetch_tuple)
1490 : {
1491 127972 : WindowAggState *winstate = winobj->winstate;
1492 127972 : int frameOptions = winstate->frameOptions;
1493 :
1494 : Assert(pos >= 0); /* else caller error */
1495 :
1496 : /*
1497 : * First, check frame starting conditions. We might as well delegate this
1498 : * to update_frameheadpos always; it doesn't add any notable cost.
1499 : */
1500 127972 : update_frameheadpos(winstate);
1501 127972 : if (pos < winstate->frameheadpos)
1502 96 : return 0;
1503 :
1504 : /*
1505 : * Okay so far, now check frame ending conditions. Here, we avoid calling
1506 : * update_frametailpos in simple cases, so as not to spool tuples further
1507 : * ahead than necessary.
1508 : */
1509 127876 : if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1510 : {
1511 105563 : if (frameOptions & FRAMEOPTION_ROWS)
1512 : {
1513 : /* rows after current row are out of frame */
1514 1472 : if (pos > winstate->currentpos)
1515 648 : return -1;
1516 : }
1517 104091 : else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1518 : {
1519 : /* following row that is not peer is out of frame */
1520 104091 : if (pos > winstate->currentpos)
1521 : {
1522 101766 : if (fetch_tuple) /* need to fetch tuple? */
1523 0 : if (!window_gettupleslot(winobj, pos, slot))
1524 0 : return -1;
1525 101766 : if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1526 912 : return -1;
1527 : }
1528 : }
1529 : else
1530 : Assert(false);
1531 : }
1532 22313 : else if (frameOptions & FRAMEOPTION_END_OFFSET)
1533 : {
1534 13276 : if (frameOptions & FRAMEOPTION_ROWS)
1535 : {
1536 3952 : int64 offset = DatumGetInt64(winstate->endOffsetValue);
1537 :
1538 : /* rows after current row + offset are out of frame */
1539 3952 : if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1540 76 : offset = -offset;
1541 :
1542 3952 : if (pos > winstate->currentpos + offset)
1543 804 : return -1;
1544 : }
1545 9324 : else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1546 : {
1547 : /* hard cases, so delegate to update_frametailpos */
1548 9324 : update_frametailpos(winstate);
1549 9296 : if (pos >= winstate->frametailpos)
1550 980 : return -1;
1551 : }
1552 : else
1553 : Assert(false);
1554 : }
1555 :
1556 : /* Check exclusion clause */
1557 124504 : if (frameOptions & FRAMEOPTION_EXCLUDE_CURRENT_ROW)
1558 : {
1559 1964 : if (pos == winstate->currentpos)
1560 332 : return 0;
1561 : }
1562 122540 : else if ((frameOptions & FRAMEOPTION_EXCLUDE_GROUP) ||
1563 120632 : ((frameOptions & FRAMEOPTION_EXCLUDE_TIES) &&
1564 1980 : pos != winstate->currentpos))
1565 : {
1566 3528 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1567 :
1568 : /* If no ORDER BY, all rows are peers with each other */
1569 3528 : if (node->ordNumCols == 0)
1570 312 : return 0;
1571 : /* Otherwise, check the group boundaries */
1572 3216 : if (pos >= winstate->groupheadpos)
1573 : {
1574 1728 : update_grouptailpos(winstate);
1575 1728 : if (pos < winstate->grouptailpos)
1576 672 : return 0;
1577 : }
1578 : }
1579 :
1580 : /* If we get here, it's in frame */
1581 123188 : return 1;
1582 : }
1583 :
1584 : /*
1585 : * update_frameheadpos
1586 : * make frameheadpos valid for the current row
1587 : *
1588 : * Note that frameheadpos is computed without regard for any window exclusion
1589 : * clause; the current row and/or its peers are considered part of the frame
1590 : * for this purpose even if they must be excluded later.
1591 : *
1592 : * May clobber winstate->temp_slot_2.
1593 : */
1594 : static void
1595 242755 : update_frameheadpos(WindowAggState *winstate)
1596 : {
1597 242755 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1598 242755 : int frameOptions = winstate->frameOptions;
1599 : MemoryContext oldcontext;
1600 :
1601 242755 : if (winstate->framehead_valid)
1602 132294 : return; /* already known for current row */
1603 :
1604 : /* We may be called in a short-lived context */
1605 110461 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1606 :
1607 110461 : if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
1608 : {
1609 : /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
1610 103331 : winstate->frameheadpos = 0;
1611 103331 : winstate->framehead_valid = true;
1612 : }
1613 7130 : else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1614 : {
1615 1866 : if (frameOptions & FRAMEOPTION_ROWS)
1616 : {
1617 : /* In ROWS mode, frame head is the same as current */
1618 1584 : winstate->frameheadpos = winstate->currentpos;
1619 1584 : winstate->framehead_valid = true;
1620 : }
1621 282 : else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1622 : {
1623 : /* If no ORDER BY, all rows are peers with each other */
1624 282 : if (node->ordNumCols == 0)
1625 : {
1626 0 : winstate->frameheadpos = 0;
1627 0 : winstate->framehead_valid = true;
1628 0 : MemoryContextSwitchTo(oldcontext);
1629 0 : return;
1630 : }
1631 :
1632 : /*
1633 : * In RANGE or GROUPS START_CURRENT_ROW mode, frame head is the
1634 : * first row that is a peer of current row. We keep a copy of the
1635 : * last-known frame head row in framehead_slot, and advance as
1636 : * necessary. Note that if we reach end of partition, we will
1637 : * leave frameheadpos = end+1 and framehead_slot empty.
1638 : */
1639 282 : tuplestore_select_read_pointer(winstate->buffer,
1640 : winstate->framehead_ptr);
1641 282 : if (winstate->frameheadpos == 0 &&
1642 140 : TupIsNull(winstate->framehead_slot))
1643 : {
1644 : /* fetch first row into framehead_slot, if we didn't already */
1645 54 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1646 : winstate->framehead_slot))
1647 0 : elog(ERROR, "unexpected end of tuplestore");
1648 : }
1649 :
1650 490 : while (!TupIsNull(winstate->framehead_slot))
1651 : {
1652 490 : if (are_peers(winstate, winstate->framehead_slot,
1653 : winstate->ss.ss_ScanTupleSlot))
1654 282 : break; /* this row is the correct frame head */
1655 : /* Note we advance frameheadpos even if the fetch fails */
1656 208 : winstate->frameheadpos++;
1657 208 : spool_tuples(winstate, winstate->frameheadpos);
1658 208 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1659 : winstate->framehead_slot))
1660 0 : break; /* end of partition */
1661 : }
1662 282 : winstate->framehead_valid = true;
1663 : }
1664 : else
1665 : Assert(false);
1666 : }
1667 5264 : else if (frameOptions & FRAMEOPTION_START_OFFSET)
1668 : {
1669 5264 : if (frameOptions & FRAMEOPTION_ROWS)
1670 : {
1671 : /* In ROWS mode, bound is physically n before/after current */
1672 1328 : int64 offset = DatumGetInt64(winstate->startOffsetValue);
1673 :
1674 1328 : if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1675 1288 : offset = -offset;
1676 :
1677 1328 : winstate->frameheadpos = winstate->currentpos + offset;
1678 : /* frame head can't go before first row */
1679 1328 : if (winstate->frameheadpos < 0)
1680 224 : winstate->frameheadpos = 0;
1681 1104 : else if (winstate->frameheadpos > winstate->currentpos + 1)
1682 : {
1683 : /* make sure frameheadpos is not past end of partition */
1684 0 : spool_tuples(winstate, winstate->frameheadpos - 1);
1685 0 : if (winstate->frameheadpos > winstate->spooled_rows)
1686 0 : winstate->frameheadpos = winstate->spooled_rows;
1687 : }
1688 1328 : winstate->framehead_valid = true;
1689 : }
1690 3936 : else if (frameOptions & FRAMEOPTION_RANGE)
1691 : {
1692 : /*
1693 : * In RANGE START_OFFSET mode, frame head is the first row that
1694 : * satisfies the in_range constraint relative to the current row.
1695 : * We keep a copy of the last-known frame head row in
1696 : * framehead_slot, and advance as necessary. Note that if we
1697 : * reach end of partition, we will leave frameheadpos = end+1 and
1698 : * framehead_slot empty.
1699 : */
1700 3016 : int sortCol = node->ordColIdx[0];
1701 : bool sub,
1702 : less;
1703 :
1704 : /* We must have an ordering column */
1705 : Assert(node->ordNumCols == 1);
1706 :
1707 : /* Precompute flags for in_range checks */
1708 3016 : if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1709 2468 : sub = true; /* subtract startOffset from current row */
1710 : else
1711 548 : sub = false; /* add it */
1712 3016 : less = false; /* normally, we want frame head >= sum */
1713 : /* If sort order is descending, flip both flags */
1714 3016 : if (!winstate->inRangeAsc)
1715 : {
1716 436 : sub = !sub;
1717 436 : less = true;
1718 : }
1719 :
1720 3016 : tuplestore_select_read_pointer(winstate->buffer,
1721 : winstate->framehead_ptr);
1722 3016 : if (winstate->frameheadpos == 0 &&
1723 1668 : TupIsNull(winstate->framehead_slot))
1724 : {
1725 : /* fetch first row into framehead_slot, if we didn't already */
1726 380 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1727 : winstate->framehead_slot))
1728 0 : elog(ERROR, "unexpected end of tuplestore");
1729 : }
1730 :
1731 4844 : while (!TupIsNull(winstate->framehead_slot))
1732 : {
1733 : Datum headval,
1734 : currval;
1735 : bool headisnull,
1736 : currisnull;
1737 :
1738 4708 : headval = slot_getattr(winstate->framehead_slot, sortCol,
1739 : &headisnull);
1740 4708 : currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
1741 : &currisnull);
1742 4708 : if (headisnull || currisnull)
1743 : {
1744 : /* order of the rows depends only on nulls_first */
1745 72 : if (winstate->inRangeNullsFirst)
1746 : {
1747 : /* advance head if head is null and curr is not */
1748 32 : if (!headisnull || currisnull)
1749 : break;
1750 : }
1751 : else
1752 : {
1753 : /* advance head if head is not null and curr is null */
1754 40 : if (headisnull || !currisnull)
1755 : break;
1756 : }
1757 : }
1758 : else
1759 : {
1760 4636 : if (DatumGetBool(FunctionCall5Coll(&winstate->startInRangeFunc,
1761 : winstate->inRangeColl,
1762 : headval,
1763 : currval,
1764 : winstate->startOffsetValue,
1765 : BoolGetDatum(sub),
1766 : BoolGetDatum(less))))
1767 2780 : break; /* this row is the correct frame head */
1768 : }
1769 : /* Note we advance frameheadpos even if the fetch fails */
1770 1864 : winstate->frameheadpos++;
1771 1864 : spool_tuples(winstate, winstate->frameheadpos);
1772 1864 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1773 : winstate->framehead_slot))
1774 36 : break; /* end of partition */
1775 : }
1776 2984 : winstate->framehead_valid = true;
1777 : }
1778 920 : else if (frameOptions & FRAMEOPTION_GROUPS)
1779 : {
1780 : /*
1781 : * In GROUPS START_OFFSET mode, frame head is the first row of the
1782 : * first peer group whose number satisfies the offset constraint.
1783 : * We keep a copy of the last-known frame head row in
1784 : * framehead_slot, and advance as necessary. Note that if we
1785 : * reach end of partition, we will leave frameheadpos = end+1 and
1786 : * framehead_slot empty.
1787 : */
1788 920 : int64 offset = DatumGetInt64(winstate->startOffsetValue);
1789 : int64 minheadgroup;
1790 :
1791 920 : if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1792 752 : minheadgroup = winstate->currentgroup - offset;
1793 : else
1794 168 : minheadgroup = winstate->currentgroup + offset;
1795 :
1796 920 : tuplestore_select_read_pointer(winstate->buffer,
1797 : winstate->framehead_ptr);
1798 920 : if (winstate->frameheadpos == 0 &&
1799 500 : TupIsNull(winstate->framehead_slot))
1800 : {
1801 : /* fetch first row into framehead_slot, if we didn't already */
1802 248 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1803 : winstate->framehead_slot))
1804 0 : elog(ERROR, "unexpected end of tuplestore");
1805 : }
1806 :
1807 2348 : while (!TupIsNull(winstate->framehead_slot))
1808 : {
1809 1412 : if (winstate->frameheadgroup >= minheadgroup)
1810 880 : break; /* this row is the correct frame head */
1811 532 : ExecCopySlot(winstate->temp_slot_2, winstate->framehead_slot);
1812 : /* Note we advance frameheadpos even if the fetch fails */
1813 532 : winstate->frameheadpos++;
1814 532 : spool_tuples(winstate, winstate->frameheadpos);
1815 532 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1816 : winstate->framehead_slot))
1817 24 : break; /* end of partition */
1818 508 : if (!are_peers(winstate, winstate->temp_slot_2,
1819 : winstate->framehead_slot))
1820 348 : winstate->frameheadgroup++;
1821 : }
1822 920 : ExecClearTuple(winstate->temp_slot_2);
1823 920 : winstate->framehead_valid = true;
1824 : }
1825 : else
1826 : Assert(false);
1827 : }
1828 : else
1829 : Assert(false);
1830 :
1831 110429 : MemoryContextSwitchTo(oldcontext);
1832 : }
1833 :
1834 : /*
1835 : * update_frametailpos
1836 : * make frametailpos valid for the current row
1837 : *
1838 : * Note that frametailpos is computed without regard for any window exclusion
1839 : * clause; the current row and/or its peers are considered part of the frame
1840 : * for this purpose even if they must be excluded later.
1841 : *
1842 : * May clobber winstate->temp_slot_2.
1843 : */
1844 : static void
1845 135238 : update_frametailpos(WindowAggState *winstate)
1846 : {
1847 135238 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1848 135238 : int frameOptions = winstate->frameOptions;
1849 : MemoryContext oldcontext;
1850 :
1851 135238 : if (winstate->frametail_valid)
1852 11960 : return; /* already known for current row */
1853 :
1854 : /* We may be called in a short-lived context */
1855 123278 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1856 :
1857 123278 : if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1858 : {
1859 : /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1860 160 : spool_tuples(winstate, -1);
1861 160 : winstate->frametailpos = winstate->spooled_rows;
1862 160 : winstate->frametail_valid = true;
1863 : }
1864 123118 : else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1865 : {
1866 118702 : if (frameOptions & FRAMEOPTION_ROWS)
1867 : {
1868 : /* In ROWS mode, exactly the rows up to current are in frame */
1869 80 : winstate->frametailpos = winstate->currentpos + 1;
1870 80 : winstate->frametail_valid = true;
1871 : }
1872 118622 : else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1873 : {
1874 : /* If no ORDER BY, all rows are peers with each other */
1875 118622 : if (node->ordNumCols == 0)
1876 : {
1877 40 : spool_tuples(winstate, -1);
1878 40 : winstate->frametailpos = winstate->spooled_rows;
1879 40 : winstate->frametail_valid = true;
1880 40 : MemoryContextSwitchTo(oldcontext);
1881 40 : return;
1882 : }
1883 :
1884 : /*
1885 : * In RANGE or GROUPS END_CURRENT_ROW mode, frame end is the last
1886 : * row that is a peer of current row, frame tail is the row after
1887 : * that (if any). We keep a copy of the last-known frame tail row
1888 : * in frametail_slot, and advance as necessary. Note that if we
1889 : * reach end of partition, we will leave frametailpos = end+1 and
1890 : * frametail_slot empty.
1891 : */
1892 118582 : tuplestore_select_read_pointer(winstate->buffer,
1893 : winstate->frametail_ptr);
1894 118582 : if (winstate->frametailpos == 0 &&
1895 470 : TupIsNull(winstate->frametail_slot))
1896 : {
1897 : /* fetch first row into frametail_slot, if we didn't already */
1898 470 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1899 : winstate->frametail_slot))
1900 0 : elog(ERROR, "unexpected end of tuplestore");
1901 : }
1902 :
1903 236702 : while (!TupIsNull(winstate->frametail_slot))
1904 : {
1905 220622 : if (winstate->frametailpos > winstate->currentpos &&
1906 182280 : !are_peers(winstate, winstate->frametail_slot,
1907 : winstate->ss.ss_ScanTupleSlot))
1908 102040 : break; /* this row is the frame tail */
1909 : /* Note we advance frametailpos even if the fetch fails */
1910 118582 : winstate->frametailpos++;
1911 118582 : spool_tuples(winstate, winstate->frametailpos);
1912 118582 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1913 : winstate->frametail_slot))
1914 462 : break; /* end of partition */
1915 : }
1916 118582 : winstate->frametail_valid = true;
1917 : }
1918 : else
1919 : Assert(false);
1920 : }
1921 4416 : else if (frameOptions & FRAMEOPTION_END_OFFSET)
1922 : {
1923 4416 : if (frameOptions & FRAMEOPTION_ROWS)
1924 : {
1925 : /* In ROWS mode, bound is physically n before/after current */
1926 280 : int64 offset = DatumGetInt64(winstate->endOffsetValue);
1927 :
1928 280 : if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1929 0 : offset = -offset;
1930 :
1931 280 : winstate->frametailpos = winstate->currentpos + offset + 1;
1932 : /* smallest allowable value of frametailpos is 0 */
1933 280 : if (winstate->frametailpos < 0)
1934 0 : winstate->frametailpos = 0;
1935 280 : else if (winstate->frametailpos > winstate->currentpos + 1)
1936 : {
1937 : /* make sure frametailpos is not past end of partition */
1938 280 : spool_tuples(winstate, winstate->frametailpos - 1);
1939 280 : if (winstate->frametailpos > winstate->spooled_rows)
1940 64 : winstate->frametailpos = winstate->spooled_rows;
1941 : }
1942 280 : winstate->frametail_valid = true;
1943 : }
1944 4136 : else if (frameOptions & FRAMEOPTION_RANGE)
1945 : {
1946 : /*
1947 : * In RANGE END_OFFSET mode, frame end is the last row that
1948 : * satisfies the in_range constraint relative to the current row,
1949 : * frame tail is the row after that (if any). We keep a copy of
1950 : * the last-known frame tail row in frametail_slot, and advance as
1951 : * necessary. Note that if we reach end of partition, we will
1952 : * leave frametailpos = end+1 and frametail_slot empty.
1953 : */
1954 3256 : int sortCol = node->ordColIdx[0];
1955 : bool sub,
1956 : less;
1957 :
1958 : /* We must have an ordering column */
1959 : Assert(node->ordNumCols == 1);
1960 :
1961 : /* Precompute flags for in_range checks */
1962 3256 : if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1963 608 : sub = true; /* subtract endOffset from current row */
1964 : else
1965 2648 : sub = false; /* add it */
1966 3256 : less = true; /* normally, we want frame tail <= sum */
1967 : /* If sort order is descending, flip both flags */
1968 3256 : if (!winstate->inRangeAsc)
1969 : {
1970 460 : sub = !sub;
1971 460 : less = false;
1972 : }
1973 :
1974 3256 : tuplestore_select_read_pointer(winstate->buffer,
1975 : winstate->frametail_ptr);
1976 3256 : if (winstate->frametailpos == 0 &&
1977 548 : TupIsNull(winstate->frametail_slot))
1978 : {
1979 : /* fetch first row into frametail_slot, if we didn't already */
1980 392 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1981 : winstate->frametail_slot))
1982 0 : elog(ERROR, "unexpected end of tuplestore");
1983 : }
1984 :
1985 6004 : while (!TupIsNull(winstate->frametail_slot))
1986 : {
1987 : Datum tailval,
1988 : currval;
1989 : bool tailisnull,
1990 : currisnull;
1991 :
1992 4960 : tailval = slot_getattr(winstate->frametail_slot, sortCol,
1993 : &tailisnull);
1994 4960 : currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
1995 : &currisnull);
1996 4960 : if (tailisnull || currisnull)
1997 : {
1998 : /* order of the rows depends only on nulls_first */
1999 72 : if (winstate->inRangeNullsFirst)
2000 : {
2001 : /* advance tail if tail is null or curr is not */
2002 32 : if (!tailisnull)
2003 2180 : break;
2004 : }
2005 : else
2006 : {
2007 : /* advance tail if tail is not null or curr is null */
2008 40 : if (!currisnull)
2009 24 : break;
2010 : }
2011 : }
2012 : else
2013 : {
2014 4888 : if (!DatumGetBool(FunctionCall5Coll(&winstate->endInRangeFunc,
2015 : winstate->inRangeColl,
2016 : tailval,
2017 : currval,
2018 : winstate->endOffsetValue,
2019 : BoolGetDatum(sub),
2020 : BoolGetDatum(less))))
2021 1820 : break; /* this row is the correct frame tail */
2022 : }
2023 : /* Note we advance frametailpos even if the fetch fails */
2024 3068 : winstate->frametailpos++;
2025 3068 : spool_tuples(winstate, winstate->frametailpos);
2026 3068 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2027 : winstate->frametail_slot))
2028 320 : break; /* end of partition */
2029 : }
2030 3224 : winstate->frametail_valid = true;
2031 : }
2032 880 : else if (frameOptions & FRAMEOPTION_GROUPS)
2033 : {
2034 : /*
2035 : * In GROUPS END_OFFSET mode, frame end is the last row of the
2036 : * last peer group whose number satisfies the offset constraint,
2037 : * and frame tail is the row after that (if any). We keep a copy
2038 : * of the last-known frame tail row in frametail_slot, and advance
2039 : * as necessary. Note that if we reach end of partition, we will
2040 : * leave frametailpos = end+1 and frametail_slot empty.
2041 : */
2042 880 : int64 offset = DatumGetInt64(winstate->endOffsetValue);
2043 : int64 maxtailgroup;
2044 :
2045 880 : if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
2046 48 : maxtailgroup = winstate->currentgroup - offset;
2047 : else
2048 832 : maxtailgroup = winstate->currentgroup + offset;
2049 :
2050 880 : tuplestore_select_read_pointer(winstate->buffer,
2051 : winstate->frametail_ptr);
2052 880 : if (winstate->frametailpos == 0 &&
2053 256 : TupIsNull(winstate->frametail_slot))
2054 : {
2055 : /* fetch first row into frametail_slot, if we didn't already */
2056 244 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2057 : winstate->frametail_slot))
2058 0 : elog(ERROR, "unexpected end of tuplestore");
2059 : }
2060 :
2061 2392 : while (!TupIsNull(winstate->frametail_slot))
2062 : {
2063 1360 : if (winstate->frametailgroup > maxtailgroup)
2064 496 : break; /* this row is the correct frame tail */
2065 864 : ExecCopySlot(winstate->temp_slot_2, winstate->frametail_slot);
2066 : /* Note we advance frametailpos even if the fetch fails */
2067 864 : winstate->frametailpos++;
2068 864 : spool_tuples(winstate, winstate->frametailpos);
2069 864 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2070 : winstate->frametail_slot))
2071 232 : break; /* end of partition */
2072 632 : if (!are_peers(winstate, winstate->temp_slot_2,
2073 : winstate->frametail_slot))
2074 400 : winstate->frametailgroup++;
2075 : }
2076 880 : ExecClearTuple(winstate->temp_slot_2);
2077 880 : winstate->frametail_valid = true;
2078 : }
2079 : else
2080 : Assert(false);
2081 : }
2082 : else
2083 : Assert(false);
2084 :
2085 123206 : MemoryContextSwitchTo(oldcontext);
2086 : }
2087 :
2088 : /*
2089 : * update_grouptailpos
2090 : * make grouptailpos valid for the current row
2091 : *
2092 : * May clobber winstate->temp_slot_2.
2093 : */
2094 : static void
2095 3248 : update_grouptailpos(WindowAggState *winstate)
2096 : {
2097 3248 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
2098 : MemoryContext oldcontext;
2099 :
2100 3248 : if (winstate->grouptail_valid)
2101 2636 : return; /* already known for current row */
2102 :
2103 : /* We may be called in a short-lived context */
2104 612 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
2105 :
2106 : /* If no ORDER BY, all rows are peers with each other */
2107 612 : if (node->ordNumCols == 0)
2108 : {
2109 0 : spool_tuples(winstate, -1);
2110 0 : winstate->grouptailpos = winstate->spooled_rows;
2111 0 : winstate->grouptail_valid = true;
2112 0 : MemoryContextSwitchTo(oldcontext);
2113 0 : return;
2114 : }
2115 :
2116 : /*
2117 : * Because grouptail_valid is reset only when current row advances into a
2118 : * new peer group, we always reach here knowing that grouptailpos needs to
2119 : * be advanced by at least one row. Hence, unlike the otherwise similar
2120 : * case for frame tail tracking, we do not need persistent storage of the
2121 : * group tail row.
2122 : */
2123 : Assert(winstate->grouptailpos <= winstate->currentpos);
2124 612 : tuplestore_select_read_pointer(winstate->buffer,
2125 : winstate->grouptail_ptr);
2126 : for (;;)
2127 : {
2128 : /* Note we advance grouptailpos even if the fetch fails */
2129 1172 : winstate->grouptailpos++;
2130 1172 : spool_tuples(winstate, winstate->grouptailpos);
2131 1172 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2132 : winstate->temp_slot_2))
2133 172 : break; /* end of partition */
2134 1000 : if (winstate->grouptailpos > winstate->currentpos &&
2135 828 : !are_peers(winstate, winstate->temp_slot_2,
2136 : winstate->ss.ss_ScanTupleSlot))
2137 440 : break; /* this row is the group tail */
2138 : }
2139 612 : ExecClearTuple(winstate->temp_slot_2);
2140 612 : winstate->grouptail_valid = true;
2141 :
2142 612 : MemoryContextSwitchTo(oldcontext);
2143 : }
2144 :
2145 : /*
2146 : * calculate_frame_offsets
2147 : * Determine the startOffsetValue and endOffsetValue values for the
2148 : * WindowAgg's frame options.
2149 : */
2150 : static pg_noinline void
2151 1585 : calculate_frame_offsets(PlanState *pstate)
2152 : {
2153 1585 : WindowAggState *winstate = castNode(WindowAggState, pstate);
2154 : ExprContext *econtext;
2155 1585 : int frameOptions = winstate->frameOptions;
2156 : Datum value;
2157 : bool isnull;
2158 : int16 len;
2159 : bool byval;
2160 :
2161 : /* Ensure we've not been called before for this scan */
2162 : Assert(winstate->all_first);
2163 :
2164 1585 : econtext = winstate->ss.ps.ps_ExprContext;
2165 :
2166 1585 : if (frameOptions & FRAMEOPTION_START_OFFSET)
2167 : {
2168 : Assert(winstate->startOffset != NULL);
2169 576 : value = ExecEvalExprSwitchContext(winstate->startOffset,
2170 : econtext,
2171 : &isnull);
2172 576 : if (isnull)
2173 0 : ereport(ERROR,
2174 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2175 : errmsg("frame starting offset must not be null")));
2176 : /* copy value into query-lifespan context */
2177 576 : get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
2178 : &len,
2179 : &byval);
2180 576 : winstate->startOffsetValue = datumCopy(value, byval, len);
2181 576 : if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2182 : {
2183 : /* value is known to be int8 */
2184 232 : int64 offset = DatumGetInt64(value);
2185 :
2186 232 : if (offset < 0)
2187 0 : ereport(ERROR,
2188 : (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2189 : errmsg("frame starting offset must not be negative")));
2190 : }
2191 : }
2192 :
2193 1585 : if (frameOptions & FRAMEOPTION_END_OFFSET)
2194 : {
2195 : Assert(winstate->endOffset != NULL);
2196 640 : value = ExecEvalExprSwitchContext(winstate->endOffset,
2197 : econtext,
2198 : &isnull);
2199 640 : if (isnull)
2200 0 : ereport(ERROR,
2201 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2202 : errmsg("frame ending offset must not be null")));
2203 : /* copy value into query-lifespan context */
2204 640 : get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
2205 : &len,
2206 : &byval);
2207 640 : winstate->endOffsetValue = datumCopy(value, byval, len);
2208 640 : if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2209 : {
2210 : /* value is known to be int8 */
2211 252 : int64 offset = DatumGetInt64(value);
2212 :
2213 252 : if (offset < 0)
2214 0 : ereport(ERROR,
2215 : (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2216 : errmsg("frame ending offset must not be negative")));
2217 : }
2218 : }
2219 1585 : winstate->all_first = false;
2220 1585 : }
2221 :
2222 : /* -----------------
2223 : * ExecWindowAgg
2224 : *
2225 : * ExecWindowAgg receives tuples from its outer subplan and
2226 : * stores them into a tuplestore, then processes window functions.
2227 : * This node doesn't reduce nor qualify any row so the number of
2228 : * returned rows is exactly the same as its outer subplan's result.
2229 : * -----------------
2230 : */
2231 : static TupleTableSlot *
2232 604837 : ExecWindowAgg(PlanState *pstate)
2233 : {
2234 604837 : WindowAggState *winstate = castNode(WindowAggState, pstate);
2235 : TupleTableSlot *slot;
2236 : ExprContext *econtext;
2237 : int i;
2238 : int numfuncs;
2239 :
2240 604837 : CHECK_FOR_INTERRUPTS();
2241 :
2242 604837 : if (winstate->status == WINDOWAGG_DONE)
2243 0 : return NULL;
2244 :
2245 : /*
2246 : * Compute frame offset values, if any, during first call (or after a
2247 : * rescan). These are assumed to hold constant throughout the scan; if
2248 : * user gives us a volatile expression, we'll only use its initial value.
2249 : */
2250 604837 : if (unlikely(winstate->all_first))
2251 1585 : calculate_frame_offsets(pstate);
2252 :
2253 : /* We need to loop as the runCondition or qual may filter out tuples */
2254 : for (;;)
2255 : {
2256 604925 : if (winstate->next_partition)
2257 : {
2258 : /* Initialize for first partition and set current row = 0 */
2259 1585 : begin_partition(winstate);
2260 : /* If there are no input rows, we'll detect that and exit below */
2261 : }
2262 : else
2263 : {
2264 : /* Advance current row within partition */
2265 603340 : winstate->currentpos++;
2266 : /* This might mean that the frame moves, too */
2267 603340 : winstate->framehead_valid = false;
2268 603340 : winstate->frametail_valid = false;
2269 : /* we don't need to invalidate grouptail here; see below */
2270 : }
2271 :
2272 : /*
2273 : * Spool all tuples up to and including the current row, if we haven't
2274 : * already
2275 : */
2276 604925 : spool_tuples(winstate, winstate->currentpos);
2277 :
2278 : /* Move to the next partition if we reached the end of this partition */
2279 604925 : if (winstate->partition_spooled &&
2280 42013 : winstate->currentpos >= winstate->spooled_rows)
2281 : {
2282 2243 : release_partition(winstate);
2283 :
2284 2243 : if (winstate->more_partitions)
2285 : {
2286 826 : begin_partition(winstate);
2287 : Assert(winstate->spooled_rows > 0);
2288 :
2289 : /* Come out of pass-through mode when changing partition */
2290 826 : winstate->status = WINDOWAGG_RUN;
2291 : }
2292 : else
2293 : {
2294 : /* No further partitions? We're done */
2295 1417 : winstate->status = WINDOWAGG_DONE;
2296 1417 : return NULL;
2297 : }
2298 : }
2299 :
2300 : /* final output execution is in ps_ExprContext */
2301 603508 : econtext = winstate->ss.ps.ps_ExprContext;
2302 :
2303 : /* Clear the per-output-tuple context for current row */
2304 603508 : ResetExprContext(econtext);
2305 :
2306 : /*
2307 : * Read the current row from the tuplestore, and save in
2308 : * ScanTupleSlot. (We can't rely on the outerplan's output slot
2309 : * because we may have to read beyond the current row. Also, we have
2310 : * to actually copy the row out of the tuplestore, since window
2311 : * function evaluation might cause the tuplestore to dump its state to
2312 : * disk.)
2313 : *
2314 : * In GROUPS mode, or when tracking a group-oriented exclusion clause,
2315 : * we must also detect entering a new peer group and update associated
2316 : * state when that happens. We use temp_slot_2 to temporarily hold
2317 : * the previous row for this purpose.
2318 : *
2319 : * Current row must be in the tuplestore, since we spooled it above.
2320 : */
2321 603508 : tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
2322 603508 : if ((winstate->frameOptions & (FRAMEOPTION_GROUPS |
2323 : FRAMEOPTION_EXCLUDE_GROUP |
2324 1932 : FRAMEOPTION_EXCLUDE_TIES)) &&
2325 1932 : winstate->currentpos > 0)
2326 : {
2327 1572 : ExecCopySlot(winstate->temp_slot_2, winstate->ss.ss_ScanTupleSlot);
2328 1572 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2329 : winstate->ss.ss_ScanTupleSlot))
2330 0 : elog(ERROR, "unexpected end of tuplestore");
2331 1572 : if (!are_peers(winstate, winstate->temp_slot_2,
2332 : winstate->ss.ss_ScanTupleSlot))
2333 : {
2334 828 : winstate->currentgroup++;
2335 828 : winstate->groupheadpos = winstate->currentpos;
2336 828 : winstate->grouptail_valid = false;
2337 : }
2338 1572 : ExecClearTuple(winstate->temp_slot_2);
2339 : }
2340 : else
2341 : {
2342 601936 : if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2343 : winstate->ss.ss_ScanTupleSlot))
2344 0 : elog(ERROR, "unexpected end of tuplestore");
2345 : }
2346 :
2347 : /* don't evaluate the window functions when we're in pass-through mode */
2348 603508 : if (winstate->status == WINDOWAGG_RUN)
2349 : {
2350 : /*
2351 : * Evaluate true window functions
2352 : */
2353 603464 : numfuncs = winstate->numfuncs;
2354 1292072 : for (i = 0; i < numfuncs; i++)
2355 : {
2356 688716 : WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
2357 :
2358 688716 : if (perfuncstate->plain_agg)
2359 108101 : continue;
2360 580615 : eval_windowfunction(winstate, perfuncstate,
2361 580615 : &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
2362 580615 : &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
2363 : }
2364 :
2365 : /*
2366 : * Evaluate aggregates
2367 : */
2368 603356 : if (winstate->numaggs > 0)
2369 106905 : eval_windowaggregates(winstate);
2370 : }
2371 :
2372 : /*
2373 : * If we have created auxiliary read pointers for the frame or group
2374 : * boundaries, force them to be kept up-to-date, because we don't know
2375 : * whether the window function(s) will do anything that requires that.
2376 : * Failing to advance the pointers would result in being unable to
2377 : * trim data from the tuplestore, which is bad. (If we could know in
2378 : * advance whether the window functions will use frame boundary info,
2379 : * we could skip creating these pointers in the first place ... but
2380 : * unfortunately the window function API doesn't require that.)
2381 : */
2382 603372 : if (winstate->framehead_ptr >= 0)
2383 4154 : update_frameheadpos(winstate);
2384 603372 : if (winstate->frametail_ptr >= 0)
2385 122686 : update_frametailpos(winstate);
2386 603372 : if (winstate->grouptail_ptr >= 0)
2387 1000 : update_grouptailpos(winstate);
2388 :
2389 : /*
2390 : * Truncate any no-longer-needed rows from the tuplestore.
2391 : */
2392 603372 : tuplestore_trim(winstate->buffer);
2393 :
2394 : /*
2395 : * Form and return a projection tuple using the windowfunc results and
2396 : * the current row. Setting ecxt_outertuple arranges that any Vars
2397 : * will be evaluated with respect to that row.
2398 : */
2399 603372 : econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2400 :
2401 603372 : slot = ExecProject(winstate->ss.ps.ps_ProjInfo);
2402 :
2403 603372 : if (winstate->status == WINDOWAGG_RUN)
2404 : {
2405 603328 : econtext->ecxt_scantuple = slot;
2406 :
2407 : /*
2408 : * Now evaluate the run condition to see if we need to go into
2409 : * pass-through mode, or maybe stop completely.
2410 : */
2411 603328 : if (!ExecQual(winstate->runcondition, econtext))
2412 : {
2413 : /*
2414 : * Determine which mode to move into. If there is no
2415 : * PARTITION BY clause and we're the top-level WindowAgg then
2416 : * we're done. This tuple and any future tuples cannot
2417 : * possibly match the runcondition. However, when there is a
2418 : * PARTITION BY clause or we're not the top-level window we
2419 : * can't just stop as we need to either process other
2420 : * partitions or ensure WindowAgg nodes above us receive all
2421 : * of the tuples they need to process their WindowFuncs.
2422 : */
2423 88 : if (winstate->use_pass_through)
2424 : {
2425 : /*
2426 : * When switching into a pass-through mode, we'd better
2427 : * NULLify the aggregate results as these are no longer
2428 : * updated and NULLifying them avoids the old stale
2429 : * results lingering. Some of these might be byref types
2430 : * so we can't have them pointing to free'd memory. The
2431 : * planner insisted that quals used in the runcondition
2432 : * are strict, so the top-level WindowAgg will always
2433 : * filter these NULLs out in the filter clause.
2434 : */
2435 60 : numfuncs = winstate->numfuncs;
2436 176 : for (i = 0; i < numfuncs; i++)
2437 : {
2438 116 : econtext->ecxt_aggvalues[i] = (Datum) 0;
2439 116 : econtext->ecxt_aggnulls[i] = true;
2440 : }
2441 :
2442 : /*
2443 : * STRICT pass-through mode is required for the top window
2444 : * when there is a PARTITION BY clause. Otherwise we must
2445 : * ensure we store tuples that don't match the
2446 : * runcondition so they're available to WindowAggs above.
2447 : */
2448 60 : if (winstate->top_window)
2449 : {
2450 48 : winstate->status = WINDOWAGG_PASSTHROUGH_STRICT;
2451 48 : continue;
2452 : }
2453 : else
2454 : {
2455 12 : winstate->status = WINDOWAGG_PASSTHROUGH;
2456 : }
2457 : }
2458 : else
2459 : {
2460 : /*
2461 : * Pass-through not required. We can just return NULL.
2462 : * Nothing else will match the runcondition.
2463 : */
2464 28 : winstate->status = WINDOWAGG_DONE;
2465 28 : return NULL;
2466 : }
2467 : }
2468 :
2469 : /*
2470 : * Filter out any tuples we don't need in the top-level WindowAgg.
2471 : */
2472 603252 : if (!ExecQual(winstate->ss.ps.qual, econtext))
2473 : {
2474 12 : InstrCountFiltered1(winstate, 1);
2475 12 : continue;
2476 : }
2477 :
2478 603240 : break;
2479 : }
2480 :
2481 : /*
2482 : * When not in WINDOWAGG_RUN mode, we must still return this tuple if
2483 : * we're anything apart from the top window.
2484 : */
2485 44 : else if (!winstate->top_window)
2486 16 : break;
2487 : }
2488 :
2489 603256 : return slot;
2490 : }
2491 :
2492 : /* -----------------
2493 : * ExecInitWindowAgg
2494 : *
2495 : * Creates the run-time information for the WindowAgg node produced by the
2496 : * planner and initializes its outer subtree
2497 : * -----------------
2498 : */
2499 : WindowAggState *
2500 1890 : ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
2501 : {
2502 : WindowAggState *winstate;
2503 : Plan *outerPlan;
2504 : ExprContext *econtext;
2505 : ExprContext *tmpcontext;
2506 : WindowStatePerFunc perfunc;
2507 : WindowStatePerAgg peragg;
2508 1890 : int frameOptions = node->frameOptions;
2509 : int numfuncs,
2510 : wfuncno,
2511 : numaggs,
2512 : aggno;
2513 : TupleDesc scanDesc;
2514 : ListCell *l;
2515 :
2516 : /* check for unsupported flags */
2517 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2518 :
2519 : /*
2520 : * create state structure
2521 : */
2522 1890 : winstate = makeNode(WindowAggState);
2523 1890 : winstate->ss.ps.plan = (Plan *) node;
2524 1890 : winstate->ss.ps.state = estate;
2525 1890 : winstate->ss.ps.ExecProcNode = ExecWindowAgg;
2526 :
2527 : /* copy frame options to state node for easy access */
2528 1890 : winstate->frameOptions = frameOptions;
2529 :
2530 : /*
2531 : * Create expression contexts. We need two, one for per-input-tuple
2532 : * processing and one for per-output-tuple processing. We cheat a little
2533 : * by using ExecAssignExprContext() to build both.
2534 : */
2535 1890 : ExecAssignExprContext(estate, &winstate->ss.ps);
2536 1890 : tmpcontext = winstate->ss.ps.ps_ExprContext;
2537 1890 : winstate->tmpcontext = tmpcontext;
2538 1890 : ExecAssignExprContext(estate, &winstate->ss.ps);
2539 :
2540 : /* Create long-lived context for storage of partition-local memory etc */
2541 1890 : winstate->partcontext =
2542 1890 : AllocSetContextCreate(CurrentMemoryContext,
2543 : "WindowAgg Partition",
2544 : ALLOCSET_DEFAULT_SIZES);
2545 :
2546 : /*
2547 : * Create mid-lived context for aggregate trans values etc.
2548 : *
2549 : * Note that moving aggregates each use their own private context, not
2550 : * this one.
2551 : */
2552 1890 : winstate->aggcontext =
2553 1890 : AllocSetContextCreate(CurrentMemoryContext,
2554 : "WindowAgg Aggregates",
2555 : ALLOCSET_DEFAULT_SIZES);
2556 :
2557 : /* Only the top-level WindowAgg may have a qual */
2558 : Assert(node->plan.qual == NIL || node->topWindow);
2559 :
2560 : /* Initialize the qual */
2561 1890 : winstate->ss.ps.qual = ExecInitQual(node->plan.qual,
2562 : (PlanState *) winstate);
2563 :
2564 : /*
2565 : * Setup the run condition, if we received one from the query planner.
2566 : * When set, this may allow us to move into pass-through mode so that we
2567 : * don't have to perform any further evaluation of WindowFuncs in the
2568 : * current partition or possibly stop returning tuples altogether when all
2569 : * tuples are in the same partition.
2570 : */
2571 1890 : winstate->runcondition = ExecInitQual(node->runCondition,
2572 : (PlanState *) winstate);
2573 :
2574 : /*
2575 : * When we're not the top-level WindowAgg node or we are but have a
2576 : * PARTITION BY clause we must move into one of the WINDOWAGG_PASSTHROUGH*
2577 : * modes when the runCondition becomes false.
2578 : */
2579 1890 : winstate->use_pass_through = !node->topWindow || node->partNumCols > 0;
2580 :
2581 : /* remember if we're the top-window or we are below the top-window */
2582 1890 : winstate->top_window = node->topWindow;
2583 :
2584 : /*
2585 : * initialize child nodes
2586 : */
2587 1890 : outerPlan = outerPlan(node);
2588 1890 : outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
2589 :
2590 : /*
2591 : * initialize source tuple type (which is also the tuple type that we'll
2592 : * store in the tuplestore and use in all our working slots).
2593 : */
2594 1890 : ExecCreateScanSlotFromOuterPlan(estate, &winstate->ss, &TTSOpsMinimalTuple);
2595 1890 : scanDesc = winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2596 :
2597 : /* the outer tuple isn't the child's tuple, but always a minimal tuple */
2598 1890 : winstate->ss.ps.outeropsset = true;
2599 1890 : winstate->ss.ps.outerops = &TTSOpsMinimalTuple;
2600 1890 : winstate->ss.ps.outeropsfixed = true;
2601 :
2602 : /*
2603 : * tuple table initialization
2604 : */
2605 1890 : winstate->first_part_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2606 : &TTSOpsMinimalTuple);
2607 1890 : winstate->agg_row_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2608 : &TTSOpsMinimalTuple);
2609 1890 : winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate, scanDesc,
2610 : &TTSOpsMinimalTuple);
2611 1890 : winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate, scanDesc,
2612 : &TTSOpsMinimalTuple);
2613 :
2614 : /*
2615 : * create frame head and tail slots only if needed (must create slots in
2616 : * exactly the same cases that update_frameheadpos and update_frametailpos
2617 : * need them)
2618 : */
2619 1890 : winstate->framehead_slot = winstate->frametail_slot = NULL;
2620 :
2621 1890 : if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
2622 : {
2623 1091 : if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
2624 50 : node->ordNumCols != 0) ||
2625 1041 : (frameOptions & FRAMEOPTION_START_OFFSET))
2626 494 : winstate->framehead_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2627 : &TTSOpsMinimalTuple);
2628 1091 : if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
2629 569 : node->ordNumCols != 0) ||
2630 761 : (frameOptions & FRAMEOPTION_END_OFFSET))
2631 814 : winstate->frametail_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2632 : &TTSOpsMinimalTuple);
2633 : }
2634 :
2635 : /*
2636 : * Initialize result slot, type and projection.
2637 : */
2638 1890 : ExecInitResultTupleSlotTL(&winstate->ss.ps, &TTSOpsVirtual);
2639 1890 : ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
2640 :
2641 : /* Set up data for comparing tuples */
2642 1890 : if (node->partNumCols > 0)
2643 452 : winstate->partEqfunction =
2644 452 : execTuplesMatchPrepare(scanDesc,
2645 : node->partNumCols,
2646 452 : node->partColIdx,
2647 452 : node->partOperators,
2648 452 : node->partCollations,
2649 : &winstate->ss.ps);
2650 :
2651 1890 : if (node->ordNumCols > 0)
2652 1469 : winstate->ordEqfunction =
2653 1469 : execTuplesMatchPrepare(scanDesc,
2654 : node->ordNumCols,
2655 1469 : node->ordColIdx,
2656 1469 : node->ordOperators,
2657 1469 : node->ordCollations,
2658 : &winstate->ss.ps);
2659 :
2660 : /*
2661 : * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
2662 : */
2663 1890 : numfuncs = winstate->numfuncs;
2664 1890 : numaggs = winstate->numaggs;
2665 1890 : econtext = winstate->ss.ps.ps_ExprContext;
2666 1890 : econtext->ecxt_aggvalues = palloc0_array(Datum, numfuncs);
2667 1890 : econtext->ecxt_aggnulls = palloc0_array(bool, numfuncs);
2668 :
2669 : /*
2670 : * allocate per-wfunc/per-agg state information.
2671 : */
2672 1890 : perfunc = palloc0_array(WindowStatePerFuncData, numfuncs);
2673 1890 : peragg = palloc0_array(WindowStatePerAggData, numaggs);
2674 1890 : winstate->perfunc = perfunc;
2675 1890 : winstate->peragg = peragg;
2676 :
2677 1890 : wfuncno = -1;
2678 1890 : aggno = -1;
2679 4344 : foreach(l, winstate->funcs)
2680 : {
2681 2454 : WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
2682 2454 : WindowFunc *wfunc = wfuncstate->wfunc;
2683 : WindowStatePerFunc perfuncstate;
2684 : AclResult aclresult;
2685 : int i;
2686 :
2687 2454 : if (wfunc->winref != node->winref) /* planner screwed up? */
2688 0 : elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
2689 : wfunc->winref, node->winref);
2690 :
2691 : /*
2692 : * Look for a previous duplicate window function, which needs the same
2693 : * ignore_nulls value
2694 : */
2695 3190 : for (i = 0; i <= wfuncno; i++)
2696 : {
2697 744 : if (equal(wfunc, perfunc[i].wfunc) &&
2698 8 : !contain_volatile_functions((Node *) wfunc))
2699 8 : break;
2700 : }
2701 2454 : if (i <= wfuncno && wfunc->ignore_nulls == perfunc[i].ignore_nulls)
2702 : {
2703 : /* Found a match to an existing entry, so just mark it */
2704 8 : wfuncstate->wfuncno = i;
2705 8 : continue;
2706 : }
2707 :
2708 : /* Nope, so assign a new PerAgg record */
2709 2446 : perfuncstate = &perfunc[++wfuncno];
2710 :
2711 : /* Mark WindowFunc state node with assigned index in the result array */
2712 2446 : wfuncstate->wfuncno = wfuncno;
2713 :
2714 : /* Check permission to call window function */
2715 2446 : aclresult = object_aclcheck(ProcedureRelationId, wfunc->winfnoid, GetUserId(),
2716 : ACL_EXECUTE);
2717 2446 : if (aclresult != ACLCHECK_OK)
2718 0 : aclcheck_error(aclresult, OBJECT_FUNCTION,
2719 0 : get_func_name(wfunc->winfnoid));
2720 2446 : InvokeFunctionExecuteHook(wfunc->winfnoid);
2721 :
2722 : /* Fill in the perfuncstate data */
2723 2446 : perfuncstate->wfuncstate = wfuncstate;
2724 2446 : perfuncstate->wfunc = wfunc;
2725 2446 : perfuncstate->numArguments = list_length(wfuncstate->args);
2726 2446 : perfuncstate->winCollation = wfunc->inputcollid;
2727 :
2728 2446 : get_typlenbyval(wfunc->wintype,
2729 : &perfuncstate->resulttypeLen,
2730 : &perfuncstate->resulttypeByVal);
2731 :
2732 : /*
2733 : * If it's really just a plain aggregate function, we'll emulate the
2734 : * Agg environment for it.
2735 : */
2736 2446 : perfuncstate->plain_agg = wfunc->winagg;
2737 2446 : if (wfunc->winagg)
2738 : {
2739 : WindowStatePerAgg peraggstate;
2740 :
2741 1087 : perfuncstate->aggno = ++aggno;
2742 1087 : peraggstate = &winstate->peragg[aggno];
2743 1087 : initialize_peragg(winstate, wfunc, peraggstate);
2744 1087 : peraggstate->wfuncno = wfuncno;
2745 : }
2746 : else
2747 : {
2748 1359 : WindowObject winobj = makeNode(WindowObjectData);
2749 :
2750 1359 : winobj->winstate = winstate;
2751 1359 : winobj->argstates = wfuncstate->args;
2752 1359 : winobj->localmem = NULL;
2753 1359 : perfuncstate->winobj = winobj;
2754 1359 : winobj->ignore_nulls = wfunc->ignore_nulls;
2755 1359 : init_notnull_info(winobj, perfuncstate);
2756 :
2757 : /* It's a real window function, so set up to call it. */
2758 1359 : fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
2759 : econtext->ecxt_per_query_memory);
2760 1359 : fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
2761 : }
2762 : }
2763 :
2764 : /* Update numfuncs, numaggs to match number of unique functions found */
2765 1890 : winstate->numfuncs = wfuncno + 1;
2766 1890 : winstate->numaggs = aggno + 1;
2767 :
2768 : /* Set up WindowObject for aggregates, if needed */
2769 1890 : if (winstate->numaggs > 0)
2770 : {
2771 1011 : WindowObject agg_winobj = makeNode(WindowObjectData);
2772 :
2773 1011 : agg_winobj->winstate = winstate;
2774 1011 : agg_winobj->argstates = NIL;
2775 1011 : agg_winobj->localmem = NULL;
2776 : /* make sure markptr = -1 to invalidate. It may not get used */
2777 1011 : agg_winobj->markptr = -1;
2778 1011 : agg_winobj->readptr = -1;
2779 1011 : winstate->agg_winobj = agg_winobj;
2780 : }
2781 :
2782 : /* Set the status to running */
2783 1890 : winstate->status = WINDOWAGG_RUN;
2784 :
2785 : /* initialize frame bound offset expressions */
2786 1890 : winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
2787 : (PlanState *) winstate);
2788 1890 : winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
2789 : (PlanState *) winstate);
2790 :
2791 : /* Lookup in_range support functions if needed */
2792 1890 : if (OidIsValid(node->startInRangeFunc))
2793 348 : fmgr_info(node->startInRangeFunc, &winstate->startInRangeFunc);
2794 1890 : if (OidIsValid(node->endInRangeFunc))
2795 392 : fmgr_info(node->endInRangeFunc, &winstate->endInRangeFunc);
2796 1890 : winstate->inRangeColl = node->inRangeColl;
2797 1890 : winstate->inRangeAsc = node->inRangeAsc;
2798 1890 : winstate->inRangeNullsFirst = node->inRangeNullsFirst;
2799 :
2800 1890 : winstate->all_first = true;
2801 1890 : winstate->partition_spooled = false;
2802 1890 : winstate->more_partitions = false;
2803 1890 : winstate->next_partition = true;
2804 :
2805 1890 : return winstate;
2806 : }
2807 :
2808 : /* -----------------
2809 : * ExecEndWindowAgg
2810 : * -----------------
2811 : */
2812 : void
2813 1754 : ExecEndWindowAgg(WindowAggState *node)
2814 : {
2815 : PlanState *outerPlan;
2816 : int i;
2817 :
2818 1754 : if (node->buffer != NULL)
2819 : {
2820 1397 : tuplestore_end(node->buffer);
2821 :
2822 : /* nullify so that release_partition skips the tuplestore_clear() */
2823 1397 : node->buffer = NULL;
2824 : }
2825 :
2826 1754 : release_partition(node);
2827 :
2828 2813 : for (i = 0; i < node->numaggs; i++)
2829 : {
2830 1059 : if (node->peragg[i].aggcontext != node->aggcontext)
2831 524 : MemoryContextDelete(node->peragg[i].aggcontext);
2832 : }
2833 1754 : MemoryContextDelete(node->partcontext);
2834 1754 : MemoryContextDelete(node->aggcontext);
2835 :
2836 1754 : pfree(node->perfunc);
2837 1754 : pfree(node->peragg);
2838 :
2839 1754 : outerPlan = outerPlanState(node);
2840 1754 : ExecEndNode(outerPlan);
2841 1754 : }
2842 :
2843 : /* -----------------
2844 : * ExecReScanWindowAgg
2845 : * -----------------
2846 : */
2847 : void
2848 52 : ExecReScanWindowAgg(WindowAggState *node)
2849 : {
2850 52 : PlanState *outerPlan = outerPlanState(node);
2851 52 : ExprContext *econtext = node->ss.ps.ps_ExprContext;
2852 :
2853 52 : node->status = WINDOWAGG_RUN;
2854 52 : node->all_first = true;
2855 :
2856 : /* release tuplestore et al */
2857 52 : release_partition(node);
2858 :
2859 : /* release all temp tuples, but especially first_part_slot */
2860 52 : ExecClearTuple(node->ss.ss_ScanTupleSlot);
2861 52 : ExecClearTuple(node->first_part_slot);
2862 52 : ExecClearTuple(node->agg_row_slot);
2863 52 : ExecClearTuple(node->temp_slot_1);
2864 52 : ExecClearTuple(node->temp_slot_2);
2865 52 : if (node->framehead_slot)
2866 0 : ExecClearTuple(node->framehead_slot);
2867 52 : if (node->frametail_slot)
2868 4 : ExecClearTuple(node->frametail_slot);
2869 :
2870 : /* Forget current wfunc values */
2871 104 : MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
2872 52 : MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
2873 :
2874 : /*
2875 : * if chgParam of subnode is not null then plan will be re-scanned by
2876 : * first ExecProcNode.
2877 : */
2878 52 : if (outerPlan->chgParam == NULL)
2879 4 : ExecReScan(outerPlan);
2880 52 : }
2881 :
2882 : /*
2883 : * initialize_peragg
2884 : *
2885 : * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
2886 : */
2887 : static WindowStatePerAggData *
2888 1087 : initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
2889 : WindowStatePerAgg peraggstate)
2890 : {
2891 : Oid inputTypes[FUNC_MAX_ARGS];
2892 : int numArguments;
2893 : HeapTuple aggTuple;
2894 : Form_pg_aggregate aggform;
2895 : Oid aggtranstype;
2896 : AttrNumber initvalAttNo;
2897 : AclResult aclresult;
2898 : bool use_ma_code;
2899 : Oid transfn_oid,
2900 : invtransfn_oid,
2901 : finalfn_oid;
2902 : bool finalextra;
2903 : char finalmodify;
2904 : Expr *transfnexpr,
2905 : *invtransfnexpr,
2906 : *finalfnexpr;
2907 : Datum textInitVal;
2908 : int i;
2909 : ListCell *lc;
2910 :
2911 1087 : numArguments = list_length(wfunc->args);
2912 :
2913 1087 : i = 0;
2914 2098 : foreach(lc, wfunc->args)
2915 : {
2916 1011 : inputTypes[i++] = exprType((Node *) lfirst(lc));
2917 : }
2918 :
2919 1087 : aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
2920 1087 : if (!HeapTupleIsValid(aggTuple))
2921 0 : elog(ERROR, "cache lookup failed for aggregate %u",
2922 : wfunc->winfnoid);
2923 1087 : aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2924 :
2925 : /*
2926 : * Figure out whether we want to use the moving-aggregate implementation,
2927 : * and collect the right set of fields from the pg_aggregate entry.
2928 : *
2929 : * It's possible that an aggregate would supply a safe moving-aggregate
2930 : * implementation and an unsafe normal one, in which case our hand is
2931 : * forced. Otherwise, if the frame head can't move, we don't need
2932 : * moving-aggregate code. Even if we'd like to use it, don't do so if the
2933 : * aggregate's arguments (and FILTER clause if any) contain any calls to
2934 : * volatile functions. Otherwise, the difference between restarting and
2935 : * not restarting the aggregation would be user-visible.
2936 : *
2937 : * We also don't risk using moving aggregates when there are subplans in
2938 : * the arguments or FILTER clause. This is partly because
2939 : * contain_volatile_functions() doesn't look inside subplans; but there
2940 : * are other reasons why a subplan's output might be volatile. For
2941 : * example, syncscan mode can render the results nonrepeatable.
2942 : */
2943 1087 : if (!OidIsValid(aggform->aggminvtransfn))
2944 189 : use_ma_code = false; /* sine qua non */
2945 898 : else if (aggform->aggmfinalmodify == AGGMODIFY_READ_ONLY &&
2946 898 : aggform->aggfinalmodify != AGGMODIFY_READ_ONLY)
2947 0 : use_ma_code = true; /* decision forced by safety */
2948 898 : else if (winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
2949 358 : use_ma_code = false; /* non-moving frame head */
2950 540 : else if (contain_volatile_functions((Node *) wfunc))
2951 8 : use_ma_code = false; /* avoid possible behavioral change */
2952 532 : else if (contain_subplans((Node *) wfunc))
2953 0 : use_ma_code = false; /* subplans might contain volatile functions */
2954 : else
2955 532 : use_ma_code = true; /* yes, let's use it */
2956 1087 : if (use_ma_code)
2957 : {
2958 532 : peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
2959 532 : peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
2960 532 : peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
2961 532 : finalextra = aggform->aggmfinalextra;
2962 532 : finalmodify = aggform->aggmfinalmodify;
2963 532 : aggtranstype = aggform->aggmtranstype;
2964 532 : initvalAttNo = Anum_pg_aggregate_aggminitval;
2965 : }
2966 : else
2967 : {
2968 555 : peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
2969 555 : peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
2970 555 : peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2971 555 : finalextra = aggform->aggfinalextra;
2972 555 : finalmodify = aggform->aggfinalmodify;
2973 555 : aggtranstype = aggform->aggtranstype;
2974 555 : initvalAttNo = Anum_pg_aggregate_agginitval;
2975 : }
2976 :
2977 : /*
2978 : * ExecInitWindowAgg already checked permission to call aggregate function
2979 : * ... but we still need to check the component functions
2980 : */
2981 :
2982 : /* Check that aggregate owner has permission to call component fns */
2983 : {
2984 : HeapTuple procTuple;
2985 : Oid aggOwner;
2986 :
2987 1087 : procTuple = SearchSysCache1(PROCOID,
2988 : ObjectIdGetDatum(wfunc->winfnoid));
2989 1087 : if (!HeapTupleIsValid(procTuple))
2990 0 : elog(ERROR, "cache lookup failed for function %u",
2991 : wfunc->winfnoid);
2992 1087 : aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2993 1087 : ReleaseSysCache(procTuple);
2994 :
2995 1087 : aclresult = object_aclcheck(ProcedureRelationId, transfn_oid, aggOwner,
2996 : ACL_EXECUTE);
2997 1087 : if (aclresult != ACLCHECK_OK)
2998 0 : aclcheck_error(aclresult, OBJECT_FUNCTION,
2999 0 : get_func_name(transfn_oid));
3000 1087 : InvokeFunctionExecuteHook(transfn_oid);
3001 :
3002 1087 : if (OidIsValid(invtransfn_oid))
3003 : {
3004 532 : aclresult = object_aclcheck(ProcedureRelationId, invtransfn_oid, aggOwner,
3005 : ACL_EXECUTE);
3006 532 : if (aclresult != ACLCHECK_OK)
3007 0 : aclcheck_error(aclresult, OBJECT_FUNCTION,
3008 0 : get_func_name(invtransfn_oid));
3009 532 : InvokeFunctionExecuteHook(invtransfn_oid);
3010 : }
3011 :
3012 1087 : if (OidIsValid(finalfn_oid))
3013 : {
3014 568 : aclresult = object_aclcheck(ProcedureRelationId, finalfn_oid, aggOwner,
3015 : ACL_EXECUTE);
3016 568 : if (aclresult != ACLCHECK_OK)
3017 0 : aclcheck_error(aclresult, OBJECT_FUNCTION,
3018 0 : get_func_name(finalfn_oid));
3019 568 : InvokeFunctionExecuteHook(finalfn_oid);
3020 : }
3021 : }
3022 :
3023 : /*
3024 : * If the selected finalfn isn't read-only, we can't run this aggregate as
3025 : * a window function. This is a user-facing error, so we take a bit more
3026 : * care with the error message than elsewhere in this function.
3027 : */
3028 1087 : if (finalmodify != AGGMODIFY_READ_ONLY)
3029 0 : ereport(ERROR,
3030 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3031 : errmsg("aggregate function %s does not support use as a window function",
3032 : format_procedure(wfunc->winfnoid))));
3033 :
3034 : /* Detect how many arguments to pass to the finalfn */
3035 1087 : if (finalextra)
3036 16 : peraggstate->numFinalArgs = numArguments + 1;
3037 : else
3038 1071 : peraggstate->numFinalArgs = 1;
3039 :
3040 : /* resolve actual type of transition state, if polymorphic */
3041 1087 : aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
3042 : aggtranstype,
3043 : inputTypes,
3044 : numArguments);
3045 :
3046 : /* build expression trees using actual argument & result types */
3047 1087 : build_aggregate_transfn_expr(inputTypes,
3048 : numArguments,
3049 : 0, /* no ordered-set window functions yet */
3050 : false, /* no variadic window functions yet */
3051 : aggtranstype,
3052 : wfunc->inputcollid,
3053 : transfn_oid,
3054 : invtransfn_oid,
3055 : &transfnexpr,
3056 : &invtransfnexpr);
3057 :
3058 : /* set up infrastructure for calling the transfn(s) and finalfn */
3059 1087 : fmgr_info(transfn_oid, &peraggstate->transfn);
3060 1087 : fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
3061 :
3062 1087 : if (OidIsValid(invtransfn_oid))
3063 : {
3064 532 : fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
3065 532 : fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
3066 : }
3067 :
3068 1087 : if (OidIsValid(finalfn_oid))
3069 : {
3070 568 : build_aggregate_finalfn_expr(inputTypes,
3071 : peraggstate->numFinalArgs,
3072 : aggtranstype,
3073 : wfunc->wintype,
3074 : wfunc->inputcollid,
3075 : finalfn_oid,
3076 : &finalfnexpr);
3077 568 : fmgr_info(finalfn_oid, &peraggstate->finalfn);
3078 568 : fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
3079 : }
3080 :
3081 : /* get info about relevant datatypes */
3082 1087 : get_typlenbyval(wfunc->wintype,
3083 : &peraggstate->resulttypeLen,
3084 : &peraggstate->resulttypeByVal);
3085 1087 : get_typlenbyval(aggtranstype,
3086 : &peraggstate->transtypeLen,
3087 : &peraggstate->transtypeByVal);
3088 :
3089 : /*
3090 : * initval is potentially null, so don't try to access it as a struct
3091 : * field. Must do it the hard way with SysCacheGetAttr.
3092 : */
3093 1087 : textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
3094 : &peraggstate->initValueIsNull);
3095 :
3096 1087 : if (peraggstate->initValueIsNull)
3097 617 : peraggstate->initValue = (Datum) 0;
3098 : else
3099 470 : peraggstate->initValue = GetAggInitVal(textInitVal,
3100 : aggtranstype);
3101 :
3102 : /*
3103 : * If the transfn is strict and the initval is NULL, make sure input type
3104 : * and transtype are the same (or at least binary-compatible), so that
3105 : * it's OK to use the first input value as the initial transValue. This
3106 : * should have been checked at agg definition time, but we must check
3107 : * again in case the transfn's strictness property has been changed.
3108 : */
3109 1087 : if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
3110 : {
3111 161 : if (numArguments < 1 ||
3112 161 : !IsBinaryCoercible(inputTypes[0], aggtranstype))
3113 0 : ereport(ERROR,
3114 : (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3115 : errmsg("aggregate %u needs to have compatible input type and transition type",
3116 : wfunc->winfnoid)));
3117 : }
3118 :
3119 : /*
3120 : * Insist that forward and inverse transition functions have the same
3121 : * strictness setting. Allowing them to differ would require handling
3122 : * more special cases in advance_windowaggregate and
3123 : * advance_windowaggregate_base, for no discernible benefit. This should
3124 : * have been checked at agg definition time, but we must check again in
3125 : * case either function's strictness property has been changed.
3126 : */
3127 1087 : if (OidIsValid(invtransfn_oid) &&
3128 532 : peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
3129 0 : ereport(ERROR,
3130 : (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3131 : errmsg("strictness of aggregate's forward and inverse transition functions must match")));
3132 :
3133 : /*
3134 : * Moving aggregates use their own aggcontext.
3135 : *
3136 : * This is necessary because they might restart at different times, so we
3137 : * might never be able to reset the shared context otherwise. We can't
3138 : * make it the aggregates' responsibility to clean up after themselves,
3139 : * because strict aggregates must be restarted whenever we remove their
3140 : * last non-NULL input, which the aggregate won't be aware is happening.
3141 : * Also, just pfree()ing the transValue upon restarting wouldn't help,
3142 : * since we'd miss any indirectly referenced data. We could, in theory,
3143 : * make the memory allocation rules for moving aggregates different than
3144 : * they have historically been for plain aggregates, but that seems grotty
3145 : * and likely to lead to memory leaks.
3146 : */
3147 1087 : if (OidIsValid(invtransfn_oid))
3148 532 : peraggstate->aggcontext =
3149 532 : AllocSetContextCreate(CurrentMemoryContext,
3150 : "WindowAgg Per Aggregate",
3151 : ALLOCSET_DEFAULT_SIZES);
3152 : else
3153 555 : peraggstate->aggcontext = winstate->aggcontext;
3154 :
3155 1087 : ReleaseSysCache(aggTuple);
3156 :
3157 1087 : return peraggstate;
3158 : }
3159 :
3160 : static Datum
3161 470 : GetAggInitVal(Datum textInitVal, Oid transtype)
3162 : {
3163 : Oid typinput,
3164 : typioparam;
3165 : char *strInitVal;
3166 : Datum initVal;
3167 :
3168 470 : getTypeInputInfo(transtype, &typinput, &typioparam);
3169 470 : strInitVal = TextDatumGetCString(textInitVal);
3170 470 : initVal = OidInputFunctionCall(typinput, strInitVal,
3171 : typioparam, -1);
3172 470 : pfree(strInitVal);
3173 470 : return initVal;
3174 : }
3175 :
3176 : /*
3177 : * are_peers
3178 : * compare two rows to see if they are equal according to the ORDER BY clause
3179 : *
3180 : * NB: this does not consider the window frame mode.
3181 : */
3182 : static bool
3183 398271 : are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
3184 : TupleTableSlot *slot2)
3185 : {
3186 398271 : WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
3187 398271 : ExprContext *econtext = winstate->tmpcontext;
3188 :
3189 : /* If no ORDER BY, all rows are peers with each other */
3190 398271 : if (node->ordNumCols == 0)
3191 20702 : return true;
3192 :
3193 377569 : econtext->ecxt_outertuple = slot1;
3194 377569 : econtext->ecxt_innertuple = slot2;
3195 377569 : return ExecQualAndReset(winstate->ordEqfunction, econtext);
3196 : }
3197 :
3198 : /*
3199 : * window_gettupleslot
3200 : * Fetch the pos'th tuple of the current partition into the slot,
3201 : * using the winobj's read pointer
3202 : *
3203 : * Returns true if successful, false if no such row
3204 : */
3205 : static bool
3206 508549 : window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
3207 : {
3208 508549 : WindowAggState *winstate = winobj->winstate;
3209 : MemoryContext oldcontext;
3210 :
3211 : /* often called repeatedly in a row */
3212 508549 : CHECK_FOR_INTERRUPTS();
3213 :
3214 : /* Don't allow passing -1 to spool_tuples here */
3215 508549 : if (pos < 0)
3216 272 : return false;
3217 :
3218 : /* If necessary, fetch the tuple into the spool */
3219 508277 : spool_tuples(winstate, pos);
3220 :
3221 508277 : if (pos >= winstate->spooled_rows)
3222 3171 : return false;
3223 :
3224 505106 : if (pos < winobj->markpos)
3225 0 : elog(ERROR, "cannot fetch row before WindowObject's mark position");
3226 :
3227 505106 : oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
3228 :
3229 505106 : tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
3230 :
3231 : /*
3232 : * Advance or rewind until we are within one tuple of the one we want.
3233 : */
3234 505106 : if (winobj->seekpos < pos - 1)
3235 : {
3236 1660 : if (!tuplestore_skiptuples(winstate->buffer,
3237 1660 : pos - 1 - winobj->seekpos,
3238 : true))
3239 0 : elog(ERROR, "unexpected end of tuplestore");
3240 1660 : winobj->seekpos = pos - 1;
3241 : }
3242 503446 : else if (winobj->seekpos > pos + 1)
3243 : {
3244 1834 : if (!tuplestore_skiptuples(winstate->buffer,
3245 1834 : winobj->seekpos - (pos + 1),
3246 : false))
3247 0 : elog(ERROR, "unexpected end of tuplestore");
3248 1834 : winobj->seekpos = pos + 1;
3249 : }
3250 501612 : else if (winobj->seekpos == pos)
3251 : {
3252 : /*
3253 : * There's no API to refetch the tuple at the current position. We
3254 : * have to move one tuple forward, and then one backward. (We don't
3255 : * do it the other way because we might try to fetch the row before
3256 : * our mark, which isn't allowed.) XXX this case could stand to be
3257 : * optimized.
3258 : */
3259 115412 : tuplestore_advance(winstate->buffer, true);
3260 115412 : winobj->seekpos++;
3261 : }
3262 :
3263 : /*
3264 : * Now we should be on the tuple immediately before or after the one we
3265 : * want, so just fetch forwards or backwards as appropriate.
3266 : *
3267 : * Notice that we tell tuplestore_gettupleslot to make a physical copy of
3268 : * the fetched tuple. This ensures that the slot's contents remain valid
3269 : * through manipulations of the tuplestore, which some callers depend on.
3270 : */
3271 505106 : if (winobj->seekpos > pos)
3272 : {
3273 117429 : if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
3274 0 : elog(ERROR, "unexpected end of tuplestore");
3275 117429 : winobj->seekpos--;
3276 : }
3277 : else
3278 : {
3279 387677 : if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
3280 0 : elog(ERROR, "unexpected end of tuplestore");
3281 387677 : winobj->seekpos++;
3282 : }
3283 :
3284 : Assert(winobj->seekpos == pos);
3285 :
3286 505106 : MemoryContextSwitchTo(oldcontext);
3287 :
3288 505106 : return true;
3289 : }
3290 :
3291 : /* gettuple_eval_partition
3292 : * get tuple in a partition and evaluate the window function's argument
3293 : * expression on it.
3294 : */
3295 : static Datum
3296 158360 : gettuple_eval_partition(WindowObject winobj, int argno,
3297 : int64 abs_pos, bool *isnull, bool *isout)
3298 : {
3299 : WindowAggState *winstate;
3300 : ExprContext *econtext;
3301 : TupleTableSlot *slot;
3302 :
3303 158360 : winstate = winobj->winstate;
3304 158360 : slot = winstate->temp_slot_1;
3305 158360 : if (!window_gettupleslot(winobj, abs_pos, slot))
3306 : {
3307 : /* out of partition */
3308 388 : if (isout)
3309 388 : *isout = true;
3310 388 : *isnull = true;
3311 388 : return (Datum) 0;
3312 : }
3313 :
3314 157972 : if (isout)
3315 157972 : *isout = false;
3316 157972 : econtext = winstate->ss.ps.ps_ExprContext;
3317 157972 : econtext->ecxt_outertuple = slot;
3318 157972 : return ExecEvalExpr((ExprState *) list_nth
3319 157972 : (winobj->argstates, argno),
3320 : econtext, isnull);
3321 : }
3322 :
3323 : /*
3324 : * ignorenulls_getfuncarginframe
3325 : * For IGNORE NULLS, get the next nonnull value in the frame, moving forward
3326 : * or backward until we find a value or reach the frame's end.
3327 : */
3328 : static Datum
3329 640 : ignorenulls_getfuncarginframe(WindowObject winobj, int argno,
3330 : int relpos, int seektype, bool set_mark,
3331 : bool *isnull, bool *isout)
3332 : {
3333 : WindowAggState *winstate;
3334 : ExprContext *econtext;
3335 : TupleTableSlot *slot;
3336 : Datum datum;
3337 : int64 abs_pos;
3338 : int64 mark_pos;
3339 : int notnull_offset;
3340 : int notnull_relpos;
3341 : int forward;
3342 :
3343 : Assert(WindowObjectIsValid(winobj));
3344 640 : winstate = winobj->winstate;
3345 640 : econtext = winstate->ss.ps.ps_ExprContext;
3346 640 : slot = winstate->temp_slot_1;
3347 640 : datum = (Datum) 0;
3348 640 : notnull_offset = 0;
3349 640 : notnull_relpos = abs(relpos);
3350 :
3351 640 : switch (seektype)
3352 : {
3353 0 : case WINDOW_SEEK_CURRENT:
3354 0 : elog(ERROR, "WINDOW_SEEK_CURRENT is not supported for WinGetFuncArgInFrame");
3355 : abs_pos = mark_pos = 0; /* keep compiler quiet */
3356 : break;
3357 440 : case WINDOW_SEEK_HEAD:
3358 : /* rejecting relpos < 0 is easy and simplifies code below */
3359 440 : if (relpos < 0)
3360 0 : goto out_of_frame;
3361 440 : update_frameheadpos(winstate);
3362 440 : abs_pos = winstate->frameheadpos;
3363 440 : mark_pos = winstate->frameheadpos;
3364 440 : forward = 1;
3365 440 : break;
3366 200 : case WINDOW_SEEK_TAIL:
3367 : /* rejecting relpos > 0 is easy and simplifies code below */
3368 200 : if (relpos > 0)
3369 0 : goto out_of_frame;
3370 200 : update_frametailpos(winstate);
3371 200 : abs_pos = winstate->frametailpos - 1;
3372 200 : mark_pos = 0; /* keep compiler quiet */
3373 200 : forward = -1;
3374 200 : break;
3375 0 : default:
3376 0 : elog(ERROR, "unrecognized window seek type: %d", seektype);
3377 : abs_pos = mark_pos = 0; /* keep compiler quiet */
3378 : break;
3379 : }
3380 :
3381 : /*
3382 : * Get the next nonnull value in the frame, moving forward or backward
3383 : * until we find a value or reach the frame's end.
3384 : */
3385 : do
3386 : {
3387 : int inframe;
3388 : int v;
3389 :
3390 : /*
3391 : * Check apparent out of frame case. We need to do this because we
3392 : * may not call window_gettupleslot before row_is_in_frame, which
3393 : * supposes abs_pos is never negative.
3394 : */
3395 1528 : if (abs_pos < 0)
3396 8 : goto out_of_frame;
3397 :
3398 : /* check whether row is in frame */
3399 1520 : inframe = row_is_in_frame(winobj, abs_pos, slot, true);
3400 1520 : if (inframe == -1)
3401 36 : goto out_of_frame;
3402 1484 : else if (inframe == 0)
3403 52 : goto advance;
3404 :
3405 1432 : if (isout)
3406 0 : *isout = false;
3407 :
3408 1432 : v = get_notnull_info(winobj, abs_pos, argno);
3409 1432 : if (v == NN_NULL) /* this row is known to be NULL */
3410 400 : goto advance;
3411 :
3412 1032 : else if (v == NN_UNKNOWN) /* need to check NULL or not */
3413 : {
3414 524 : if (!window_gettupleslot(winobj, abs_pos, slot))
3415 20 : goto out_of_frame;
3416 :
3417 504 : econtext->ecxt_outertuple = slot;
3418 504 : datum = ExecEvalExpr(
3419 504 : (ExprState *) list_nth(winobj->argstates,
3420 : argno), econtext,
3421 : isnull);
3422 504 : if (!*isnull)
3423 300 : notnull_offset++;
3424 :
3425 : /* record the row status */
3426 504 : put_notnull_info(winobj, abs_pos, argno, *isnull);
3427 : }
3428 : else /* this row is known to be NOT NULL */
3429 : {
3430 508 : notnull_offset++;
3431 508 : if (notnull_offset > notnull_relpos)
3432 : {
3433 : /* to prepare exiting this loop, datum needs to be set */
3434 320 : if (!window_gettupleslot(winobj, abs_pos, slot))
3435 0 : goto out_of_frame;
3436 :
3437 320 : econtext->ecxt_outertuple = slot;
3438 320 : datum = ExecEvalExpr(
3439 320 : (ExprState *) list_nth
3440 320 : (winobj->argstates, argno),
3441 : econtext, isnull);
3442 : }
3443 : }
3444 188 : advance:
3445 1464 : abs_pos += forward;
3446 1464 : } while (notnull_offset <= notnull_relpos);
3447 :
3448 576 : if (set_mark)
3449 576 : WinSetMarkPosition(winobj, mark_pos);
3450 :
3451 576 : return datum;
3452 :
3453 64 : out_of_frame:
3454 64 : if (isout)
3455 0 : *isout = true;
3456 64 : *isnull = true;
3457 64 : return (Datum) 0;
3458 : }
3459 :
3460 :
3461 : /*
3462 : * init_notnull_info
3463 : * Initialize non null map.
3464 : */
3465 : static void
3466 1359 : init_notnull_info(WindowObject winobj, WindowStatePerFunc perfuncstate)
3467 : {
3468 1359 : int numargs = perfuncstate->numArguments;
3469 :
3470 1359 : if (winobj->ignore_nulls == PARSER_IGNORE_NULLS)
3471 : {
3472 124 : winobj->notnull_info = palloc0_array(uint8 *, numargs);
3473 124 : winobj->num_notnull_info = palloc0_array(int64, numargs);
3474 : }
3475 1359 : }
3476 :
3477 : /*
3478 : * grow_notnull_info
3479 : * expand notnull_info if necessary.
3480 : * pos: not null info position
3481 : * argno: argument number
3482 : */
3483 : static void
3484 2632 : grow_notnull_info(WindowObject winobj, int64 pos, int argno)
3485 : {
3486 : /* initial number of notnull info members */
3487 : #define INIT_NOT_NULL_INFO_NUM 128
3488 :
3489 2632 : if (pos >= winobj->num_notnull_info[argno])
3490 : {
3491 : /* We may be called in a short-lived context */
3492 100 : MemoryContext oldcontext = MemoryContextSwitchTo
3493 100 : (winobj->winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
3494 :
3495 : for (;;)
3496 0 : {
3497 100 : Size oldsize = NN_POS_TO_BYTES
3498 : (winobj->num_notnull_info[argno]);
3499 : Size newsize;
3500 :
3501 100 : if (oldsize == 0) /* memory has not been allocated yet for this
3502 : * arg */
3503 : {
3504 100 : newsize = NN_POS_TO_BYTES(INIT_NOT_NULL_INFO_NUM);
3505 100 : winobj->notnull_info[argno] = palloc0(newsize);
3506 : }
3507 : else
3508 : {
3509 0 : newsize = oldsize * 2;
3510 0 : winobj->notnull_info[argno] =
3511 0 : repalloc0(winobj->notnull_info[argno], oldsize, newsize);
3512 : }
3513 100 : winobj->num_notnull_info[argno] = NN_BYTES_TO_POS(newsize);
3514 100 : if (winobj->num_notnull_info[argno] > pos)
3515 100 : break;
3516 : }
3517 100 : MemoryContextSwitchTo(oldcontext);
3518 : }
3519 2632 : }
3520 :
3521 : /*
3522 : * get_notnull_info
3523 : * retrieve a map
3524 : * pos: map position
3525 : * argno: argument number
3526 : */
3527 : static uint8
3528 1848 : get_notnull_info(WindowObject winobj, int64 pos, int argno)
3529 : {
3530 : uint8 *mbp;
3531 : uint8 mb;
3532 : int64 bpos;
3533 :
3534 1848 : grow_notnull_info(winobj, pos, argno);
3535 1848 : bpos = NN_POS_TO_BYTES(pos);
3536 1848 : mbp = winobj->notnull_info[argno];
3537 1848 : mb = mbp[bpos];
3538 1848 : return (mb >> (NN_SHIFT(pos))) & NN_MASK;
3539 : }
3540 :
3541 : /*
3542 : * put_notnull_info
3543 : * update map
3544 : * pos: map position
3545 : * argno: argument number
3546 : * isnull: indicate NULL or NOT
3547 : */
3548 : static void
3549 784 : put_notnull_info(WindowObject winobj, int64 pos, int argno, bool isnull)
3550 : {
3551 : uint8 *mbp;
3552 : uint8 mb;
3553 : int64 bpos;
3554 784 : uint8 val = isnull ? NN_NULL : NN_NOTNULL;
3555 : int shift;
3556 :
3557 784 : grow_notnull_info(winobj, pos, argno);
3558 784 : bpos = NN_POS_TO_BYTES(pos);
3559 784 : mbp = winobj->notnull_info[argno];
3560 784 : mb = mbp[bpos];
3561 784 : shift = NN_SHIFT(pos);
3562 784 : mb &= ~(NN_MASK << shift); /* clear map */
3563 784 : mb |= (val << shift); /* update map */
3564 784 : mbp[bpos] = mb;
3565 784 : }
3566 :
3567 : /***********************************************************************
3568 : * API exposed to window functions
3569 : ***********************************************************************/
3570 :
3571 :
3572 : /*
3573 : * WinCheckAndInitializeNullTreatment
3574 : * Check null treatment clause and sets ignore_nulls
3575 : *
3576 : * Window functions should call this to check if they are being called with
3577 : * a null treatment clause when they don't allow it, or to set ignore_nulls.
3578 : */
3579 : void
3580 580615 : WinCheckAndInitializeNullTreatment(WindowObject winobj,
3581 : bool allowNullTreatment,
3582 : FunctionCallInfo fcinfo)
3583 : {
3584 : Assert(WindowObjectIsValid(winobj));
3585 580615 : if (winobj->ignore_nulls != NO_NULLTREATMENT && !allowNullTreatment)
3586 : {
3587 48 : const char *funcname = get_func_name(fcinfo->flinfo->fn_oid);
3588 :
3589 48 : if (!funcname)
3590 0 : elog(ERROR, "could not get function name");
3591 48 : ereport(ERROR,
3592 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3593 : errmsg("function %s does not allow RESPECT/IGNORE NULLS",
3594 : funcname)));
3595 : }
3596 580567 : else if (winobj->ignore_nulls == PARSER_IGNORE_NULLS)
3597 100 : winobj->ignore_nulls = IGNORE_NULLS;
3598 580567 : }
3599 :
3600 : /*
3601 : * WinGetPartitionLocalMemory
3602 : * Get working memory that lives till end of partition processing
3603 : *
3604 : * On first call within a given partition, this allocates and zeroes the
3605 : * requested amount of space. Subsequent calls just return the same chunk.
3606 : *
3607 : * Memory obtained this way is normally used to hold state that should be
3608 : * automatically reset for each new partition. If a window function wants
3609 : * to hold state across the whole query, fcinfo->fn_extra can be used in the
3610 : * usual way for that.
3611 : */
3612 : void *
3613 221280 : WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
3614 : {
3615 : Assert(WindowObjectIsValid(winobj));
3616 221280 : if (winobj->localmem == NULL)
3617 295 : winobj->localmem =
3618 295 : MemoryContextAllocZero(winobj->winstate->partcontext, sz);
3619 221280 : return winobj->localmem;
3620 : }
3621 :
3622 : /*
3623 : * WinGetCurrentPosition
3624 : * Return the current row's position (counting from 0) within the current
3625 : * partition.
3626 : */
3627 : int64
3628 501546 : WinGetCurrentPosition(WindowObject winobj)
3629 : {
3630 : Assert(WindowObjectIsValid(winobj));
3631 501546 : return winobj->winstate->currentpos;
3632 : }
3633 :
3634 : /*
3635 : * WinGetPartitionRowCount
3636 : * Return total number of rows contained in the current partition.
3637 : *
3638 : * Note: this is a relatively expensive operation because it forces the
3639 : * whole partition to be "spooled" into the tuplestore at once. Once
3640 : * executed, however, additional calls within the same partition are cheap.
3641 : */
3642 : int64
3643 208 : WinGetPartitionRowCount(WindowObject winobj)
3644 : {
3645 : Assert(WindowObjectIsValid(winobj));
3646 208 : spool_tuples(winobj->winstate, -1);
3647 208 : return winobj->winstate->spooled_rows;
3648 : }
3649 :
3650 : /*
3651 : * WinSetMarkPosition
3652 : * Set the "mark" position for the window object, which is the oldest row
3653 : * number (counting from 0) it is allowed to fetch during all subsequent
3654 : * operations within the current partition.
3655 : *
3656 : * Window functions do not have to call this, but are encouraged to move the
3657 : * mark forward when possible to keep the tuplestore size down and prevent
3658 : * having to spill rows to disk.
3659 : */
3660 : void
3661 583493 : WinSetMarkPosition(WindowObject winobj, int64 markpos)
3662 : {
3663 : WindowAggState *winstate;
3664 :
3665 : Assert(WindowObjectIsValid(winobj));
3666 583493 : winstate = winobj->winstate;
3667 :
3668 583493 : if (markpos < winobj->markpos)
3669 0 : elog(ERROR, "cannot move WindowObject's mark position backward");
3670 583493 : tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
3671 583493 : if (markpos > winobj->markpos)
3672 : {
3673 579281 : tuplestore_skiptuples(winstate->buffer,
3674 579281 : markpos - winobj->markpos,
3675 : true);
3676 579281 : winobj->markpos = markpos;
3677 : }
3678 583493 : tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
3679 583493 : if (markpos > winobj->seekpos)
3680 : {
3681 308306 : tuplestore_skiptuples(winstate->buffer,
3682 308306 : markpos - winobj->seekpos,
3683 : true);
3684 308306 : winobj->seekpos = markpos;
3685 : }
3686 583493 : }
3687 :
3688 : /*
3689 : * WinRowsArePeers
3690 : * Compare two rows (specified by absolute position in partition) to see
3691 : * if they are equal according to the ORDER BY clause.
3692 : *
3693 : * NB: this does not consider the window frame mode.
3694 : */
3695 : bool
3696 110375 : WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
3697 : {
3698 : WindowAggState *winstate;
3699 : WindowAgg *node;
3700 : TupleTableSlot *slot1;
3701 : TupleTableSlot *slot2;
3702 : bool res;
3703 :
3704 : Assert(WindowObjectIsValid(winobj));
3705 110375 : winstate = winobj->winstate;
3706 110375 : node = (WindowAgg *) winstate->ss.ps.plan;
3707 :
3708 : /* If no ORDER BY, all rows are peers; don't bother to fetch them */
3709 110375 : if (node->ordNumCols == 0)
3710 180 : return true;
3711 :
3712 : /*
3713 : * Note: OK to use temp_slot_2 here because we aren't calling any
3714 : * frame-related functions (those tend to clobber temp_slot_2).
3715 : */
3716 110195 : slot1 = winstate->temp_slot_1;
3717 110195 : slot2 = winstate->temp_slot_2;
3718 :
3719 110195 : if (!window_gettupleslot(winobj, pos1, slot1))
3720 0 : elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3721 : pos1);
3722 110195 : if (!window_gettupleslot(winobj, pos2, slot2))
3723 0 : elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3724 : pos2);
3725 :
3726 110195 : res = are_peers(winstate, slot1, slot2);
3727 :
3728 110195 : ExecClearTuple(slot1);
3729 110195 : ExecClearTuple(slot2);
3730 :
3731 110195 : return res;
3732 : }
3733 :
3734 : /*
3735 : * WinGetFuncArgInPartition
3736 : * Evaluate a window function's argument expression on a specified
3737 : * row of the partition. The row is identified in lseek(2) style,
3738 : * i.e. relative to the current, first, or last row.
3739 : *
3740 : * argno: argument number to evaluate (counted from 0)
3741 : * relpos: signed rowcount offset from the seek position
3742 : * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
3743 : * set_mark: If the row is found and set_mark is true, the mark is moved to
3744 : * the row as a side-effect.
3745 : * isnull: output argument, receives isnull status of result
3746 : * isout: output argument, set to indicate whether target row position
3747 : * is out of partition (can pass NULL if caller doesn't care about this)
3748 : *
3749 : * Specifying a nonexistent row is not an error, it just causes a null result
3750 : * (plus setting *isout true, if isout isn't NULL).
3751 : */
3752 : Datum
3753 158040 : WinGetFuncArgInPartition(WindowObject winobj, int argno,
3754 : int relpos, int seektype, bool set_mark,
3755 : bool *isnull, bool *isout)
3756 : {
3757 : WindowAggState *winstate;
3758 : int64 abs_pos;
3759 : int64 mark_pos;
3760 : Datum datum;
3761 : bool null_treatment;
3762 : int notnull_offset;
3763 : int notnull_relpos;
3764 : int forward;
3765 : bool myisout;
3766 :
3767 : Assert(WindowObjectIsValid(winobj));
3768 158040 : winstate = winobj->winstate;
3769 :
3770 158040 : null_treatment = (winobj->ignore_nulls == IGNORE_NULLS && relpos != 0);
3771 :
3772 158040 : switch (seektype)
3773 : {
3774 158040 : case WINDOW_SEEK_CURRENT:
3775 158040 : if (null_treatment)
3776 320 : abs_pos = winstate->currentpos;
3777 : else
3778 157720 : abs_pos = winstate->currentpos + relpos;
3779 158040 : break;
3780 0 : case WINDOW_SEEK_HEAD:
3781 0 : if (null_treatment)
3782 0 : abs_pos = 0;
3783 : else
3784 0 : abs_pos = relpos;
3785 0 : break;
3786 0 : case WINDOW_SEEK_TAIL:
3787 0 : spool_tuples(winstate, -1);
3788 0 : abs_pos = winstate->spooled_rows - 1 + relpos;
3789 0 : break;
3790 0 : default:
3791 0 : elog(ERROR, "unrecognized window seek type: %d", seektype);
3792 : abs_pos = 0; /* keep compiler quiet */
3793 : break;
3794 : }
3795 :
3796 : /* Easy case if IGNORE NULLS is not specified */
3797 158040 : if (!null_treatment)
3798 : {
3799 : /* get tuple and evaluate in partition */
3800 157720 : datum = gettuple_eval_partition(winobj, argno,
3801 : abs_pos, isnull, &myisout);
3802 157720 : if (!myisout && set_mark)
3803 157360 : WinSetMarkPosition(winobj, abs_pos);
3804 157720 : if (isout)
3805 157720 : *isout = myisout;
3806 157720 : return datum;
3807 : }
3808 :
3809 : /* Prepare for loop */
3810 320 : notnull_offset = 0;
3811 320 : notnull_relpos = abs(relpos);
3812 320 : forward = relpos > 0 ? 1 : -1;
3813 320 : myisout = false;
3814 320 : datum = 0;
3815 :
3816 : /*
3817 : * IGNORE NULLS + WINDOW_SEEK_CURRENT + relpos > 0 case, we would fetch
3818 : * beyond the current row + relpos to find out the target row. If we mark
3819 : * at abs_pos, next call to WinGetFuncArgInPartition or
3820 : * WinGetFuncArgInFrame (in case when a window function have multiple
3821 : * args) could fail with "cannot fetch row before WindowObject's mark
3822 : * position". So keep the mark position at currentpos.
3823 : */
3824 320 : if (seektype == WINDOW_SEEK_CURRENT && relpos > 0)
3825 160 : mark_pos = winstate->currentpos;
3826 : else
3827 : {
3828 : /*
3829 : * For other cases we have no idea what position of row callers would
3830 : * fetch next time. Also for relpos < 0 case (we go backward), we
3831 : * cannot set mark either. For those cases we always set mark at 0.
3832 : */
3833 160 : mark_pos = 0;
3834 : }
3835 :
3836 : /*
3837 : * Get the next nonnull value in the partition, moving forward or backward
3838 : * until we find a value or reach the partition's end. We cache the
3839 : * nullness status because we may repeat this process many times.
3840 : */
3841 : do
3842 : {
3843 : int nn_info; /* NOT NULL status */
3844 :
3845 468 : abs_pos += forward;
3846 468 : if (abs_pos < 0) /* clearly out of partition */
3847 52 : break;
3848 :
3849 : /* check NOT NULL cached info */
3850 416 : nn_info = get_notnull_info(winobj, abs_pos, argno);
3851 416 : if (nn_info == NN_NOTNULL) /* this row is known to be NOT NULL */
3852 60 : notnull_offset++;
3853 356 : else if (nn_info == NN_NULL) /* this row is known to be NULL */
3854 36 : continue; /* keep on moving forward or backward */
3855 : else /* need to check NULL or not */
3856 : {
3857 : /*
3858 : * NOT NULL info does not exist yet. Get tuple and evaluate func
3859 : * arg in partition. We ignore the return value from
3860 : * gettuple_eval_partition because we are just interested in
3861 : * whether we are inside or outside of partition, NULL or NOT
3862 : * NULL.
3863 : */
3864 320 : (void) gettuple_eval_partition(winobj, argno,
3865 : abs_pos, isnull, &myisout);
3866 320 : if (myisout) /* out of partition? */
3867 40 : break;
3868 280 : if (!*isnull)
3869 168 : notnull_offset++;
3870 : /* record the row status */
3871 280 : put_notnull_info(winobj, abs_pos, argno, *isnull);
3872 : }
3873 376 : } while (notnull_offset < notnull_relpos);
3874 :
3875 : /* get tuple and evaluate func arg in partition */
3876 320 : datum = gettuple_eval_partition(winobj, argno,
3877 : abs_pos, isnull, &myisout);
3878 320 : if (!myisout && set_mark)
3879 228 : WinSetMarkPosition(winobj, mark_pos);
3880 320 : if (isout)
3881 320 : *isout = myisout;
3882 :
3883 320 : return datum;
3884 : }
3885 :
3886 : /*
3887 : * WinGetFuncArgInFrame
3888 : * Evaluate a window function's argument expression on a specified
3889 : * row of the window frame. The row is identified in lseek(2) style,
3890 : * i.e. relative to the first or last row of the frame. (We do not
3891 : * support WINDOW_SEEK_CURRENT here, because it's not very clear what
3892 : * that should mean if the current row isn't part of the frame.)
3893 : *
3894 : * argno: argument number to evaluate (counted from 0)
3895 : * relpos: signed rowcount offset from the seek position
3896 : * seektype: WINDOW_SEEK_HEAD or WINDOW_SEEK_TAIL
3897 : * set_mark: If the row is found/in frame and set_mark is true, the mark is
3898 : * moved to the row as a side-effect.
3899 : * isnull: output argument, receives isnull status of result
3900 : * isout: output argument, set to indicate whether target row position
3901 : * is out of frame (can pass NULL if caller doesn't care about this)
3902 : *
3903 : * Specifying a nonexistent or not-in-frame row is not an error, it just
3904 : * causes a null result (plus setting *isout true, if isout isn't NULL).
3905 : *
3906 : * Note that some exclusion-clause options lead to situations where the
3907 : * rows that are in-frame are not consecutive in the partition. But we
3908 : * count only in-frame rows when measuring relpos.
3909 : *
3910 : * The set_mark flag is interpreted as meaning that the caller will specify
3911 : * a constant (or, perhaps, monotonically increasing) relpos in successive
3912 : * calls, so that *if there is no exclusion clause* there will be no need
3913 : * to fetch a row before the previously fetched row. But we do not expect
3914 : * the caller to know how to account for exclusion clauses. Therefore,
3915 : * if there is an exclusion clause we take responsibility for adjusting the
3916 : * mark request to something that will be safe given the above assumption
3917 : * about relpos.
3918 : */
3919 : Datum
3920 6632 : WinGetFuncArgInFrame(WindowObject winobj, int argno,
3921 : int relpos, int seektype, bool set_mark,
3922 : bool *isnull, bool *isout)
3923 : {
3924 : WindowAggState *winstate;
3925 : ExprContext *econtext;
3926 : TupleTableSlot *slot;
3927 : int64 abs_pos;
3928 : int64 mark_pos;
3929 :
3930 : Assert(WindowObjectIsValid(winobj));
3931 6632 : winstate = winobj->winstate;
3932 6632 : econtext = winstate->ss.ps.ps_ExprContext;
3933 6632 : slot = winstate->temp_slot_1;
3934 :
3935 6632 : if (winobj->ignore_nulls == IGNORE_NULLS)
3936 640 : return ignorenulls_getfuncarginframe(winobj, argno, relpos, seektype,
3937 : set_mark, isnull, isout);
3938 :
3939 5992 : switch (seektype)
3940 : {
3941 0 : case WINDOW_SEEK_CURRENT:
3942 0 : elog(ERROR, "WINDOW_SEEK_CURRENT is not supported for WinGetFuncArgInFrame");
3943 : abs_pos = mark_pos = 0; /* keep compiler quiet */
3944 : break;
3945 2964 : case WINDOW_SEEK_HEAD:
3946 : /* rejecting relpos < 0 is easy and simplifies code below */
3947 2964 : if (relpos < 0)
3948 0 : goto out_of_frame;
3949 2964 : update_frameheadpos(winstate);
3950 2936 : abs_pos = winstate->frameheadpos + relpos;
3951 2936 : mark_pos = abs_pos;
3952 :
3953 : /*
3954 : * Account for exclusion option if one is active, but advance only
3955 : * abs_pos not mark_pos. This prevents changes of the current
3956 : * row's peer group from resulting in trying to fetch a row before
3957 : * some previous mark position.
3958 : *
3959 : * Note that in some corner cases such as current row being
3960 : * outside frame, these calculations are theoretically too simple,
3961 : * but it doesn't matter because we'll end up deciding the row is
3962 : * out of frame. We do not attempt to avoid fetching rows past
3963 : * end of frame; that would happen in some cases anyway.
3964 : */
3965 2936 : switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
3966 : {
3967 2496 : case 0:
3968 : /* no adjustment needed */
3969 2496 : break;
3970 160 : case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
3971 160 : if (abs_pos >= winstate->currentpos &&
3972 124 : winstate->currentpos >= winstate->frameheadpos)
3973 44 : abs_pos++;
3974 160 : break;
3975 80 : case FRAMEOPTION_EXCLUDE_GROUP:
3976 80 : update_grouptailpos(winstate);
3977 80 : if (abs_pos >= winstate->groupheadpos &&
3978 48 : winstate->grouptailpos > winstate->frameheadpos)
3979 : {
3980 48 : int64 overlapstart = Max(winstate->groupheadpos,
3981 : winstate->frameheadpos);
3982 :
3983 48 : abs_pos += winstate->grouptailpos - overlapstart;
3984 : }
3985 80 : break;
3986 200 : case FRAMEOPTION_EXCLUDE_TIES:
3987 200 : update_grouptailpos(winstate);
3988 200 : if (abs_pos >= winstate->groupheadpos &&
3989 136 : winstate->grouptailpos > winstate->frameheadpos)
3990 : {
3991 56 : int64 overlapstart = Max(winstate->groupheadpos,
3992 : winstate->frameheadpos);
3993 :
3994 56 : if (abs_pos == overlapstart)
3995 56 : abs_pos = winstate->currentpos;
3996 : else
3997 0 : abs_pos += winstate->grouptailpos - overlapstart - 1;
3998 : }
3999 200 : break;
4000 0 : default:
4001 0 : elog(ERROR, "unrecognized frame option state: 0x%x",
4002 : winstate->frameOptions);
4003 : break;
4004 : }
4005 2936 : break;
4006 3028 : case WINDOW_SEEK_TAIL:
4007 : /* rejecting relpos > 0 is easy and simplifies code below */
4008 3028 : if (relpos > 0)
4009 0 : goto out_of_frame;
4010 3028 : update_frametailpos(winstate);
4011 3024 : abs_pos = winstate->frametailpos - 1 + relpos;
4012 :
4013 : /*
4014 : * Account for exclusion option if one is active. If there is no
4015 : * exclusion, we can safely set the mark at the accessed row. But
4016 : * if there is, we can only mark the frame start, because we can't
4017 : * be sure how far back in the frame the exclusion might cause us
4018 : * to fetch in future. Furthermore, we have to actually check
4019 : * against frameheadpos here, since it's unsafe to try to fetch a
4020 : * row before frame start if the mark might be there already.
4021 : */
4022 3024 : switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
4023 : {
4024 2704 : case 0:
4025 : /* no adjustment needed */
4026 2704 : mark_pos = abs_pos;
4027 2704 : break;
4028 80 : case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
4029 80 : if (abs_pos <= winstate->currentpos &&
4030 8 : winstate->currentpos < winstate->frametailpos)
4031 8 : abs_pos--;
4032 80 : update_frameheadpos(winstate);
4033 80 : if (abs_pos < winstate->frameheadpos)
4034 4 : goto out_of_frame;
4035 76 : mark_pos = winstate->frameheadpos;
4036 76 : break;
4037 160 : case FRAMEOPTION_EXCLUDE_GROUP:
4038 160 : update_grouptailpos(winstate);
4039 160 : if (abs_pos < winstate->grouptailpos &&
4040 36 : winstate->groupheadpos < winstate->frametailpos)
4041 : {
4042 36 : int64 overlapend = Min(winstate->grouptailpos,
4043 : winstate->frametailpos);
4044 :
4045 36 : abs_pos -= overlapend - winstate->groupheadpos;
4046 : }
4047 160 : update_frameheadpos(winstate);
4048 160 : if (abs_pos < winstate->frameheadpos)
4049 36 : goto out_of_frame;
4050 124 : mark_pos = winstate->frameheadpos;
4051 124 : break;
4052 80 : case FRAMEOPTION_EXCLUDE_TIES:
4053 80 : update_grouptailpos(winstate);
4054 80 : if (abs_pos < winstate->grouptailpos &&
4055 24 : winstate->groupheadpos < winstate->frametailpos)
4056 : {
4057 24 : int64 overlapend = Min(winstate->grouptailpos,
4058 : winstate->frametailpos);
4059 :
4060 24 : if (abs_pos == overlapend - 1)
4061 24 : abs_pos = winstate->currentpos;
4062 : else
4063 0 : abs_pos -= overlapend - 1 - winstate->groupheadpos;
4064 : }
4065 80 : update_frameheadpos(winstate);
4066 80 : if (abs_pos < winstate->frameheadpos)
4067 0 : goto out_of_frame;
4068 80 : mark_pos = winstate->frameheadpos;
4069 80 : break;
4070 0 : default:
4071 0 : elog(ERROR, "unrecognized frame option state: 0x%x",
4072 : winstate->frameOptions);
4073 : mark_pos = 0; /* keep compiler quiet */
4074 : break;
4075 : }
4076 2984 : break;
4077 0 : default:
4078 0 : elog(ERROR, "unrecognized window seek type: %d", seektype);
4079 : abs_pos = mark_pos = 0; /* keep compiler quiet */
4080 : break;
4081 : }
4082 :
4083 5920 : if (!window_gettupleslot(winobj, abs_pos, slot))
4084 264 : goto out_of_frame;
4085 :
4086 : /* The code above does not detect all out-of-frame cases, so check */
4087 5656 : if (row_is_in_frame(winobj, abs_pos, slot, false) <= 0)
4088 200 : goto out_of_frame;
4089 :
4090 5436 : if (isout)
4091 0 : *isout = false;
4092 5436 : if (set_mark)
4093 5408 : WinSetMarkPosition(winobj, mark_pos);
4094 5436 : econtext->ecxt_outertuple = slot;
4095 5436 : return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
4096 : econtext, isnull);
4097 :
4098 504 : out_of_frame:
4099 504 : if (isout)
4100 0 : *isout = true;
4101 504 : *isnull = true;
4102 504 : return (Datum) 0;
4103 : }
4104 :
4105 : /*
4106 : * WinGetFuncArgCurrent
4107 : * Evaluate a window function's argument expression on the current row.
4108 : *
4109 : * argno: argument number to evaluate (counted from 0)
4110 : * isnull: output argument, receives isnull status of result
4111 : *
4112 : * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
4113 : * WinGetFuncArgInFrame targeting the current row, because it will succeed
4114 : * even if the WindowObject's mark has been set beyond the current row.
4115 : * This should generally be used for "ordinary" arguments of a window
4116 : * function, such as the offset argument of lead() or lag().
4117 : */
4118 : Datum
4119 1340 : WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
4120 : {
4121 : WindowAggState *winstate;
4122 : ExprContext *econtext;
4123 :
4124 : Assert(WindowObjectIsValid(winobj));
4125 1340 : winstate = winobj->winstate;
4126 :
4127 1340 : econtext = winstate->ss.ps.ps_ExprContext;
4128 :
4129 1340 : econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
4130 1340 : return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
4131 : econtext, isnull);
4132 : }
|