Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pquery.c
4 : * POSTGRES process query command code
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/tcop/pquery.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include <limits.h>
19 :
20 : #include "access/xact.h"
21 : #include "commands/prepare.h"
22 : #include "executor/tstoreReceiver.h"
23 : #include "miscadmin.h"
24 : #include "pg_trace.h"
25 : #include "tcop/pquery.h"
26 : #include "tcop/utility.h"
27 : #include "utils/memutils.h"
28 : #include "utils/snapmgr.h"
29 :
30 :
31 : /*
32 : * ActivePortal is the currently executing Portal (the most closely nested,
33 : * if there are several).
34 : */
35 : Portal ActivePortal = NULL;
36 :
37 :
38 : static void ProcessQuery(PlannedStmt *plan,
39 : const char *sourceText,
40 : ParamListInfo params,
41 : QueryEnvironment *queryEnv,
42 : DestReceiver *dest,
43 : QueryCompletion *qc);
44 : static void FillPortalStore(Portal portal, bool isTopLevel);
45 : static uint64 RunFromStore(Portal portal, ScanDirection direction, uint64 count,
46 : DestReceiver *dest);
47 : static uint64 PortalRunSelect(Portal portal, bool forward, long count,
48 : DestReceiver *dest);
49 : static void PortalRunUtility(Portal portal, PlannedStmt *pstmt,
50 : bool isTopLevel, bool setHoldSnapshot,
51 : DestReceiver *dest, QueryCompletion *qc);
52 : static void PortalRunMulti(Portal portal,
53 : bool isTopLevel, bool setHoldSnapshot,
54 : DestReceiver *dest, DestReceiver *altdest,
55 : QueryCompletion *qc);
56 : static uint64 DoPortalRunFetch(Portal portal,
57 : FetchDirection fdirection,
58 : long count,
59 : DestReceiver *dest);
60 : static void DoPortalRewind(Portal portal);
61 :
62 :
63 : /*
64 : * CreateQueryDesc
65 : */
66 : QueryDesc *
67 637786 : CreateQueryDesc(PlannedStmt *plannedstmt,
68 : const char *sourceText,
69 : Snapshot snapshot,
70 : Snapshot crosscheck_snapshot,
71 : DestReceiver *dest,
72 : ParamListInfo params,
73 : QueryEnvironment *queryEnv,
74 : int instrument_options)
75 : {
76 637786 : QueryDesc *qd = (QueryDesc *) palloc(sizeof(QueryDesc));
77 :
78 637786 : qd->operation = plannedstmt->commandType; /* operation */
79 637786 : qd->plannedstmt = plannedstmt; /* plan */
80 637786 : qd->sourceText = sourceText; /* query text */
81 637786 : qd->snapshot = RegisterSnapshot(snapshot); /* snapshot */
82 : /* RI check snapshot */
83 637786 : qd->crosscheck_snapshot = RegisterSnapshot(crosscheck_snapshot);
84 637786 : qd->dest = dest; /* output dest */
85 637786 : qd->params = params; /* parameter values passed into query */
86 637786 : qd->queryEnv = queryEnv;
87 637786 : qd->instrument_options = instrument_options; /* instrumentation wanted? */
88 :
89 : /* null these fields until set by ExecutorStart */
90 637786 : qd->tupDesc = NULL;
91 637786 : qd->estate = NULL;
92 637786 : qd->planstate = NULL;
93 637786 : qd->totaltime = NULL;
94 :
95 : /* not yet executed */
96 637786 : qd->already_executed = false;
97 :
98 637786 : return qd;
99 : }
100 :
101 : /*
102 : * FreeQueryDesc
103 : */
104 : void
105 610638 : FreeQueryDesc(QueryDesc *qdesc)
106 : {
107 : /* Can't be a live query */
108 : Assert(qdesc->estate == NULL);
109 :
110 : /* forget our snapshots */
111 610638 : UnregisterSnapshot(qdesc->snapshot);
112 610638 : UnregisterSnapshot(qdesc->crosscheck_snapshot);
113 :
114 : /* Only the QueryDesc itself need be freed */
115 610638 : pfree(qdesc);
116 610638 : }
117 :
118 :
119 : /*
120 : * ProcessQuery
121 : * Execute a single plannable query within a PORTAL_MULTI_QUERY,
122 : * PORTAL_ONE_RETURNING, or PORTAL_ONE_MOD_WITH portal
123 : *
124 : * plan: the plan tree for the query
125 : * sourceText: the source text of the query
126 : * params: any parameters needed
127 : * dest: where to send results
128 : * qc: where to store the command completion status data.
129 : *
130 : * qc may be NULL if caller doesn't want a status string.
131 : *
132 : * Must be called in a memory context that will be reset or deleted on
133 : * error; otherwise the executor's memory usage will be leaked.
134 : */
135 : static void
136 89172 : ProcessQuery(PlannedStmt *plan,
137 : const char *sourceText,
138 : ParamListInfo params,
139 : QueryEnvironment *queryEnv,
140 : DestReceiver *dest,
141 : QueryCompletion *qc)
142 : {
143 : QueryDesc *queryDesc;
144 :
145 : /*
146 : * Create the QueryDesc object
147 : */
148 89172 : queryDesc = CreateQueryDesc(plan, sourceText,
149 : GetActiveSnapshot(), InvalidSnapshot,
150 : dest, params, queryEnv, 0);
151 :
152 : /*
153 : * Call ExecutorStart to prepare the plan for execution
154 : */
155 89172 : ExecutorStart(queryDesc, 0);
156 :
157 : /*
158 : * Run the plan to completion.
159 : */
160 87938 : ExecutorRun(queryDesc, ForwardScanDirection, 0, true);
161 :
162 : /*
163 : * Build command completion status data, if caller wants one.
164 : */
165 84850 : if (qc)
166 : {
167 84232 : switch (queryDesc->operation)
168 : {
169 116 : case CMD_SELECT:
170 116 : SetQueryCompletion(qc, CMDTAG_SELECT, queryDesc->estate->es_processed);
171 116 : break;
172 69388 : case CMD_INSERT:
173 69388 : SetQueryCompletion(qc, CMDTAG_INSERT, queryDesc->estate->es_processed);
174 69388 : break;
175 10276 : case CMD_UPDATE:
176 10276 : SetQueryCompletion(qc, CMDTAG_UPDATE, queryDesc->estate->es_processed);
177 10276 : break;
178 3444 : case CMD_DELETE:
179 3444 : SetQueryCompletion(qc, CMDTAG_DELETE, queryDesc->estate->es_processed);
180 3444 : break;
181 1008 : case CMD_MERGE:
182 1008 : SetQueryCompletion(qc, CMDTAG_MERGE, queryDesc->estate->es_processed);
183 1008 : break;
184 0 : default:
185 0 : SetQueryCompletion(qc, CMDTAG_UNKNOWN, queryDesc->estate->es_processed);
186 0 : break;
187 : }
188 618 : }
189 :
190 : /*
191 : * Now, we close down all the scans and free allocated resources.
192 : */
193 84850 : ExecutorFinish(queryDesc);
194 83786 : ExecutorEnd(queryDesc);
195 :
196 83786 : FreeQueryDesc(queryDesc);
197 83786 : }
198 :
199 : /*
200 : * ChoosePortalStrategy
201 : * Select portal execution strategy given the intended statement list.
202 : *
203 : * The list elements can be Querys or PlannedStmts.
204 : * That's more general than portals need, but plancache.c uses this too.
205 : *
206 : * See the comments in portal.h.
207 : */
208 : PortalStrategy
209 757944 : ChoosePortalStrategy(List *stmts)
210 : {
211 : int nSetTag;
212 : ListCell *lc;
213 :
214 : /*
215 : * PORTAL_ONE_SELECT and PORTAL_UTIL_SELECT need only consider the
216 : * single-statement case, since there are no rewrite rules that can add
217 : * auxiliary queries to a SELECT or a utility command. PORTAL_ONE_MOD_WITH
218 : * likewise allows only one top-level statement.
219 : */
220 757944 : if (list_length(stmts) == 1)
221 : {
222 757614 : Node *stmt = (Node *) linitial(stmts);
223 :
224 757614 : if (IsA(stmt, Query))
225 : {
226 75208 : Query *query = (Query *) stmt;
227 :
228 75208 : if (query->canSetTag)
229 : {
230 75208 : if (query->commandType == CMD_SELECT)
231 : {
232 51870 : if (query->hasModifyingCTE)
233 0 : return PORTAL_ONE_MOD_WITH;
234 : else
235 51870 : return PORTAL_ONE_SELECT;
236 : }
237 23338 : if (query->commandType == CMD_UTILITY)
238 : {
239 15390 : if (UtilityReturnsTuples(query->utilityStmt))
240 8216 : return PORTAL_UTIL_SELECT;
241 : /* it can't be ONE_RETURNING, so give up */
242 7174 : return PORTAL_MULTI_QUERY;
243 : }
244 : }
245 : }
246 682406 : else if (IsA(stmt, PlannedStmt))
247 : {
248 682406 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
249 :
250 682406 : if (pstmt->canSetTag)
251 : {
252 682376 : if (pstmt->commandType == CMD_SELECT)
253 : {
254 263570 : if (pstmt->hasModifyingCTE)
255 128 : return PORTAL_ONE_MOD_WITH;
256 : else
257 263442 : return PORTAL_ONE_SELECT;
258 : }
259 418806 : if (pstmt->commandType == CMD_UTILITY)
260 : {
261 330704 : if (UtilityReturnsTuples(pstmt->utilityStmt))
262 43582 : return PORTAL_UTIL_SELECT;
263 : /* it can't be ONE_RETURNING, so give up */
264 287122 : return PORTAL_MULTI_QUERY;
265 : }
266 : }
267 : }
268 : else
269 0 : elog(ERROR, "unrecognized node type: %d", (int) nodeTag(stmt));
270 : }
271 :
272 : /*
273 : * PORTAL_ONE_RETURNING has to allow auxiliary queries added by rewrite.
274 : * Choose PORTAL_ONE_RETURNING if there is exactly one canSetTag query and
275 : * it has a RETURNING list.
276 : */
277 96410 : nSetTag = 0;
278 99484 : foreach(lc, stmts)
279 : {
280 96572 : Node *stmt = (Node *) lfirst(lc);
281 :
282 96572 : if (IsA(stmt, Query))
283 : {
284 7948 : Query *query = (Query *) stmt;
285 :
286 7948 : if (query->canSetTag)
287 : {
288 7948 : if (++nSetTag > 1)
289 93498 : return PORTAL_MULTI_QUERY; /* no need to look further */
290 7948 : if (query->commandType == CMD_UTILITY ||
291 7948 : query->returningList == NIL)
292 7736 : return PORTAL_MULTI_QUERY; /* no need to look further */
293 : }
294 : }
295 88624 : else if (IsA(stmt, PlannedStmt))
296 : {
297 88624 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
298 :
299 88624 : if (pstmt->canSetTag)
300 : {
301 88420 : if (++nSetTag > 1)
302 0 : return PORTAL_MULTI_QUERY; /* no need to look further */
303 88420 : if (pstmt->commandType == CMD_UTILITY ||
304 88420 : !pstmt->hasReturning)
305 85762 : return PORTAL_MULTI_QUERY; /* no need to look further */
306 : }
307 : }
308 : else
309 0 : elog(ERROR, "unrecognized node type: %d", (int) nodeTag(stmt));
310 : }
311 2912 : if (nSetTag == 1)
312 2870 : return PORTAL_ONE_RETURNING;
313 :
314 : /* Else, it's the general case... */
315 42 : return PORTAL_MULTI_QUERY;
316 : }
317 :
318 : /*
319 : * FetchPortalTargetList
320 : * Given a portal that returns tuples, extract the query targetlist.
321 : * Returns NIL if the portal doesn't have a determinable targetlist.
322 : *
323 : * Note: do not modify the result.
324 : */
325 : List *
326 284558 : FetchPortalTargetList(Portal portal)
327 : {
328 : /* no point in looking if we determined it doesn't return tuples */
329 284558 : if (portal->strategy == PORTAL_MULTI_QUERY)
330 24 : return NIL;
331 : /* get the primary statement and find out what it returns */
332 284534 : return FetchStatementTargetList((Node *) PortalGetPrimaryStmt(portal));
333 : }
334 :
335 : /*
336 : * FetchStatementTargetList
337 : * Given a statement that returns tuples, extract the query targetlist.
338 : * Returns NIL if the statement doesn't have a determinable targetlist.
339 : *
340 : * This can be applied to a Query or a PlannedStmt.
341 : * That's more general than portals need, but plancache.c uses this too.
342 : *
343 : * Note: do not modify the result.
344 : *
345 : * XXX be careful to keep this in sync with UtilityReturnsTuples.
346 : */
347 : List *
348 299450 : FetchStatementTargetList(Node *stmt)
349 : {
350 299450 : if (stmt == NULL)
351 0 : return NIL;
352 299450 : if (IsA(stmt, Query))
353 : {
354 14916 : Query *query = (Query *) stmt;
355 :
356 14916 : if (query->commandType == CMD_UTILITY)
357 : {
358 : /* transfer attention to utility statement */
359 12 : stmt = query->utilityStmt;
360 : }
361 : else
362 : {
363 14904 : if (query->commandType == CMD_SELECT)
364 14904 : return query->targetList;
365 0 : if (query->returningList)
366 0 : return query->returningList;
367 0 : return NIL;
368 : }
369 : }
370 284546 : if (IsA(stmt, PlannedStmt))
371 : {
372 284534 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
373 :
374 284534 : if (pstmt->commandType == CMD_UTILITY)
375 : {
376 : /* transfer attention to utility statement */
377 35320 : stmt = pstmt->utilityStmt;
378 : }
379 : else
380 : {
381 249214 : if (pstmt->commandType == CMD_SELECT)
382 246664 : return pstmt->planTree->targetlist;
383 2550 : if (pstmt->hasReturning)
384 2550 : return pstmt->planTree->targetlist;
385 0 : return NIL;
386 : }
387 : }
388 35332 : if (IsA(stmt, FetchStmt))
389 : {
390 5582 : FetchStmt *fstmt = (FetchStmt *) stmt;
391 : Portal subportal;
392 :
393 : Assert(!fstmt->ismove);
394 5582 : subportal = GetPortalByName(fstmt->portalname);
395 : Assert(PortalIsValid(subportal));
396 5582 : return FetchPortalTargetList(subportal);
397 : }
398 29750 : if (IsA(stmt, ExecuteStmt))
399 : {
400 14788 : ExecuteStmt *estmt = (ExecuteStmt *) stmt;
401 : PreparedStatement *entry;
402 :
403 14788 : entry = FetchPreparedStatement(estmt->name, true);
404 14788 : return FetchPreparedStatementTargetList(entry);
405 : }
406 14962 : return NIL;
407 : }
408 :
409 : /*
410 : * PortalStart
411 : * Prepare a portal for execution.
412 : *
413 : * Caller must already have created the portal, done PortalDefineQuery(),
414 : * and adjusted portal options if needed.
415 : *
416 : * If parameters are needed by the query, they must be passed in "params"
417 : * (caller is responsible for giving them appropriate lifetime).
418 : *
419 : * The caller can also provide an initial set of "eflags" to be passed to
420 : * ExecutorStart (but note these can be modified internally, and they are
421 : * currently only honored for PORTAL_ONE_SELECT portals). Most callers
422 : * should simply pass zero.
423 : *
424 : * The caller can optionally pass a snapshot to be used; pass InvalidSnapshot
425 : * for the normal behavior of setting a new snapshot. This parameter is
426 : * presently ignored for non-PORTAL_ONE_SELECT portals (it's only intended
427 : * to be used for cursors).
428 : *
429 : * On return, portal is ready to accept PortalRun() calls, and the result
430 : * tupdesc (if any) is known.
431 : */
432 : void
433 682730 : PortalStart(Portal portal, ParamListInfo params,
434 : int eflags, Snapshot snapshot)
435 : {
436 : Portal saveActivePortal;
437 : ResourceOwner saveResourceOwner;
438 : MemoryContext savePortalContext;
439 : MemoryContext oldContext;
440 : QueryDesc *queryDesc;
441 : int myeflags;
442 :
443 : Assert(PortalIsValid(portal));
444 : Assert(portal->status == PORTAL_DEFINED);
445 :
446 : /*
447 : * Set up global portal context pointers.
448 : */
449 682730 : saveActivePortal = ActivePortal;
450 682730 : saveResourceOwner = CurrentResourceOwner;
451 682730 : savePortalContext = PortalContext;
452 682730 : PG_TRY();
453 : {
454 682730 : ActivePortal = portal;
455 682730 : if (portal->resowner)
456 682730 : CurrentResourceOwner = portal->resowner;
457 682730 : PortalContext = portal->portalContext;
458 :
459 682730 : oldContext = MemoryContextSwitchTo(PortalContext);
460 :
461 : /* Must remember portal param list, if any */
462 682730 : portal->portalParams = params;
463 :
464 : /*
465 : * Determine the portal execution strategy
466 : */
467 682730 : portal->strategy = ChoosePortalStrategy(portal->stmts);
468 :
469 : /*
470 : * Fire her up according to the strategy
471 : */
472 682730 : switch (portal->strategy)
473 : {
474 263442 : case PORTAL_ONE_SELECT:
475 :
476 : /* Must set snapshot before starting executor. */
477 263442 : if (snapshot)
478 21668 : PushActiveSnapshot(snapshot);
479 : else
480 241774 : PushActiveSnapshot(GetTransactionSnapshot());
481 :
482 : /*
483 : * We could remember the snapshot in portal->portalSnapshot,
484 : * but presently there seems no need to, as this code path
485 : * cannot be used for non-atomic execution. Hence there can't
486 : * be any commit/abort that might destroy the snapshot. Since
487 : * we don't do that, there's also no need to force a
488 : * non-default nesting level for the snapshot.
489 : */
490 :
491 : /*
492 : * Create QueryDesc in portal's context; for the moment, set
493 : * the destination to DestNone.
494 : */
495 263442 : queryDesc = CreateQueryDesc(linitial_node(PlannedStmt, portal->stmts),
496 : portal->sourceText,
497 : GetActiveSnapshot(),
498 : InvalidSnapshot,
499 : None_Receiver,
500 : params,
501 : portal->queryEnv,
502 : 0);
503 :
504 : /*
505 : * If it's a scrollable cursor, executor needs to support
506 : * REWIND and backwards scan, as well as whatever the caller
507 : * might've asked for.
508 : */
509 263442 : if (portal->cursorOptions & CURSOR_OPT_SCROLL)
510 2166 : myeflags = eflags | EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD;
511 : else
512 261276 : myeflags = eflags;
513 :
514 : /*
515 : * Call ExecutorStart to prepare the plan for execution
516 : */
517 263442 : ExecutorStart(queryDesc, myeflags);
518 :
519 : /*
520 : * This tells PortalCleanup to shut down the executor
521 : */
522 262804 : portal->queryDesc = queryDesc;
523 :
524 : /*
525 : * Remember tuple descriptor (computed by ExecutorStart)
526 : */
527 262804 : portal->tupDesc = queryDesc->tupDesc;
528 :
529 : /*
530 : * Reset cursor position data to "start of query"
531 : */
532 262804 : portal->atStart = true;
533 262804 : portal->atEnd = false; /* allow fetches */
534 262804 : portal->portalPos = 0;
535 :
536 262804 : PopActiveSnapshot();
537 262804 : break;
538 :
539 2786 : case PORTAL_ONE_RETURNING:
540 : case PORTAL_ONE_MOD_WITH:
541 :
542 : /*
543 : * We don't start the executor until we are told to run the
544 : * portal. We do need to set up the result tupdesc.
545 : */
546 : {
547 : PlannedStmt *pstmt;
548 :
549 2786 : pstmt = PortalGetPrimaryStmt(portal);
550 2786 : portal->tupDesc =
551 2786 : ExecCleanTypeFromTL(pstmt->planTree->targetlist);
552 : }
553 :
554 : /*
555 : * Reset cursor position data to "start of query"
556 : */
557 2786 : portal->atStart = true;
558 2786 : portal->atEnd = false; /* allow fetches */
559 2786 : portal->portalPos = 0;
560 2786 : break;
561 :
562 43582 : case PORTAL_UTIL_SELECT:
563 :
564 : /*
565 : * We don't set snapshot here, because PortalRunUtility will
566 : * take care of it if needed.
567 : */
568 : {
569 43582 : PlannedStmt *pstmt = PortalGetPrimaryStmt(portal);
570 :
571 : Assert(pstmt->commandType == CMD_UTILITY);
572 43582 : portal->tupDesc = UtilityTupleDescriptor(pstmt->utilityStmt);
573 : }
574 :
575 : /*
576 : * Reset cursor position data to "start of query"
577 : */
578 43554 : portal->atStart = true;
579 43554 : portal->atEnd = false; /* allow fetches */
580 43554 : portal->portalPos = 0;
581 43554 : break;
582 :
583 372920 : case PORTAL_MULTI_QUERY:
584 : /* Need do nothing now */
585 372920 : portal->tupDesc = NULL;
586 372920 : break;
587 : }
588 682064 : }
589 666 : PG_CATCH();
590 : {
591 : /* Uncaught error while executing portal: mark it dead */
592 666 : MarkPortalFailed(portal);
593 :
594 : /* Restore global vars and propagate error */
595 666 : ActivePortal = saveActivePortal;
596 666 : CurrentResourceOwner = saveResourceOwner;
597 666 : PortalContext = savePortalContext;
598 :
599 666 : PG_RE_THROW();
600 : }
601 682064 : PG_END_TRY();
602 :
603 682064 : MemoryContextSwitchTo(oldContext);
604 :
605 682064 : ActivePortal = saveActivePortal;
606 682064 : CurrentResourceOwner = saveResourceOwner;
607 682064 : PortalContext = savePortalContext;
608 :
609 682064 : portal->status = PORTAL_READY;
610 682064 : }
611 :
612 : /*
613 : * PortalSetResultFormat
614 : * Select the format codes for a portal's output.
615 : *
616 : * This must be run after PortalStart for a portal that will be read by
617 : * a DestRemote or DestRemoteExecute destination. It is not presently needed
618 : * for other destination types.
619 : *
620 : * formats[] is the client format request, as per Bind message conventions.
621 : */
622 : void
623 652274 : PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
624 : {
625 : int natts;
626 : int i;
627 :
628 : /* Do nothing if portal won't return tuples */
629 652274 : if (portal->tupDesc == NULL)
630 372800 : return;
631 279474 : natts = portal->tupDesc->natts;
632 279474 : portal->formats = (int16 *)
633 279474 : MemoryContextAlloc(portal->portalContext,
634 : natts * sizeof(int16));
635 279474 : if (nFormats > 1)
636 : {
637 : /* format specified for each column */
638 0 : if (nFormats != natts)
639 0 : ereport(ERROR,
640 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
641 : errmsg("bind message has %d result formats but query has %d columns",
642 : nFormats, natts)));
643 0 : memcpy(portal->formats, formats, natts * sizeof(int16));
644 : }
645 279474 : else if (nFormats > 0)
646 : {
647 : /* single format specified, use for all columns */
648 279474 : int16 format1 = formats[0];
649 :
650 1165864 : for (i = 0; i < natts; i++)
651 886390 : portal->formats[i] = format1;
652 : }
653 : else
654 : {
655 : /* use default format for all columns */
656 0 : for (i = 0; i < natts; i++)
657 0 : portal->formats[i] = 0;
658 : }
659 : }
660 :
661 : /*
662 : * PortalRun
663 : * Run a portal's query or queries.
664 : *
665 : * count <= 0 is interpreted as a no-op: the destination gets started up
666 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
667 : * interpreted as "all rows". Note that count is ignored in multi-query
668 : * situations, where we always run the portal to completion.
669 : *
670 : * isTopLevel: true if query is being executed at backend "top level"
671 : * (that is, directly from a client command message)
672 : *
673 : * dest: where to send output of primary (canSetTag) query
674 : *
675 : * altdest: where to send output of non-primary queries
676 : *
677 : * qc: where to store command completion status data.
678 : * May be NULL if caller doesn't want status data.
679 : *
680 : * Returns true if the portal's execution is complete, false if it was
681 : * suspended due to exhaustion of the count parameter.
682 : */
683 : bool
684 667224 : PortalRun(Portal portal, long count, bool isTopLevel, bool run_once,
685 : DestReceiver *dest, DestReceiver *altdest,
686 : QueryCompletion *qc)
687 : {
688 : bool result;
689 : uint64 nprocessed;
690 : ResourceOwner saveTopTransactionResourceOwner;
691 : MemoryContext saveTopTransactionContext;
692 : Portal saveActivePortal;
693 : ResourceOwner saveResourceOwner;
694 : MemoryContext savePortalContext;
695 : MemoryContext saveMemoryContext;
696 :
697 : Assert(PortalIsValid(portal));
698 :
699 : TRACE_POSTGRESQL_QUERY_EXECUTE_START();
700 :
701 : /* Initialize empty completion data */
702 667224 : if (qc)
703 667224 : InitializeQueryCompletion(qc);
704 :
705 667224 : if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
706 : {
707 0 : elog(DEBUG3, "PortalRun");
708 : /* PORTAL_MULTI_QUERY logs its own stats per query */
709 0 : ResetUsage();
710 : }
711 :
712 : /*
713 : * Check for improper portal use, and mark portal active.
714 : */
715 667224 : MarkPortalActive(portal);
716 :
717 : /* Set run_once flag. Shouldn't be clear if previously set. */
718 : Assert(!portal->run_once || run_once);
719 667224 : portal->run_once = run_once;
720 :
721 : /*
722 : * Set up global portal context pointers.
723 : *
724 : * We have to play a special game here to support utility commands like
725 : * VACUUM and CLUSTER, which internally start and commit transactions.
726 : * When we are called to execute such a command, CurrentResourceOwner will
727 : * be pointing to the TopTransactionResourceOwner --- which will be
728 : * destroyed and replaced in the course of the internal commit and
729 : * restart. So we need to be prepared to restore it as pointing to the
730 : * exit-time TopTransactionResourceOwner. (Ain't that ugly? This idea of
731 : * internally starting whole new transactions is not good.)
732 : * CurrentMemoryContext has a similar problem, but the other pointers we
733 : * save here will be NULL or pointing to longer-lived objects.
734 : */
735 667224 : saveTopTransactionResourceOwner = TopTransactionResourceOwner;
736 667224 : saveTopTransactionContext = TopTransactionContext;
737 667224 : saveActivePortal = ActivePortal;
738 667224 : saveResourceOwner = CurrentResourceOwner;
739 667224 : savePortalContext = PortalContext;
740 667224 : saveMemoryContext = CurrentMemoryContext;
741 667224 : PG_TRY();
742 : {
743 667224 : ActivePortal = portal;
744 667224 : if (portal->resowner)
745 667224 : CurrentResourceOwner = portal->resowner;
746 667224 : PortalContext = portal->portalContext;
747 :
748 667224 : MemoryContextSwitchTo(PortalContext);
749 :
750 667224 : switch (portal->strategy)
751 : {
752 294304 : case PORTAL_ONE_SELECT:
753 : case PORTAL_ONE_RETURNING:
754 : case PORTAL_ONE_MOD_WITH:
755 : case PORTAL_UTIL_SELECT:
756 :
757 : /*
758 : * If we have not yet run the command, do so, storing its
759 : * results in the portal's tuplestore. But we don't do that
760 : * for the PORTAL_ONE_SELECT case.
761 : */
762 294304 : if (portal->strategy != PORTAL_ONE_SELECT && !portal->holdStore)
763 38338 : FillPortalStore(portal, isTopLevel);
764 :
765 : /*
766 : * Now fetch desired portion of results.
767 : */
768 293948 : nprocessed = PortalRunSelect(portal, true, count, dest);
769 :
770 : /*
771 : * If the portal result contains a command tag and the caller
772 : * gave us a pointer to store it, copy it and update the
773 : * rowcount.
774 : */
775 287088 : if (qc && portal->qc.commandTag != CMDTAG_UNKNOWN)
776 : {
777 287088 : CopyQueryCompletion(qc, &portal->qc);
778 287088 : qc->nprocessed = nprocessed;
779 : }
780 :
781 : /* Mark portal not active */
782 287088 : portal->status = PORTAL_READY;
783 :
784 : /*
785 : * Since it's a forward fetch, say DONE iff atEnd is now true.
786 : */
787 287088 : result = portal->atEnd;
788 287088 : break;
789 :
790 372920 : case PORTAL_MULTI_QUERY:
791 372920 : PortalRunMulti(portal, isTopLevel, false,
792 : dest, altdest, qc);
793 :
794 : /* Prevent portal's commands from being re-executed */
795 353496 : MarkPortalDone(portal);
796 :
797 : /* Always complete at end of RunMulti */
798 353496 : result = true;
799 353496 : break;
800 :
801 0 : default:
802 0 : elog(ERROR, "unrecognized portal strategy: %d",
803 : (int) portal->strategy);
804 : result = false; /* keep compiler quiet */
805 : break;
806 : }
807 : }
808 26632 : PG_CATCH();
809 : {
810 : /* Uncaught error while executing portal: mark it dead */
811 26632 : MarkPortalFailed(portal);
812 :
813 : /* Restore global vars and propagate error */
814 26632 : if (saveMemoryContext == saveTopTransactionContext)
815 26334 : MemoryContextSwitchTo(TopTransactionContext);
816 : else
817 298 : MemoryContextSwitchTo(saveMemoryContext);
818 26632 : ActivePortal = saveActivePortal;
819 26632 : if (saveResourceOwner == saveTopTransactionResourceOwner)
820 26376 : CurrentResourceOwner = TopTransactionResourceOwner;
821 : else
822 256 : CurrentResourceOwner = saveResourceOwner;
823 26632 : PortalContext = savePortalContext;
824 :
825 26632 : PG_RE_THROW();
826 : }
827 640584 : PG_END_TRY();
828 :
829 640584 : if (saveMemoryContext == saveTopTransactionContext)
830 598526 : MemoryContextSwitchTo(TopTransactionContext);
831 : else
832 42058 : MemoryContextSwitchTo(saveMemoryContext);
833 640584 : ActivePortal = saveActivePortal;
834 640584 : if (saveResourceOwner == saveTopTransactionResourceOwner)
835 619390 : CurrentResourceOwner = TopTransactionResourceOwner;
836 : else
837 21194 : CurrentResourceOwner = saveResourceOwner;
838 640584 : PortalContext = savePortalContext;
839 :
840 640584 : if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
841 0 : ShowUsage("EXECUTOR STATISTICS");
842 :
843 : TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
844 :
845 640584 : return result;
846 : }
847 :
848 : /*
849 : * PortalRunSelect
850 : * Execute a portal's query in PORTAL_ONE_SELECT mode, and also
851 : * when fetching from a completed holdStore in PORTAL_ONE_RETURNING,
852 : * PORTAL_ONE_MOD_WITH, and PORTAL_UTIL_SELECT cases.
853 : *
854 : * This handles simple N-rows-forward-or-backward cases. For more complex
855 : * nonsequential access to a portal, see PortalRunFetch.
856 : *
857 : * count <= 0 is interpreted as a no-op: the destination gets started up
858 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
859 : * interpreted as "all rows". (cf FetchStmt.howMany)
860 : *
861 : * Caller must already have validated the Portal and done appropriate
862 : * setup (cf. PortalRun).
863 : *
864 : * Returns number of rows processed (suitable for use in result tag)
865 : */
866 : static uint64
867 343762 : PortalRunSelect(Portal portal,
868 : bool forward,
869 : long count,
870 : DestReceiver *dest)
871 : {
872 : QueryDesc *queryDesc;
873 : ScanDirection direction;
874 : uint64 nprocessed;
875 :
876 : /*
877 : * NB: queryDesc will be NULL if we are fetching from a held cursor or a
878 : * completed utility query; can't use it in that path.
879 : */
880 343762 : queryDesc = portal->queryDesc;
881 :
882 : /* Caller messed up if we have neither a ready query nor held data. */
883 : Assert(queryDesc || portal->holdStore);
884 :
885 : /*
886 : * Force the queryDesc destination to the right thing. This supports
887 : * MOVE, for example, which will pass in dest = DestNone. This is okay to
888 : * change as long as we do it on every fetch. (The Executor must not
889 : * assume that dest never changes.)
890 : */
891 343762 : if (queryDesc)
892 274016 : queryDesc->dest = dest;
893 :
894 : /*
895 : * Determine which direction to go in, and check to see if we're already
896 : * at the end of the available tuples in that direction. If so, set the
897 : * direction to NoMovement to avoid trying to fetch any tuples. (This
898 : * check exists because not all plan node types are robust about being
899 : * called again if they've already returned NULL once.) Then call the
900 : * executor (we must not skip this, because the destination needs to see a
901 : * setup and shutdown even if no tuples are available). Finally, update
902 : * the portal position state depending on the number of tuples that were
903 : * retrieved.
904 : */
905 343762 : if (forward)
906 : {
907 343158 : if (portal->atEnd || count <= 0)
908 : {
909 3946 : direction = NoMovementScanDirection;
910 3946 : count = 0; /* don't pass negative count to executor */
911 : }
912 : else
913 339212 : direction = ForwardScanDirection;
914 :
915 : /* In the executor, zero count processes all rows */
916 343158 : if (count == FETCH_ALL)
917 294270 : count = 0;
918 :
919 343158 : if (portal->holdStore)
920 69728 : nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
921 : else
922 : {
923 273430 : PushActiveSnapshot(queryDesc->snapshot);
924 273430 : ExecutorRun(queryDesc, direction, (uint64) count,
925 273430 : portal->run_once);
926 266552 : nprocessed = queryDesc->estate->es_processed;
927 266552 : PopActiveSnapshot();
928 : }
929 :
930 336280 : if (!ScanDirectionIsNoMovement(direction))
931 : {
932 332334 : if (nprocessed > 0)
933 284172 : portal->atStart = false; /* OK to go backward now */
934 332334 : if (count == 0 || nprocessed < (uint64) count)
935 301368 : portal->atEnd = true; /* we retrieved 'em all */
936 332334 : portal->portalPos += nprocessed;
937 : }
938 : }
939 : else
940 : {
941 604 : if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
942 24 : ereport(ERROR,
943 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
944 : errmsg("cursor can only scan forward"),
945 : errhint("Declare it with SCROLL option to enable backward scan.")));
946 :
947 580 : if (portal->atStart || count <= 0)
948 : {
949 72 : direction = NoMovementScanDirection;
950 72 : count = 0; /* don't pass negative count to executor */
951 : }
952 : else
953 508 : direction = BackwardScanDirection;
954 :
955 : /* In the executor, zero count processes all rows */
956 580 : if (count == FETCH_ALL)
957 60 : count = 0;
958 :
959 580 : if (portal->holdStore)
960 12 : nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
961 : else
962 : {
963 568 : PushActiveSnapshot(queryDesc->snapshot);
964 568 : ExecutorRun(queryDesc, direction, (uint64) count,
965 568 : portal->run_once);
966 568 : nprocessed = queryDesc->estate->es_processed;
967 568 : PopActiveSnapshot();
968 : }
969 :
970 580 : if (!ScanDirectionIsNoMovement(direction))
971 : {
972 508 : if (nprocessed > 0 && portal->atEnd)
973 : {
974 154 : portal->atEnd = false; /* OK to go forward now */
975 154 : portal->portalPos++; /* adjust for endpoint case */
976 : }
977 508 : if (count == 0 || nprocessed < (uint64) count)
978 : {
979 186 : portal->atStart = true; /* we retrieved 'em all */
980 186 : portal->portalPos = 0;
981 : }
982 : else
983 : {
984 322 : portal->portalPos -= nprocessed;
985 : }
986 : }
987 : }
988 :
989 336860 : return nprocessed;
990 : }
991 :
992 : /*
993 : * FillPortalStore
994 : * Run the query and load result tuples into the portal's tuple store.
995 : *
996 : * This is used for PORTAL_ONE_RETURNING, PORTAL_ONE_MOD_WITH, and
997 : * PORTAL_UTIL_SELECT cases only.
998 : */
999 : static void
1000 46340 : FillPortalStore(Portal portal, bool isTopLevel)
1001 : {
1002 : DestReceiver *treceiver;
1003 : QueryCompletion qc;
1004 :
1005 46340 : InitializeQueryCompletion(&qc);
1006 46340 : PortalCreateHoldStore(portal);
1007 46340 : treceiver = CreateDestReceiver(DestTuplestore);
1008 46340 : SetTuplestoreDestReceiverParams(treceiver,
1009 : portal->holdStore,
1010 : portal->holdContext,
1011 : false,
1012 : NULL,
1013 : NULL);
1014 :
1015 46340 : switch (portal->strategy)
1016 : {
1017 2786 : case PORTAL_ONE_RETURNING:
1018 : case PORTAL_ONE_MOD_WITH:
1019 :
1020 : /*
1021 : * Run the portal to completion just as for the default
1022 : * PORTAL_MULTI_QUERY case, but send the primary query's output to
1023 : * the tuplestore. Auxiliary query outputs are discarded. Set the
1024 : * portal's holdSnapshot to the snapshot used (or a copy of it).
1025 : */
1026 2786 : PortalRunMulti(portal, isTopLevel, true,
1027 : treceiver, None_Receiver, &qc);
1028 2664 : break;
1029 :
1030 43554 : case PORTAL_UTIL_SELECT:
1031 43554 : PortalRunUtility(portal, linitial_node(PlannedStmt, portal->stmts),
1032 : isTopLevel, true, treceiver, &qc);
1033 43314 : break;
1034 :
1035 0 : default:
1036 0 : elog(ERROR, "unsupported portal strategy: %d",
1037 : (int) portal->strategy);
1038 : break;
1039 : }
1040 :
1041 : /* Override portal completion data with actual command results */
1042 45978 : if (qc.commandTag != CMDTAG_UNKNOWN)
1043 23028 : CopyQueryCompletion(&portal->qc, &qc);
1044 :
1045 45978 : treceiver->rDestroy(treceiver);
1046 45978 : }
1047 :
1048 : /*
1049 : * RunFromStore
1050 : * Fetch tuples from the portal's tuple store.
1051 : *
1052 : * Calling conventions are similar to ExecutorRun, except that we
1053 : * do not depend on having a queryDesc or estate. Therefore we return the
1054 : * number of tuples processed as the result, not in estate->es_processed.
1055 : *
1056 : * One difference from ExecutorRun is that the destination receiver functions
1057 : * are run in the caller's memory context (since we have no estate). Watch
1058 : * out for memory leaks.
1059 : */
1060 : static uint64
1061 69740 : RunFromStore(Portal portal, ScanDirection direction, uint64 count,
1062 : DestReceiver *dest)
1063 : {
1064 69740 : uint64 current_tuple_count = 0;
1065 : TupleTableSlot *slot;
1066 :
1067 69740 : slot = MakeSingleTupleTableSlot(portal->tupDesc, &TTSOpsMinimalTuple);
1068 :
1069 69740 : dest->rStartup(dest, CMD_SELECT, portal->tupDesc);
1070 :
1071 69740 : if (ScanDirectionIsNoMovement(direction))
1072 : {
1073 : /* do nothing except start/stop the destination */
1074 : }
1075 : else
1076 : {
1077 66918 : bool forward = ScanDirectionIsForward(direction);
1078 :
1079 : for (;;)
1080 397246 : {
1081 : MemoryContext oldcontext;
1082 : bool ok;
1083 :
1084 464164 : oldcontext = MemoryContextSwitchTo(portal->holdContext);
1085 :
1086 464164 : ok = tuplestore_gettupleslot(portal->holdStore, forward, false,
1087 : slot);
1088 :
1089 464164 : MemoryContextSwitchTo(oldcontext);
1090 :
1091 464164 : if (!ok)
1092 46032 : break;
1093 :
1094 : /*
1095 : * If we are not able to send the tuple, we assume the destination
1096 : * has closed and no more tuples can be sent. If that's the case,
1097 : * end the loop.
1098 : */
1099 418132 : if (!dest->receiveSlot(slot, dest))
1100 0 : break;
1101 :
1102 418132 : ExecClearTuple(slot);
1103 :
1104 : /*
1105 : * check our tuple count.. if we've processed the proper number
1106 : * then quit, else loop again and process more tuples. Zero count
1107 : * means no limit.
1108 : */
1109 418132 : current_tuple_count++;
1110 418132 : if (count && count == current_tuple_count)
1111 20886 : break;
1112 : }
1113 : }
1114 :
1115 69740 : dest->rShutdown(dest);
1116 :
1117 69740 : ExecDropSingleTupleTableSlot(slot);
1118 :
1119 69740 : return current_tuple_count;
1120 : }
1121 :
1122 : /*
1123 : * PortalRunUtility
1124 : * Execute a utility statement inside a portal.
1125 : */
1126 : static void
1127 330676 : PortalRunUtility(Portal portal, PlannedStmt *pstmt,
1128 : bool isTopLevel, bool setHoldSnapshot,
1129 : DestReceiver *dest, QueryCompletion *qc)
1130 : {
1131 : /*
1132 : * Set snapshot if utility stmt needs one.
1133 : */
1134 330676 : if (PlannedStmtRequiresSnapshot(pstmt))
1135 : {
1136 263514 : Snapshot snapshot = GetTransactionSnapshot();
1137 :
1138 : /* If told to, register the snapshot we're using and save in portal */
1139 263514 : if (setHoldSnapshot)
1140 : {
1141 37116 : snapshot = RegisterSnapshot(snapshot);
1142 37116 : portal->holdSnapshot = snapshot;
1143 : }
1144 :
1145 : /*
1146 : * In any case, make the snapshot active and remember it in portal.
1147 : * Because the portal now references the snapshot, we must tell
1148 : * snapmgr.c that the snapshot belongs to the portal's transaction
1149 : * level, else we risk portalSnapshot becoming a dangling pointer.
1150 : */
1151 263514 : PushActiveSnapshotWithLevel(snapshot, portal->createLevel);
1152 : /* PushActiveSnapshotWithLevel might have copied the snapshot */
1153 263514 : portal->portalSnapshot = GetActiveSnapshot();
1154 : }
1155 : else
1156 67162 : portal->portalSnapshot = NULL;
1157 :
1158 330676 : ProcessUtility(pstmt,
1159 : portal->sourceText,
1160 330676 : (portal->cplan != NULL), /* protect tree if in plancache */
1161 330676 : isTopLevel ? PROCESS_UTILITY_TOPLEVEL : PROCESS_UTILITY_QUERY,
1162 : portal->portalParams,
1163 : portal->queryEnv,
1164 : dest,
1165 : qc);
1166 :
1167 : /* Some utility statements may change context on us */
1168 316276 : MemoryContextSwitchTo(portal->portalContext);
1169 :
1170 : /*
1171 : * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from
1172 : * under us, so don't complain if it's now empty. Otherwise, our snapshot
1173 : * should be the top one; pop it. Note that this could be a different
1174 : * snapshot from the one we made above; see EnsurePortalSnapshotExists.
1175 : */
1176 316276 : if (portal->portalSnapshot != NULL && ActiveSnapshotSet())
1177 : {
1178 : Assert(portal->portalSnapshot == GetActiveSnapshot());
1179 242032 : PopActiveSnapshot();
1180 : }
1181 316276 : portal->portalSnapshot = NULL;
1182 316276 : }
1183 :
1184 : /*
1185 : * PortalRunMulti
1186 : * Execute a portal's queries in the general case (multi queries
1187 : * or non-SELECT-like queries)
1188 : */
1189 : static void
1190 375706 : PortalRunMulti(Portal portal,
1191 : bool isTopLevel, bool setHoldSnapshot,
1192 : DestReceiver *dest, DestReceiver *altdest,
1193 : QueryCompletion *qc)
1194 : {
1195 375706 : bool active_snapshot_set = false;
1196 : ListCell *stmtlist_item;
1197 :
1198 : /*
1199 : * If the destination is DestRemoteExecute, change to DestNone. The
1200 : * reason is that the client won't be expecting any tuples, and indeed has
1201 : * no way to know what they are, since there is no provision for Describe
1202 : * to send a RowDescription message when this portal execution strategy is
1203 : * in effect. This presently will only affect SELECT commands added to
1204 : * non-SELECT queries by rewrite rules: such commands will be executed,
1205 : * but the results will be discarded unless you use "simple Query"
1206 : * protocol.
1207 : */
1208 375706 : if (dest->mydest == DestRemoteExecute)
1209 12398 : dest = None_Receiver;
1210 375706 : if (altdest->mydest == DestRemoteExecute)
1211 12398 : altdest = None_Receiver;
1212 :
1213 : /*
1214 : * Loop to handle the individual queries generated from a single parsetree
1215 : * by analysis and rewrite.
1216 : */
1217 732454 : foreach(stmtlist_item, portal->stmts)
1218 : {
1219 376294 : PlannedStmt *pstmt = lfirst_node(PlannedStmt, stmtlist_item);
1220 :
1221 : /*
1222 : * If we got a cancel signal in prior command, quit
1223 : */
1224 376294 : CHECK_FOR_INTERRUPTS();
1225 :
1226 376294 : if (pstmt->utilityStmt == NULL)
1227 : {
1228 : /*
1229 : * process a plannable query.
1230 : */
1231 : TRACE_POSTGRESQL_QUERY_EXECUTE_START();
1232 :
1233 89172 : if (log_executor_stats)
1234 0 : ResetUsage();
1235 :
1236 : /*
1237 : * Must always have a snapshot for plannable queries. First time
1238 : * through, take a new snapshot; for subsequent queries in the
1239 : * same portal, just update the snapshot's copy of the command
1240 : * counter.
1241 : */
1242 89172 : if (!active_snapshot_set)
1243 : {
1244 88584 : Snapshot snapshot = GetTransactionSnapshot();
1245 :
1246 : /* If told to, register the snapshot and save in portal */
1247 88584 : if (setHoldSnapshot)
1248 : {
1249 2786 : snapshot = RegisterSnapshot(snapshot);
1250 2786 : portal->holdSnapshot = snapshot;
1251 : }
1252 :
1253 : /*
1254 : * We can't have the holdSnapshot also be the active one,
1255 : * because UpdateActiveSnapshotCommandId would complain. So
1256 : * force an extra snapshot copy. Plain PushActiveSnapshot
1257 : * would have copied the transaction snapshot anyway, so this
1258 : * only adds a copy step when setHoldSnapshot is true. (It's
1259 : * okay for the command ID of the active snapshot to diverge
1260 : * from what holdSnapshot has.)
1261 : */
1262 88584 : PushCopiedSnapshot(snapshot);
1263 :
1264 : /*
1265 : * As for PORTAL_ONE_SELECT portals, it does not seem
1266 : * necessary to maintain portal->portalSnapshot here.
1267 : */
1268 :
1269 88584 : active_snapshot_set = true;
1270 : }
1271 : else
1272 588 : UpdateActiveSnapshotCommandId();
1273 :
1274 89172 : if (pstmt->canSetTag)
1275 : {
1276 : /* statement can set tag string */
1277 88548 : ProcessQuery(pstmt,
1278 : portal->sourceText,
1279 : portal->portalParams,
1280 : portal->queryEnv,
1281 : dest, qc);
1282 : }
1283 : else
1284 : {
1285 : /* stmt added by rewrite cannot set tag */
1286 624 : ProcessQuery(pstmt,
1287 : portal->sourceText,
1288 : portal->portalParams,
1289 : portal->queryEnv,
1290 : altdest, NULL);
1291 : }
1292 :
1293 83786 : if (log_executor_stats)
1294 0 : ShowUsage("EXECUTOR STATISTICS");
1295 :
1296 : TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
1297 : }
1298 : else
1299 : {
1300 : /*
1301 : * process utility functions (create, destroy, etc..)
1302 : *
1303 : * We must not set a snapshot here for utility commands (if one is
1304 : * needed, PortalRunUtility will do it). If a utility command is
1305 : * alone in a portal then everything's fine. The only case where
1306 : * a utility command can be part of a longer list is that rules
1307 : * are allowed to include NotifyStmt. NotifyStmt doesn't care
1308 : * whether it has a snapshot or not, so we just leave the current
1309 : * snapshot alone if we have one.
1310 : */
1311 287122 : if (pstmt->canSetTag)
1312 : {
1313 : Assert(!active_snapshot_set);
1314 : /* statement can set tag string */
1315 287122 : PortalRunUtility(portal, pstmt, isTopLevel, false,
1316 : dest, qc);
1317 : }
1318 : else
1319 : {
1320 : Assert(IsA(pstmt->utilityStmt, NotifyStmt));
1321 : /* stmt added by rewrite cannot set tag */
1322 0 : PortalRunUtility(portal, pstmt, isTopLevel, false,
1323 : altdest, NULL);
1324 : }
1325 : }
1326 :
1327 : /*
1328 : * Clear subsidiary contexts to recover temporary memory.
1329 : */
1330 : Assert(portal->portalContext == CurrentMemoryContext);
1331 :
1332 356748 : MemoryContextDeleteChildren(portal->portalContext);
1333 :
1334 : /*
1335 : * Avoid crashing if portal->stmts has been reset. This can only
1336 : * occur if a CALL or DO utility statement executed an internal
1337 : * COMMIT/ROLLBACK (cf PortalReleaseCachedPlan). The CALL or DO must
1338 : * have been the only statement in the portal, so there's nothing left
1339 : * for us to do; but we don't want to dereference a now-dangling list
1340 : * pointer.
1341 : */
1342 356748 : if (portal->stmts == NIL)
1343 0 : break;
1344 :
1345 : /*
1346 : * Increment command counter between queries, but not after the last
1347 : * one.
1348 : */
1349 356748 : if (lnext(portal->stmts, stmtlist_item) != NULL)
1350 588 : CommandCounterIncrement();
1351 : }
1352 :
1353 : /* Pop the snapshot if we pushed one. */
1354 356160 : if (active_snapshot_set)
1355 83198 : PopActiveSnapshot();
1356 :
1357 : /*
1358 : * If a query completion data was supplied, use it. Otherwise use the
1359 : * portal's query completion data.
1360 : *
1361 : * Exception: Clients expect INSERT/UPDATE/DELETE tags to have counts, so
1362 : * fake them with zeros. This can happen with DO INSTEAD rules if there
1363 : * is no replacement query of the same type as the original. We print "0
1364 : * 0" here because technically there is no query of the matching tag type,
1365 : * and printing a non-zero count for a different query type seems wrong,
1366 : * e.g. an INSERT that does an UPDATE instead should not print "0 1" if
1367 : * one row was updated. See QueryRewrite(), step 3, for details.
1368 : */
1369 356160 : if (qc && qc->commandTag == CMDTAG_UNKNOWN)
1370 : {
1371 261542 : if (portal->qc.commandTag != CMDTAG_UNKNOWN)
1372 261542 : CopyQueryCompletion(qc, &portal->qc);
1373 : /* If the caller supplied a qc, we should have set it by now. */
1374 : Assert(qc->commandTag != CMDTAG_UNKNOWN);
1375 : }
1376 356160 : }
1377 :
1378 : /*
1379 : * PortalRunFetch
1380 : * Variant form of PortalRun that supports SQL FETCH directions.
1381 : *
1382 : * Note: we presently assume that no callers of this want isTopLevel = true.
1383 : *
1384 : * count <= 0 is interpreted as a no-op: the destination gets started up
1385 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
1386 : * interpreted as "all rows". (cf FetchStmt.howMany)
1387 : *
1388 : * Returns number of rows processed (suitable for use in result tag)
1389 : */
1390 : uint64
1391 49742 : PortalRunFetch(Portal portal,
1392 : FetchDirection fdirection,
1393 : long count,
1394 : DestReceiver *dest)
1395 : {
1396 : uint64 result;
1397 : Portal saveActivePortal;
1398 : ResourceOwner saveResourceOwner;
1399 : MemoryContext savePortalContext;
1400 : MemoryContext oldContext;
1401 :
1402 : Assert(PortalIsValid(portal));
1403 :
1404 : /*
1405 : * Check for improper portal use, and mark portal active.
1406 : */
1407 49742 : MarkPortalActive(portal);
1408 :
1409 : /* If supporting FETCH, portal can't be run-once. */
1410 : Assert(!portal->run_once);
1411 :
1412 : /*
1413 : * Set up global portal context pointers.
1414 : */
1415 49724 : saveActivePortal = ActivePortal;
1416 49724 : saveResourceOwner = CurrentResourceOwner;
1417 49724 : savePortalContext = PortalContext;
1418 49724 : PG_TRY();
1419 : {
1420 49724 : ActivePortal = portal;
1421 49724 : if (portal->resowner)
1422 49532 : CurrentResourceOwner = portal->resowner;
1423 49724 : PortalContext = portal->portalContext;
1424 :
1425 49724 : oldContext = MemoryContextSwitchTo(PortalContext);
1426 :
1427 49724 : switch (portal->strategy)
1428 : {
1429 18146 : case PORTAL_ONE_SELECT:
1430 18146 : result = DoPortalRunFetch(portal, fdirection, count, dest);
1431 18098 : break;
1432 :
1433 31578 : case PORTAL_ONE_RETURNING:
1434 : case PORTAL_ONE_MOD_WITH:
1435 : case PORTAL_UTIL_SELECT:
1436 :
1437 : /*
1438 : * If we have not yet run the command, do so, storing its
1439 : * results in the portal's tuplestore.
1440 : */
1441 31578 : if (!portal->holdStore)
1442 8002 : FillPortalStore(portal, false /* isTopLevel */ );
1443 :
1444 : /*
1445 : * Now fetch desired portion of results.
1446 : */
1447 31572 : result = DoPortalRunFetch(portal, fdirection, count, dest);
1448 31572 : break;
1449 :
1450 0 : default:
1451 0 : elog(ERROR, "unsupported portal strategy");
1452 : result = 0; /* keep compiler quiet */
1453 : break;
1454 : }
1455 : }
1456 54 : PG_CATCH();
1457 : {
1458 : /* Uncaught error while executing portal: mark it dead */
1459 54 : MarkPortalFailed(portal);
1460 :
1461 : /* Restore global vars and propagate error */
1462 54 : ActivePortal = saveActivePortal;
1463 54 : CurrentResourceOwner = saveResourceOwner;
1464 54 : PortalContext = savePortalContext;
1465 :
1466 54 : PG_RE_THROW();
1467 : }
1468 49670 : PG_END_TRY();
1469 :
1470 49670 : MemoryContextSwitchTo(oldContext);
1471 :
1472 : /* Mark portal not active */
1473 49670 : portal->status = PORTAL_READY;
1474 :
1475 49670 : ActivePortal = saveActivePortal;
1476 49670 : CurrentResourceOwner = saveResourceOwner;
1477 49670 : PortalContext = savePortalContext;
1478 :
1479 49670 : return result;
1480 : }
1481 :
1482 : /*
1483 : * DoPortalRunFetch
1484 : * Guts of PortalRunFetch --- the portal context is already set up
1485 : *
1486 : * Here, count < 0 typically reverses the direction. Also, count == FETCH_ALL
1487 : * is interpreted as "all rows". (cf FetchStmt.howMany)
1488 : *
1489 : * Returns number of rows processed (suitable for use in result tag)
1490 : */
1491 : static uint64
1492 49718 : DoPortalRunFetch(Portal portal,
1493 : FetchDirection fdirection,
1494 : long count,
1495 : DestReceiver *dest)
1496 : {
1497 : bool forward;
1498 :
1499 : Assert(portal->strategy == PORTAL_ONE_SELECT ||
1500 : portal->strategy == PORTAL_ONE_RETURNING ||
1501 : portal->strategy == PORTAL_ONE_MOD_WITH ||
1502 : portal->strategy == PORTAL_UTIL_SELECT);
1503 :
1504 : /*
1505 : * Note: we disallow backwards fetch (including re-fetch of current row)
1506 : * for NO SCROLL cursors, but we interpret that very loosely: you can use
1507 : * any of the FetchDirection options, so long as the end result is to move
1508 : * forwards by at least one row. Currently it's sufficient to check for
1509 : * NO SCROLL in DoPortalRewind() and in the forward == false path in
1510 : * PortalRunSelect(); but someday we might prefer to account for that
1511 : * restriction explicitly here.
1512 : */
1513 49718 : switch (fdirection)
1514 : {
1515 48978 : case FETCH_FORWARD:
1516 48978 : if (count < 0)
1517 : {
1518 0 : fdirection = FETCH_BACKWARD;
1519 0 : count = -count;
1520 : }
1521 : /* fall out of switch to share code with FETCH_BACKWARD */
1522 48978 : break;
1523 496 : case FETCH_BACKWARD:
1524 496 : if (count < 0)
1525 : {
1526 0 : fdirection = FETCH_FORWARD;
1527 0 : count = -count;
1528 : }
1529 : /* fall out of switch to share code with FETCH_FORWARD */
1530 496 : break;
1531 160 : case FETCH_ABSOLUTE:
1532 160 : if (count > 0)
1533 : {
1534 : /*
1535 : * Definition: Rewind to start, advance count-1 rows, return
1536 : * next row (if any).
1537 : *
1538 : * In practice, if the goal is less than halfway back to the
1539 : * start, it's better to scan from where we are.
1540 : *
1541 : * Also, if current portalPos is outside the range of "long",
1542 : * do it the hard way to avoid possible overflow of the count
1543 : * argument to PortalRunSelect. We must exclude exactly
1544 : * LONG_MAX, as well, lest the count look like FETCH_ALL.
1545 : *
1546 : * In any case, we arrange to fetch the target row going
1547 : * forwards.
1548 : */
1549 90 : if ((uint64) (count - 1) <= portal->portalPos / 2 ||
1550 36 : portal->portalPos >= (uint64) LONG_MAX)
1551 : {
1552 54 : DoPortalRewind(portal);
1553 48 : if (count > 1)
1554 0 : PortalRunSelect(portal, true, count - 1,
1555 : None_Receiver);
1556 : }
1557 : else
1558 : {
1559 36 : long pos = (long) portal->portalPos;
1560 :
1561 36 : if (portal->atEnd)
1562 0 : pos++; /* need one extra fetch if off end */
1563 36 : if (count <= pos)
1564 12 : PortalRunSelect(portal, false, pos - count + 1,
1565 : None_Receiver);
1566 24 : else if (count > pos + 1)
1567 12 : PortalRunSelect(portal, true, count - pos - 1,
1568 : None_Receiver);
1569 : }
1570 78 : return PortalRunSelect(portal, true, 1L, dest);
1571 : }
1572 70 : else if (count < 0)
1573 : {
1574 : /*
1575 : * Definition: Advance to end, back up abs(count)-1 rows,
1576 : * return prior row (if any). We could optimize this if we
1577 : * knew in advance where the end was, but typically we won't.
1578 : * (Is it worth considering case where count > half of size of
1579 : * query? We could rewind once we know the size ...)
1580 : */
1581 54 : PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
1582 54 : if (count < -1)
1583 0 : PortalRunSelect(portal, false, -count - 1, None_Receiver);
1584 54 : return PortalRunSelect(portal, false, 1L, dest);
1585 : }
1586 : else
1587 : {
1588 : /* count == 0 */
1589 : /* Rewind to start, return zero rows */
1590 16 : DoPortalRewind(portal);
1591 16 : return PortalRunSelect(portal, true, 0L, dest);
1592 : }
1593 : break;
1594 84 : case FETCH_RELATIVE:
1595 84 : if (count > 0)
1596 : {
1597 : /*
1598 : * Definition: advance count-1 rows, return next row (if any).
1599 : */
1600 36 : if (count > 1)
1601 24 : PortalRunSelect(portal, true, count - 1, None_Receiver);
1602 36 : return PortalRunSelect(portal, true, 1L, dest);
1603 : }
1604 48 : else if (count < 0)
1605 : {
1606 : /*
1607 : * Definition: back up abs(count)-1 rows, return prior row (if
1608 : * any).
1609 : */
1610 30 : if (count < -1)
1611 18 : PortalRunSelect(portal, false, -count - 1, None_Receiver);
1612 30 : return PortalRunSelect(portal, false, 1L, dest);
1613 : }
1614 : else
1615 : {
1616 : /* count == 0 */
1617 : /* Same as FETCH FORWARD 0, so fall out of switch */
1618 18 : fdirection = FETCH_FORWARD;
1619 : }
1620 18 : break;
1621 0 : default:
1622 0 : elog(ERROR, "bogus direction");
1623 : break;
1624 : }
1625 :
1626 : /*
1627 : * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, and count
1628 : * >= 0.
1629 : */
1630 49492 : forward = (fdirection == FETCH_FORWARD);
1631 :
1632 : /*
1633 : * Zero count means to re-fetch the current row, if any (per SQL)
1634 : */
1635 49492 : if (count == 0)
1636 : {
1637 : bool on_row;
1638 :
1639 : /* Are we sitting on a row? */
1640 18 : on_row = (!portal->atStart && !portal->atEnd);
1641 :
1642 18 : if (dest->mydest == DestNone)
1643 : {
1644 : /* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */
1645 0 : return on_row ? 1 : 0;
1646 : }
1647 : else
1648 : {
1649 : /*
1650 : * If we are sitting on a row, back up one so we can re-fetch it.
1651 : * If we are not sitting on a row, we still have to start up and
1652 : * shut down the executor so that the destination is initialized
1653 : * and shut down correctly; so keep going. To PortalRunSelect,
1654 : * count == 0 means we will retrieve no row.
1655 : */
1656 18 : if (on_row)
1657 : {
1658 18 : PortalRunSelect(portal, false, 1L, None_Receiver);
1659 : /* Set up to fetch one row forward */
1660 12 : count = 1;
1661 12 : forward = true;
1662 : }
1663 : }
1664 : }
1665 :
1666 : /*
1667 : * Optimize MOVE BACKWARD ALL into a Rewind.
1668 : */
1669 49486 : if (!forward && count == FETCH_ALL && dest->mydest == DestNone)
1670 : {
1671 24 : uint64 result = portal->portalPos;
1672 :
1673 24 : if (result > 0 && !portal->atEnd)
1674 0 : result--;
1675 24 : DoPortalRewind(portal);
1676 24 : return result;
1677 : }
1678 :
1679 49462 : return PortalRunSelect(portal, forward, count, dest);
1680 : }
1681 :
1682 : /*
1683 : * DoPortalRewind - rewind a Portal to starting point
1684 : */
1685 : static void
1686 94 : DoPortalRewind(Portal portal)
1687 : {
1688 : QueryDesc *queryDesc;
1689 :
1690 : /*
1691 : * No work is needed if we've not advanced nor attempted to advance the
1692 : * cursor (and we don't want to throw a NO SCROLL error in this case).
1693 : */
1694 94 : if (portal->atStart && !portal->atEnd)
1695 18 : return;
1696 :
1697 : /* Otherwise, cursor must allow scrolling */
1698 76 : if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
1699 6 : ereport(ERROR,
1700 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1701 : errmsg("cursor can only scan forward"),
1702 : errhint("Declare it with SCROLL option to enable backward scan.")));
1703 :
1704 : /* Rewind holdStore, if we have one */
1705 70 : if (portal->holdStore)
1706 : {
1707 : MemoryContext oldcontext;
1708 :
1709 6 : oldcontext = MemoryContextSwitchTo(portal->holdContext);
1710 6 : tuplestore_rescan(portal->holdStore);
1711 6 : MemoryContextSwitchTo(oldcontext);
1712 : }
1713 :
1714 : /* Rewind executor, if active */
1715 70 : queryDesc = portal->queryDesc;
1716 70 : if (queryDesc)
1717 : {
1718 64 : PushActiveSnapshot(queryDesc->snapshot);
1719 64 : ExecutorRewind(queryDesc);
1720 64 : PopActiveSnapshot();
1721 : }
1722 :
1723 70 : portal->atStart = true;
1724 70 : portal->atEnd = false;
1725 70 : portal->portalPos = 0;
1726 : }
1727 :
1728 : /*
1729 : * PlannedStmtRequiresSnapshot - what it says on the tin
1730 : */
1731 : bool
1732 436306 : PlannedStmtRequiresSnapshot(PlannedStmt *pstmt)
1733 : {
1734 436306 : Node *utilityStmt = pstmt->utilityStmt;
1735 :
1736 : /* If it's not a utility statement, it definitely needs a snapshot */
1737 436306 : if (utilityStmt == NULL)
1738 81718 : return true;
1739 :
1740 : /*
1741 : * Most utility statements need a snapshot, and the default presumption
1742 : * about new ones should be that they do too. Hence, enumerate those that
1743 : * do not need one.
1744 : *
1745 : * Transaction control, LOCK, and SET must *not* set a snapshot, since
1746 : * they need to be executable at the start of a transaction-snapshot-mode
1747 : * transaction without freezing a snapshot. By extension we allow SHOW
1748 : * not to set a snapshot. The other stmts listed are just efficiency
1749 : * hacks. Beware of listing anything that can modify the database --- if,
1750 : * say, it has to update an index with expressions that invoke
1751 : * user-defined functions, then it had better have a snapshot.
1752 : */
1753 354588 : if (IsA(utilityStmt, TransactionStmt) ||
1754 319466 : IsA(utilityStmt, LockStmt) ||
1755 318382 : IsA(utilityStmt, VariableSetStmt) ||
1756 283756 : IsA(utilityStmt, VariableShowStmt) ||
1757 282958 : IsA(utilityStmt, ConstraintsSetStmt) ||
1758 : /* efficiency hacks from here down */
1759 282856 : IsA(utilityStmt, FetchStmt) ||
1760 277114 : IsA(utilityStmt, ListenStmt) ||
1761 277040 : IsA(utilityStmt, NotifyStmt) ||
1762 276952 : IsA(utilityStmt, UnlistenStmt) ||
1763 276914 : IsA(utilityStmt, CheckPointStmt))
1764 77878 : return false;
1765 :
1766 276710 : return true;
1767 : }
1768 :
1769 : /*
1770 : * EnsurePortalSnapshotExists - recreate Portal-level snapshot, if needed
1771 : *
1772 : * Generally, we will have an active snapshot whenever we are executing
1773 : * inside a Portal, unless the Portal's query is one of the utility
1774 : * statements exempted from that rule (see PlannedStmtRequiresSnapshot).
1775 : * However, procedures and DO blocks can commit or abort the transaction,
1776 : * and thereby destroy all snapshots. This function can be called to
1777 : * re-establish the Portal-level snapshot when none exists.
1778 : */
1779 : void
1780 390446 : EnsurePortalSnapshotExists(void)
1781 : {
1782 : Portal portal;
1783 :
1784 : /*
1785 : * Nothing to do if a snapshot is set. (We take it on faith that the
1786 : * outermost active snapshot belongs to some Portal; or if there is no
1787 : * Portal, it's somebody else's responsibility to manage things.)
1788 : */
1789 390446 : if (ActiveSnapshotSet())
1790 386008 : return;
1791 :
1792 : /* Otherwise, we'd better have an active Portal */
1793 4438 : portal = ActivePortal;
1794 4438 : if (unlikely(portal == NULL))
1795 0 : elog(ERROR, "cannot execute SQL without an outer snapshot or portal");
1796 : Assert(portal->portalSnapshot == NULL);
1797 :
1798 : /*
1799 : * Create a new snapshot, make it active, and remember it in portal.
1800 : * Because the portal now references the snapshot, we must tell snapmgr.c
1801 : * that the snapshot belongs to the portal's transaction level, else we
1802 : * risk portalSnapshot becoming a dangling pointer.
1803 : */
1804 4438 : PushActiveSnapshotWithLevel(GetTransactionSnapshot(), portal->createLevel);
1805 : /* PushActiveSnapshotWithLevel might have copied the snapshot */
1806 4438 : portal->portalSnapshot = GetActiveSnapshot();
1807 : }
|