Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pquery.c
4 : * POSTGRES process query command code
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/tcop/pquery.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include <limits.h>
19 :
20 : #include "access/xact.h"
21 : #include "commands/prepare.h"
22 : #include "executor/execdesc.h"
23 : #include "executor/executor.h"
24 : #include "executor/tstoreReceiver.h"
25 : #include "miscadmin.h"
26 : #include "pg_trace.h"
27 : #include "tcop/pquery.h"
28 : #include "tcop/utility.h"
29 : #include "utils/memutils.h"
30 : #include "utils/snapmgr.h"
31 :
32 :
33 : /*
34 : * ActivePortal is the currently executing Portal (the most closely nested,
35 : * if there are several).
36 : */
37 : Portal ActivePortal = NULL;
38 :
39 :
40 : static void ProcessQuery(PlannedStmt *plan,
41 : CachedPlan *cplan,
42 : CachedPlanSource *plansource,
43 : int query_index,
44 : const char *sourceText,
45 : ParamListInfo params,
46 : QueryEnvironment *queryEnv,
47 : DestReceiver *dest,
48 : QueryCompletion *qc);
49 : static void FillPortalStore(Portal portal, bool isTopLevel);
50 : static uint64 RunFromStore(Portal portal, ScanDirection direction, uint64 count,
51 : DestReceiver *dest);
52 : static uint64 PortalRunSelect(Portal portal, bool forward, long count,
53 : DestReceiver *dest);
54 : static void PortalRunUtility(Portal portal, PlannedStmt *pstmt,
55 : bool isTopLevel, bool setHoldSnapshot,
56 : DestReceiver *dest, QueryCompletion *qc);
57 : static void PortalRunMulti(Portal portal,
58 : bool isTopLevel, bool setHoldSnapshot,
59 : DestReceiver *dest, DestReceiver *altdest,
60 : QueryCompletion *qc);
61 : static uint64 DoPortalRunFetch(Portal portal,
62 : FetchDirection fdirection,
63 : long count,
64 : DestReceiver *dest);
65 : static void DoPortalRewind(Portal portal);
66 :
67 :
68 : /*
69 : * CreateQueryDesc
70 : */
71 : QueryDesc *
72 670516 : CreateQueryDesc(PlannedStmt *plannedstmt,
73 : CachedPlan *cplan,
74 : const char *sourceText,
75 : Snapshot snapshot,
76 : Snapshot crosscheck_snapshot,
77 : DestReceiver *dest,
78 : ParamListInfo params,
79 : QueryEnvironment *queryEnv,
80 : int instrument_options)
81 : {
82 670516 : QueryDesc *qd = (QueryDesc *) palloc(sizeof(QueryDesc));
83 :
84 670516 : qd->operation = plannedstmt->commandType; /* operation */
85 670516 : qd->plannedstmt = plannedstmt; /* plan */
86 670516 : qd->cplan = cplan; /* CachedPlan supplying the plannedstmt */
87 670516 : qd->sourceText = sourceText; /* query text */
88 670516 : qd->snapshot = RegisterSnapshot(snapshot); /* snapshot */
89 : /* RI check snapshot */
90 670516 : qd->crosscheck_snapshot = RegisterSnapshot(crosscheck_snapshot);
91 670516 : qd->dest = dest; /* output dest */
92 670516 : qd->params = params; /* parameter values passed into query */
93 670516 : qd->queryEnv = queryEnv;
94 670516 : qd->instrument_options = instrument_options; /* instrumentation wanted? */
95 :
96 : /* null these fields until set by ExecutorStart */
97 670516 : qd->tupDesc = NULL;
98 670516 : qd->estate = NULL;
99 670516 : qd->planstate = NULL;
100 670516 : qd->totaltime = NULL;
101 :
102 : /* not yet executed */
103 670516 : qd->already_executed = false;
104 :
105 670516 : return qd;
106 : }
107 :
108 : /*
109 : * FreeQueryDesc
110 : */
111 : void
112 642070 : FreeQueryDesc(QueryDesc *qdesc)
113 : {
114 : /* Can't be a live query */
115 : Assert(qdesc->estate == NULL);
116 :
117 : /* forget our snapshots */
118 642070 : UnregisterSnapshot(qdesc->snapshot);
119 642070 : UnregisterSnapshot(qdesc->crosscheck_snapshot);
120 :
121 : /* Only the QueryDesc itself need be freed */
122 642070 : pfree(qdesc);
123 642070 : }
124 :
125 :
126 : /*
127 : * ProcessQuery
128 : * Execute a single plannable query within a PORTAL_MULTI_QUERY,
129 : * PORTAL_ONE_RETURNING, or PORTAL_ONE_MOD_WITH portal
130 : *
131 : * plan: the plan tree for the query
132 : * cplan: CachedPlan supplying the plan
133 : * plansource: CachedPlanSource supplying the cplan
134 : * query_index: index of the query in plansource->query_list
135 : * sourceText: the source text of the query
136 : * params: any parameters needed
137 : * dest: where to send results
138 : * qc: where to store the command completion status data.
139 : *
140 : * qc may be NULL if caller doesn't want a status string.
141 : *
142 : * Must be called in a memory context that will be reset or deleted on
143 : * error; otherwise the executor's memory usage will be leaked.
144 : */
145 : static void
146 90544 : ProcessQuery(PlannedStmt *plan,
147 : CachedPlan *cplan,
148 : CachedPlanSource *plansource,
149 : int query_index,
150 : const char *sourceText,
151 : ParamListInfo params,
152 : QueryEnvironment *queryEnv,
153 : DestReceiver *dest,
154 : QueryCompletion *qc)
155 : {
156 : QueryDesc *queryDesc;
157 :
158 : /*
159 : * Create the QueryDesc object
160 : */
161 90544 : queryDesc = CreateQueryDesc(plan, cplan, sourceText,
162 : GetActiveSnapshot(), InvalidSnapshot,
163 : dest, params, queryEnv, 0);
164 :
165 : /*
166 : * Prepare the plan for execution
167 : */
168 90544 : if (queryDesc->cplan)
169 : {
170 10082 : ExecutorStartCachedPlan(queryDesc, 0, plansource, query_index);
171 : Assert(queryDesc->planstate);
172 : }
173 : else
174 : {
175 80462 : if (!ExecutorStart(queryDesc, 0))
176 0 : elog(ERROR, "ExecutorStart() failed unexpectedly");
177 : }
178 :
179 : /*
180 : * Run the plan to completion.
181 : */
182 89266 : ExecutorRun(queryDesc, ForwardScanDirection, 0);
183 :
184 : /*
185 : * Build command completion status data, if caller wants one.
186 : */
187 86064 : if (qc)
188 : {
189 85440 : switch (queryDesc->operation)
190 : {
191 116 : case CMD_SELECT:
192 116 : SetQueryCompletion(qc, CMDTAG_SELECT, queryDesc->estate->es_processed);
193 116 : break;
194 69954 : case CMD_INSERT:
195 69954 : SetQueryCompletion(qc, CMDTAG_INSERT, queryDesc->estate->es_processed);
196 69954 : break;
197 10864 : case CMD_UPDATE:
198 10864 : SetQueryCompletion(qc, CMDTAG_UPDATE, queryDesc->estate->es_processed);
199 10864 : break;
200 3456 : case CMD_DELETE:
201 3456 : SetQueryCompletion(qc, CMDTAG_DELETE, queryDesc->estate->es_processed);
202 3456 : break;
203 1050 : case CMD_MERGE:
204 1050 : SetQueryCompletion(qc, CMDTAG_MERGE, queryDesc->estate->es_processed);
205 1050 : break;
206 0 : default:
207 0 : SetQueryCompletion(qc, CMDTAG_UNKNOWN, queryDesc->estate->es_processed);
208 0 : break;
209 : }
210 624 : }
211 :
212 : /*
213 : * Now, we close down all the scans and free allocated resources.
214 : */
215 86064 : ExecutorFinish(queryDesc);
216 85036 : ExecutorEnd(queryDesc);
217 :
218 85036 : FreeQueryDesc(queryDesc);
219 85036 : }
220 :
221 : /*
222 : * ChoosePortalStrategy
223 : * Select portal execution strategy given the intended statement list.
224 : *
225 : * The list elements can be Querys or PlannedStmts.
226 : * That's more general than portals need, but plancache.c uses this too.
227 : *
228 : * See the comments in portal.h.
229 : */
230 : PortalStrategy
231 820278 : ChoosePortalStrategy(List *stmts)
232 : {
233 : int nSetTag;
234 : ListCell *lc;
235 :
236 : /*
237 : * PORTAL_ONE_SELECT and PORTAL_UTIL_SELECT need only consider the
238 : * single-statement case, since there are no rewrite rules that can add
239 : * auxiliary queries to a SELECT or a utility command. PORTAL_ONE_MOD_WITH
240 : * likewise allows only one top-level statement.
241 : */
242 820278 : if (list_length(stmts) == 1)
243 : {
244 819948 : Node *stmt = (Node *) linitial(stmts);
245 :
246 819948 : if (IsA(stmt, Query))
247 : {
248 81374 : Query *query = (Query *) stmt;
249 :
250 81374 : if (query->canSetTag)
251 : {
252 81374 : if (query->commandType == CMD_SELECT)
253 : {
254 53232 : if (query->hasModifyingCTE)
255 0 : return PORTAL_ONE_MOD_WITH;
256 : else
257 53232 : return PORTAL_ONE_SELECT;
258 : }
259 28142 : if (query->commandType == CMD_UTILITY)
260 : {
261 20174 : if (UtilityReturnsTuples(query->utilityStmt))
262 10044 : return PORTAL_UTIL_SELECT;
263 : /* it can't be ONE_RETURNING, so give up */
264 10130 : return PORTAL_MULTI_QUERY;
265 : }
266 : }
267 : }
268 738574 : else if (IsA(stmt, PlannedStmt))
269 : {
270 738574 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
271 :
272 738574 : if (pstmt->canSetTag)
273 : {
274 738538 : if (pstmt->commandType == CMD_SELECT)
275 : {
276 291578 : if (pstmt->hasModifyingCTE)
277 128 : return PORTAL_ONE_MOD_WITH;
278 : else
279 291450 : return PORTAL_ONE_SELECT;
280 : }
281 446960 : if (pstmt->commandType == CMD_UTILITY)
282 : {
283 357492 : if (UtilityReturnsTuples(pstmt->utilityStmt))
284 57430 : return PORTAL_UTIL_SELECT;
285 : /* it can't be ONE_RETURNING, so give up */
286 300062 : return PORTAL_MULTI_QUERY;
287 : }
288 : }
289 : }
290 : else
291 0 : elog(ERROR, "unrecognized node type: %d", (int) nodeTag(stmt));
292 : }
293 :
294 : /*
295 : * PORTAL_ONE_RETURNING has to allow auxiliary queries added by rewrite.
296 : * Choose PORTAL_ONE_RETURNING if there is exactly one canSetTag query and
297 : * it has a RETURNING list.
298 : */
299 97802 : nSetTag = 0;
300 101190 : foreach(lc, stmts)
301 : {
302 97964 : Node *stmt = (Node *) lfirst(lc);
303 :
304 97964 : if (IsA(stmt, Query))
305 : {
306 7968 : Query *query = (Query *) stmt;
307 :
308 7968 : if (query->canSetTag)
309 : {
310 7968 : if (++nSetTag > 1)
311 94576 : return PORTAL_MULTI_QUERY; /* no need to look further */
312 7968 : if (query->commandType == CMD_UTILITY ||
313 7968 : query->returningList == NIL)
314 7736 : return PORTAL_MULTI_QUERY; /* no need to look further */
315 : }
316 : }
317 89996 : else if (IsA(stmt, PlannedStmt))
318 : {
319 89996 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
320 :
321 89996 : if (pstmt->canSetTag)
322 : {
323 89786 : if (++nSetTag > 1)
324 0 : return PORTAL_MULTI_QUERY; /* no need to look further */
325 89786 : if (pstmt->commandType == CMD_UTILITY ||
326 89786 : !pstmt->hasReturning)
327 86840 : return PORTAL_MULTI_QUERY; /* no need to look further */
328 : }
329 : }
330 : else
331 0 : elog(ERROR, "unrecognized node type: %d", (int) nodeTag(stmt));
332 : }
333 3226 : if (nSetTag == 1)
334 3178 : return PORTAL_ONE_RETURNING;
335 :
336 : /* Else, it's the general case... */
337 48 : return PORTAL_MULTI_QUERY;
338 : }
339 :
340 : /*
341 : * FetchPortalTargetList
342 : * Given a portal that returns tuples, extract the query targetlist.
343 : * Returns NIL if the portal doesn't have a determinable targetlist.
344 : *
345 : * Note: do not modify the result.
346 : */
347 : List *
348 312288 : FetchPortalTargetList(Portal portal)
349 : {
350 : /* no point in looking if we determined it doesn't return tuples */
351 312288 : if (portal->strategy == PORTAL_MULTI_QUERY)
352 30 : return NIL;
353 : /* get the primary statement and find out what it returns */
354 312258 : return FetchStatementTargetList((Node *) PortalGetPrimaryStmt(portal));
355 : }
356 :
357 : /*
358 : * FetchStatementTargetList
359 : * Given a statement that returns tuples, extract the query targetlist.
360 : * Returns NIL if the statement doesn't have a determinable targetlist.
361 : *
362 : * This can be applied to a Query or a PlannedStmt.
363 : * That's more general than portals need, but plancache.c uses this too.
364 : *
365 : * Note: do not modify the result.
366 : *
367 : * XXX be careful to keep this in sync with UtilityReturnsTuples.
368 : */
369 : List *
370 339754 : FetchStatementTargetList(Node *stmt)
371 : {
372 339754 : if (stmt == NULL)
373 0 : return NIL;
374 339754 : if (IsA(stmt, Query))
375 : {
376 27496 : Query *query = (Query *) stmt;
377 :
378 27496 : if (query->commandType == CMD_UTILITY)
379 : {
380 : /* transfer attention to utility statement */
381 12 : stmt = query->utilityStmt;
382 : }
383 : else
384 : {
385 27484 : if (query->commandType == CMD_SELECT)
386 27472 : return query->targetList;
387 12 : if (query->returningList)
388 12 : return query->returningList;
389 0 : return NIL;
390 : }
391 : }
392 312270 : if (IsA(stmt, PlannedStmt))
393 : {
394 312258 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
395 :
396 312258 : if (pstmt->commandType == CMD_UTILITY)
397 : {
398 : /* transfer attention to utility statement */
399 49080 : stmt = pstmt->utilityStmt;
400 : }
401 : else
402 : {
403 263178 : if (pstmt->commandType == CMD_SELECT)
404 260364 : return pstmt->planTree->targetlist;
405 2814 : if (pstmt->hasReturning)
406 2814 : return pstmt->planTree->targetlist;
407 0 : return NIL;
408 : }
409 : }
410 49092 : if (IsA(stmt, FetchStmt))
411 : {
412 5614 : FetchStmt *fstmt = (FetchStmt *) stmt;
413 : Portal subportal;
414 :
415 : Assert(!fstmt->ismove);
416 5614 : subportal = GetPortalByName(fstmt->portalname);
417 : Assert(PortalIsValid(subportal));
418 5614 : return FetchPortalTargetList(subportal);
419 : }
420 43478 : if (IsA(stmt, ExecuteStmt))
421 : {
422 27368 : ExecuteStmt *estmt = (ExecuteStmt *) stmt;
423 : PreparedStatement *entry;
424 :
425 27368 : entry = FetchPreparedStatement(estmt->name, true);
426 27368 : return FetchPreparedStatementTargetList(entry);
427 : }
428 16110 : return NIL;
429 : }
430 :
431 : /*
432 : * PortalStart
433 : * Prepare a portal for execution.
434 : *
435 : * Caller must already have created the portal, done PortalDefineQuery(),
436 : * and adjusted portal options if needed.
437 : *
438 : * If parameters are needed by the query, they must be passed in "params"
439 : * (caller is responsible for giving them appropriate lifetime).
440 : *
441 : * The caller can also provide an initial set of "eflags" to be passed to
442 : * ExecutorStart (but note these can be modified internally, and they are
443 : * currently only honored for PORTAL_ONE_SELECT portals). Most callers
444 : * should simply pass zero.
445 : *
446 : * The caller can optionally pass a snapshot to be used; pass InvalidSnapshot
447 : * for the normal behavior of setting a new snapshot. This parameter is
448 : * presently ignored for non-PORTAL_ONE_SELECT portals (it's only intended
449 : * to be used for cursors).
450 : *
451 : * On return, portal is ready to accept PortalRun() calls, and the result
452 : * tupdesc (if any) is known.
453 : */
454 : void
455 738898 : PortalStart(Portal portal, ParamListInfo params,
456 : int eflags, Snapshot snapshot)
457 : {
458 : Portal saveActivePortal;
459 : ResourceOwner saveResourceOwner;
460 : MemoryContext savePortalContext;
461 : MemoryContext oldContext;
462 : QueryDesc *queryDesc;
463 : int myeflags;
464 :
465 : Assert(PortalIsValid(portal));
466 : Assert(portal->status == PORTAL_DEFINED);
467 :
468 : /*
469 : * Set up global portal context pointers.
470 : */
471 738898 : saveActivePortal = ActivePortal;
472 738898 : saveResourceOwner = CurrentResourceOwner;
473 738898 : savePortalContext = PortalContext;
474 738898 : PG_TRY();
475 : {
476 738898 : ActivePortal = portal;
477 738898 : if (portal->resowner)
478 738898 : CurrentResourceOwner = portal->resowner;
479 738898 : PortalContext = portal->portalContext;
480 :
481 738898 : oldContext = MemoryContextSwitchTo(PortalContext);
482 :
483 : /* Must remember portal param list, if any */
484 738898 : portal->portalParams = params;
485 :
486 : /*
487 : * Determine the portal execution strategy
488 : */
489 738898 : portal->strategy = ChoosePortalStrategy(portal->stmts);
490 :
491 : /*
492 : * Fire her up according to the strategy
493 : */
494 738898 : switch (portal->strategy)
495 : {
496 291450 : case PORTAL_ONE_SELECT:
497 :
498 : /* Must set snapshot before starting executor. */
499 291450 : if (snapshot)
500 35988 : PushActiveSnapshot(snapshot);
501 : else
502 255462 : PushActiveSnapshot(GetTransactionSnapshot());
503 :
504 : /*
505 : * We could remember the snapshot in portal->portalSnapshot,
506 : * but presently there seems no need to, as this code path
507 : * cannot be used for non-atomic execution. Hence there can't
508 : * be any commit/abort that might destroy the snapshot. Since
509 : * we don't do that, there's also no need to force a
510 : * non-default nesting level for the snapshot.
511 : */
512 :
513 : /*
514 : * Create QueryDesc in portal's context; for the moment, set
515 : * the destination to DestNone.
516 : */
517 291450 : queryDesc = CreateQueryDesc(linitial_node(PlannedStmt, portal->stmts),
518 : portal->cplan,
519 : portal->sourceText,
520 : GetActiveSnapshot(),
521 : InvalidSnapshot,
522 : None_Receiver,
523 : params,
524 : portal->queryEnv,
525 : 0);
526 :
527 : /*
528 : * If it's a scrollable cursor, executor needs to support
529 : * REWIND and backwards scan, as well as whatever the caller
530 : * might've asked for.
531 : */
532 291450 : if (portal->cursorOptions & CURSOR_OPT_SCROLL)
533 3924 : myeflags = eflags | EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD;
534 : else
535 287526 : myeflags = eflags;
536 :
537 : /*
538 : * Prepare the plan for execution.
539 : */
540 291450 : if (portal->cplan)
541 : {
542 37748 : ExecutorStartCachedPlan(queryDesc, myeflags,
543 : portal->plansource, 0);
544 : Assert(queryDesc->planstate);
545 : }
546 : else
547 : {
548 253702 : if (!ExecutorStart(queryDesc, myeflags))
549 0 : elog(ERROR, "ExecutorStart() failed unexpectedly");
550 : }
551 :
552 : /*
553 : * This tells PortalCleanup to shut down the executor
554 : */
555 290792 : portal->queryDesc = queryDesc;
556 :
557 : /*
558 : * Remember tuple descriptor (computed by ExecutorStart)
559 : */
560 290792 : portal->tupDesc = queryDesc->tupDesc;
561 :
562 : /*
563 : * Reset cursor position data to "start of query"
564 : */
565 290792 : portal->atStart = true;
566 290792 : portal->atEnd = false; /* allow fetches */
567 290792 : portal->portalPos = 0;
568 :
569 290792 : PopActiveSnapshot();
570 290792 : break;
571 :
572 3074 : case PORTAL_ONE_RETURNING:
573 : case PORTAL_ONE_MOD_WITH:
574 :
575 : /*
576 : * We don't start the executor until we are told to run the
577 : * portal. We do need to set up the result tupdesc.
578 : */
579 : {
580 : PlannedStmt *pstmt;
581 :
582 3074 : pstmt = PortalGetPrimaryStmt(portal);
583 3074 : portal->tupDesc =
584 3074 : ExecCleanTypeFromTL(pstmt->planTree->targetlist);
585 : }
586 :
587 : /*
588 : * Reset cursor position data to "start of query"
589 : */
590 3074 : portal->atStart = true;
591 3074 : portal->atEnd = false; /* allow fetches */
592 3074 : portal->portalPos = 0;
593 3074 : break;
594 :
595 57430 : case PORTAL_UTIL_SELECT:
596 :
597 : /*
598 : * We don't set snapshot here, because PortalRunUtility will
599 : * take care of it if needed.
600 : */
601 : {
602 57430 : PlannedStmt *pstmt = PortalGetPrimaryStmt(portal);
603 :
604 : Assert(pstmt->commandType == CMD_UTILITY);
605 57430 : portal->tupDesc = UtilityTupleDescriptor(pstmt->utilityStmt);
606 : }
607 :
608 : /*
609 : * Reset cursor position data to "start of query"
610 : */
611 57402 : portal->atStart = true;
612 57402 : portal->atEnd = false; /* allow fetches */
613 57402 : portal->portalPos = 0;
614 57402 : break;
615 :
616 386944 : case PORTAL_MULTI_QUERY:
617 : /* Need do nothing now */
618 386944 : portal->tupDesc = NULL;
619 386944 : break;
620 : }
621 738212 : }
622 686 : PG_CATCH();
623 : {
624 : /* Uncaught error while executing portal: mark it dead */
625 686 : MarkPortalFailed(portal);
626 :
627 : /* Restore global vars and propagate error */
628 686 : ActivePortal = saveActivePortal;
629 686 : CurrentResourceOwner = saveResourceOwner;
630 686 : PortalContext = savePortalContext;
631 :
632 686 : PG_RE_THROW();
633 : }
634 738212 : PG_END_TRY();
635 :
636 738212 : MemoryContextSwitchTo(oldContext);
637 :
638 738212 : ActivePortal = saveActivePortal;
639 738212 : CurrentResourceOwner = saveResourceOwner;
640 738212 : PortalContext = savePortalContext;
641 :
642 738212 : portal->status = PORTAL_READY;
643 738212 : }
644 :
645 : /*
646 : * PortalSetResultFormat
647 : * Select the format codes for a portal's output.
648 : *
649 : * This must be run after PortalStart for a portal that will be read by
650 : * a DestRemote or DestRemoteExecute destination. It is not presently needed
651 : * for other destination types.
652 : *
653 : * formats[] is the client format request, as per Bind message conventions.
654 : */
655 : void
656 694008 : PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
657 : {
658 : int natts;
659 : int i;
660 :
661 : /* Do nothing if portal won't return tuples */
662 694008 : if (portal->tupDesc == NULL)
663 386824 : return;
664 307184 : natts = portal->tupDesc->natts;
665 307184 : portal->formats = (int16 *)
666 307184 : MemoryContextAlloc(portal->portalContext,
667 : natts * sizeof(int16));
668 307184 : if (nFormats > 1)
669 : {
670 : /* format specified for each column */
671 0 : if (nFormats != natts)
672 0 : ereport(ERROR,
673 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
674 : errmsg("bind message has %d result formats but query has %d columns",
675 : nFormats, natts)));
676 0 : memcpy(portal->formats, formats, natts * sizeof(int16));
677 : }
678 307184 : else if (nFormats > 0)
679 : {
680 : /* single format specified, use for all columns */
681 307184 : int16 format1 = formats[0];
682 :
683 1429308 : for (i = 0; i < natts; i++)
684 1122124 : portal->formats[i] = format1;
685 : }
686 : else
687 : {
688 : /* use default format for all columns */
689 0 : for (i = 0; i < natts; i++)
690 0 : portal->formats[i] = 0;
691 : }
692 : }
693 :
694 : /*
695 : * PortalRun
696 : * Run a portal's query or queries.
697 : *
698 : * count <= 0 is interpreted as a no-op: the destination gets started up
699 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
700 : * interpreted as "all rows". Note that count is ignored in multi-query
701 : * situations, where we always run the portal to completion.
702 : *
703 : * isTopLevel: true if query is being executed at backend "top level"
704 : * (that is, directly from a client command message)
705 : *
706 : * dest: where to send output of primary (canSetTag) query
707 : *
708 : * altdest: where to send output of non-primary queries
709 : *
710 : * qc: where to store command completion status data.
711 : * May be NULL if caller doesn't want status data.
712 : *
713 : * Returns true if the portal's execution is complete, false if it was
714 : * suspended due to exhaustion of the count parameter.
715 : */
716 : bool
717 721544 : PortalRun(Portal portal, long count, bool isTopLevel,
718 : DestReceiver *dest, DestReceiver *altdest,
719 : QueryCompletion *qc)
720 : {
721 : bool result;
722 : uint64 nprocessed;
723 : ResourceOwner saveTopTransactionResourceOwner;
724 : MemoryContext saveTopTransactionContext;
725 : Portal saveActivePortal;
726 : ResourceOwner saveResourceOwner;
727 : MemoryContext savePortalContext;
728 : MemoryContext saveMemoryContext;
729 :
730 : Assert(PortalIsValid(portal));
731 :
732 : TRACE_POSTGRESQL_QUERY_EXECUTE_START();
733 :
734 : /* Initialize empty completion data */
735 721544 : if (qc)
736 721544 : InitializeQueryCompletion(qc);
737 :
738 721544 : if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
739 : {
740 0 : elog(DEBUG3, "PortalRun");
741 : /* PORTAL_MULTI_QUERY logs its own stats per query */
742 0 : ResetUsage();
743 : }
744 :
745 : /*
746 : * Check for improper portal use, and mark portal active.
747 : */
748 721544 : MarkPortalActive(portal);
749 :
750 : /*
751 : * Set up global portal context pointers.
752 : *
753 : * We have to play a special game here to support utility commands like
754 : * VACUUM and CLUSTER, which internally start and commit transactions.
755 : * When we are called to execute such a command, CurrentResourceOwner will
756 : * be pointing to the TopTransactionResourceOwner --- which will be
757 : * destroyed and replaced in the course of the internal commit and
758 : * restart. So we need to be prepared to restore it as pointing to the
759 : * exit-time TopTransactionResourceOwner. (Ain't that ugly? This idea of
760 : * internally starting whole new transactions is not good.)
761 : * CurrentMemoryContext has a similar problem, but the other pointers we
762 : * save here will be NULL or pointing to longer-lived objects.
763 : */
764 721544 : saveTopTransactionResourceOwner = TopTransactionResourceOwner;
765 721544 : saveTopTransactionContext = TopTransactionContext;
766 721544 : saveActivePortal = ActivePortal;
767 721544 : saveResourceOwner = CurrentResourceOwner;
768 721544 : savePortalContext = PortalContext;
769 721544 : saveMemoryContext = CurrentMemoryContext;
770 721544 : PG_TRY();
771 : {
772 721544 : ActivePortal = portal;
773 721544 : if (portal->resowner)
774 721544 : CurrentResourceOwner = portal->resowner;
775 721544 : PortalContext = portal->portalContext;
776 :
777 721544 : MemoryContextSwitchTo(PortalContext);
778 :
779 721544 : switch (portal->strategy)
780 : {
781 334600 : case PORTAL_ONE_SELECT:
782 : case PORTAL_ONE_RETURNING:
783 : case PORTAL_ONE_MOD_WITH:
784 : case PORTAL_UTIL_SELECT:
785 :
786 : /*
787 : * If we have not yet run the command, do so, storing its
788 : * results in the portal's tuplestore. But we don't do that
789 : * for the PORTAL_ONE_SELECT case.
790 : */
791 334600 : if (portal->strategy != PORTAL_ONE_SELECT && !portal->holdStore)
792 52398 : FillPortalStore(portal, isTopLevel);
793 :
794 : /*
795 : * Now fetch desired portion of results.
796 : */
797 334220 : nprocessed = PortalRunSelect(portal, true, count, dest);
798 :
799 : /*
800 : * If the portal result contains a command tag and the caller
801 : * gave us a pointer to store it, copy it and update the
802 : * rowcount.
803 : */
804 327216 : if (qc && portal->qc.commandTag != CMDTAG_UNKNOWN)
805 : {
806 327216 : CopyQueryCompletion(qc, &portal->qc);
807 327216 : qc->nprocessed = nprocessed;
808 : }
809 :
810 : /* Mark portal not active */
811 327216 : portal->status = PORTAL_READY;
812 :
813 : /*
814 : * Since it's a forward fetch, say DONE iff atEnd is now true.
815 : */
816 327216 : result = portal->atEnd;
817 327216 : break;
818 :
819 386944 : case PORTAL_MULTI_QUERY:
820 386944 : PortalRunMulti(portal, isTopLevel, false,
821 : dest, altdest, qc);
822 :
823 : /* Prevent portal's commands from being re-executed */
824 366518 : MarkPortalDone(portal);
825 :
826 : /* Always complete at end of RunMulti */
827 366518 : result = true;
828 366518 : break;
829 :
830 0 : default:
831 0 : elog(ERROR, "unrecognized portal strategy: %d",
832 : (int) portal->strategy);
833 : result = false; /* keep compiler quiet */
834 : break;
835 : }
836 : }
837 27802 : PG_CATCH();
838 : {
839 : /* Uncaught error while executing portal: mark it dead */
840 27802 : MarkPortalFailed(portal);
841 :
842 : /* Restore global vars and propagate error */
843 27802 : if (saveMemoryContext == saveTopTransactionContext)
844 27454 : MemoryContextSwitchTo(TopTransactionContext);
845 : else
846 348 : MemoryContextSwitchTo(saveMemoryContext);
847 27802 : ActivePortal = saveActivePortal;
848 27802 : if (saveResourceOwner == saveTopTransactionResourceOwner)
849 27534 : CurrentResourceOwner = TopTransactionResourceOwner;
850 : else
851 268 : CurrentResourceOwner = saveResourceOwner;
852 27802 : PortalContext = savePortalContext;
853 :
854 27802 : PG_RE_THROW();
855 : }
856 693734 : PG_END_TRY();
857 :
858 693734 : if (saveMemoryContext == saveTopTransactionContext)
859 638712 : MemoryContextSwitchTo(TopTransactionContext);
860 : else
861 55022 : MemoryContextSwitchTo(saveMemoryContext);
862 693734 : ActivePortal = saveActivePortal;
863 693734 : if (saveResourceOwner == saveTopTransactionResourceOwner)
864 660106 : CurrentResourceOwner = TopTransactionResourceOwner;
865 : else
866 33628 : CurrentResourceOwner = saveResourceOwner;
867 693734 : PortalContext = savePortalContext;
868 :
869 693734 : if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
870 0 : ShowUsage("EXECUTOR STATISTICS");
871 :
872 : TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
873 :
874 693734 : return result;
875 : }
876 :
877 : /*
878 : * PortalRunSelect
879 : * Execute a portal's query in PORTAL_ONE_SELECT mode, and also
880 : * when fetching from a completed holdStore in PORTAL_ONE_RETURNING,
881 : * PORTAL_ONE_MOD_WITH, and PORTAL_UTIL_SELECT cases.
882 : *
883 : * This handles simple N-rows-forward-or-backward cases. For more complex
884 : * nonsequential access to a portal, see PortalRunFetch.
885 : *
886 : * count <= 0 is interpreted as a no-op: the destination gets started up
887 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
888 : * interpreted as "all rows". (cf FetchStmt.howMany)
889 : *
890 : * Caller must already have validated the Portal and done appropriate
891 : * setup (cf. PortalRun).
892 : *
893 : * Returns number of rows processed (suitable for use in result tag)
894 : */
895 : static uint64
896 386092 : PortalRunSelect(Portal portal,
897 : bool forward,
898 : long count,
899 : DestReceiver *dest)
900 : {
901 : QueryDesc *queryDesc;
902 : ScanDirection direction;
903 : uint64 nprocessed;
904 :
905 : /*
906 : * NB: queryDesc will be NULL if we are fetching from a held cursor or a
907 : * completed utility query; can't use it in that path.
908 : */
909 386092 : queryDesc = portal->queryDesc;
910 :
911 : /* Caller messed up if we have neither a ready query nor held data. */
912 : Assert(queryDesc || portal->holdStore);
913 :
914 : /*
915 : * Force the queryDesc destination to the right thing. This supports
916 : * MOVE, for example, which will pass in dest = DestNone. This is okay to
917 : * change as long as we do it on every fetch. (The Executor must not
918 : * assume that dest never changes.)
919 : */
920 386092 : if (queryDesc)
921 302018 : queryDesc->dest = dest;
922 :
923 : /*
924 : * Determine which direction to go in, and check to see if we're already
925 : * at the end of the available tuples in that direction. If so, set the
926 : * direction to NoMovement to avoid trying to fetch any tuples. (This
927 : * check exists because not all plan node types are robust about being
928 : * called again if they've already returned NULL once.) Then call the
929 : * executor (we must not skip this, because the destination needs to see a
930 : * setup and shutdown even if no tuples are available). Finally, update
931 : * the portal position state depending on the number of tuples that were
932 : * retrieved.
933 : */
934 386092 : if (forward)
935 : {
936 385488 : if (portal->atEnd || count <= 0)
937 : {
938 3692 : direction = NoMovementScanDirection;
939 3692 : count = 0; /* don't pass negative count to executor */
940 : }
941 : else
942 381796 : direction = ForwardScanDirection;
943 :
944 : /* In the executor, zero count processes all rows */
945 385488 : if (count == FETCH_ALL)
946 334542 : count = 0;
947 :
948 385488 : if (portal->holdStore)
949 84056 : nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
950 : else
951 : {
952 301432 : PushActiveSnapshot(queryDesc->snapshot);
953 301432 : ExecutorRun(queryDesc, direction, (uint64) count);
954 294400 : nprocessed = queryDesc->estate->es_processed;
955 294400 : PopActiveSnapshot();
956 : }
957 :
958 378456 : if (!ScanDirectionIsNoMovement(direction))
959 : {
960 374764 : if (nprocessed > 0)
961 302924 : portal->atStart = false; /* OK to go backward now */
962 374764 : if (count == 0 || nprocessed < (uint64) count)
963 341588 : portal->atEnd = true; /* we retrieved 'em all */
964 374764 : portal->portalPos += nprocessed;
965 : }
966 : }
967 : else
968 : {
969 604 : if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
970 24 : ereport(ERROR,
971 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
972 : errmsg("cursor can only scan forward"),
973 : errhint("Declare it with SCROLL option to enable backward scan.")));
974 :
975 580 : if (portal->atStart || count <= 0)
976 : {
977 72 : direction = NoMovementScanDirection;
978 72 : count = 0; /* don't pass negative count to executor */
979 : }
980 : else
981 508 : direction = BackwardScanDirection;
982 :
983 : /* In the executor, zero count processes all rows */
984 580 : if (count == FETCH_ALL)
985 60 : count = 0;
986 :
987 580 : if (portal->holdStore)
988 12 : nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
989 : else
990 : {
991 568 : PushActiveSnapshot(queryDesc->snapshot);
992 568 : ExecutorRun(queryDesc, direction, (uint64) count);
993 568 : nprocessed = queryDesc->estate->es_processed;
994 568 : PopActiveSnapshot();
995 : }
996 :
997 580 : if (!ScanDirectionIsNoMovement(direction))
998 : {
999 508 : if (nprocessed > 0 && portal->atEnd)
1000 : {
1001 154 : portal->atEnd = false; /* OK to go forward now */
1002 154 : portal->portalPos++; /* adjust for endpoint case */
1003 : }
1004 508 : if (count == 0 || nprocessed < (uint64) count)
1005 : {
1006 186 : portal->atStart = true; /* we retrieved 'em all */
1007 186 : portal->portalPos = 0;
1008 : }
1009 : else
1010 : {
1011 322 : portal->portalPos -= nprocessed;
1012 : }
1013 : }
1014 : }
1015 :
1016 379036 : return nprocessed;
1017 : }
1018 :
1019 : /*
1020 : * FillPortalStore
1021 : * Run the query and load result tuples into the portal's tuple store.
1022 : *
1023 : * This is used for PORTAL_ONE_RETURNING, PORTAL_ONE_MOD_WITH, and
1024 : * PORTAL_UTIL_SELECT cases only.
1025 : */
1026 : static void
1027 60476 : FillPortalStore(Portal portal, bool isTopLevel)
1028 : {
1029 : DestReceiver *treceiver;
1030 : QueryCompletion qc;
1031 :
1032 60476 : InitializeQueryCompletion(&qc);
1033 60476 : PortalCreateHoldStore(portal);
1034 60476 : treceiver = CreateDestReceiver(DestTuplestore);
1035 60476 : SetTuplestoreDestReceiverParams(treceiver,
1036 : portal->holdStore,
1037 : portal->holdContext,
1038 : false,
1039 : NULL,
1040 : NULL);
1041 :
1042 60476 : switch (portal->strategy)
1043 : {
1044 3074 : case PORTAL_ONE_RETURNING:
1045 : case PORTAL_ONE_MOD_WITH:
1046 :
1047 : /*
1048 : * Run the portal to completion just as for the default
1049 : * PORTAL_MULTI_QUERY case, but send the primary query's output to
1050 : * the tuplestore. Auxiliary query outputs are discarded. Set the
1051 : * portal's holdSnapshot to the snapshot used (or a copy of it).
1052 : */
1053 3074 : PortalRunMulti(portal, isTopLevel, true,
1054 : treceiver, None_Receiver, &qc);
1055 2940 : break;
1056 :
1057 57402 : case PORTAL_UTIL_SELECT:
1058 57402 : PortalRunUtility(portal, linitial_node(PlannedStmt, portal->stmts),
1059 : isTopLevel, true, treceiver, &qc);
1060 57150 : break;
1061 :
1062 0 : default:
1063 0 : elog(ERROR, "unsupported portal strategy: %d",
1064 : (int) portal->strategy);
1065 : break;
1066 : }
1067 :
1068 : /* Override portal completion data with actual command results */
1069 60090 : if (qc.commandTag != CMDTAG_UNKNOWN)
1070 35916 : CopyQueryCompletion(&portal->qc, &qc);
1071 :
1072 60090 : treceiver->rDestroy(treceiver);
1073 60090 : }
1074 :
1075 : /*
1076 : * RunFromStore
1077 : * Fetch tuples from the portal's tuple store.
1078 : *
1079 : * Calling conventions are similar to ExecutorRun, except that we
1080 : * do not depend on having a queryDesc or estate. Therefore we return the
1081 : * number of tuples processed as the result, not in estate->es_processed.
1082 : *
1083 : * One difference from ExecutorRun is that the destination receiver functions
1084 : * are run in the caller's memory context (since we have no estate). Watch
1085 : * out for memory leaks.
1086 : */
1087 : static uint64
1088 84068 : RunFromStore(Portal portal, ScanDirection direction, uint64 count,
1089 : DestReceiver *dest)
1090 : {
1091 84068 : uint64 current_tuple_count = 0;
1092 : TupleTableSlot *slot;
1093 :
1094 84068 : slot = MakeSingleTupleTableSlot(portal->tupDesc, &TTSOpsMinimalTuple);
1095 :
1096 84068 : dest->rStartup(dest, CMD_SELECT, portal->tupDesc);
1097 :
1098 84068 : if (ScanDirectionIsNoMovement(direction))
1099 : {
1100 : /* do nothing except start/stop the destination */
1101 : }
1102 : else
1103 : {
1104 81500 : bool forward = ScanDirectionIsForward(direction);
1105 :
1106 : for (;;)
1107 419148 : {
1108 : MemoryContext oldcontext;
1109 : bool ok;
1110 :
1111 500648 : oldcontext = MemoryContextSwitchTo(portal->holdContext);
1112 :
1113 500648 : ok = tuplestore_gettupleslot(portal->holdStore, forward, false,
1114 : slot);
1115 :
1116 500648 : MemoryContextSwitchTo(oldcontext);
1117 :
1118 500648 : if (!ok)
1119 60144 : break;
1120 :
1121 : /*
1122 : * If we are not able to send the tuple, we assume the destination
1123 : * has closed and no more tuples can be sent. If that's the case,
1124 : * end the loop.
1125 : */
1126 440504 : if (!dest->receiveSlot(slot, dest))
1127 0 : break;
1128 :
1129 440504 : ExecClearTuple(slot);
1130 :
1131 : /*
1132 : * check our tuple count.. if we've processed the proper number
1133 : * then quit, else loop again and process more tuples. Zero count
1134 : * means no limit.
1135 : */
1136 440504 : current_tuple_count++;
1137 440504 : if (count && count == current_tuple_count)
1138 21356 : break;
1139 : }
1140 : }
1141 :
1142 84068 : dest->rShutdown(dest);
1143 :
1144 84068 : ExecDropSingleTupleTableSlot(slot);
1145 :
1146 84068 : return current_tuple_count;
1147 : }
1148 :
1149 : /*
1150 : * PortalRunUtility
1151 : * Execute a utility statement inside a portal.
1152 : */
1153 : static void
1154 357464 : PortalRunUtility(Portal portal, PlannedStmt *pstmt,
1155 : bool isTopLevel, bool setHoldSnapshot,
1156 : DestReceiver *dest, QueryCompletion *qc)
1157 : {
1158 : /*
1159 : * Set snapshot if utility stmt needs one.
1160 : */
1161 357464 : if (PlannedStmtRequiresSnapshot(pstmt))
1162 : {
1163 287770 : Snapshot snapshot = GetTransactionSnapshot();
1164 :
1165 : /* If told to, register the snapshot we're using and save in portal */
1166 287770 : if (setHoldSnapshot)
1167 : {
1168 50910 : snapshot = RegisterSnapshot(snapshot);
1169 50910 : portal->holdSnapshot = snapshot;
1170 : }
1171 :
1172 : /*
1173 : * In any case, make the snapshot active and remember it in portal.
1174 : * Because the portal now references the snapshot, we must tell
1175 : * snapmgr.c that the snapshot belongs to the portal's transaction
1176 : * level, else we risk portalSnapshot becoming a dangling pointer.
1177 : */
1178 287770 : PushActiveSnapshotWithLevel(snapshot, portal->createLevel);
1179 : /* PushActiveSnapshotWithLevel might have copied the snapshot */
1180 287770 : portal->portalSnapshot = GetActiveSnapshot();
1181 : }
1182 : else
1183 69694 : portal->portalSnapshot = NULL;
1184 :
1185 357464 : ProcessUtility(pstmt,
1186 : portal->sourceText,
1187 357464 : (portal->cplan != NULL), /* protect tree if in plancache */
1188 357464 : isTopLevel ? PROCESS_UTILITY_TOPLEVEL : PROCESS_UTILITY_QUERY,
1189 : portal->portalParams,
1190 : portal->queryEnv,
1191 : dest,
1192 : qc);
1193 :
1194 : /* Some utility statements may change context on us */
1195 342160 : MemoryContextSwitchTo(portal->portalContext);
1196 :
1197 : /*
1198 : * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from
1199 : * under us, so don't complain if it's now empty. Otherwise, our snapshot
1200 : * should be the top one; pop it. Note that this could be a different
1201 : * snapshot from the one we made above; see EnsurePortalSnapshotExists.
1202 : */
1203 342160 : if (portal->portalSnapshot != NULL && ActiveSnapshotSet())
1204 : {
1205 : Assert(portal->portalSnapshot == GetActiveSnapshot());
1206 262326 : PopActiveSnapshot();
1207 : }
1208 342160 : portal->portalSnapshot = NULL;
1209 342160 : }
1210 :
1211 : /*
1212 : * PortalRunMulti
1213 : * Execute a portal's queries in the general case (multi queries
1214 : * or non-SELECT-like queries)
1215 : */
1216 : static void
1217 390018 : PortalRunMulti(Portal portal,
1218 : bool isTopLevel, bool setHoldSnapshot,
1219 : DestReceiver *dest, DestReceiver *altdest,
1220 : QueryCompletion *qc)
1221 : {
1222 390018 : bool active_snapshot_set = false;
1223 : ListCell *stmtlist_item;
1224 390018 : int query_index = 0;
1225 :
1226 : /*
1227 : * If the destination is DestRemoteExecute, change to DestNone. The
1228 : * reason is that the client won't be expecting any tuples, and indeed has
1229 : * no way to know what they are, since there is no provision for Describe
1230 : * to send a RowDescription message when this portal execution strategy is
1231 : * in effect. This presently will only affect SELECT commands added to
1232 : * non-SELECT queries by rewrite rules: such commands will be executed,
1233 : * but the results will be discarded unless you use "simple Query"
1234 : * protocol.
1235 : */
1236 390018 : if (dest->mydest == DestRemoteExecute)
1237 12670 : dest = None_Receiver;
1238 390018 : if (altdest->mydest == DestRemoteExecute)
1239 12670 : altdest = None_Receiver;
1240 :
1241 : /*
1242 : * Loop to handle the individual queries generated from a single parsetree
1243 : * by analysis and rewrite.
1244 : */
1245 760064 : foreach(stmtlist_item, portal->stmts)
1246 : {
1247 390606 : PlannedStmt *pstmt = lfirst_node(PlannedStmt, stmtlist_item);
1248 :
1249 : /*
1250 : * If we got a cancel signal in prior command, quit
1251 : */
1252 390606 : CHECK_FOR_INTERRUPTS();
1253 :
1254 390606 : if (pstmt->utilityStmt == NULL)
1255 : {
1256 : /*
1257 : * process a plannable query.
1258 : */
1259 : TRACE_POSTGRESQL_QUERY_EXECUTE_START();
1260 :
1261 90544 : if (log_executor_stats)
1262 0 : ResetUsage();
1263 :
1264 : /*
1265 : * Must always have a snapshot for plannable queries. First time
1266 : * through, take a new snapshot; for subsequent queries in the
1267 : * same portal, just update the snapshot's copy of the command
1268 : * counter.
1269 : */
1270 90544 : if (!active_snapshot_set)
1271 : {
1272 89956 : Snapshot snapshot = GetTransactionSnapshot();
1273 :
1274 : /* If told to, register the snapshot and save in portal */
1275 89956 : if (setHoldSnapshot)
1276 : {
1277 3074 : snapshot = RegisterSnapshot(snapshot);
1278 3074 : portal->holdSnapshot = snapshot;
1279 : }
1280 :
1281 : /*
1282 : * We can't have the holdSnapshot also be the active one,
1283 : * because UpdateActiveSnapshotCommandId would complain. So
1284 : * force an extra snapshot copy. Plain PushActiveSnapshot
1285 : * would have copied the transaction snapshot anyway, so this
1286 : * only adds a copy step when setHoldSnapshot is true. (It's
1287 : * okay for the command ID of the active snapshot to diverge
1288 : * from what holdSnapshot has.)
1289 : */
1290 89956 : PushCopiedSnapshot(snapshot);
1291 :
1292 : /*
1293 : * As for PORTAL_ONE_SELECT portals, it does not seem
1294 : * necessary to maintain portal->portalSnapshot here.
1295 : */
1296 :
1297 89956 : active_snapshot_set = true;
1298 : }
1299 : else
1300 588 : UpdateActiveSnapshotCommandId();
1301 :
1302 90544 : if (pstmt->canSetTag)
1303 : {
1304 : /* statement can set tag string */
1305 89914 : ProcessQuery(pstmt,
1306 : portal->cplan,
1307 : portal->plansource,
1308 : query_index,
1309 : portal->sourceText,
1310 : portal->portalParams,
1311 : portal->queryEnv,
1312 : dest, qc);
1313 : }
1314 : else
1315 : {
1316 : /* stmt added by rewrite cannot set tag */
1317 630 : ProcessQuery(pstmt,
1318 : portal->cplan,
1319 : portal->plansource,
1320 : query_index,
1321 : portal->sourceText,
1322 : portal->portalParams,
1323 : portal->queryEnv,
1324 : altdest, NULL);
1325 : }
1326 :
1327 85036 : if (log_executor_stats)
1328 0 : ShowUsage("EXECUTOR STATISTICS");
1329 :
1330 : TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
1331 : }
1332 : else
1333 : {
1334 : /*
1335 : * process utility functions (create, destroy, etc..)
1336 : *
1337 : * We must not set a snapshot here for utility commands (if one is
1338 : * needed, PortalRunUtility will do it). If a utility command is
1339 : * alone in a portal then everything's fine. The only case where
1340 : * a utility command can be part of a longer list is that rules
1341 : * are allowed to include NotifyStmt. NotifyStmt doesn't care
1342 : * whether it has a snapshot or not, so we just leave the current
1343 : * snapshot alone if we have one.
1344 : */
1345 300062 : if (pstmt->canSetTag)
1346 : {
1347 : Assert(!active_snapshot_set);
1348 : /* statement can set tag string */
1349 300062 : PortalRunUtility(portal, pstmt, isTopLevel, false,
1350 : dest, qc);
1351 : }
1352 : else
1353 : {
1354 : Assert(IsA(pstmt->utilityStmt, NotifyStmt));
1355 : /* stmt added by rewrite cannot set tag */
1356 0 : PortalRunUtility(portal, pstmt, isTopLevel, false,
1357 : altdest, NULL);
1358 : }
1359 : }
1360 :
1361 : /*
1362 : * Clear subsidiary contexts to recover temporary memory.
1363 : */
1364 : Assert(portal->portalContext == CurrentMemoryContext);
1365 :
1366 370046 : MemoryContextDeleteChildren(portal->portalContext);
1367 :
1368 : /*
1369 : * Avoid crashing if portal->stmts has been reset. This can only
1370 : * occur if a CALL or DO utility statement executed an internal
1371 : * COMMIT/ROLLBACK (cf PortalReleaseCachedPlan). The CALL or DO must
1372 : * have been the only statement in the portal, so there's nothing left
1373 : * for us to do; but we don't want to dereference a now-dangling list
1374 : * pointer.
1375 : */
1376 370046 : if (portal->stmts == NIL)
1377 0 : break;
1378 :
1379 : /*
1380 : * Increment command counter between queries, but not after the last
1381 : * one.
1382 : */
1383 370046 : if (lnext(portal->stmts, stmtlist_item) != NULL)
1384 588 : CommandCounterIncrement();
1385 :
1386 370046 : query_index++;
1387 : }
1388 :
1389 : /* Pop the snapshot if we pushed one. */
1390 369458 : if (active_snapshot_set)
1391 84448 : PopActiveSnapshot();
1392 :
1393 : /*
1394 : * If a query completion data was supplied, use it. Otherwise use the
1395 : * portal's query completion data.
1396 : *
1397 : * Exception: Clients expect INSERT/UPDATE/DELETE tags to have counts, so
1398 : * fake them with zeros. This can happen with DO INSTEAD rules if there
1399 : * is no replacement query of the same type as the original. We print "0
1400 : * 0" here because technically there is no query of the matching tag type,
1401 : * and printing a non-zero count for a different query type seems wrong,
1402 : * e.g. an INSERT that does an UPDATE instead should not print "0 1" if
1403 : * one row was updated. See QueryRewrite(), step 3, for details.
1404 : */
1405 369458 : if (qc && qc->commandTag == CMDTAG_UNKNOWN)
1406 : {
1407 272746 : if (portal->qc.commandTag != CMDTAG_UNKNOWN)
1408 272746 : CopyQueryCompletion(qc, &portal->qc);
1409 : /* If the caller supplied a qc, we should have set it by now. */
1410 : Assert(qc->commandTag != CMDTAG_UNKNOWN);
1411 : }
1412 369458 : }
1413 :
1414 : /*
1415 : * PortalRunFetch
1416 : * Variant form of PortalRun that supports SQL FETCH directions.
1417 : *
1418 : * Note: we presently assume that no callers of this want isTopLevel = true.
1419 : *
1420 : * count <= 0 is interpreted as a no-op: the destination gets started up
1421 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
1422 : * interpreted as "all rows". (cf FetchStmt.howMany)
1423 : *
1424 : * Returns number of rows processed (suitable for use in result tag)
1425 : */
1426 : uint64
1427 51800 : PortalRunFetch(Portal portal,
1428 : FetchDirection fdirection,
1429 : long count,
1430 : DestReceiver *dest)
1431 : {
1432 : uint64 result;
1433 : Portal saveActivePortal;
1434 : ResourceOwner saveResourceOwner;
1435 : MemoryContext savePortalContext;
1436 : MemoryContext oldContext;
1437 :
1438 : Assert(PortalIsValid(portal));
1439 :
1440 : /*
1441 : * Check for improper portal use, and mark portal active.
1442 : */
1443 51800 : MarkPortalActive(portal);
1444 :
1445 : /*
1446 : * Set up global portal context pointers.
1447 : */
1448 51782 : saveActivePortal = ActivePortal;
1449 51782 : saveResourceOwner = CurrentResourceOwner;
1450 51782 : savePortalContext = PortalContext;
1451 51782 : PG_TRY();
1452 : {
1453 51782 : ActivePortal = portal;
1454 51782 : if (portal->resowner)
1455 51590 : CurrentResourceOwner = portal->resowner;
1456 51782 : PortalContext = portal->portalContext;
1457 :
1458 51782 : oldContext = MemoryContextSwitchTo(PortalContext);
1459 :
1460 51782 : switch (portal->strategy)
1461 : {
1462 19912 : case PORTAL_ONE_SELECT:
1463 19912 : result = DoPortalRunFetch(portal, fdirection, count, dest);
1464 19854 : break;
1465 :
1466 31870 : case PORTAL_ONE_RETURNING:
1467 : case PORTAL_ONE_MOD_WITH:
1468 : case PORTAL_UTIL_SELECT:
1469 :
1470 : /*
1471 : * If we have not yet run the command, do so, storing its
1472 : * results in the portal's tuplestore.
1473 : */
1474 31870 : if (!portal->holdStore)
1475 8078 : FillPortalStore(portal, false /* isTopLevel */ );
1476 :
1477 : /*
1478 : * Now fetch desired portion of results.
1479 : */
1480 31864 : result = DoPortalRunFetch(portal, fdirection, count, dest);
1481 31864 : break;
1482 :
1483 0 : default:
1484 0 : elog(ERROR, "unsupported portal strategy");
1485 : result = 0; /* keep compiler quiet */
1486 : break;
1487 : }
1488 : }
1489 64 : PG_CATCH();
1490 : {
1491 : /* Uncaught error while executing portal: mark it dead */
1492 64 : MarkPortalFailed(portal);
1493 :
1494 : /* Restore global vars and propagate error */
1495 64 : ActivePortal = saveActivePortal;
1496 64 : CurrentResourceOwner = saveResourceOwner;
1497 64 : PortalContext = savePortalContext;
1498 :
1499 64 : PG_RE_THROW();
1500 : }
1501 51718 : PG_END_TRY();
1502 :
1503 51718 : MemoryContextSwitchTo(oldContext);
1504 :
1505 : /* Mark portal not active */
1506 51718 : portal->status = PORTAL_READY;
1507 :
1508 51718 : ActivePortal = saveActivePortal;
1509 51718 : CurrentResourceOwner = saveResourceOwner;
1510 51718 : PortalContext = savePortalContext;
1511 :
1512 51718 : return result;
1513 : }
1514 :
1515 : /*
1516 : * DoPortalRunFetch
1517 : * Guts of PortalRunFetch --- the portal context is already set up
1518 : *
1519 : * Here, count < 0 typically reverses the direction. Also, count == FETCH_ALL
1520 : * is interpreted as "all rows". (cf FetchStmt.howMany)
1521 : *
1522 : * Returns number of rows processed (suitable for use in result tag)
1523 : */
1524 : static uint64
1525 51776 : DoPortalRunFetch(Portal portal,
1526 : FetchDirection fdirection,
1527 : long count,
1528 : DestReceiver *dest)
1529 : {
1530 : bool forward;
1531 :
1532 : Assert(portal->strategy == PORTAL_ONE_SELECT ||
1533 : portal->strategy == PORTAL_ONE_RETURNING ||
1534 : portal->strategy == PORTAL_ONE_MOD_WITH ||
1535 : portal->strategy == PORTAL_UTIL_SELECT);
1536 :
1537 : /*
1538 : * Note: we disallow backwards fetch (including re-fetch of current row)
1539 : * for NO SCROLL cursors, but we interpret that very loosely: you can use
1540 : * any of the FetchDirection options, so long as the end result is to move
1541 : * forwards by at least one row. Currently it's sufficient to check for
1542 : * NO SCROLL in DoPortalRewind() and in the forward == false path in
1543 : * PortalRunSelect(); but someday we might prefer to account for that
1544 : * restriction explicitly here.
1545 : */
1546 51776 : switch (fdirection)
1547 : {
1548 51036 : case FETCH_FORWARD:
1549 51036 : if (count < 0)
1550 : {
1551 0 : fdirection = FETCH_BACKWARD;
1552 0 : count = -count;
1553 : }
1554 : /* fall out of switch to share code with FETCH_BACKWARD */
1555 51036 : break;
1556 496 : case FETCH_BACKWARD:
1557 496 : if (count < 0)
1558 : {
1559 0 : fdirection = FETCH_FORWARD;
1560 0 : count = -count;
1561 : }
1562 : /* fall out of switch to share code with FETCH_FORWARD */
1563 496 : break;
1564 160 : case FETCH_ABSOLUTE:
1565 160 : if (count > 0)
1566 : {
1567 : /*
1568 : * Definition: Rewind to start, advance count-1 rows, return
1569 : * next row (if any).
1570 : *
1571 : * In practice, if the goal is less than halfway back to the
1572 : * start, it's better to scan from where we are.
1573 : *
1574 : * Also, if current portalPos is outside the range of "long",
1575 : * do it the hard way to avoid possible overflow of the count
1576 : * argument to PortalRunSelect. We must exclude exactly
1577 : * LONG_MAX, as well, lest the count look like FETCH_ALL.
1578 : *
1579 : * In any case, we arrange to fetch the target row going
1580 : * forwards.
1581 : */
1582 90 : if ((uint64) (count - 1) <= portal->portalPos / 2 ||
1583 36 : portal->portalPos >= (uint64) LONG_MAX)
1584 : {
1585 54 : DoPortalRewind(portal);
1586 48 : if (count > 1)
1587 0 : PortalRunSelect(portal, true, count - 1,
1588 : None_Receiver);
1589 : }
1590 : else
1591 : {
1592 36 : long pos = (long) portal->portalPos;
1593 :
1594 36 : if (portal->atEnd)
1595 0 : pos++; /* need one extra fetch if off end */
1596 36 : if (count <= pos)
1597 12 : PortalRunSelect(portal, false, pos - count + 1,
1598 : None_Receiver);
1599 24 : else if (count > pos + 1)
1600 12 : PortalRunSelect(portal, true, count - pos - 1,
1601 : None_Receiver);
1602 : }
1603 78 : return PortalRunSelect(portal, true, 1L, dest);
1604 : }
1605 70 : else if (count < 0)
1606 : {
1607 : /*
1608 : * Definition: Advance to end, back up abs(count)-1 rows,
1609 : * return prior row (if any). We could optimize this if we
1610 : * knew in advance where the end was, but typically we won't.
1611 : * (Is it worth considering case where count > half of size of
1612 : * query? We could rewind once we know the size ...)
1613 : */
1614 54 : PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
1615 54 : if (count < -1)
1616 0 : PortalRunSelect(portal, false, -count - 1, None_Receiver);
1617 54 : return PortalRunSelect(portal, false, 1L, dest);
1618 : }
1619 : else
1620 : {
1621 : /* count == 0 */
1622 : /* Rewind to start, return zero rows */
1623 16 : DoPortalRewind(portal);
1624 16 : return PortalRunSelect(portal, true, 0L, dest);
1625 : }
1626 : break;
1627 84 : case FETCH_RELATIVE:
1628 84 : if (count > 0)
1629 : {
1630 : /*
1631 : * Definition: advance count-1 rows, return next row (if any).
1632 : */
1633 36 : if (count > 1)
1634 24 : PortalRunSelect(portal, true, count - 1, None_Receiver);
1635 36 : return PortalRunSelect(portal, true, 1L, dest);
1636 : }
1637 48 : else if (count < 0)
1638 : {
1639 : /*
1640 : * Definition: back up abs(count)-1 rows, return prior row (if
1641 : * any).
1642 : */
1643 30 : if (count < -1)
1644 18 : PortalRunSelect(portal, false, -count - 1, None_Receiver);
1645 30 : return PortalRunSelect(portal, false, 1L, dest);
1646 : }
1647 : else
1648 : {
1649 : /* count == 0 */
1650 : /* Same as FETCH FORWARD 0, so fall out of switch */
1651 18 : fdirection = FETCH_FORWARD;
1652 : }
1653 18 : break;
1654 0 : default:
1655 0 : elog(ERROR, "bogus direction");
1656 : break;
1657 : }
1658 :
1659 : /*
1660 : * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, and count
1661 : * >= 0.
1662 : */
1663 51550 : forward = (fdirection == FETCH_FORWARD);
1664 :
1665 : /*
1666 : * Zero count means to re-fetch the current row, if any (per SQL)
1667 : */
1668 51550 : if (count == 0)
1669 : {
1670 : bool on_row;
1671 :
1672 : /* Are we sitting on a row? */
1673 18 : on_row = (!portal->atStart && !portal->atEnd);
1674 :
1675 18 : if (dest->mydest == DestNone)
1676 : {
1677 : /* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */
1678 0 : return on_row ? 1 : 0;
1679 : }
1680 : else
1681 : {
1682 : /*
1683 : * If we are sitting on a row, back up one so we can re-fetch it.
1684 : * If we are not sitting on a row, we still have to start up and
1685 : * shut down the executor so that the destination is initialized
1686 : * and shut down correctly; so keep going. To PortalRunSelect,
1687 : * count == 0 means we will retrieve no row.
1688 : */
1689 18 : if (on_row)
1690 : {
1691 18 : PortalRunSelect(portal, false, 1L, None_Receiver);
1692 : /* Set up to fetch one row forward */
1693 12 : count = 1;
1694 12 : forward = true;
1695 : }
1696 : }
1697 : }
1698 :
1699 : /*
1700 : * Optimize MOVE BACKWARD ALL into a Rewind.
1701 : */
1702 51544 : if (!forward && count == FETCH_ALL && dest->mydest == DestNone)
1703 : {
1704 24 : uint64 result = portal->portalPos;
1705 :
1706 24 : if (result > 0 && !portal->atEnd)
1707 0 : result--;
1708 24 : DoPortalRewind(portal);
1709 24 : return result;
1710 : }
1711 :
1712 51520 : return PortalRunSelect(portal, forward, count, dest);
1713 : }
1714 :
1715 : /*
1716 : * DoPortalRewind - rewind a Portal to starting point
1717 : */
1718 : static void
1719 94 : DoPortalRewind(Portal portal)
1720 : {
1721 : QueryDesc *queryDesc;
1722 :
1723 : /*
1724 : * No work is needed if we've not advanced nor attempted to advance the
1725 : * cursor (and we don't want to throw a NO SCROLL error in this case).
1726 : */
1727 94 : if (portal->atStart && !portal->atEnd)
1728 18 : return;
1729 :
1730 : /* Otherwise, cursor must allow scrolling */
1731 76 : if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
1732 6 : ereport(ERROR,
1733 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1734 : errmsg("cursor can only scan forward"),
1735 : errhint("Declare it with SCROLL option to enable backward scan.")));
1736 :
1737 : /* Rewind holdStore, if we have one */
1738 70 : if (portal->holdStore)
1739 : {
1740 : MemoryContext oldcontext;
1741 :
1742 6 : oldcontext = MemoryContextSwitchTo(portal->holdContext);
1743 6 : tuplestore_rescan(portal->holdStore);
1744 6 : MemoryContextSwitchTo(oldcontext);
1745 : }
1746 :
1747 : /* Rewind executor, if active */
1748 70 : queryDesc = portal->queryDesc;
1749 70 : if (queryDesc)
1750 : {
1751 64 : PushActiveSnapshot(queryDesc->snapshot);
1752 64 : ExecutorRewind(queryDesc);
1753 64 : PopActiveSnapshot();
1754 : }
1755 :
1756 70 : portal->atStart = true;
1757 70 : portal->atEnd = false;
1758 70 : portal->portalPos = 0;
1759 : }
1760 :
1761 : /*
1762 : * PlannedStmtRequiresSnapshot - what it says on the tin
1763 : */
1764 : bool
1765 470236 : PlannedStmtRequiresSnapshot(PlannedStmt *pstmt)
1766 : {
1767 470236 : Node *utilityStmt = pstmt->utilityStmt;
1768 :
1769 : /* If it's not a utility statement, it definitely needs a snapshot */
1770 470236 : if (utilityStmt == NULL)
1771 84262 : return true;
1772 :
1773 : /*
1774 : * Most utility statements need a snapshot, and the default presumption
1775 : * about new ones should be that they do too. Hence, enumerate those that
1776 : * do not need one.
1777 : *
1778 : * Transaction control, LOCK, and SET must *not* set a snapshot, since
1779 : * they need to be executable at the start of a transaction-snapshot-mode
1780 : * transaction without freezing a snapshot. By extension we allow SHOW
1781 : * not to set a snapshot. The other stmts listed are just efficiency
1782 : * hacks. Beware of listing anything that can modify the database --- if,
1783 : * say, it has to update an index with expressions that invoke
1784 : * user-defined functions, then it had better have a snapshot.
1785 : */
1786 385974 : if (IsA(utilityStmt, TransactionStmt) ||
1787 350654 : IsA(utilityStmt, LockStmt) ||
1788 349514 : IsA(utilityStmt, VariableSetStmt) ||
1789 312632 : IsA(utilityStmt, VariableShowStmt) ||
1790 311810 : IsA(utilityStmt, ConstraintsSetStmt) ||
1791 : /* efficiency hacks from here down */
1792 311708 : IsA(utilityStmt, FetchStmt) ||
1793 304202 : IsA(utilityStmt, ListenStmt) ||
1794 304128 : IsA(utilityStmt, NotifyStmt) ||
1795 304040 : IsA(utilityStmt, UnlistenStmt) ||
1796 304002 : IsA(utilityStmt, CheckPointStmt))
1797 82186 : return false;
1798 :
1799 303788 : return true;
1800 : }
1801 :
1802 : /*
1803 : * EnsurePortalSnapshotExists - recreate Portal-level snapshot, if needed
1804 : *
1805 : * Generally, we will have an active snapshot whenever we are executing
1806 : * inside a Portal, unless the Portal's query is one of the utility
1807 : * statements exempted from that rule (see PlannedStmtRequiresSnapshot).
1808 : * However, procedures and DO blocks can commit or abort the transaction,
1809 : * and thereby destroy all snapshots. This function can be called to
1810 : * re-establish the Portal-level snapshot when none exists.
1811 : */
1812 : void
1813 418010 : EnsurePortalSnapshotExists(void)
1814 : {
1815 : Portal portal;
1816 :
1817 : /*
1818 : * Nothing to do if a snapshot is set. (We take it on faith that the
1819 : * outermost active snapshot belongs to some Portal; or if there is no
1820 : * Portal, it's somebody else's responsibility to manage things.)
1821 : */
1822 418010 : if (ActiveSnapshotSet())
1823 413560 : return;
1824 :
1825 : /* Otherwise, we'd better have an active Portal */
1826 4450 : portal = ActivePortal;
1827 4450 : if (unlikely(portal == NULL))
1828 0 : elog(ERROR, "cannot execute SQL without an outer snapshot or portal");
1829 : Assert(portal->portalSnapshot == NULL);
1830 :
1831 : /*
1832 : * Create a new snapshot, make it active, and remember it in portal.
1833 : * Because the portal now references the snapshot, we must tell snapmgr.c
1834 : * that the snapshot belongs to the portal's transaction level, else we
1835 : * risk portalSnapshot becoming a dangling pointer.
1836 : */
1837 4450 : PushActiveSnapshotWithLevel(GetTransactionSnapshot(), portal->createLevel);
1838 : /* PushActiveSnapshotWithLevel might have copied the snapshot */
1839 4450 : portal->portalSnapshot = GetActiveSnapshot();
1840 : }
|