Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pquery.c
4 : * POSTGRES process query command code
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/tcop/pquery.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include <limits.h>
19 :
20 : #include "access/xact.h"
21 : #include "commands/prepare.h"
22 : #include "executor/executor.h"
23 : #include "executor/tstoreReceiver.h"
24 : #include "miscadmin.h"
25 : #include "pg_trace.h"
26 : #include "tcop/pquery.h"
27 : #include "tcop/utility.h"
28 : #include "utils/memutils.h"
29 : #include "utils/snapmgr.h"
30 :
31 :
32 : /*
33 : * ActivePortal is the currently executing Portal (the most closely nested,
34 : * if there are several).
35 : */
36 : Portal ActivePortal = NULL;
37 :
38 :
39 : static void ProcessQuery(PlannedStmt *plan,
40 : const char *sourceText,
41 : ParamListInfo params,
42 : QueryEnvironment *queryEnv,
43 : DestReceiver *dest,
44 : QueryCompletion *qc);
45 : static void FillPortalStore(Portal portal, bool isTopLevel);
46 : static uint64 RunFromStore(Portal portal, ScanDirection direction, uint64 count,
47 : DestReceiver *dest);
48 : static uint64 PortalRunSelect(Portal portal, bool forward, long count,
49 : DestReceiver *dest);
50 : static void PortalRunUtility(Portal portal, PlannedStmt *pstmt,
51 : bool isTopLevel, bool setHoldSnapshot,
52 : DestReceiver *dest, QueryCompletion *qc);
53 : static void PortalRunMulti(Portal portal,
54 : bool isTopLevel, bool setHoldSnapshot,
55 : DestReceiver *dest, DestReceiver *altdest,
56 : QueryCompletion *qc);
57 : static uint64 DoPortalRunFetch(Portal portal,
58 : FetchDirection fdirection,
59 : long count,
60 : DestReceiver *dest);
61 : static void DoPortalRewind(Portal portal);
62 :
63 :
64 : /*
65 : * CreateQueryDesc
66 : */
67 : QueryDesc *
68 363735 : CreateQueryDesc(PlannedStmt *plannedstmt,
69 : const char *sourceText,
70 : Snapshot snapshot,
71 : Snapshot crosscheck_snapshot,
72 : DestReceiver *dest,
73 : ParamListInfo params,
74 : QueryEnvironment *queryEnv,
75 : int instrument_options)
76 : {
77 363735 : QueryDesc *qd = palloc_object(QueryDesc);
78 :
79 363735 : qd->operation = plannedstmt->commandType; /* operation */
80 363735 : qd->plannedstmt = plannedstmt; /* plan */
81 363735 : qd->sourceText = sourceText; /* query text */
82 363735 : qd->snapshot = RegisterSnapshot(snapshot); /* snapshot */
83 : /* RI check snapshot */
84 363735 : qd->crosscheck_snapshot = RegisterSnapshot(crosscheck_snapshot);
85 363735 : qd->dest = dest; /* output dest */
86 363735 : qd->params = params; /* parameter values passed into query */
87 363735 : qd->queryEnv = queryEnv;
88 363735 : qd->instrument_options = instrument_options; /* instrumentation wanted? */
89 363735 : qd->query_instr_options = 0;
90 :
91 : /* null these fields until set by ExecutorStart */
92 363735 : qd->tupDesc = NULL;
93 363735 : qd->estate = NULL;
94 363735 : qd->planstate = NULL;
95 363735 : qd->query_instr = NULL;
96 :
97 : /* not yet executed */
98 363735 : qd->already_executed = false;
99 :
100 363735 : return qd;
101 : }
102 :
103 : /*
104 : * FreeQueryDesc
105 : */
106 : void
107 344790 : FreeQueryDesc(QueryDesc *qdesc)
108 : {
109 : /* Can't be a live query */
110 : Assert(qdesc->estate == NULL);
111 :
112 : /* forget our snapshots */
113 344790 : UnregisterSnapshot(qdesc->snapshot);
114 344790 : UnregisterSnapshot(qdesc->crosscheck_snapshot);
115 :
116 : /* Only the QueryDesc itself need be freed */
117 344790 : pfree(qdesc);
118 344790 : }
119 :
120 :
121 : /*
122 : * ProcessQuery
123 : * Execute a single plannable query within a PORTAL_MULTI_QUERY,
124 : * PORTAL_ONE_RETURNING, or PORTAL_ONE_MOD_WITH portal
125 : *
126 : * plan: the plan tree for the query
127 : * sourceText: the source text of the query
128 : * params: any parameters needed
129 : * dest: where to send results
130 : * qc: where to store the command completion status data.
131 : *
132 : * qc may be NULL if caller doesn't want a status string.
133 : *
134 : * Must be called in a memory context that will be reset or deleted on
135 : * error; otherwise the executor's memory usage will be leaked.
136 : */
137 : static void
138 53265 : ProcessQuery(PlannedStmt *plan,
139 : const char *sourceText,
140 : ParamListInfo params,
141 : QueryEnvironment *queryEnv,
142 : DestReceiver *dest,
143 : QueryCompletion *qc)
144 : {
145 : QueryDesc *queryDesc;
146 :
147 : /*
148 : * Create the QueryDesc object
149 : */
150 53265 : queryDesc = CreateQueryDesc(plan, sourceText,
151 : GetActiveSnapshot(), InvalidSnapshot,
152 : dest, params, queryEnv, 0);
153 :
154 : /*
155 : * Call ExecutorStart to prepare the plan for execution
156 : */
157 53265 : ExecutorStart(queryDesc, 0);
158 :
159 : /*
160 : * Run the plan to completion.
161 : */
162 52551 : ExecutorRun(queryDesc, ForwardScanDirection, 0);
163 :
164 : /*
165 : * Build command completion status data, if caller wants one.
166 : */
167 50308 : if (qc)
168 : {
169 : CommandTag tag;
170 :
171 49844 : if (queryDesc->operation == CMD_SELECT)
172 83 : tag = CMDTAG_SELECT;
173 49761 : else if (queryDesc->operation == CMD_INSERT)
174 39243 : tag = CMDTAG_INSERT;
175 10518 : else if (queryDesc->operation == CMD_UPDATE)
176 7239 : tag = CMDTAG_UPDATE;
177 3279 : else if (queryDesc->operation == CMD_DELETE)
178 2539 : tag = CMDTAG_DELETE;
179 740 : else if (queryDesc->operation == CMD_MERGE)
180 740 : tag = CMDTAG_MERGE;
181 : else
182 0 : tag = CMDTAG_UNKNOWN;
183 :
184 49844 : SetQueryCompletion(qc, tag, queryDesc->estate->es_processed);
185 : }
186 :
187 : /*
188 : * Now, we close down all the scans and free allocated resources.
189 : */
190 50308 : ExecutorFinish(queryDesc);
191 49526 : ExecutorEnd(queryDesc);
192 :
193 49526 : FreeQueryDesc(queryDesc);
194 49526 : }
195 :
196 : /*
197 : * ChoosePortalStrategy
198 : * Select portal execution strategy given the intended statement list.
199 : *
200 : * The list elements can be Querys or PlannedStmts.
201 : * That's more general than portals need, but plancache.c uses this too.
202 : *
203 : * See the comments in portal.h.
204 : */
205 : PortalStrategy
206 524568 : ChoosePortalStrategy(List *stmts)
207 : {
208 : int nSetTag;
209 : ListCell *lc;
210 :
211 : /*
212 : * PORTAL_ONE_SELECT and PORTAL_UTIL_SELECT need only consider the
213 : * single-statement case, since there are no rewrite rules that can add
214 : * auxiliary queries to a SELECT or a utility command. PORTAL_ONE_MOD_WITH
215 : * likewise allows only one top-level statement.
216 : */
217 524568 : if (list_length(stmts) == 1)
218 : {
219 524312 : Node *stmt = (Node *) linitial(stmts);
220 :
221 524312 : if (IsA(stmt, Query))
222 : {
223 52121 : Query *query = (Query *) stmt;
224 :
225 52121 : if (query->canSetTag)
226 : {
227 52121 : if (query->commandType == CMD_SELECT)
228 : {
229 34899 : if (query->hasModifyingCTE)
230 0 : return PORTAL_ONE_MOD_WITH;
231 : else
232 34899 : return PORTAL_ONE_SELECT;
233 : }
234 17222 : if (query->commandType == CMD_UTILITY)
235 : {
236 12860 : if (UtilityReturnsTuples(query->utilityStmt))
237 6734 : return PORTAL_UTIL_SELECT;
238 : /* it can't be ONE_RETURNING, so give up */
239 6126 : return PORTAL_MULTI_QUERY;
240 : }
241 : }
242 : }
243 472191 : else if (IsA(stmt, PlannedStmt))
244 : {
245 472191 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
246 :
247 472191 : if (pstmt->canSetTag)
248 : {
249 472155 : if (pstmt->commandType == CMD_SELECT)
250 : {
251 188807 : if (pstmt->hasModifyingCTE)
252 91 : return PORTAL_ONE_MOD_WITH;
253 : else
254 188716 : return PORTAL_ONE_SELECT;
255 : }
256 283348 : if (pstmt->commandType == CMD_UTILITY)
257 : {
258 230882 : if (UtilityReturnsTuples(pstmt->utilityStmt))
259 29005 : return PORTAL_UTIL_SELECT;
260 : /* it can't be ONE_RETURNING, so give up */
261 201877 : return PORTAL_MULTI_QUERY;
262 : }
263 : }
264 : }
265 : else
266 0 : elog(ERROR, "unrecognized node type: %d", (int) nodeTag(stmt));
267 : }
268 :
269 : /*
270 : * PORTAL_ONE_RETURNING has to allow auxiliary queries added by rewrite.
271 : * Choose PORTAL_ONE_RETURNING if there is exactly one canSetTag query and
272 : * it has a RETURNING list.
273 : */
274 57120 : nSetTag = 0;
275 59518 : foreach(lc, stmts)
276 : {
277 57256 : Node *stmt = (Node *) lfirst(lc);
278 :
279 57256 : if (IsA(stmt, Query))
280 : {
281 4370 : Query *query = (Query *) stmt;
282 :
283 4370 : if (query->canSetTag)
284 : {
285 4366 : if (++nSetTag > 1)
286 54858 : return PORTAL_MULTI_QUERY; /* no need to look further */
287 4366 : if (query->commandType == CMD_UTILITY ||
288 4366 : query->returningList == NIL)
289 4183 : return PORTAL_MULTI_QUERY; /* no need to look further */
290 : }
291 : }
292 52886 : else if (IsA(stmt, PlannedStmt))
293 : {
294 52886 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
295 :
296 52886 : if (pstmt->canSetTag)
297 : {
298 52706 : if (++nSetTag > 1)
299 0 : return PORTAL_MULTI_QUERY; /* no need to look further */
300 52706 : if (pstmt->commandType == CMD_UTILITY ||
301 52706 : !pstmt->hasReturning)
302 50675 : return PORTAL_MULTI_QUERY; /* no need to look further */
303 : }
304 : }
305 : else
306 0 : elog(ERROR, "unrecognized node type: %d", (int) nodeTag(stmt));
307 : }
308 2262 : if (nSetTag == 1)
309 2214 : return PORTAL_ONE_RETURNING;
310 :
311 : /* Else, it's the general case... */
312 48 : return PORTAL_MULTI_QUERY;
313 : }
314 :
315 : /*
316 : * FetchPortalTargetList
317 : * Given a portal that returns tuples, extract the query targetlist.
318 : * Returns NIL if the portal doesn't have a determinable targetlist.
319 : *
320 : * Note: do not modify the result.
321 : */
322 : List *
323 202578 : FetchPortalTargetList(Portal portal)
324 : {
325 : /* no point in looking if we determined it doesn't return tuples */
326 202578 : if (portal->strategy == PORTAL_MULTI_QUERY)
327 20 : return NIL;
328 : /* get the primary statement and find out what it returns */
329 202558 : return FetchStatementTargetList((Node *) PortalGetPrimaryStmt(portal));
330 : }
331 :
332 : /*
333 : * FetchStatementTargetList
334 : * Given a statement that returns tuples, extract the query targetlist.
335 : * Returns NIL if the statement doesn't have a determinable targetlist.
336 : *
337 : * This can be applied to a Query or a PlannedStmt.
338 : * That's more general than portals need, but plancache.c uses this too.
339 : *
340 : * Note: do not modify the result.
341 : *
342 : * XXX be careful to keep this in sync with UtilityReturnsTuples.
343 : */
344 : List *
345 211172 : FetchStatementTargetList(Node *stmt)
346 : {
347 211172 : if (stmt == NULL)
348 0 : return NIL;
349 211172 : if (IsA(stmt, Query))
350 : {
351 8614 : Query *query = (Query *) stmt;
352 :
353 8614 : if (query->commandType == CMD_UTILITY)
354 : {
355 : /* transfer attention to utility statement */
356 8 : stmt = query->utilityStmt;
357 : }
358 : else
359 : {
360 8606 : if (query->commandType == CMD_SELECT)
361 8598 : return query->targetList;
362 8 : if (query->returningList)
363 8 : return query->returningList;
364 0 : return NIL;
365 : }
366 : }
367 202566 : if (IsA(stmt, PlannedStmt))
368 : {
369 202558 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
370 :
371 202558 : if (pstmt->commandType == CMD_UTILITY)
372 : {
373 : /* transfer attention to utility statement */
374 23331 : stmt = pstmt->utilityStmt;
375 : }
376 : else
377 : {
378 179227 : if (pstmt->commandType == CMD_SELECT)
379 177323 : return pstmt->planTree->targetlist;
380 1904 : if (pstmt->hasReturning)
381 1904 : return pstmt->planTree->targetlist;
382 0 : return NIL;
383 : }
384 : }
385 23339 : if (IsA(stmt, FetchStmt))
386 : {
387 3172 : FetchStmt *fstmt = (FetchStmt *) stmt;
388 : Portal subportal;
389 :
390 : Assert(!fstmt->ismove);
391 3172 : subportal = GetPortalByName(fstmt->portalname);
392 : Assert(PortalIsValid(subportal));
393 3172 : return FetchPortalTargetList(subportal);
394 : }
395 20167 : if (IsA(stmt, ExecuteStmt))
396 : {
397 8543 : ExecuteStmt *estmt = (ExecuteStmt *) stmt;
398 : PreparedStatement *entry;
399 :
400 8543 : entry = FetchPreparedStatement(estmt->name, true);
401 8543 : return FetchPreparedStatementTargetList(entry);
402 : }
403 11624 : return NIL;
404 : }
405 :
406 : /*
407 : * PortalStart
408 : * Prepare a portal for execution.
409 : *
410 : * Caller must already have created the portal, done PortalDefineQuery(),
411 : * and adjusted portal options if needed.
412 : *
413 : * If parameters are needed by the query, they must be passed in "params"
414 : * (caller is responsible for giving them appropriate lifetime).
415 : *
416 : * The caller can also provide an initial set of "eflags" to be passed to
417 : * ExecutorStart (but note these can be modified internally, and they are
418 : * currently only honored for PORTAL_ONE_SELECT portals). Most callers
419 : * should simply pass zero.
420 : *
421 : * The caller can optionally pass a snapshot to be used; pass InvalidSnapshot
422 : * for the normal behavior of setting a new snapshot. This parameter is
423 : * presently ignored for non-PORTAL_ONE_SELECT portals (it's only intended
424 : * to be used for cursors).
425 : *
426 : * On return, portal is ready to accept PortalRun() calls, and the result
427 : * tupdesc (if any) is known.
428 : */
429 : void
430 472439 : PortalStart(Portal portal, ParamListInfo params,
431 : int eflags, Snapshot snapshot)
432 : {
433 : Portal saveActivePortal;
434 : ResourceOwner saveResourceOwner;
435 : MemoryContext savePortalContext;
436 : MemoryContext oldContext;
437 : QueryDesc *queryDesc;
438 : int myeflags;
439 :
440 : Assert(PortalIsValid(portal));
441 : Assert(portal->status == PORTAL_DEFINED);
442 :
443 : /*
444 : * Set up global portal context pointers.
445 : */
446 472439 : saveActivePortal = ActivePortal;
447 472439 : saveResourceOwner = CurrentResourceOwner;
448 472439 : savePortalContext = PortalContext;
449 472439 : PG_TRY();
450 : {
451 472439 : ActivePortal = portal;
452 472439 : if (portal->resowner)
453 472439 : CurrentResourceOwner = portal->resowner;
454 472439 : PortalContext = portal->portalContext;
455 :
456 472439 : oldContext = MemoryContextSwitchTo(PortalContext);
457 :
458 : /* Must remember portal param list, if any */
459 472439 : portal->portalParams = params;
460 :
461 : /*
462 : * Determine the portal execution strategy
463 : */
464 472439 : portal->strategy = ChoosePortalStrategy(portal->stmts);
465 :
466 : /*
467 : * Fire her up according to the strategy
468 : */
469 472439 : switch (portal->strategy)
470 : {
471 188716 : case PORTAL_ONE_SELECT:
472 :
473 : /* Must set snapshot before starting executor. */
474 188716 : if (snapshot)
475 14101 : PushActiveSnapshot(snapshot);
476 : else
477 174615 : PushActiveSnapshot(GetTransactionSnapshot());
478 :
479 : /*
480 : * We could remember the snapshot in portal->portalSnapshot,
481 : * but presently there seems no need to, as this code path
482 : * cannot be used for non-atomic execution. Hence there can't
483 : * be any commit/abort that might destroy the snapshot. Since
484 : * we don't do that, there's also no need to force a
485 : * non-default nesting level for the snapshot.
486 : */
487 :
488 : /*
489 : * Create QueryDesc in portal's context; for the moment, set
490 : * the destination to DestNone.
491 : */
492 188716 : queryDesc = CreateQueryDesc(linitial_node(PlannedStmt, portal->stmts),
493 : portal->sourceText,
494 : GetActiveSnapshot(),
495 : InvalidSnapshot,
496 : None_Receiver,
497 : params,
498 : portal->queryEnv,
499 : 0);
500 :
501 : /*
502 : * If it's a scrollable cursor, executor needs to support
503 : * REWIND and backwards scan, as well as whatever the caller
504 : * might've asked for.
505 : */
506 188716 : if (portal->cursorOptions & CURSOR_OPT_SCROLL)
507 2442 : myeflags = eflags | EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD;
508 : else
509 186274 : myeflags = eflags;
510 :
511 : /*
512 : * Call ExecutorStart to prepare the plan for execution
513 : */
514 188716 : ExecutorStart(queryDesc, myeflags);
515 :
516 : /*
517 : * This tells PortalCleanup to shut down the executor
518 : */
519 188288 : portal->queryDesc = queryDesc;
520 :
521 : /*
522 : * Remember tuple descriptor (computed by ExecutorStart)
523 : */
524 188288 : portal->tupDesc = queryDesc->tupDesc;
525 :
526 : /*
527 : * Reset cursor position data to "start of query"
528 : */
529 188288 : portal->atStart = true;
530 188288 : portal->atEnd = false; /* allow fetches */
531 188288 : portal->portalPos = 0;
532 :
533 188288 : PopActiveSnapshot();
534 188288 : break;
535 :
536 2122 : case PORTAL_ONE_RETURNING:
537 : case PORTAL_ONE_MOD_WITH:
538 :
539 : /*
540 : * We don't start the executor until we are told to run the
541 : * portal. We do need to set up the result tupdesc.
542 : */
543 : {
544 : PlannedStmt *pstmt;
545 :
546 2122 : pstmt = PortalGetPrimaryStmt(portal);
547 2122 : portal->tupDesc =
548 2122 : ExecCleanTypeFromTL(pstmt->planTree->targetlist);
549 : }
550 :
551 : /*
552 : * Reset cursor position data to "start of query"
553 : */
554 2122 : portal->atStart = true;
555 2122 : portal->atEnd = false; /* allow fetches */
556 2122 : portal->portalPos = 0;
557 2122 : break;
558 :
559 29005 : case PORTAL_UTIL_SELECT:
560 :
561 : /*
562 : * We don't set snapshot here, because PortalRunUtility will
563 : * take care of it if needed.
564 : */
565 : {
566 29005 : PlannedStmt *pstmt = PortalGetPrimaryStmt(portal);
567 :
568 : Assert(pstmt->commandType == CMD_UTILITY);
569 29005 : portal->tupDesc = UtilityTupleDescriptor(pstmt->utilityStmt);
570 : }
571 :
572 : /*
573 : * Reset cursor position data to "start of query"
574 : */
575 28987 : portal->atStart = true;
576 28987 : portal->atEnd = false; /* allow fetches */
577 28987 : portal->portalPos = 0;
578 28987 : break;
579 :
580 252596 : case PORTAL_MULTI_QUERY:
581 : /* Need do nothing now */
582 252596 : portal->tupDesc = NULL;
583 252596 : break;
584 : }
585 : }
586 446 : PG_CATCH();
587 : {
588 : /* Uncaught error while executing portal: mark it dead */
589 446 : MarkPortalFailed(portal);
590 :
591 : /* Restore global vars and propagate error */
592 446 : ActivePortal = saveActivePortal;
593 446 : CurrentResourceOwner = saveResourceOwner;
594 446 : PortalContext = savePortalContext;
595 :
596 446 : PG_RE_THROW();
597 : }
598 471993 : PG_END_TRY();
599 :
600 471993 : MemoryContextSwitchTo(oldContext);
601 :
602 471993 : ActivePortal = saveActivePortal;
603 471993 : CurrentResourceOwner = saveResourceOwner;
604 471993 : PortalContext = savePortalContext;
605 :
606 471993 : portal->status = PORTAL_READY;
607 471993 : }
608 :
609 : /*
610 : * PortalSetResultFormat
611 : * Select the format codes for a portal's output.
612 : *
613 : * This must be run after PortalStart for a portal that will be read by
614 : * a DestRemote or DestRemoteExecute destination. It is not presently needed
615 : * for other destination types.
616 : *
617 : * formats[] is the client format request, as per Bind message conventions.
618 : */
619 : void
620 452370 : PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
621 : {
622 : int natts;
623 : int i;
624 :
625 : /* Do nothing if portal won't return tuples */
626 452370 : if (portal->tupDesc == NULL)
627 252525 : return;
628 199845 : natts = portal->tupDesc->natts;
629 199845 : portal->formats = (int16 *)
630 199845 : MemoryContextAlloc(portal->portalContext,
631 : natts * sizeof(int16));
632 199845 : if (nFormats > 1)
633 : {
634 : /* format specified for each column */
635 0 : if (nFormats != natts)
636 0 : ereport(ERROR,
637 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
638 : errmsg("bind message has %d result formats but query has %d columns",
639 : nFormats, natts)));
640 0 : memcpy(portal->formats, formats, natts * sizeof(int16));
641 : }
642 199845 : else if (nFormats > 0)
643 : {
644 : /* single format specified, use for all columns */
645 199845 : int16 format1 = formats[0];
646 :
647 831080 : for (i = 0; i < natts; i++)
648 631235 : portal->formats[i] = format1;
649 : }
650 : else
651 : {
652 : /* use default format for all columns */
653 0 : for (i = 0; i < natts; i++)
654 0 : portal->formats[i] = 0;
655 : }
656 : }
657 :
658 : /*
659 : * PortalRun
660 : * Run a portal's query or queries.
661 : *
662 : * count <= 0 is interpreted as a no-op: the destination gets started up
663 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
664 : * interpreted as "all rows". Note that count is ignored in multi-query
665 : * situations, where we always run the portal to completion.
666 : *
667 : * isTopLevel: true if query is being executed at backend "top level"
668 : * (that is, directly from a client command message)
669 : *
670 : * dest: where to send output of primary (canSetTag) query
671 : *
672 : * altdest: where to send output of non-primary queries
673 : *
674 : * qc: where to store command completion status data.
675 : * May be NULL if caller doesn't want status data.
676 : *
677 : * Returns true if the portal's execution is complete, false if it was
678 : * suspended due to exhaustion of the count parameter.
679 : */
680 : bool
681 461015 : PortalRun(Portal portal, long count, bool isTopLevel,
682 : DestReceiver *dest, DestReceiver *altdest,
683 : QueryCompletion *qc)
684 : {
685 : bool result;
686 : uint64 nprocessed;
687 : ResourceOwner saveTopTransactionResourceOwner;
688 : MemoryContext saveTopTransactionContext;
689 : Portal saveActivePortal;
690 : ResourceOwner saveResourceOwner;
691 : MemoryContext savePortalContext;
692 : MemoryContext saveMemoryContext;
693 :
694 : Assert(PortalIsValid(portal));
695 :
696 : TRACE_POSTGRESQL_QUERY_EXECUTE_START();
697 :
698 : /* Initialize empty completion data */
699 461015 : if (qc)
700 461015 : InitializeQueryCompletion(qc);
701 :
702 461015 : if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
703 : {
704 0 : elog(DEBUG3, "PortalRun");
705 : /* PORTAL_MULTI_QUERY logs its own stats per query */
706 0 : ResetUsage();
707 : }
708 :
709 : /*
710 : * Check for improper portal use, and mark portal active.
711 : */
712 461015 : MarkPortalActive(portal);
713 :
714 : /*
715 : * Set up global portal context pointers.
716 : *
717 : * We have to play a special game here to support utility commands like
718 : * VACUUM and CLUSTER, which internally start and commit transactions.
719 : * When we are called to execute such a command, CurrentResourceOwner will
720 : * be pointing to the TopTransactionResourceOwner --- which will be
721 : * destroyed and replaced in the course of the internal commit and
722 : * restart. So we need to be prepared to restore it as pointing to the
723 : * exit-time TopTransactionResourceOwner. (Ain't that ugly? This idea of
724 : * internally starting whole new transactions is not good.)
725 : * CurrentMemoryContext has a similar problem, but the other pointers we
726 : * save here will be NULL or pointing to longer-lived objects.
727 : */
728 461015 : saveTopTransactionResourceOwner = TopTransactionResourceOwner;
729 461015 : saveTopTransactionContext = TopTransactionContext;
730 461015 : saveActivePortal = ActivePortal;
731 461015 : saveResourceOwner = CurrentResourceOwner;
732 461015 : savePortalContext = PortalContext;
733 461015 : saveMemoryContext = CurrentMemoryContext;
734 461015 : PG_TRY();
735 : {
736 461015 : ActivePortal = portal;
737 461015 : if (portal->resowner)
738 461015 : CurrentResourceOwner = portal->resowner;
739 461015 : PortalContext = portal->portalContext;
740 :
741 461015 : MemoryContextSwitchTo(PortalContext);
742 :
743 461015 : switch (portal->strategy)
744 : {
745 208419 : case PORTAL_ONE_SELECT:
746 : case PORTAL_ONE_RETURNING:
747 : case PORTAL_ONE_MOD_WITH:
748 : case PORTAL_UTIL_SELECT:
749 :
750 : /*
751 : * If we have not yet run the command, do so, storing its
752 : * results in the portal's tuplestore. But we don't do that
753 : * for the PORTAL_ONE_SELECT case.
754 : */
755 208419 : if (portal->strategy != PORTAL_ONE_SELECT && !portal->holdStore)
756 25670 : FillPortalStore(portal, isTopLevel);
757 :
758 : /*
759 : * Now fetch desired portion of results.
760 : */
761 208073 : nprocessed = PortalRunSelect(portal, true, count, dest);
762 :
763 : /*
764 : * If the portal result contains a command tag and the caller
765 : * gave us a pointer to store it, copy it and update the
766 : * rowcount.
767 : */
768 203081 : if (qc && portal->qc.commandTag != CMDTAG_UNKNOWN)
769 : {
770 203081 : CopyQueryCompletion(qc, &portal->qc);
771 203081 : qc->nprocessed = nprocessed;
772 : }
773 :
774 : /* Mark portal not active */
775 203081 : portal->status = PORTAL_READY;
776 :
777 : /*
778 : * Since it's a forward fetch, say DONE iff atEnd is now true.
779 : */
780 203081 : result = portal->atEnd;
781 203081 : break;
782 :
783 252596 : case PORTAL_MULTI_QUERY:
784 252596 : PortalRunMulti(portal, isTopLevel, false,
785 : dest, altdest, qc);
786 :
787 : /* Prevent portal's commands from being re-executed */
788 237787 : MarkPortalDone(portal);
789 :
790 : /* Always complete at end of RunMulti */
791 237787 : result = true;
792 237787 : break;
793 :
794 0 : default:
795 0 : elog(ERROR, "unrecognized portal strategy: %d",
796 : (int) portal->strategy);
797 : result = false; /* keep compiler quiet */
798 : break;
799 : }
800 : }
801 20143 : PG_CATCH();
802 : {
803 : /* Uncaught error while executing portal: mark it dead */
804 20143 : MarkPortalFailed(portal);
805 :
806 : /* Restore global vars and propagate error */
807 20143 : if (saveMemoryContext == saveTopTransactionContext)
808 19899 : MemoryContextSwitchTo(TopTransactionContext);
809 : else
810 244 : MemoryContextSwitchTo(saveMemoryContext);
811 20143 : ActivePortal = saveActivePortal;
812 20143 : if (saveResourceOwner == saveTopTransactionResourceOwner)
813 19952 : CurrentResourceOwner = TopTransactionResourceOwner;
814 : else
815 191 : CurrentResourceOwner = saveResourceOwner;
816 20143 : PortalContext = savePortalContext;
817 :
818 20143 : PG_RE_THROW();
819 : }
820 440868 : PG_END_TRY();
821 :
822 440868 : if (saveMemoryContext == saveTopTransactionContext)
823 417321 : MemoryContextSwitchTo(TopTransactionContext);
824 : else
825 23547 : MemoryContextSwitchTo(saveMemoryContext);
826 440868 : ActivePortal = saveActivePortal;
827 440868 : if (saveResourceOwner == saveTopTransactionResourceOwner)
828 428323 : CurrentResourceOwner = TopTransactionResourceOwner;
829 : else
830 12545 : CurrentResourceOwner = saveResourceOwner;
831 440868 : PortalContext = savePortalContext;
832 :
833 440868 : if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
834 0 : ShowUsage("EXECUTOR STATISTICS");
835 :
836 : TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
837 :
838 440868 : return result;
839 : }
840 :
841 : /*
842 : * PortalRunSelect
843 : * Execute a portal's query in PORTAL_ONE_SELECT mode, and also
844 : * when fetching from a completed holdStore in PORTAL_ONE_RETURNING,
845 : * PORTAL_ONE_MOD_WITH, and PORTAL_UTIL_SELECT cases.
846 : *
847 : * This handles simple N-rows-forward-or-backward cases. For more complex
848 : * nonsequential access to a portal, see PortalRunFetch.
849 : *
850 : * count <= 0 is interpreted as a no-op: the destination gets started up
851 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
852 : * interpreted as "all rows". (cf FetchStmt.howMany)
853 : *
854 : * Caller must already have validated the Portal and done appropriate
855 : * setup (cf. PortalRun).
856 : *
857 : * Returns number of rows processed (suitable for use in result tag)
858 : */
859 : static uint64
860 242411 : PortalRunSelect(Portal portal,
861 : bool forward,
862 : long count,
863 : DestReceiver *dest)
864 : {
865 : QueryDesc *queryDesc;
866 : ScanDirection direction;
867 : uint64 nprocessed;
868 :
869 : /*
870 : * NB: queryDesc will be NULL if we are fetching from a held cursor or a
871 : * completed utility query; can't use it in that path.
872 : */
873 242411 : queryDesc = portal->queryDesc;
874 :
875 : /* Caller messed up if we have neither a ready query nor held data. */
876 : Assert(queryDesc || portal->holdStore);
877 :
878 : /*
879 : * Force the queryDesc destination to the right thing. This supports
880 : * MOVE, for example, which will pass in dest = DestNone. This is okay to
881 : * change as long as we do it on every fetch. (The Executor must not
882 : * assume that dest never changes.)
883 : */
884 242411 : if (queryDesc)
885 195626 : queryDesc->dest = dest;
886 :
887 : /*
888 : * Determine which direction to go in, and check to see if we're already
889 : * at the end of the available tuples in that direction. If so, set the
890 : * direction to NoMovement to avoid trying to fetch any tuples. (This
891 : * check exists because not all plan node types are robust about being
892 : * called again if they've already returned NULL once.) Then call the
893 : * executor (we must not skip this, because the destination needs to see a
894 : * setup and shutdown even if no tuples are available). Finally, update
895 : * the portal position state depending on the number of tuples that were
896 : * retrieved.
897 : */
898 242411 : if (forward)
899 : {
900 241999 : if (portal->atEnd || count <= 0)
901 : {
902 2504 : direction = NoMovementScanDirection;
903 2504 : count = 0; /* don't pass negative count to executor */
904 : }
905 : else
906 239495 : direction = ForwardScanDirection;
907 :
908 : /* In the executor, zero count processes all rows */
909 241999 : if (count == FETCH_ALL)
910 208288 : count = 0;
911 :
912 241999 : if (portal->holdStore)
913 46773 : nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
914 : else
915 : {
916 195226 : PushActiveSnapshot(queryDesc->snapshot);
917 195226 : ExecutorRun(queryDesc, direction, (uint64) count);
918 190207 : nprocessed = queryDesc->estate->es_processed;
919 190207 : PopActiveSnapshot();
920 : }
921 :
922 236980 : if (!ScanDirectionIsNoMovement(direction))
923 : {
924 234476 : if (nprocessed > 0)
925 194159 : portal->atStart = false; /* OK to go backward now */
926 234476 : if (count == 0 || nprocessed < (uint64) count)
927 212554 : portal->atEnd = true; /* we retrieved 'em all */
928 234476 : portal->portalPos += nprocessed;
929 : }
930 : }
931 : else
932 : {
933 412 : if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
934 16 : ereport(ERROR,
935 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
936 : errmsg("cursor can only scan forward"),
937 : errhint("Declare it with SCROLL option to enable backward scan.")));
938 :
939 396 : if (portal->atStart || count <= 0)
940 : {
941 48 : direction = NoMovementScanDirection;
942 48 : count = 0; /* don't pass negative count to executor */
943 : }
944 : else
945 348 : direction = BackwardScanDirection;
946 :
947 : /* In the executor, zero count processes all rows */
948 396 : if (count == FETCH_ALL)
949 41 : count = 0;
950 :
951 396 : if (portal->holdStore)
952 8 : nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
953 : else
954 : {
955 388 : PushActiveSnapshot(queryDesc->snapshot);
956 388 : ExecutorRun(queryDesc, direction, (uint64) count);
957 388 : nprocessed = queryDesc->estate->es_processed;
958 388 : PopActiveSnapshot();
959 : }
960 :
961 396 : if (!ScanDirectionIsNoMovement(direction))
962 : {
963 348 : if (nprocessed > 0 && portal->atEnd)
964 : {
965 106 : portal->atEnd = false; /* OK to go forward now */
966 106 : portal->portalPos++; /* adjust for endpoint case */
967 : }
968 348 : if (count == 0 || nprocessed < (uint64) count)
969 : {
970 125 : portal->atStart = true; /* we retrieved 'em all */
971 125 : portal->portalPos = 0;
972 : }
973 : else
974 : {
975 223 : portal->portalPos -= nprocessed;
976 : }
977 : }
978 : }
979 :
980 237376 : return nprocessed;
981 : }
982 :
983 : /*
984 : * FillPortalStore
985 : * Run the query and load result tuples into the portal's tuple store.
986 : *
987 : * This is used for PORTAL_ONE_RETURNING, PORTAL_ONE_MOD_WITH, and
988 : * PORTAL_UTIL_SELECT cases only.
989 : */
990 : static void
991 31109 : FillPortalStore(Portal portal, bool isTopLevel)
992 : {
993 : DestReceiver *treceiver;
994 : QueryCompletion qc;
995 :
996 31109 : InitializeQueryCompletion(&qc);
997 31109 : PortalCreateHoldStore(portal);
998 31109 : treceiver = CreateDestReceiver(DestTuplestore);
999 31109 : SetTuplestoreDestReceiverParams(treceiver,
1000 : portal->holdStore,
1001 : portal->holdContext,
1002 : false,
1003 : NULL,
1004 : NULL);
1005 :
1006 31109 : switch (portal->strategy)
1007 : {
1008 2122 : case PORTAL_ONE_RETURNING:
1009 : case PORTAL_ONE_MOD_WITH:
1010 :
1011 : /*
1012 : * Run the portal to completion just as for the default
1013 : * PORTAL_MULTI_QUERY case, but send the primary query's output to
1014 : * the tuplestore. Auxiliary query outputs are discarded. Set the
1015 : * portal's holdSnapshot to the snapshot used (or a copy of it).
1016 : */
1017 2122 : PortalRunMulti(portal, isTopLevel, true,
1018 : treceiver, None_Receiver, &qc);
1019 1994 : break;
1020 :
1021 28987 : case PORTAL_UTIL_SELECT:
1022 28987 : PortalRunUtility(portal, linitial_node(PlannedStmt, portal->stmts),
1023 : isTopLevel, true, treceiver, &qc);
1024 28765 : break;
1025 :
1026 0 : default:
1027 0 : elog(ERROR, "unsupported portal strategy: %d",
1028 : (int) portal->strategy);
1029 : break;
1030 : }
1031 :
1032 : /* Override portal completion data with actual command results */
1033 30759 : if (qc.commandTag != CMDTAG_UNKNOWN)
1034 13705 : CopyQueryCompletion(&portal->qc, &qc);
1035 :
1036 30759 : treceiver->rDestroy(treceiver);
1037 30759 : }
1038 :
1039 : /*
1040 : * RunFromStore
1041 : * Fetch tuples from the portal's tuple store.
1042 : *
1043 : * Calling conventions are similar to ExecutorRun, except that we
1044 : * do not depend on having a queryDesc or estate. Therefore we return the
1045 : * number of tuples processed as the result, not in estate->es_processed.
1046 : *
1047 : * One difference from ExecutorRun is that the destination receiver functions
1048 : * are run in the caller's memory context (since we have no estate). Watch
1049 : * out for memory leaks.
1050 : */
1051 : static uint64
1052 46781 : RunFromStore(Portal portal, ScanDirection direction, uint64 count,
1053 : DestReceiver *dest)
1054 : {
1055 46781 : uint64 current_tuple_count = 0;
1056 : TupleTableSlot *slot;
1057 :
1058 46781 : slot = MakeSingleTupleTableSlot(portal->tupDesc, &TTSOpsMinimalTuple);
1059 :
1060 46781 : dest->rStartup(dest, CMD_SELECT, portal->tupDesc);
1061 :
1062 46781 : if (ScanDirectionIsNoMovement(direction))
1063 : {
1064 : /* do nothing except start/stop the destination */
1065 : }
1066 : else
1067 : {
1068 45024 : bool forward = ScanDirectionIsForward(direction);
1069 :
1070 : for (;;)
1071 260683 : {
1072 : MemoryContext oldcontext;
1073 : bool ok;
1074 :
1075 305707 : oldcontext = MemoryContextSwitchTo(portal->holdContext);
1076 :
1077 305707 : ok = tuplestore_gettupleslot(portal->holdStore, forward, false,
1078 : slot);
1079 :
1080 305707 : MemoryContextSwitchTo(oldcontext);
1081 :
1082 305707 : if (!ok)
1083 30791 : break;
1084 :
1085 : /*
1086 : * If we are not able to send the tuple, we assume the destination
1087 : * has closed and no more tuples can be sent. If that's the case,
1088 : * end the loop.
1089 : */
1090 274916 : if (!dest->receiveSlot(slot, dest))
1091 0 : break;
1092 :
1093 274916 : ExecClearTuple(slot);
1094 :
1095 : /*
1096 : * check our tuple count.. if we've processed the proper number
1097 : * then quit, else loop again and process more tuples. Zero count
1098 : * means no limit.
1099 : */
1100 274916 : current_tuple_count++;
1101 274916 : if (count && count == current_tuple_count)
1102 14233 : break;
1103 : }
1104 : }
1105 :
1106 46781 : dest->rShutdown(dest);
1107 :
1108 46781 : ExecDropSingleTupleTableSlot(slot);
1109 :
1110 46781 : return current_tuple_count;
1111 : }
1112 :
1113 : /*
1114 : * PortalRunUtility
1115 : * Execute a utility statement inside a portal.
1116 : */
1117 : static void
1118 230864 : PortalRunUtility(Portal portal, PlannedStmt *pstmt,
1119 : bool isTopLevel, bool setHoldSnapshot,
1120 : DestReceiver *dest, QueryCompletion *qc)
1121 : {
1122 : /*
1123 : * Set snapshot if utility stmt needs one.
1124 : */
1125 230864 : if (PlannedStmtRequiresSnapshot(pstmt))
1126 : {
1127 179635 : Snapshot snapshot = GetTransactionSnapshot();
1128 :
1129 : /* If told to, register the snapshot we're using and save in portal */
1130 179635 : if (setHoldSnapshot)
1131 : {
1132 24989 : snapshot = RegisterSnapshot(snapshot);
1133 24989 : portal->holdSnapshot = snapshot;
1134 : }
1135 :
1136 : /*
1137 : * In any case, make the snapshot active and remember it in portal.
1138 : * Because the portal now references the snapshot, we must tell
1139 : * snapmgr.c that the snapshot belongs to the portal's transaction
1140 : * level, else we risk portalSnapshot becoming a dangling pointer.
1141 : */
1142 179635 : PushActiveSnapshotWithLevel(snapshot, portal->createLevel);
1143 : /* PushActiveSnapshotWithLevel might have copied the snapshot */
1144 179635 : portal->portalSnapshot = GetActiveSnapshot();
1145 : }
1146 : else
1147 51229 : portal->portalSnapshot = NULL;
1148 :
1149 230864 : ProcessUtility(pstmt,
1150 : portal->sourceText,
1151 230864 : (portal->cplan != NULL), /* protect tree if in plancache */
1152 230864 : isTopLevel ? PROCESS_UTILITY_TOPLEVEL : PROCESS_UTILITY_QUERY,
1153 : portal->portalParams,
1154 : portal->queryEnv,
1155 : dest,
1156 : qc);
1157 :
1158 : /* Some utility statements may change context on us */
1159 219444 : MemoryContextSwitchTo(portal->portalContext);
1160 :
1161 : /*
1162 : * Some utility commands (e.g., VACUUM, WAIT FOR) pop the ActiveSnapshot
1163 : * stack from under us, so don't complain if it's now empty. Otherwise,
1164 : * our snapshot should be the top one; pop it. Note that this could be a
1165 : * different snapshot from the one we made above; see
1166 : * EnsurePortalSnapshotExists.
1167 : */
1168 219444 : if (portal->portalSnapshot != NULL && ActiveSnapshotSet())
1169 : {
1170 : Assert(portal->portalSnapshot == GetActiveSnapshot());
1171 161807 : PopActiveSnapshot();
1172 : }
1173 219444 : portal->portalSnapshot = NULL;
1174 219444 : }
1175 :
1176 : /*
1177 : * PortalRunMulti
1178 : * Execute a portal's queries in the general case (multi queries
1179 : * or non-SELECT-like queries)
1180 : */
1181 : static void
1182 254718 : PortalRunMulti(Portal portal,
1183 : bool isTopLevel, bool setHoldSnapshot,
1184 : DestReceiver *dest, DestReceiver *altdest,
1185 : QueryCompletion *qc)
1186 : {
1187 254718 : bool active_snapshot_set = false;
1188 : ListCell *stmtlist_item;
1189 :
1190 : /*
1191 : * If the destination is DestRemoteExecute, change to DestNone. The
1192 : * reason is that the client won't be expecting any tuples, and indeed has
1193 : * no way to know what they are, since there is no provision for Describe
1194 : * to send a RowDescription message when this portal execution strategy is
1195 : * in effect. This presently will only affect SELECT commands added to
1196 : * non-SELECT queries by rewrite rules: such commands will be executed,
1197 : * but the results will be discarded unless you use "simple Query"
1198 : * protocol.
1199 : */
1200 254718 : if (dest->mydest == DestRemoteExecute)
1201 6372 : dest = None_Receiver;
1202 254718 : if (altdest->mydest == DestRemoteExecute)
1203 6372 : altdest = None_Receiver;
1204 :
1205 : /*
1206 : * Loop to handle the individual queries generated from a single parsetree
1207 : * by analysis and rewrite.
1208 : */
1209 494923 : foreach(stmtlist_item, portal->stmts)
1210 : {
1211 255142 : PlannedStmt *pstmt = lfirst_node(PlannedStmt, stmtlist_item);
1212 :
1213 : /*
1214 : * If we got a cancel signal in prior command, quit
1215 : */
1216 255142 : CHECK_FOR_INTERRUPTS();
1217 :
1218 255142 : if (pstmt->utilityStmt == NULL)
1219 : {
1220 : /*
1221 : * process a plannable query.
1222 : */
1223 : TRACE_POSTGRESQL_QUERY_EXECUTE_START();
1224 :
1225 53265 : if (log_executor_stats)
1226 0 : ResetUsage();
1227 :
1228 : /*
1229 : * Must always have a snapshot for plannable queries. First time
1230 : * through, take a new snapshot; for subsequent queries in the
1231 : * same portal, just update the snapshot's copy of the command
1232 : * counter.
1233 : */
1234 53265 : if (!active_snapshot_set)
1235 : {
1236 52841 : Snapshot snapshot = GetTransactionSnapshot();
1237 :
1238 : /* If told to, register the snapshot and save in portal */
1239 52841 : if (setHoldSnapshot)
1240 : {
1241 2122 : snapshot = RegisterSnapshot(snapshot);
1242 2122 : portal->holdSnapshot = snapshot;
1243 : }
1244 :
1245 : /*
1246 : * We can't have the holdSnapshot also be the active one,
1247 : * because UpdateActiveSnapshotCommandId would complain. So
1248 : * force an extra snapshot copy. Plain PushActiveSnapshot
1249 : * would have copied the transaction snapshot anyway, so this
1250 : * only adds a copy step when setHoldSnapshot is true. (It's
1251 : * okay for the command ID of the active snapshot to diverge
1252 : * from what holdSnapshot has.)
1253 : */
1254 52841 : PushCopiedSnapshot(snapshot);
1255 :
1256 : /*
1257 : * As for PORTAL_ONE_SELECT portals, it does not seem
1258 : * necessary to maintain portal->portalSnapshot here.
1259 : */
1260 :
1261 52841 : active_snapshot_set = true;
1262 : }
1263 : else
1264 424 : UpdateActiveSnapshotCommandId();
1265 :
1266 53265 : if (pstmt->canSetTag)
1267 : {
1268 : /* statement can set tag string */
1269 52797 : ProcessQuery(pstmt,
1270 : portal->sourceText,
1271 : portal->portalParams,
1272 : portal->queryEnv,
1273 : dest, qc);
1274 : }
1275 : else
1276 : {
1277 : /* stmt added by rewrite cannot set tag */
1278 468 : ProcessQuery(pstmt,
1279 : portal->sourceText,
1280 : portal->portalParams,
1281 : portal->queryEnv,
1282 : altdest, NULL);
1283 : }
1284 :
1285 49526 : if (log_executor_stats)
1286 0 : ShowUsage("EXECUTOR STATISTICS");
1287 :
1288 : TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
1289 : }
1290 : else
1291 : {
1292 : /*
1293 : * process utility functions (create, destroy, etc..)
1294 : *
1295 : * We must not set a snapshot here for utility commands (if one is
1296 : * needed, PortalRunUtility will do it). If a utility command is
1297 : * alone in a portal then everything's fine. The only case where
1298 : * a utility command can be part of a longer list is that rules
1299 : * are allowed to include NotifyStmt. NotifyStmt doesn't care
1300 : * whether it has a snapshot or not, so we just leave the current
1301 : * snapshot alone if we have one.
1302 : */
1303 201877 : if (pstmt->canSetTag)
1304 : {
1305 : Assert(!active_snapshot_set);
1306 : /* statement can set tag string */
1307 201877 : PortalRunUtility(portal, pstmt, isTopLevel, false,
1308 : dest, qc);
1309 : }
1310 : else
1311 : {
1312 : Assert(IsA(pstmt->utilityStmt, NotifyStmt));
1313 : /* stmt added by rewrite cannot set tag */
1314 0 : PortalRunUtility(portal, pstmt, isTopLevel, false,
1315 : altdest, NULL);
1316 : }
1317 : }
1318 :
1319 : /*
1320 : * Clear subsidiary contexts to recover temporary memory.
1321 : */
1322 : Assert(portal->portalContext == CurrentMemoryContext);
1323 :
1324 240205 : MemoryContextDeleteChildren(portal->portalContext);
1325 :
1326 : /*
1327 : * Avoid crashing if portal->stmts has been reset. This can only
1328 : * occur if a CALL or DO utility statement executed an internal
1329 : * COMMIT/ROLLBACK (cf PortalReleaseCachedPlan). The CALL or DO must
1330 : * have been the only statement in the portal, so there's nothing left
1331 : * for us to do; but we don't want to dereference a now-dangling list
1332 : * pointer.
1333 : */
1334 240205 : if (portal->stmts == NIL)
1335 0 : break;
1336 :
1337 : /*
1338 : * Increment command counter between queries, but not after the last
1339 : * one.
1340 : */
1341 240205 : if (lnext(portal->stmts, stmtlist_item) != NULL)
1342 424 : CommandCounterIncrement();
1343 : }
1344 :
1345 : /* Pop the snapshot if we pushed one. */
1346 239781 : if (active_snapshot_set)
1347 49102 : PopActiveSnapshot();
1348 :
1349 : /*
1350 : * If a command tag was requested and we did not fill in a run-time-
1351 : * determined tag above, copy the parse-time tag from the Portal. (There
1352 : * might not be any tag there either, in edge cases such as empty prepared
1353 : * statements. That's OK.)
1354 : */
1355 239781 : if (qc &&
1356 239781 : qc->commandTag == CMDTAG_UNKNOWN &&
1357 183066 : portal->qc.commandTag != CMDTAG_UNKNOWN)
1358 183066 : CopyQueryCompletion(qc, &portal->qc);
1359 239781 : }
1360 :
1361 : /*
1362 : * PortalRunFetch
1363 : * Variant form of PortalRun that supports SQL FETCH directions.
1364 : *
1365 : * Note: we presently assume that no callers of this want isTopLevel = true.
1366 : *
1367 : * count <= 0 is interpreted as a no-op: the destination gets started up
1368 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
1369 : * interpreted as "all rows". (cf FetchStmt.howMany)
1370 : *
1371 : * Returns number of rows processed (suitable for use in result tag)
1372 : */
1373 : uint64
1374 34287 : PortalRunFetch(Portal portal,
1375 : FetchDirection fdirection,
1376 : long count,
1377 : DestReceiver *dest)
1378 : {
1379 : uint64 result;
1380 : Portal saveActivePortal;
1381 : ResourceOwner saveResourceOwner;
1382 : MemoryContext savePortalContext;
1383 : MemoryContext oldContext;
1384 :
1385 : Assert(PortalIsValid(portal));
1386 :
1387 : /*
1388 : * Check for improper portal use, and mark portal active.
1389 : */
1390 34287 : MarkPortalActive(portal);
1391 :
1392 : /*
1393 : * Set up global portal context pointers.
1394 : */
1395 34275 : saveActivePortal = ActivePortal;
1396 34275 : saveResourceOwner = CurrentResourceOwner;
1397 34275 : savePortalContext = PortalContext;
1398 34275 : PG_TRY();
1399 : {
1400 34275 : ActivePortal = portal;
1401 34275 : if (portal->resowner)
1402 34168 : CurrentResourceOwner = portal->resowner;
1403 34275 : PortalContext = portal->portalContext;
1404 :
1405 34275 : oldContext = MemoryContextSwitchTo(PortalContext);
1406 :
1407 34275 : switch (portal->strategy)
1408 : {
1409 12917 : case PORTAL_ONE_SELECT:
1410 12917 : result = DoPortalRunFetch(portal, fdirection, count, dest);
1411 12870 : break;
1412 :
1413 21358 : case PORTAL_ONE_RETURNING:
1414 : case PORTAL_ONE_MOD_WITH:
1415 : case PORTAL_UTIL_SELECT:
1416 :
1417 : /*
1418 : * If we have not yet run the command, do so, storing its
1419 : * results in the portal's tuplestore.
1420 : */
1421 21358 : if (!portal->holdStore)
1422 5439 : FillPortalStore(portal, false /* isTopLevel */ );
1423 :
1424 : /*
1425 : * Now fetch desired portion of results.
1426 : */
1427 21354 : result = DoPortalRunFetch(portal, fdirection, count, dest);
1428 21354 : break;
1429 :
1430 0 : default:
1431 0 : elog(ERROR, "unsupported portal strategy");
1432 : result = 0; /* keep compiler quiet */
1433 : break;
1434 : }
1435 : }
1436 51 : PG_CATCH();
1437 : {
1438 : /* Uncaught error while executing portal: mark it dead */
1439 51 : MarkPortalFailed(portal);
1440 :
1441 : /* Restore global vars and propagate error */
1442 51 : ActivePortal = saveActivePortal;
1443 51 : CurrentResourceOwner = saveResourceOwner;
1444 51 : PortalContext = savePortalContext;
1445 :
1446 51 : PG_RE_THROW();
1447 : }
1448 34224 : PG_END_TRY();
1449 :
1450 34224 : MemoryContextSwitchTo(oldContext);
1451 :
1452 : /* Mark portal not active */
1453 34224 : portal->status = PORTAL_READY;
1454 :
1455 34224 : ActivePortal = saveActivePortal;
1456 34224 : CurrentResourceOwner = saveResourceOwner;
1457 34224 : PortalContext = savePortalContext;
1458 :
1459 34224 : return result;
1460 : }
1461 :
1462 : /*
1463 : * DoPortalRunFetch
1464 : * Guts of PortalRunFetch --- the portal context is already set up
1465 : *
1466 : * Here, count < 0 typically reverses the direction. Also, count == FETCH_ALL
1467 : * is interpreted as "all rows". (cf FetchStmt.howMany)
1468 : *
1469 : * Returns number of rows processed (suitable for use in result tag)
1470 : */
1471 : static uint64
1472 34271 : DoPortalRunFetch(Portal portal,
1473 : FetchDirection fdirection,
1474 : long count,
1475 : DestReceiver *dest)
1476 : {
1477 : bool forward;
1478 :
1479 : Assert(portal->strategy == PORTAL_ONE_SELECT ||
1480 : portal->strategy == PORTAL_ONE_RETURNING ||
1481 : portal->strategy == PORTAL_ONE_MOD_WITH ||
1482 : portal->strategy == PORTAL_UTIL_SELECT);
1483 :
1484 : /*
1485 : * Note: we disallow backwards fetch (including re-fetch of current row)
1486 : * for NO SCROLL cursors, but we interpret that very loosely: you can use
1487 : * any of the FetchDirection options, so long as the end result is to move
1488 : * forwards by at least one row. Currently it's sufficient to check for
1489 : * NO SCROLL in DoPortalRewind() and in the forward == false path in
1490 : * PortalRunSelect(); but someday we might prefer to account for that
1491 : * restriction explicitly here.
1492 : */
1493 34271 : switch (fdirection)
1494 : {
1495 33767 : case FETCH_FORWARD:
1496 33767 : if (count < 0)
1497 : {
1498 2 : fdirection = FETCH_BACKWARD;
1499 2 : count = -count;
1500 : }
1501 : /* fall out of switch to share code with FETCH_BACKWARD */
1502 33767 : break;
1503 336 : case FETCH_BACKWARD:
1504 336 : if (count < 0)
1505 : {
1506 1 : fdirection = FETCH_FORWARD;
1507 1 : count = -count;
1508 : }
1509 : /* fall out of switch to share code with FETCH_FORWARD */
1510 336 : break;
1511 109 : case FETCH_ABSOLUTE:
1512 109 : if (count > 0)
1513 : {
1514 : /*
1515 : * Definition: Rewind to start, advance count-1 rows, return
1516 : * next row (if any).
1517 : *
1518 : * In practice, if the goal is less than halfway back to the
1519 : * start, it's better to scan from where we are.
1520 : *
1521 : * Also, if current portalPos is outside the range of "long",
1522 : * do it the hard way to avoid possible overflow of the count
1523 : * argument to PortalRunSelect. We must exclude exactly
1524 : * LONG_MAX, as well, lest the count look like FETCH_ALL.
1525 : *
1526 : * In any case, we arrange to fetch the target row going
1527 : * forwards.
1528 : */
1529 63 : if ((uint64) (count - 1) <= portal->portalPos / 2 ||
1530 25 : portal->portalPos >= (uint64) LONG_MAX)
1531 : {
1532 38 : DoPortalRewind(portal);
1533 34 : if (count > 1)
1534 0 : PortalRunSelect(portal, true, count - 1,
1535 : None_Receiver);
1536 : }
1537 : else
1538 : {
1539 25 : long pos = (long) portal->portalPos;
1540 :
1541 25 : if (portal->atEnd)
1542 0 : pos++; /* need one extra fetch if off end */
1543 25 : if (count <= pos)
1544 8 : PortalRunSelect(portal, false, pos - count + 1,
1545 : None_Receiver);
1546 17 : else if (count > pos + 1)
1547 8 : PortalRunSelect(portal, true, count - pos - 1,
1548 : None_Receiver);
1549 : }
1550 55 : return PortalRunSelect(portal, true, 1L, dest);
1551 : }
1552 46 : else if (count < 0)
1553 : {
1554 : /*
1555 : * Definition: Advance to end, back up abs(count)-1 rows,
1556 : * return prior row (if any). We could optimize this if we
1557 : * knew in advance where the end was, but typically we won't.
1558 : * (Is it worth considering case where count > half of size of
1559 : * query? We could rewind once we know the size ...)
1560 : */
1561 38 : PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
1562 38 : if (count < -1)
1563 0 : PortalRunSelect(portal, false, -count - 1, None_Receiver);
1564 38 : return PortalRunSelect(portal, false, 1L, dest);
1565 : }
1566 : else
1567 : {
1568 : /* count == 0 */
1569 : /* Rewind to start, return zero rows */
1570 8 : DoPortalRewind(portal);
1571 8 : return PortalRunSelect(portal, true, 0L, dest);
1572 : }
1573 : break;
1574 59 : case FETCH_RELATIVE:
1575 59 : if (count > 0)
1576 : {
1577 : /*
1578 : * Definition: advance count-1 rows, return next row (if any).
1579 : */
1580 26 : if (count > 1)
1581 17 : PortalRunSelect(portal, true, count - 1, None_Receiver);
1582 26 : return PortalRunSelect(portal, true, 1L, dest);
1583 : }
1584 33 : else if (count < 0)
1585 : {
1586 : /*
1587 : * Definition: back up abs(count)-1 rows, return prior row (if
1588 : * any).
1589 : */
1590 21 : if (count < -1)
1591 12 : PortalRunSelect(portal, false, -count - 1, None_Receiver);
1592 21 : return PortalRunSelect(portal, false, 1L, dest);
1593 : }
1594 : else
1595 : {
1596 : /* count == 0 */
1597 : /* Same as FETCH FORWARD 0, so fall out of switch */
1598 12 : fdirection = FETCH_FORWARD;
1599 : }
1600 12 : break;
1601 0 : default:
1602 0 : elog(ERROR, "bogus direction");
1603 : break;
1604 : }
1605 :
1606 : /*
1607 : * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, and count
1608 : * >= 0.
1609 : */
1610 34115 : forward = (fdirection == FETCH_FORWARD);
1611 :
1612 : /*
1613 : * Zero count means to re-fetch the current row, if any (per SQL)
1614 : */
1615 34115 : if (count == 0)
1616 : {
1617 : bool on_row;
1618 :
1619 : /* Are we sitting on a row? */
1620 12 : on_row = (!portal->atStart && !portal->atEnd);
1621 :
1622 12 : if (dest->mydest == DestNone)
1623 : {
1624 : /* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */
1625 0 : return on_row ? 1 : 0;
1626 : }
1627 : else
1628 : {
1629 : /*
1630 : * If we are sitting on a row, back up one so we can re-fetch it.
1631 : * If we are not sitting on a row, we still have to start up and
1632 : * shut down the executor so that the destination is initialized
1633 : * and shut down correctly; so keep going. To PortalRunSelect,
1634 : * count == 0 means we will retrieve no row.
1635 : */
1636 12 : if (on_row)
1637 : {
1638 12 : PortalRunSelect(portal, false, 1L, None_Receiver);
1639 : /* Set up to fetch one row forward */
1640 8 : count = 1;
1641 8 : forward = true;
1642 : }
1643 : }
1644 : }
1645 :
1646 : /*
1647 : * Optimize MOVE BACKWARD ALL into a Rewind.
1648 : */
1649 34111 : if (!forward && count == FETCH_ALL && dest->mydest == DestNone)
1650 : {
1651 16 : uint64 result = portal->portalPos;
1652 :
1653 16 : if (result > 0 && !portal->atEnd)
1654 0 : result--;
1655 16 : DoPortalRewind(portal);
1656 16 : return result;
1657 : }
1658 :
1659 34095 : return PortalRunSelect(portal, forward, count, dest);
1660 : }
1661 :
1662 : /*
1663 : * DoPortalRewind - rewind a Portal to starting point
1664 : */
1665 : static void
1666 62 : DoPortalRewind(Portal portal)
1667 : {
1668 : QueryDesc *queryDesc;
1669 :
1670 : /*
1671 : * No work is needed if we've not advanced nor attempted to advance the
1672 : * cursor (and we don't want to throw a NO SCROLL error in this case).
1673 : */
1674 62 : if (portal->atStart && !portal->atEnd)
1675 12 : return;
1676 :
1677 : /* Otherwise, cursor must allow scrolling */
1678 50 : if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
1679 4 : ereport(ERROR,
1680 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1681 : errmsg("cursor can only scan forward"),
1682 : errhint("Declare it with SCROLL option to enable backward scan.")));
1683 :
1684 : /* Rewind holdStore, if we have one */
1685 46 : if (portal->holdStore)
1686 : {
1687 : MemoryContext oldcontext;
1688 :
1689 4 : oldcontext = MemoryContextSwitchTo(portal->holdContext);
1690 4 : tuplestore_rescan(portal->holdStore);
1691 4 : MemoryContextSwitchTo(oldcontext);
1692 : }
1693 :
1694 : /* Rewind executor, if active */
1695 46 : queryDesc = portal->queryDesc;
1696 46 : if (queryDesc)
1697 : {
1698 42 : PushActiveSnapshot(queryDesc->snapshot);
1699 42 : ExecutorRewind(queryDesc);
1700 42 : PopActiveSnapshot();
1701 : }
1702 :
1703 46 : portal->atStart = true;
1704 46 : portal->atEnd = false;
1705 46 : portal->portalPos = 0;
1706 : }
1707 :
1708 : /*
1709 : * PlannedStmtRequiresSnapshot - what it says on the tin
1710 : */
1711 : bool
1712 296837 : PlannedStmtRequiresSnapshot(PlannedStmt *pstmt)
1713 : {
1714 296837 : Node *utilityStmt = pstmt->utilityStmt;
1715 :
1716 : /* If it's not a utility statement, it definitely needs a snapshot */
1717 296837 : if (utilityStmt == NULL)
1718 48252 : return true;
1719 :
1720 : /*
1721 : * Most utility statements need a snapshot, and the default presumption
1722 : * about new ones should be that they do too. Hence, enumerate those that
1723 : * do not need one.
1724 : *
1725 : * Transaction control, LOCK, and SET must *not* set a snapshot, since
1726 : * they need to be executable at the start of a transaction-snapshot-mode
1727 : * transaction without freezing a snapshot. By extension we allow SHOW
1728 : * not to set a snapshot. The other stmts listed are just efficiency
1729 : * hacks. Beware of listing anything that can modify the database --- if,
1730 : * say, it has to update an index with expressions that invoke
1731 : * user-defined functions, then it had better have a snapshot.
1732 : */
1733 248585 : if (IsA(utilityStmt, TransactionStmt) ||
1734 221255 : IsA(utilityStmt, LockStmt) ||
1735 220609 : IsA(utilityStmt, VariableSetStmt) ||
1736 194648 : IsA(utilityStmt, VariableShowStmt) ||
1737 194093 : IsA(utilityStmt, ConstraintsSetStmt) ||
1738 : /* efficiency hacks from here down */
1739 194022 : IsA(utilityStmt, FetchStmt) ||
1740 189591 : IsA(utilityStmt, ListenStmt) ||
1741 189531 : IsA(utilityStmt, NotifyStmt) ||
1742 189469 : IsA(utilityStmt, UnlistenStmt) ||
1743 189389 : IsA(utilityStmt, CheckPointStmt) ||
1744 188922 : IsA(utilityStmt, WaitStmt))
1745 59891 : return false;
1746 :
1747 188694 : return true;
1748 : }
1749 :
1750 : /*
1751 : * EnsurePortalSnapshotExists - recreate Portal-level snapshot, if needed
1752 : *
1753 : * Generally, we will have an active snapshot whenever we are executing
1754 : * inside a Portal, unless the Portal's query is one of the utility
1755 : * statements exempted from that rule (see PlannedStmtRequiresSnapshot).
1756 : * However, procedures and DO blocks can commit or abort the transaction,
1757 : * and thereby destroy all snapshots. This function can be called to
1758 : * re-establish the Portal-level snapshot when none exists.
1759 : */
1760 : void
1761 271527 : EnsurePortalSnapshotExists(void)
1762 : {
1763 : Portal portal;
1764 :
1765 : /*
1766 : * Nothing to do if a snapshot is set. (We take it on faith that the
1767 : * outermost active snapshot belongs to some Portal; or if there is no
1768 : * Portal, it's somebody else's responsibility to manage things.)
1769 : */
1770 271527 : if (ActiveSnapshotSet())
1771 269282 : return;
1772 :
1773 : /* Otherwise, we'd better have an active Portal */
1774 2245 : portal = ActivePortal;
1775 2245 : if (unlikely(portal == NULL))
1776 0 : elog(ERROR, "cannot execute SQL without an outer snapshot or portal");
1777 : Assert(portal->portalSnapshot == NULL);
1778 :
1779 : /*
1780 : * Create a new snapshot, make it active, and remember it in portal.
1781 : * Because the portal now references the snapshot, we must tell snapmgr.c
1782 : * that the snapshot belongs to the portal's transaction level, else we
1783 : * risk portalSnapshot becoming a dangling pointer.
1784 : */
1785 2245 : PushActiveSnapshotWithLevel(GetTransactionSnapshot(), portal->createLevel);
1786 : /* PushActiveSnapshotWithLevel might have copied the snapshot */
1787 2245 : portal->portalSnapshot = GetActiveSnapshot();
1788 : }
|