Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pquery.c
4 : * POSTGRES process query command code
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/tcop/pquery.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include <limits.h>
19 :
20 : #include "access/xact.h"
21 : #include "commands/prepare.h"
22 : #include "executor/execdesc.h"
23 : #include "executor/tstoreReceiver.h"
24 : #include "miscadmin.h"
25 : #include "pg_trace.h"
26 : #include "tcop/pquery.h"
27 : #include "tcop/utility.h"
28 : #include "utils/memutils.h"
29 : #include "utils/snapmgr.h"
30 :
31 :
32 : /*
33 : * ActivePortal is the currently executing Portal (the most closely nested,
34 : * if there are several).
35 : */
36 : Portal ActivePortal = NULL;
37 :
38 :
39 : static void ProcessQuery(PlannedStmt *plan,
40 : CachedPlan *cplan,
41 : CachedPlanSource *plansource,
42 : int query_index,
43 : const char *sourceText,
44 : ParamListInfo params,
45 : QueryEnvironment *queryEnv,
46 : DestReceiver *dest,
47 : QueryCompletion *qc);
48 : static void FillPortalStore(Portal portal, bool isTopLevel);
49 : static uint64 RunFromStore(Portal portal, ScanDirection direction, uint64 count,
50 : DestReceiver *dest);
51 : static uint64 PortalRunSelect(Portal portal, bool forward, long count,
52 : DestReceiver *dest);
53 : static void PortalRunUtility(Portal portal, PlannedStmt *pstmt,
54 : bool isTopLevel, bool setHoldSnapshot,
55 : DestReceiver *dest, QueryCompletion *qc);
56 : static void PortalRunMulti(Portal portal,
57 : bool isTopLevel, bool setHoldSnapshot,
58 : DestReceiver *dest, DestReceiver *altdest,
59 : QueryCompletion *qc);
60 : static uint64 DoPortalRunFetch(Portal portal,
61 : FetchDirection fdirection,
62 : long count,
63 : DestReceiver *dest);
64 : static void DoPortalRewind(Portal portal);
65 :
66 :
67 : /*
68 : * CreateQueryDesc
69 : */
70 : QueryDesc *
71 663654 : CreateQueryDesc(PlannedStmt *plannedstmt,
72 : CachedPlan *cplan,
73 : const char *sourceText,
74 : Snapshot snapshot,
75 : Snapshot crosscheck_snapshot,
76 : DestReceiver *dest,
77 : ParamListInfo params,
78 : QueryEnvironment *queryEnv,
79 : int instrument_options)
80 : {
81 663654 : QueryDesc *qd = (QueryDesc *) palloc(sizeof(QueryDesc));
82 :
83 663654 : qd->operation = plannedstmt->commandType; /* operation */
84 663654 : qd->plannedstmt = plannedstmt; /* plan */
85 663654 : qd->cplan = cplan; /* CachedPlan supplying the plannedstmt */
86 663654 : qd->sourceText = sourceText; /* query text */
87 663654 : qd->snapshot = RegisterSnapshot(snapshot); /* snapshot */
88 : /* RI check snapshot */
89 663654 : qd->crosscheck_snapshot = RegisterSnapshot(crosscheck_snapshot);
90 663654 : qd->dest = dest; /* output dest */
91 663654 : qd->params = params; /* parameter values passed into query */
92 663654 : qd->queryEnv = queryEnv;
93 663654 : qd->instrument_options = instrument_options; /* instrumentation wanted? */
94 :
95 : /* null these fields until set by ExecutorStart */
96 663654 : qd->tupDesc = NULL;
97 663654 : qd->estate = NULL;
98 663654 : qd->planstate = NULL;
99 663654 : qd->totaltime = NULL;
100 :
101 : /* not yet executed */
102 663654 : qd->already_executed = false;
103 :
104 663654 : return qd;
105 : }
106 :
107 : /*
108 : * FreeQueryDesc
109 : */
110 : void
111 636412 : FreeQueryDesc(QueryDesc *qdesc)
112 : {
113 : /* Can't be a live query */
114 : Assert(qdesc->estate == NULL);
115 :
116 : /* forget our snapshots */
117 636412 : UnregisterSnapshot(qdesc->snapshot);
118 636412 : UnregisterSnapshot(qdesc->crosscheck_snapshot);
119 :
120 : /* Only the QueryDesc itself need be freed */
121 636412 : pfree(qdesc);
122 636412 : }
123 :
124 :
125 : /*
126 : * ProcessQuery
127 : * Execute a single plannable query within a PORTAL_MULTI_QUERY,
128 : * PORTAL_ONE_RETURNING, or PORTAL_ONE_MOD_WITH portal
129 : *
130 : * plan: the plan tree for the query
131 : * cplan: CachedPlan supplying the plan
132 : * plansource: CachedPlanSource supplying the cplan
133 : * query_index: index of the query in plansource->query_list
134 : * sourceText: the source text of the query
135 : * params: any parameters needed
136 : * dest: where to send results
137 : * qc: where to store the command completion status data.
138 : *
139 : * qc may be NULL if caller doesn't want a status string.
140 : *
141 : * Must be called in a memory context that will be reset or deleted on
142 : * error; otherwise the executor's memory usage will be leaked.
143 : */
144 : static void
145 90586 : ProcessQuery(PlannedStmt *plan,
146 : CachedPlan *cplan,
147 : CachedPlanSource *plansource,
148 : int query_index,
149 : const char *sourceText,
150 : ParamListInfo params,
151 : QueryEnvironment *queryEnv,
152 : DestReceiver *dest,
153 : QueryCompletion *qc)
154 : {
155 : QueryDesc *queryDesc;
156 :
157 : /*
158 : * Create the QueryDesc object
159 : */
160 90586 : queryDesc = CreateQueryDesc(plan, cplan, sourceText,
161 : GetActiveSnapshot(), InvalidSnapshot,
162 : dest, params, queryEnv, 0);
163 :
164 : /*
165 : * Prepare the plan for execution
166 : */
167 90586 : if (queryDesc->cplan)
168 : {
169 10082 : ExecutorStartCachedPlan(queryDesc, 0, plansource, query_index);
170 : Assert(queryDesc->planstate);
171 : }
172 : else
173 : {
174 80504 : if (!ExecutorStart(queryDesc, 0))
175 0 : elog(ERROR, "ExecutorStart() failed unexpectedly");
176 : }
177 :
178 : /*
179 : * Run the plan to completion.
180 : */
181 89308 : ExecutorRun(queryDesc, ForwardScanDirection, 0);
182 :
183 : /*
184 : * Build command completion status data, if caller wants one.
185 : */
186 86150 : if (qc)
187 : {
188 85526 : switch (queryDesc->operation)
189 : {
190 116 : case CMD_SELECT:
191 116 : SetQueryCompletion(qc, CMDTAG_SELECT, queryDesc->estate->es_processed);
192 116 : break;
193 70090 : case CMD_INSERT:
194 70090 : SetQueryCompletion(qc, CMDTAG_INSERT, queryDesc->estate->es_processed);
195 70090 : break;
196 10754 : case CMD_UPDATE:
197 10754 : SetQueryCompletion(qc, CMDTAG_UPDATE, queryDesc->estate->es_processed);
198 10754 : break;
199 3528 : case CMD_DELETE:
200 3528 : SetQueryCompletion(qc, CMDTAG_DELETE, queryDesc->estate->es_processed);
201 3528 : break;
202 1038 : case CMD_MERGE:
203 1038 : SetQueryCompletion(qc, CMDTAG_MERGE, queryDesc->estate->es_processed);
204 1038 : break;
205 0 : default:
206 0 : SetQueryCompletion(qc, CMDTAG_UNKNOWN, queryDesc->estate->es_processed);
207 0 : break;
208 : }
209 624 : }
210 :
211 : /*
212 : * Now, we close down all the scans and free allocated resources.
213 : */
214 86150 : ExecutorFinish(queryDesc);
215 85050 : ExecutorEnd(queryDesc);
216 :
217 85050 : FreeQueryDesc(queryDesc);
218 85050 : }
219 :
220 : /*
221 : * ChoosePortalStrategy
222 : * Select portal execution strategy given the intended statement list.
223 : *
224 : * The list elements can be Querys or PlannedStmts.
225 : * That's more general than portals need, but plancache.c uses this too.
226 : *
227 : * See the comments in portal.h.
228 : */
229 : PortalStrategy
230 801096 : ChoosePortalStrategy(List *stmts)
231 : {
232 : int nSetTag;
233 : ListCell *lc;
234 :
235 : /*
236 : * PORTAL_ONE_SELECT and PORTAL_UTIL_SELECT need only consider the
237 : * single-statement case, since there are no rewrite rules that can add
238 : * auxiliary queries to a SELECT or a utility command. PORTAL_ONE_MOD_WITH
239 : * likewise allows only one top-level statement.
240 : */
241 801096 : if (list_length(stmts) == 1)
242 : {
243 800766 : Node *stmt = (Node *) linitial(stmts);
244 :
245 800766 : if (IsA(stmt, Query))
246 : {
247 78004 : Query *query = (Query *) stmt;
248 :
249 78004 : if (query->canSetTag)
250 : {
251 78004 : if (query->commandType == CMD_SELECT)
252 : {
253 53440 : if (query->hasModifyingCTE)
254 0 : return PORTAL_ONE_MOD_WITH;
255 : else
256 53440 : return PORTAL_ONE_SELECT;
257 : }
258 24564 : if (query->commandType == CMD_UTILITY)
259 : {
260 16576 : if (UtilityReturnsTuples(query->utilityStmt))
261 8270 : return PORTAL_UTIL_SELECT;
262 : /* it can't be ONE_RETURNING, so give up */
263 8306 : return PORTAL_MULTI_QUERY;
264 : }
265 : }
266 : }
267 722762 : else if (IsA(stmt, PlannedStmt))
268 : {
269 722762 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
270 :
271 722762 : if (pstmt->canSetTag)
272 : {
273 722726 : if (pstmt->commandType == CMD_SELECT)
274 : {
275 293852 : if (pstmt->hasModifyingCTE)
276 128 : return PORTAL_ONE_MOD_WITH;
277 : else
278 293724 : return PORTAL_ONE_SELECT;
279 : }
280 428874 : if (pstmt->commandType == CMD_UTILITY)
281 : {
282 339364 : if (UtilityReturnsTuples(pstmt->utilityStmt))
283 44806 : return PORTAL_UTIL_SELECT;
284 : /* it can't be ONE_RETURNING, so give up */
285 294558 : return PORTAL_MULTI_QUERY;
286 : }
287 : }
288 : }
289 : else
290 0 : elog(ERROR, "unrecognized node type: %d", (int) nodeTag(stmt));
291 : }
292 :
293 : /*
294 : * PORTAL_ONE_RETURNING has to allow auxiliary queries added by rewrite.
295 : * Choose PORTAL_ONE_RETURNING if there is exactly one canSetTag query and
296 : * it has a RETURNING list.
297 : */
298 97864 : nSetTag = 0;
299 101220 : foreach(lc, stmts)
300 : {
301 98026 : Node *stmt = (Node *) lfirst(lc);
302 :
303 98026 : if (IsA(stmt, Query))
304 : {
305 7988 : Query *query = (Query *) stmt;
306 :
307 7988 : if (query->canSetTag)
308 : {
309 7988 : if (++nSetTag > 1)
310 94670 : return PORTAL_MULTI_QUERY; /* no need to look further */
311 7988 : if (query->commandType == CMD_UTILITY ||
312 7988 : query->returningList == NIL)
313 7758 : return PORTAL_MULTI_QUERY; /* no need to look further */
314 : }
315 : }
316 90038 : else if (IsA(stmt, PlannedStmt))
317 : {
318 90038 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
319 :
320 90038 : if (pstmt->canSetTag)
321 : {
322 89828 : if (++nSetTag > 1)
323 0 : return PORTAL_MULTI_QUERY; /* no need to look further */
324 89828 : if (pstmt->commandType == CMD_UTILITY ||
325 89828 : !pstmt->hasReturning)
326 86912 : return PORTAL_MULTI_QUERY; /* no need to look further */
327 : }
328 : }
329 : else
330 0 : elog(ERROR, "unrecognized node type: %d", (int) nodeTag(stmt));
331 : }
332 3194 : if (nSetTag == 1)
333 3146 : return PORTAL_ONE_RETURNING;
334 :
335 : /* Else, it's the general case... */
336 48 : return PORTAL_MULTI_QUERY;
337 : }
338 :
339 : /*
340 : * FetchPortalTargetList
341 : * Given a portal that returns tuples, extract the query targetlist.
342 : * Returns NIL if the portal doesn't have a determinable targetlist.
343 : *
344 : * Note: do not modify the result.
345 : */
346 : List *
347 316030 : FetchPortalTargetList(Portal portal)
348 : {
349 : /* no point in looking if we determined it doesn't return tuples */
350 316030 : if (portal->strategy == PORTAL_MULTI_QUERY)
351 30 : return NIL;
352 : /* get the primary statement and find out what it returns */
353 316000 : return FetchStatementTargetList((Node *) PortalGetPrimaryStmt(portal));
354 : }
355 :
356 : /*
357 : * FetchStatementTargetList
358 : * Given a statement that returns tuples, extract the query targetlist.
359 : * Returns NIL if the statement doesn't have a determinable targetlist.
360 : *
361 : * This can be applied to a Query or a PlannedStmt.
362 : * That's more general than portals need, but plancache.c uses this too.
363 : *
364 : * Note: do not modify the result.
365 : *
366 : * XXX be careful to keep this in sync with UtilityReturnsTuples.
367 : */
368 : List *
369 331128 : FetchStatementTargetList(Node *stmt)
370 : {
371 331128 : if (stmt == NULL)
372 0 : return NIL;
373 331128 : if (IsA(stmt, Query))
374 : {
375 15128 : Query *query = (Query *) stmt;
376 :
377 15128 : if (query->commandType == CMD_UTILITY)
378 : {
379 : /* transfer attention to utility statement */
380 12 : stmt = query->utilityStmt;
381 : }
382 : else
383 : {
384 15116 : if (query->commandType == CMD_SELECT)
385 15104 : return query->targetList;
386 12 : if (query->returningList)
387 12 : return query->returningList;
388 0 : return NIL;
389 : }
390 : }
391 316012 : if (IsA(stmt, PlannedStmt))
392 : {
393 316000 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
394 :
395 316000 : if (pstmt->commandType == CMD_UTILITY)
396 : {
397 : /* transfer attention to utility statement */
398 36486 : stmt = pstmt->utilityStmt;
399 : }
400 : else
401 : {
402 279514 : if (pstmt->commandType == CMD_SELECT)
403 276730 : return pstmt->planTree->targetlist;
404 2784 : if (pstmt->hasReturning)
405 2784 : return pstmt->planTree->targetlist;
406 0 : return NIL;
407 : }
408 : }
409 36498 : if (IsA(stmt, FetchStmt))
410 : {
411 5598 : FetchStmt *fstmt = (FetchStmt *) stmt;
412 : Portal subportal;
413 :
414 : Assert(!fstmt->ismove);
415 5598 : subportal = GetPortalByName(fstmt->portalname);
416 : Assert(PortalIsValid(subportal));
417 5598 : return FetchPortalTargetList(subportal);
418 : }
419 30900 : if (IsA(stmt, ExecuteStmt))
420 : {
421 15000 : ExecuteStmt *estmt = (ExecuteStmt *) stmt;
422 : PreparedStatement *entry;
423 :
424 15000 : entry = FetchPreparedStatement(estmt->name, true);
425 15000 : return FetchPreparedStatementTargetList(entry);
426 : }
427 15900 : return NIL;
428 : }
429 :
430 : /*
431 : * PortalStart
432 : * Prepare a portal for execution.
433 : *
434 : * Caller must already have created the portal, done PortalDefineQuery(),
435 : * and adjusted portal options if needed.
436 : *
437 : * If parameters are needed by the query, they must be passed in "params"
438 : * (caller is responsible for giving them appropriate lifetime).
439 : *
440 : * The caller can also provide an initial set of "eflags" to be passed to
441 : * ExecutorStart (but note these can be modified internally, and they are
442 : * currently only honored for PORTAL_ONE_SELECT portals). Most callers
443 : * should simply pass zero.
444 : *
445 : * The caller can optionally pass a snapshot to be used; pass InvalidSnapshot
446 : * for the normal behavior of setting a new snapshot. This parameter is
447 : * presently ignored for non-PORTAL_ONE_SELECT portals (it's only intended
448 : * to be used for cursors).
449 : *
450 : * On return, portal is ready to accept PortalRun() calls, and the result
451 : * tupdesc (if any) is known.
452 : */
453 : void
454 723086 : PortalStart(Portal portal, ParamListInfo params,
455 : int eflags, Snapshot snapshot)
456 : {
457 : Portal saveActivePortal;
458 : ResourceOwner saveResourceOwner;
459 : MemoryContext savePortalContext;
460 : MemoryContext oldContext;
461 : QueryDesc *queryDesc;
462 : int myeflags;
463 :
464 : Assert(PortalIsValid(portal));
465 : Assert(portal->status == PORTAL_DEFINED);
466 :
467 : /*
468 : * Set up global portal context pointers.
469 : */
470 723086 : saveActivePortal = ActivePortal;
471 723086 : saveResourceOwner = CurrentResourceOwner;
472 723086 : savePortalContext = PortalContext;
473 723086 : PG_TRY();
474 : {
475 723086 : ActivePortal = portal;
476 723086 : if (portal->resowner)
477 723086 : CurrentResourceOwner = portal->resowner;
478 723086 : PortalContext = portal->portalContext;
479 :
480 723086 : oldContext = MemoryContextSwitchTo(PortalContext);
481 :
482 : /* Must remember portal param list, if any */
483 723086 : portal->portalParams = params;
484 :
485 : /*
486 : * Determine the portal execution strategy
487 : */
488 723086 : portal->strategy = ChoosePortalStrategy(portal->stmts);
489 :
490 : /*
491 : * Fire her up according to the strategy
492 : */
493 723086 : switch (portal->strategy)
494 : {
495 293724 : case PORTAL_ONE_SELECT:
496 :
497 : /* Must set snapshot before starting executor. */
498 293724 : if (snapshot)
499 21882 : PushActiveSnapshot(snapshot);
500 : else
501 271842 : PushActiveSnapshot(GetTransactionSnapshot());
502 :
503 : /*
504 : * We could remember the snapshot in portal->portalSnapshot,
505 : * but presently there seems no need to, as this code path
506 : * cannot be used for non-atomic execution. Hence there can't
507 : * be any commit/abort that might destroy the snapshot. Since
508 : * we don't do that, there's also no need to force a
509 : * non-default nesting level for the snapshot.
510 : */
511 :
512 : /*
513 : * Create QueryDesc in portal's context; for the moment, set
514 : * the destination to DestNone.
515 : */
516 293724 : queryDesc = CreateQueryDesc(linitial_node(PlannedStmt, portal->stmts),
517 : portal->cplan,
518 : portal->sourceText,
519 : GetActiveSnapshot(),
520 : InvalidSnapshot,
521 : None_Receiver,
522 : params,
523 : portal->queryEnv,
524 : 0);
525 :
526 : /*
527 : * If it's a scrollable cursor, executor needs to support
528 : * REWIND and backwards scan, as well as whatever the caller
529 : * might've asked for.
530 : */
531 293724 : if (portal->cursorOptions & CURSOR_OPT_SCROLL)
532 2186 : myeflags = eflags | EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD;
533 : else
534 291538 : myeflags = eflags;
535 :
536 : /*
537 : * Prepare the plan for execution.
538 : */
539 293724 : if (portal->cplan)
540 : {
541 25388 : ExecutorStartCachedPlan(queryDesc, myeflags,
542 : portal->plansource, 0);
543 : Assert(queryDesc->planstate);
544 : }
545 : else
546 : {
547 268336 : if (!ExecutorStart(queryDesc, myeflags))
548 0 : elog(ERROR, "ExecutorStart() failed unexpectedly");
549 : }
550 :
551 : /*
552 : * This tells PortalCleanup to shut down the executor
553 : */
554 293068 : portal->queryDesc = queryDesc;
555 :
556 : /*
557 : * Remember tuple descriptor (computed by ExecutorStart)
558 : */
559 293068 : portal->tupDesc = queryDesc->tupDesc;
560 :
561 : /*
562 : * Reset cursor position data to "start of query"
563 : */
564 293068 : portal->atStart = true;
565 293068 : portal->atEnd = false; /* allow fetches */
566 293068 : portal->portalPos = 0;
567 :
568 293068 : PopActiveSnapshot();
569 293068 : break;
570 :
571 3044 : case PORTAL_ONE_RETURNING:
572 : case PORTAL_ONE_MOD_WITH:
573 :
574 : /*
575 : * We don't start the executor until we are told to run the
576 : * portal. We do need to set up the result tupdesc.
577 : */
578 : {
579 : PlannedStmt *pstmt;
580 :
581 3044 : pstmt = PortalGetPrimaryStmt(portal);
582 3044 : portal->tupDesc =
583 3044 : ExecCleanTypeFromTL(pstmt->planTree->targetlist);
584 : }
585 :
586 : /*
587 : * Reset cursor position data to "start of query"
588 : */
589 3044 : portal->atStart = true;
590 3044 : portal->atEnd = false; /* allow fetches */
591 3044 : portal->portalPos = 0;
592 3044 : break;
593 :
594 44806 : case PORTAL_UTIL_SELECT:
595 :
596 : /*
597 : * We don't set snapshot here, because PortalRunUtility will
598 : * take care of it if needed.
599 : */
600 : {
601 44806 : PlannedStmt *pstmt = PortalGetPrimaryStmt(portal);
602 :
603 : Assert(pstmt->commandType == CMD_UTILITY);
604 44806 : portal->tupDesc = UtilityTupleDescriptor(pstmt->utilityStmt);
605 : }
606 :
607 : /*
608 : * Reset cursor position data to "start of query"
609 : */
610 44778 : portal->atStart = true;
611 44778 : portal->atEnd = false; /* allow fetches */
612 44778 : portal->portalPos = 0;
613 44778 : break;
614 :
615 381512 : case PORTAL_MULTI_QUERY:
616 : /* Need do nothing now */
617 381512 : portal->tupDesc = NULL;
618 381512 : break;
619 : }
620 722402 : }
621 684 : PG_CATCH();
622 : {
623 : /* Uncaught error while executing portal: mark it dead */
624 684 : MarkPortalFailed(portal);
625 :
626 : /* Restore global vars and propagate error */
627 684 : ActivePortal = saveActivePortal;
628 684 : CurrentResourceOwner = saveResourceOwner;
629 684 : PortalContext = savePortalContext;
630 :
631 684 : PG_RE_THROW();
632 : }
633 722402 : PG_END_TRY();
634 :
635 722402 : MemoryContextSwitchTo(oldContext);
636 :
637 722402 : ActivePortal = saveActivePortal;
638 722402 : CurrentResourceOwner = saveResourceOwner;
639 722402 : PortalContext = savePortalContext;
640 :
641 722402 : portal->status = PORTAL_READY;
642 722402 : }
643 :
644 : /*
645 : * PortalSetResultFormat
646 : * Select the format codes for a portal's output.
647 : *
648 : * This must be run after PortalStart for a portal that will be read by
649 : * a DestRemote or DestRemoteExecute destination. It is not presently needed
650 : * for other destination types.
651 : *
652 : * formats[] is the client format request, as per Bind message conventions.
653 : */
654 : void
655 692326 : PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
656 : {
657 : int natts;
658 : int i;
659 :
660 : /* Do nothing if portal won't return tuples */
661 692326 : if (portal->tupDesc == NULL)
662 381392 : return;
663 310934 : natts = portal->tupDesc->natts;
664 310934 : portal->formats = (int16 *)
665 310934 : MemoryContextAlloc(portal->portalContext,
666 : natts * sizeof(int16));
667 310934 : if (nFormats > 1)
668 : {
669 : /* format specified for each column */
670 0 : if (nFormats != natts)
671 0 : ereport(ERROR,
672 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
673 : errmsg("bind message has %d result formats but query has %d columns",
674 : nFormats, natts)));
675 0 : memcpy(portal->formats, formats, natts * sizeof(int16));
676 : }
677 310934 : else if (nFormats > 0)
678 : {
679 : /* single format specified, use for all columns */
680 310934 : int16 format1 = formats[0];
681 :
682 1486096 : for (i = 0; i < natts; i++)
683 1175162 : portal->formats[i] = format1;
684 : }
685 : else
686 : {
687 : /* use default format for all columns */
688 0 : for (i = 0; i < natts; i++)
689 0 : portal->formats[i] = 0;
690 : }
691 : }
692 :
693 : /*
694 : * PortalRun
695 : * Run a portal's query or queries.
696 : *
697 : * count <= 0 is interpreted as a no-op: the destination gets started up
698 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
699 : * interpreted as "all rows". Note that count is ignored in multi-query
700 : * situations, where we always run the portal to completion.
701 : *
702 : * isTopLevel: true if query is being executed at backend "top level"
703 : * (that is, directly from a client command message)
704 : *
705 : * dest: where to send output of primary (canSetTag) query
706 : *
707 : * altdest: where to send output of non-primary queries
708 : *
709 : * qc: where to store command completion status data.
710 : * May be NULL if caller doesn't want status data.
711 : *
712 : * Returns true if the portal's execution is complete, false if it was
713 : * suspended due to exhaustion of the count parameter.
714 : */
715 : bool
716 707494 : PortalRun(Portal portal, long count, bool isTopLevel,
717 : DestReceiver *dest, DestReceiver *altdest,
718 : QueryCompletion *qc)
719 : {
720 : bool result;
721 : uint64 nprocessed;
722 : ResourceOwner saveTopTransactionResourceOwner;
723 : MemoryContext saveTopTransactionContext;
724 : Portal saveActivePortal;
725 : ResourceOwner saveResourceOwner;
726 : MemoryContext savePortalContext;
727 : MemoryContext saveMemoryContext;
728 :
729 : Assert(PortalIsValid(portal));
730 :
731 : TRACE_POSTGRESQL_QUERY_EXECUTE_START();
732 :
733 : /* Initialize empty completion data */
734 707494 : if (qc)
735 707494 : InitializeQueryCompletion(qc);
736 :
737 707494 : if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
738 : {
739 0 : elog(DEBUG3, "PortalRun");
740 : /* PORTAL_MULTI_QUERY logs its own stats per query */
741 0 : ResetUsage();
742 : }
743 :
744 : /*
745 : * Check for improper portal use, and mark portal active.
746 : */
747 707494 : MarkPortalActive(portal);
748 :
749 : /*
750 : * Set up global portal context pointers.
751 : *
752 : * We have to play a special game here to support utility commands like
753 : * VACUUM and CLUSTER, which internally start and commit transactions.
754 : * When we are called to execute such a command, CurrentResourceOwner will
755 : * be pointing to the TopTransactionResourceOwner --- which will be
756 : * destroyed and replaced in the course of the internal commit and
757 : * restart. So we need to be prepared to restore it as pointing to the
758 : * exit-time TopTransactionResourceOwner. (Ain't that ugly? This idea of
759 : * internally starting whole new transactions is not good.)
760 : * CurrentMemoryContext has a similar problem, but the other pointers we
761 : * save here will be NULL or pointing to longer-lived objects.
762 : */
763 707494 : saveTopTransactionResourceOwner = TopTransactionResourceOwner;
764 707494 : saveTopTransactionContext = TopTransactionContext;
765 707494 : saveActivePortal = ActivePortal;
766 707494 : saveResourceOwner = CurrentResourceOwner;
767 707494 : savePortalContext = PortalContext;
768 707494 : saveMemoryContext = CurrentMemoryContext;
769 707494 : PG_TRY();
770 : {
771 707494 : ActivePortal = portal;
772 707494 : if (portal->resowner)
773 707494 : CurrentResourceOwner = portal->resowner;
774 707494 : PortalContext = portal->portalContext;
775 :
776 707494 : MemoryContextSwitchTo(PortalContext);
777 :
778 707494 : switch (portal->strategy)
779 : {
780 325982 : case PORTAL_ONE_SELECT:
781 : case PORTAL_ONE_RETURNING:
782 : case PORTAL_ONE_MOD_WITH:
783 : case PORTAL_UTIL_SELECT:
784 :
785 : /*
786 : * If we have not yet run the command, do so, storing its
787 : * results in the portal's tuplestore. But we don't do that
788 : * for the PORTAL_ONE_SELECT case.
789 : */
790 325982 : if (portal->strategy != PORTAL_ONE_SELECT && !portal->holdStore)
791 39766 : FillPortalStore(portal, isTopLevel);
792 :
793 : /*
794 : * Now fetch desired portion of results.
795 : */
796 325610 : nprocessed = PortalRunSelect(portal, true, count, dest);
797 :
798 : /*
799 : * If the portal result contains a command tag and the caller
800 : * gave us a pointer to store it, copy it and update the
801 : * rowcount.
802 : */
803 318562 : if (qc && portal->qc.commandTag != CMDTAG_UNKNOWN)
804 : {
805 318562 : CopyQueryCompletion(qc, &portal->qc);
806 318562 : qc->nprocessed = nprocessed;
807 : }
808 :
809 : /* Mark portal not active */
810 318562 : portal->status = PORTAL_READY;
811 :
812 : /*
813 : * Since it's a forward fetch, say DONE iff atEnd is now true.
814 : */
815 318562 : result = portal->atEnd;
816 318562 : break;
817 :
818 381512 : case PORTAL_MULTI_QUERY:
819 381512 : PortalRunMulti(portal, isTopLevel, false,
820 : dest, altdest, qc);
821 :
822 : /* Prevent portal's commands from being re-executed */
823 361190 : MarkPortalDone(portal);
824 :
825 : /* Always complete at end of RunMulti */
826 361190 : result = true;
827 361190 : break;
828 :
829 0 : default:
830 0 : elog(ERROR, "unrecognized portal strategy: %d",
831 : (int) portal->strategy);
832 : result = false; /* keep compiler quiet */
833 : break;
834 : }
835 : }
836 27734 : PG_CATCH();
837 : {
838 : /* Uncaught error while executing portal: mark it dead */
839 27734 : MarkPortalFailed(portal);
840 :
841 : /* Restore global vars and propagate error */
842 27734 : if (saveMemoryContext == saveTopTransactionContext)
843 27416 : MemoryContextSwitchTo(TopTransactionContext);
844 : else
845 318 : MemoryContextSwitchTo(saveMemoryContext);
846 27734 : ActivePortal = saveActivePortal;
847 27734 : if (saveResourceOwner == saveTopTransactionResourceOwner)
848 27472 : CurrentResourceOwner = TopTransactionResourceOwner;
849 : else
850 262 : CurrentResourceOwner = saveResourceOwner;
851 27734 : PortalContext = savePortalContext;
852 :
853 27734 : PG_RE_THROW();
854 : }
855 679752 : PG_END_TRY();
856 :
857 679752 : if (saveMemoryContext == saveTopTransactionContext)
858 637118 : MemoryContextSwitchTo(TopTransactionContext);
859 : else
860 42634 : MemoryContextSwitchTo(saveMemoryContext);
861 679752 : ActivePortal = saveActivePortal;
862 679752 : if (saveResourceOwner == saveTopTransactionResourceOwner)
863 658402 : CurrentResourceOwner = TopTransactionResourceOwner;
864 : else
865 21350 : CurrentResourceOwner = saveResourceOwner;
866 679752 : PortalContext = savePortalContext;
867 :
868 679752 : if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
869 0 : ShowUsage("EXECUTOR STATISTICS");
870 :
871 : TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
872 :
873 679752 : return result;
874 : }
875 :
876 : /*
877 : * PortalRunSelect
878 : * Execute a portal's query in PORTAL_ONE_SELECT mode, and also
879 : * when fetching from a completed holdStore in PORTAL_ONE_RETURNING,
880 : * PORTAL_ONE_MOD_WITH, and PORTAL_UTIL_SELECT cases.
881 : *
882 : * This handles simple N-rows-forward-or-backward cases. For more complex
883 : * nonsequential access to a portal, see PortalRunFetch.
884 : *
885 : * count <= 0 is interpreted as a no-op: the destination gets started up
886 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
887 : * interpreted as "all rows". (cf FetchStmt.howMany)
888 : *
889 : * Caller must already have validated the Portal and done appropriate
890 : * setup (cf. PortalRun).
891 : *
892 : * Returns number of rows processed (suitable for use in result tag)
893 : */
894 : static uint64
895 375594 : PortalRunSelect(Portal portal,
896 : bool forward,
897 : long count,
898 : DestReceiver *dest)
899 : {
900 : QueryDesc *queryDesc;
901 : ScanDirection direction;
902 : uint64 nprocessed;
903 :
904 : /*
905 : * NB: queryDesc will be NULL if we are fetching from a held cursor or a
906 : * completed utility query; can't use it in that path.
907 : */
908 375594 : queryDesc = portal->queryDesc;
909 :
910 : /* Caller messed up if we have neither a ready query nor held data. */
911 : Assert(queryDesc || portal->holdStore);
912 :
913 : /*
914 : * Force the queryDesc destination to the right thing. This supports
915 : * MOVE, for example, which will pass in dest = DestNone. This is okay to
916 : * change as long as we do it on every fetch. (The Executor must not
917 : * assume that dest never changes.)
918 : */
919 375594 : if (queryDesc)
920 304280 : queryDesc->dest = dest;
921 :
922 : /*
923 : * Determine which direction to go in, and check to see if we're already
924 : * at the end of the available tuples in that direction. If so, set the
925 : * direction to NoMovement to avoid trying to fetch any tuples. (This
926 : * check exists because not all plan node types are robust about being
927 : * called again if they've already returned NULL once.) Then call the
928 : * executor (we must not skip this, because the destination needs to see a
929 : * setup and shutdown even if no tuples are available). Finally, update
930 : * the portal position state depending on the number of tuples that were
931 : * retrieved.
932 : */
933 375594 : if (forward)
934 : {
935 374990 : if (portal->atEnd || count <= 0)
936 : {
937 3760 : direction = NoMovementScanDirection;
938 3760 : count = 0; /* don't pass negative count to executor */
939 : }
940 : else
941 371230 : direction = ForwardScanDirection;
942 :
943 : /* In the executor, zero count processes all rows */
944 374990 : if (count == FETCH_ALL)
945 325932 : count = 0;
946 :
947 374990 : if (portal->holdStore)
948 71296 : nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
949 : else
950 : {
951 303694 : PushActiveSnapshot(queryDesc->snapshot);
952 303694 : ExecutorRun(queryDesc, direction, (uint64) count);
953 296630 : nprocessed = queryDesc->estate->es_processed;
954 296630 : PopActiveSnapshot();
955 : }
956 :
957 367926 : if (!ScanDirectionIsNoMovement(direction))
958 : {
959 364166 : if (nprocessed > 0)
960 304334 : portal->atStart = false; /* OK to go backward now */
961 364166 : if (count == 0 || nprocessed < (uint64) count)
962 332908 : portal->atEnd = true; /* we retrieved 'em all */
963 364166 : portal->portalPos += nprocessed;
964 : }
965 : }
966 : else
967 : {
968 604 : if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
969 24 : ereport(ERROR,
970 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
971 : errmsg("cursor can only scan forward"),
972 : errhint("Declare it with SCROLL option to enable backward scan.")));
973 :
974 580 : if (portal->atStart || count <= 0)
975 : {
976 72 : direction = NoMovementScanDirection;
977 72 : count = 0; /* don't pass negative count to executor */
978 : }
979 : else
980 508 : direction = BackwardScanDirection;
981 :
982 : /* In the executor, zero count processes all rows */
983 580 : if (count == FETCH_ALL)
984 60 : count = 0;
985 :
986 580 : if (portal->holdStore)
987 12 : nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
988 : else
989 : {
990 568 : PushActiveSnapshot(queryDesc->snapshot);
991 568 : ExecutorRun(queryDesc, direction, (uint64) count);
992 568 : nprocessed = queryDesc->estate->es_processed;
993 568 : PopActiveSnapshot();
994 : }
995 :
996 580 : if (!ScanDirectionIsNoMovement(direction))
997 : {
998 508 : if (nprocessed > 0 && portal->atEnd)
999 : {
1000 154 : portal->atEnd = false; /* OK to go forward now */
1001 154 : portal->portalPos++; /* adjust for endpoint case */
1002 : }
1003 508 : if (count == 0 || nprocessed < (uint64) count)
1004 : {
1005 186 : portal->atStart = true; /* we retrieved 'em all */
1006 186 : portal->portalPos = 0;
1007 : }
1008 : else
1009 : {
1010 322 : portal->portalPos -= nprocessed;
1011 : }
1012 : }
1013 : }
1014 :
1015 368506 : return nprocessed;
1016 : }
1017 :
1018 : /*
1019 : * FillPortalStore
1020 : * Run the query and load result tuples into the portal's tuple store.
1021 : *
1022 : * This is used for PORTAL_ONE_RETURNING, PORTAL_ONE_MOD_WITH, and
1023 : * PORTAL_UTIL_SELECT cases only.
1024 : */
1025 : static void
1026 47822 : FillPortalStore(Portal portal, bool isTopLevel)
1027 : {
1028 : DestReceiver *treceiver;
1029 : QueryCompletion qc;
1030 :
1031 47822 : InitializeQueryCompletion(&qc);
1032 47822 : PortalCreateHoldStore(portal);
1033 47822 : treceiver = CreateDestReceiver(DestTuplestore);
1034 47822 : SetTuplestoreDestReceiverParams(treceiver,
1035 : portal->holdStore,
1036 : portal->holdContext,
1037 : false,
1038 : NULL,
1039 : NULL);
1040 :
1041 47822 : switch (portal->strategy)
1042 : {
1043 3044 : case PORTAL_ONE_RETURNING:
1044 : case PORTAL_ONE_MOD_WITH:
1045 :
1046 : /*
1047 : * Run the portal to completion just as for the default
1048 : * PORTAL_MULTI_QUERY case, but send the primary query's output to
1049 : * the tuplestore. Auxiliary query outputs are discarded. Set the
1050 : * portal's holdSnapshot to the snapshot used (or a copy of it).
1051 : */
1052 3044 : PortalRunMulti(portal, isTopLevel, true,
1053 : treceiver, None_Receiver, &qc);
1054 2910 : break;
1055 :
1056 44778 : case PORTAL_UTIL_SELECT:
1057 44778 : PortalRunUtility(portal, linitial_node(PlannedStmt, portal->stmts),
1058 : isTopLevel, true, treceiver, &qc);
1059 44534 : break;
1060 :
1061 0 : default:
1062 0 : elog(ERROR, "unsupported portal strategy: %d",
1063 : (int) portal->strategy);
1064 : break;
1065 : }
1066 :
1067 : /* Override portal completion data with actual command results */
1068 47444 : if (qc.commandTag != CMDTAG_UNKNOWN)
1069 23502 : CopyQueryCompletion(&portal->qc, &qc);
1070 :
1071 47444 : treceiver->rDestroy(treceiver);
1072 47444 : }
1073 :
1074 : /*
1075 : * RunFromStore
1076 : * Fetch tuples from the portal's tuple store.
1077 : *
1078 : * Calling conventions are similar to ExecutorRun, except that we
1079 : * do not depend on having a queryDesc or estate. Therefore we return the
1080 : * number of tuples processed as the result, not in estate->es_processed.
1081 : *
1082 : * One difference from ExecutorRun is that the destination receiver functions
1083 : * are run in the caller's memory context (since we have no estate). Watch
1084 : * out for memory leaks.
1085 : */
1086 : static uint64
1087 71308 : RunFromStore(Portal portal, ScanDirection direction, uint64 count,
1088 : DestReceiver *dest)
1089 : {
1090 71308 : uint64 current_tuple_count = 0;
1091 : TupleTableSlot *slot;
1092 :
1093 71308 : slot = MakeSingleTupleTableSlot(portal->tupDesc, &TTSOpsMinimalTuple);
1094 :
1095 71308 : dest->rStartup(dest, CMD_SELECT, portal->tupDesc);
1096 :
1097 71308 : if (ScanDirectionIsNoMovement(direction))
1098 : {
1099 : /* do nothing except start/stop the destination */
1100 : }
1101 : else
1102 : {
1103 68672 : bool forward = ScanDirectionIsForward(direction);
1104 :
1105 : for (;;)
1106 408136 : {
1107 : MemoryContext oldcontext;
1108 : bool ok;
1109 :
1110 476808 : oldcontext = MemoryContextSwitchTo(portal->holdContext);
1111 :
1112 476808 : ok = tuplestore_gettupleslot(portal->holdStore, forward, false,
1113 : slot);
1114 :
1115 476808 : MemoryContextSwitchTo(oldcontext);
1116 :
1117 476808 : if (!ok)
1118 47498 : break;
1119 :
1120 : /*
1121 : * If we are not able to send the tuple, we assume the destination
1122 : * has closed and no more tuples can be sent. If that's the case,
1123 : * end the loop.
1124 : */
1125 429310 : if (!dest->receiveSlot(slot, dest))
1126 0 : break;
1127 :
1128 429310 : ExecClearTuple(slot);
1129 :
1130 : /*
1131 : * check our tuple count.. if we've processed the proper number
1132 : * then quit, else loop again and process more tuples. Zero count
1133 : * means no limit.
1134 : */
1135 429310 : current_tuple_count++;
1136 429310 : if (count && count == current_tuple_count)
1137 21174 : break;
1138 : }
1139 : }
1140 :
1141 71308 : dest->rShutdown(dest);
1142 :
1143 71308 : ExecDropSingleTupleTableSlot(slot);
1144 :
1145 71308 : return current_tuple_count;
1146 : }
1147 :
1148 : /*
1149 : * PortalRunUtility
1150 : * Execute a utility statement inside a portal.
1151 : */
1152 : static void
1153 339336 : PortalRunUtility(Portal portal, PlannedStmt *pstmt,
1154 : bool isTopLevel, bool setHoldSnapshot,
1155 : DestReceiver *dest, QueryCompletion *qc)
1156 : {
1157 : /*
1158 : * Set snapshot if utility stmt needs one.
1159 : */
1160 339336 : if (PlannedStmtRequiresSnapshot(pstmt))
1161 : {
1162 271042 : Snapshot snapshot = GetTransactionSnapshot();
1163 :
1164 : /* If told to, register the snapshot we're using and save in portal */
1165 271042 : if (setHoldSnapshot)
1166 : {
1167 38326 : snapshot = RegisterSnapshot(snapshot);
1168 38326 : portal->holdSnapshot = snapshot;
1169 : }
1170 :
1171 : /*
1172 : * In any case, make the snapshot active and remember it in portal.
1173 : * Because the portal now references the snapshot, we must tell
1174 : * snapmgr.c that the snapshot belongs to the portal's transaction
1175 : * level, else we risk portalSnapshot becoming a dangling pointer.
1176 : */
1177 271042 : PushActiveSnapshotWithLevel(snapshot, portal->createLevel);
1178 : /* PushActiveSnapshotWithLevel might have copied the snapshot */
1179 271042 : portal->portalSnapshot = GetActiveSnapshot();
1180 : }
1181 : else
1182 68294 : portal->portalSnapshot = NULL;
1183 :
1184 339336 : ProcessUtility(pstmt,
1185 : portal->sourceText,
1186 339336 : (portal->cplan != NULL), /* protect tree if in plancache */
1187 339336 : isTopLevel ? PROCESS_UTILITY_TOPLEVEL : PROCESS_UTILITY_QUERY,
1188 : portal->portalParams,
1189 : portal->queryEnv,
1190 : dest,
1191 : qc);
1192 :
1193 : /* Some utility statements may change context on us */
1194 324172 : MemoryContextSwitchTo(portal->portalContext);
1195 :
1196 : /*
1197 : * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from
1198 : * under us, so don't complain if it's now empty. Otherwise, our snapshot
1199 : * should be the top one; pop it. Note that this could be a different
1200 : * snapshot from the one we made above; see EnsurePortalSnapshotExists.
1201 : */
1202 324172 : if (portal->portalSnapshot != NULL && ActiveSnapshotSet())
1203 : {
1204 : Assert(portal->portalSnapshot == GetActiveSnapshot());
1205 248188 : PopActiveSnapshot();
1206 : }
1207 324172 : portal->portalSnapshot = NULL;
1208 324172 : }
1209 :
1210 : /*
1211 : * PortalRunMulti
1212 : * Execute a portal's queries in the general case (multi queries
1213 : * or non-SELECT-like queries)
1214 : */
1215 : static void
1216 384556 : PortalRunMulti(Portal portal,
1217 : bool isTopLevel, bool setHoldSnapshot,
1218 : DestReceiver *dest, DestReceiver *altdest,
1219 : QueryCompletion *qc)
1220 : {
1221 384556 : bool active_snapshot_set = false;
1222 : ListCell *stmtlist_item;
1223 384556 : int query_index = 0;
1224 :
1225 : /*
1226 : * If the destination is DestRemoteExecute, change to DestNone. The
1227 : * reason is that the client won't be expecting any tuples, and indeed has
1228 : * no way to know what they are, since there is no provision for Describe
1229 : * to send a RowDescription message when this portal execution strategy is
1230 : * in effect. This presently will only affect SELECT commands added to
1231 : * non-SELECT queries by rewrite rules: such commands will be executed,
1232 : * but the results will be discarded unless you use "simple Query"
1233 : * protocol.
1234 : */
1235 384556 : if (dest->mydest == DestRemoteExecute)
1236 12544 : dest = None_Receiver;
1237 384556 : if (altdest->mydest == DestRemoteExecute)
1238 12544 : altdest = None_Receiver;
1239 :
1240 : /*
1241 : * Loop to handle the individual queries generated from a single parsetree
1242 : * by analysis and rewrite.
1243 : */
1244 749244 : foreach(stmtlist_item, portal->stmts)
1245 : {
1246 385144 : PlannedStmt *pstmt = lfirst_node(PlannedStmt, stmtlist_item);
1247 :
1248 : /*
1249 : * If we got a cancel signal in prior command, quit
1250 : */
1251 385144 : CHECK_FOR_INTERRUPTS();
1252 :
1253 385144 : if (pstmt->utilityStmt == NULL)
1254 : {
1255 : /*
1256 : * process a plannable query.
1257 : */
1258 : TRACE_POSTGRESQL_QUERY_EXECUTE_START();
1259 :
1260 90586 : if (log_executor_stats)
1261 0 : ResetUsage();
1262 :
1263 : /*
1264 : * Must always have a snapshot for plannable queries. First time
1265 : * through, take a new snapshot; for subsequent queries in the
1266 : * same portal, just update the snapshot's copy of the command
1267 : * counter.
1268 : */
1269 90586 : if (!active_snapshot_set)
1270 : {
1271 89998 : Snapshot snapshot = GetTransactionSnapshot();
1272 :
1273 : /* If told to, register the snapshot and save in portal */
1274 89998 : if (setHoldSnapshot)
1275 : {
1276 3044 : snapshot = RegisterSnapshot(snapshot);
1277 3044 : portal->holdSnapshot = snapshot;
1278 : }
1279 :
1280 : /*
1281 : * We can't have the holdSnapshot also be the active one,
1282 : * because UpdateActiveSnapshotCommandId would complain. So
1283 : * force an extra snapshot copy. Plain PushActiveSnapshot
1284 : * would have copied the transaction snapshot anyway, so this
1285 : * only adds a copy step when setHoldSnapshot is true. (It's
1286 : * okay for the command ID of the active snapshot to diverge
1287 : * from what holdSnapshot has.)
1288 : */
1289 89998 : PushCopiedSnapshot(snapshot);
1290 :
1291 : /*
1292 : * As for PORTAL_ONE_SELECT portals, it does not seem
1293 : * necessary to maintain portal->portalSnapshot here.
1294 : */
1295 :
1296 89998 : active_snapshot_set = true;
1297 : }
1298 : else
1299 588 : UpdateActiveSnapshotCommandId();
1300 :
1301 90586 : if (pstmt->canSetTag)
1302 : {
1303 : /* statement can set tag string */
1304 89956 : ProcessQuery(pstmt,
1305 : portal->cplan,
1306 : portal->plansource,
1307 : query_index,
1308 : portal->sourceText,
1309 : portal->portalParams,
1310 : portal->queryEnv,
1311 : dest, qc);
1312 : }
1313 : else
1314 : {
1315 : /* stmt added by rewrite cannot set tag */
1316 630 : ProcessQuery(pstmt,
1317 : portal->cplan,
1318 : portal->plansource,
1319 : query_index,
1320 : portal->sourceText,
1321 : portal->portalParams,
1322 : portal->queryEnv,
1323 : altdest, NULL);
1324 : }
1325 :
1326 85050 : if (log_executor_stats)
1327 0 : ShowUsage("EXECUTOR STATISTICS");
1328 :
1329 : TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
1330 : }
1331 : else
1332 : {
1333 : /*
1334 : * process utility functions (create, destroy, etc..)
1335 : *
1336 : * We must not set a snapshot here for utility commands (if one is
1337 : * needed, PortalRunUtility will do it). If a utility command is
1338 : * alone in a portal then everything's fine. The only case where
1339 : * a utility command can be part of a longer list is that rules
1340 : * are allowed to include NotifyStmt. NotifyStmt doesn't care
1341 : * whether it has a snapshot or not, so we just leave the current
1342 : * snapshot alone if we have one.
1343 : */
1344 294558 : if (pstmt->canSetTag)
1345 : {
1346 : Assert(!active_snapshot_set);
1347 : /* statement can set tag string */
1348 294558 : PortalRunUtility(portal, pstmt, isTopLevel, false,
1349 : dest, qc);
1350 : }
1351 : else
1352 : {
1353 : Assert(IsA(pstmt->utilityStmt, NotifyStmt));
1354 : /* stmt added by rewrite cannot set tag */
1355 0 : PortalRunUtility(portal, pstmt, isTopLevel, false,
1356 : altdest, NULL);
1357 : }
1358 : }
1359 :
1360 : /*
1361 : * Clear subsidiary contexts to recover temporary memory.
1362 : */
1363 : Assert(portal->portalContext == CurrentMemoryContext);
1364 :
1365 364688 : MemoryContextDeleteChildren(portal->portalContext);
1366 :
1367 : /*
1368 : * Avoid crashing if portal->stmts has been reset. This can only
1369 : * occur if a CALL or DO utility statement executed an internal
1370 : * COMMIT/ROLLBACK (cf PortalReleaseCachedPlan). The CALL or DO must
1371 : * have been the only statement in the portal, so there's nothing left
1372 : * for us to do; but we don't want to dereference a now-dangling list
1373 : * pointer.
1374 : */
1375 364688 : if (portal->stmts == NIL)
1376 0 : break;
1377 :
1378 : /*
1379 : * Increment command counter between queries, but not after the last
1380 : * one.
1381 : */
1382 364688 : if (lnext(portal->stmts, stmtlist_item) != NULL)
1383 588 : CommandCounterIncrement();
1384 :
1385 364688 : query_index++;
1386 : }
1387 :
1388 : /* Pop the snapshot if we pushed one. */
1389 364100 : if (active_snapshot_set)
1390 84462 : PopActiveSnapshot();
1391 :
1392 : /*
1393 : * If a query completion data was supplied, use it. Otherwise use the
1394 : * portal's query completion data.
1395 : *
1396 : * Exception: Clients expect INSERT/UPDATE/DELETE tags to have counts, so
1397 : * fake them with zeros. This can happen with DO INSTEAD rules if there
1398 : * is no replacement query of the same type as the original. We print "0
1399 : * 0" here because technically there is no query of the matching tag type,
1400 : * and printing a non-zero count for a different query type seems wrong,
1401 : * e.g. an INSERT that does an UPDATE instead should not print "0 1" if
1402 : * one row was updated. See QueryRewrite(), step 3, for details.
1403 : */
1404 364100 : if (qc && qc->commandTag == CMDTAG_UNKNOWN)
1405 : {
1406 267558 : if (portal->qc.commandTag != CMDTAG_UNKNOWN)
1407 267558 : CopyQueryCompletion(qc, &portal->qc);
1408 : /* If the caller supplied a qc, we should have set it by now. */
1409 : Assert(qc->commandTag != CMDTAG_UNKNOWN);
1410 : }
1411 364100 : }
1412 :
1413 : /*
1414 : * PortalRunFetch
1415 : * Variant form of PortalRun that supports SQL FETCH directions.
1416 : *
1417 : * Note: we presently assume that no callers of this want isTopLevel = true.
1418 : *
1419 : * count <= 0 is interpreted as a no-op: the destination gets started up
1420 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
1421 : * interpreted as "all rows". (cf FetchStmt.howMany)
1422 : *
1423 : * Returns number of rows processed (suitable for use in result tag)
1424 : */
1425 : uint64
1426 49912 : PortalRunFetch(Portal portal,
1427 : FetchDirection fdirection,
1428 : long count,
1429 : DestReceiver *dest)
1430 : {
1431 : uint64 result;
1432 : Portal saveActivePortal;
1433 : ResourceOwner saveResourceOwner;
1434 : MemoryContext savePortalContext;
1435 : MemoryContext oldContext;
1436 :
1437 : Assert(PortalIsValid(portal));
1438 :
1439 : /*
1440 : * Check for improper portal use, and mark portal active.
1441 : */
1442 49912 : MarkPortalActive(portal);
1443 :
1444 : /*
1445 : * Set up global portal context pointers.
1446 : */
1447 49894 : saveActivePortal = ActivePortal;
1448 49894 : saveResourceOwner = CurrentResourceOwner;
1449 49894 : savePortalContext = PortalContext;
1450 49894 : PG_TRY();
1451 : {
1452 49894 : ActivePortal = portal;
1453 49894 : if (portal->resowner)
1454 49702 : CurrentResourceOwner = portal->resowner;
1455 49894 : PortalContext = portal->portalContext;
1456 :
1457 49894 : oldContext = MemoryContextSwitchTo(PortalContext);
1458 :
1459 49894 : switch (portal->strategy)
1460 : {
1461 18160 : case PORTAL_ONE_SELECT:
1462 18160 : result = DoPortalRunFetch(portal, fdirection, count, dest);
1463 18114 : break;
1464 :
1465 31734 : case PORTAL_ONE_RETURNING:
1466 : case PORTAL_ONE_MOD_WITH:
1467 : case PORTAL_UTIL_SELECT:
1468 :
1469 : /*
1470 : * If we have not yet run the command, do so, storing its
1471 : * results in the portal's tuplestore.
1472 : */
1473 31734 : if (!portal->holdStore)
1474 8056 : FillPortalStore(portal, false /* isTopLevel */ );
1475 :
1476 : /*
1477 : * Now fetch desired portion of results.
1478 : */
1479 31728 : result = DoPortalRunFetch(portal, fdirection, count, dest);
1480 31728 : break;
1481 :
1482 0 : default:
1483 0 : elog(ERROR, "unsupported portal strategy");
1484 : result = 0; /* keep compiler quiet */
1485 : break;
1486 : }
1487 : }
1488 52 : PG_CATCH();
1489 : {
1490 : /* Uncaught error while executing portal: mark it dead */
1491 52 : MarkPortalFailed(portal);
1492 :
1493 : /* Restore global vars and propagate error */
1494 52 : ActivePortal = saveActivePortal;
1495 52 : CurrentResourceOwner = saveResourceOwner;
1496 52 : PortalContext = savePortalContext;
1497 :
1498 52 : PG_RE_THROW();
1499 : }
1500 49842 : PG_END_TRY();
1501 :
1502 49842 : MemoryContextSwitchTo(oldContext);
1503 :
1504 : /* Mark portal not active */
1505 49842 : portal->status = PORTAL_READY;
1506 :
1507 49842 : ActivePortal = saveActivePortal;
1508 49842 : CurrentResourceOwner = saveResourceOwner;
1509 49842 : PortalContext = savePortalContext;
1510 :
1511 49842 : return result;
1512 : }
1513 :
1514 : /*
1515 : * DoPortalRunFetch
1516 : * Guts of PortalRunFetch --- the portal context is already set up
1517 : *
1518 : * Here, count < 0 typically reverses the direction. Also, count == FETCH_ALL
1519 : * is interpreted as "all rows". (cf FetchStmt.howMany)
1520 : *
1521 : * Returns number of rows processed (suitable for use in result tag)
1522 : */
1523 : static uint64
1524 49888 : DoPortalRunFetch(Portal portal,
1525 : FetchDirection fdirection,
1526 : long count,
1527 : DestReceiver *dest)
1528 : {
1529 : bool forward;
1530 :
1531 : Assert(portal->strategy == PORTAL_ONE_SELECT ||
1532 : portal->strategy == PORTAL_ONE_RETURNING ||
1533 : portal->strategy == PORTAL_ONE_MOD_WITH ||
1534 : portal->strategy == PORTAL_UTIL_SELECT);
1535 :
1536 : /*
1537 : * Note: we disallow backwards fetch (including re-fetch of current row)
1538 : * for NO SCROLL cursors, but we interpret that very loosely: you can use
1539 : * any of the FetchDirection options, so long as the end result is to move
1540 : * forwards by at least one row. Currently it's sufficient to check for
1541 : * NO SCROLL in DoPortalRewind() and in the forward == false path in
1542 : * PortalRunSelect(); but someday we might prefer to account for that
1543 : * restriction explicitly here.
1544 : */
1545 49888 : switch (fdirection)
1546 : {
1547 49148 : case FETCH_FORWARD:
1548 49148 : if (count < 0)
1549 : {
1550 0 : fdirection = FETCH_BACKWARD;
1551 0 : count = -count;
1552 : }
1553 : /* fall out of switch to share code with FETCH_BACKWARD */
1554 49148 : break;
1555 496 : case FETCH_BACKWARD:
1556 496 : if (count < 0)
1557 : {
1558 0 : fdirection = FETCH_FORWARD;
1559 0 : count = -count;
1560 : }
1561 : /* fall out of switch to share code with FETCH_FORWARD */
1562 496 : break;
1563 160 : case FETCH_ABSOLUTE:
1564 160 : if (count > 0)
1565 : {
1566 : /*
1567 : * Definition: Rewind to start, advance count-1 rows, return
1568 : * next row (if any).
1569 : *
1570 : * In practice, if the goal is less than halfway back to the
1571 : * start, it's better to scan from where we are.
1572 : *
1573 : * Also, if current portalPos is outside the range of "long",
1574 : * do it the hard way to avoid possible overflow of the count
1575 : * argument to PortalRunSelect. We must exclude exactly
1576 : * LONG_MAX, as well, lest the count look like FETCH_ALL.
1577 : *
1578 : * In any case, we arrange to fetch the target row going
1579 : * forwards.
1580 : */
1581 90 : if ((uint64) (count - 1) <= portal->portalPos / 2 ||
1582 36 : portal->portalPos >= (uint64) LONG_MAX)
1583 : {
1584 54 : DoPortalRewind(portal);
1585 48 : if (count > 1)
1586 0 : PortalRunSelect(portal, true, count - 1,
1587 : None_Receiver);
1588 : }
1589 : else
1590 : {
1591 36 : long pos = (long) portal->portalPos;
1592 :
1593 36 : if (portal->atEnd)
1594 0 : pos++; /* need one extra fetch if off end */
1595 36 : if (count <= pos)
1596 12 : PortalRunSelect(portal, false, pos - count + 1,
1597 : None_Receiver);
1598 24 : else if (count > pos + 1)
1599 12 : PortalRunSelect(portal, true, count - pos - 1,
1600 : None_Receiver);
1601 : }
1602 78 : return PortalRunSelect(portal, true, 1L, dest);
1603 : }
1604 70 : else if (count < 0)
1605 : {
1606 : /*
1607 : * Definition: Advance to end, back up abs(count)-1 rows,
1608 : * return prior row (if any). We could optimize this if we
1609 : * knew in advance where the end was, but typically we won't.
1610 : * (Is it worth considering case where count > half of size of
1611 : * query? We could rewind once we know the size ...)
1612 : */
1613 54 : PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
1614 54 : if (count < -1)
1615 0 : PortalRunSelect(portal, false, -count - 1, None_Receiver);
1616 54 : return PortalRunSelect(portal, false, 1L, dest);
1617 : }
1618 : else
1619 : {
1620 : /* count == 0 */
1621 : /* Rewind to start, return zero rows */
1622 16 : DoPortalRewind(portal);
1623 16 : return PortalRunSelect(portal, true, 0L, dest);
1624 : }
1625 : break;
1626 84 : case FETCH_RELATIVE:
1627 84 : if (count > 0)
1628 : {
1629 : /*
1630 : * Definition: advance count-1 rows, return next row (if any).
1631 : */
1632 36 : if (count > 1)
1633 24 : PortalRunSelect(portal, true, count - 1, None_Receiver);
1634 36 : return PortalRunSelect(portal, true, 1L, dest);
1635 : }
1636 48 : else if (count < 0)
1637 : {
1638 : /*
1639 : * Definition: back up abs(count)-1 rows, return prior row (if
1640 : * any).
1641 : */
1642 30 : if (count < -1)
1643 18 : PortalRunSelect(portal, false, -count - 1, None_Receiver);
1644 30 : return PortalRunSelect(portal, false, 1L, dest);
1645 : }
1646 : else
1647 : {
1648 : /* count == 0 */
1649 : /* Same as FETCH FORWARD 0, so fall out of switch */
1650 18 : fdirection = FETCH_FORWARD;
1651 : }
1652 18 : break;
1653 0 : default:
1654 0 : elog(ERROR, "bogus direction");
1655 : break;
1656 : }
1657 :
1658 : /*
1659 : * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, and count
1660 : * >= 0.
1661 : */
1662 49662 : forward = (fdirection == FETCH_FORWARD);
1663 :
1664 : /*
1665 : * Zero count means to re-fetch the current row, if any (per SQL)
1666 : */
1667 49662 : if (count == 0)
1668 : {
1669 : bool on_row;
1670 :
1671 : /* Are we sitting on a row? */
1672 18 : on_row = (!portal->atStart && !portal->atEnd);
1673 :
1674 18 : if (dest->mydest == DestNone)
1675 : {
1676 : /* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */
1677 0 : return on_row ? 1 : 0;
1678 : }
1679 : else
1680 : {
1681 : /*
1682 : * If we are sitting on a row, back up one so we can re-fetch it.
1683 : * If we are not sitting on a row, we still have to start up and
1684 : * shut down the executor so that the destination is initialized
1685 : * and shut down correctly; so keep going. To PortalRunSelect,
1686 : * count == 0 means we will retrieve no row.
1687 : */
1688 18 : if (on_row)
1689 : {
1690 18 : PortalRunSelect(portal, false, 1L, None_Receiver);
1691 : /* Set up to fetch one row forward */
1692 12 : count = 1;
1693 12 : forward = true;
1694 : }
1695 : }
1696 : }
1697 :
1698 : /*
1699 : * Optimize MOVE BACKWARD ALL into a Rewind.
1700 : */
1701 49656 : if (!forward && count == FETCH_ALL && dest->mydest == DestNone)
1702 : {
1703 24 : uint64 result = portal->portalPos;
1704 :
1705 24 : if (result > 0 && !portal->atEnd)
1706 0 : result--;
1707 24 : DoPortalRewind(portal);
1708 24 : return result;
1709 : }
1710 :
1711 49632 : return PortalRunSelect(portal, forward, count, dest);
1712 : }
1713 :
1714 : /*
1715 : * DoPortalRewind - rewind a Portal to starting point
1716 : */
1717 : static void
1718 94 : DoPortalRewind(Portal portal)
1719 : {
1720 : QueryDesc *queryDesc;
1721 :
1722 : /*
1723 : * No work is needed if we've not advanced nor attempted to advance the
1724 : * cursor (and we don't want to throw a NO SCROLL error in this case).
1725 : */
1726 94 : if (portal->atStart && !portal->atEnd)
1727 18 : return;
1728 :
1729 : /* Otherwise, cursor must allow scrolling */
1730 76 : if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
1731 6 : ereport(ERROR,
1732 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1733 : errmsg("cursor can only scan forward"),
1734 : errhint("Declare it with SCROLL option to enable backward scan.")));
1735 :
1736 : /* Rewind holdStore, if we have one */
1737 70 : if (portal->holdStore)
1738 : {
1739 : MemoryContext oldcontext;
1740 :
1741 6 : oldcontext = MemoryContextSwitchTo(portal->holdContext);
1742 6 : tuplestore_rescan(portal->holdStore);
1743 6 : MemoryContextSwitchTo(oldcontext);
1744 : }
1745 :
1746 : /* Rewind executor, if active */
1747 70 : queryDesc = portal->queryDesc;
1748 70 : if (queryDesc)
1749 : {
1750 64 : PushActiveSnapshot(queryDesc->snapshot);
1751 64 : ExecutorRewind(queryDesc);
1752 64 : PopActiveSnapshot();
1753 : }
1754 :
1755 70 : portal->atStart = true;
1756 70 : portal->atEnd = false;
1757 70 : portal->portalPos = 0;
1758 : }
1759 :
1760 : /*
1761 : * PlannedStmtRequiresSnapshot - what it says on the tin
1762 : */
1763 : bool
1764 447380 : PlannedStmtRequiresSnapshot(PlannedStmt *pstmt)
1765 : {
1766 447380 : Node *utilityStmt = pstmt->utilityStmt;
1767 :
1768 : /* If it's not a utility statement, it definitely needs a snapshot */
1769 447380 : if (utilityStmt == NULL)
1770 82922 : return true;
1771 :
1772 : /*
1773 : * Most utility statements need a snapshot, and the default presumption
1774 : * about new ones should be that they do too. Hence, enumerate those that
1775 : * do not need one.
1776 : *
1777 : * Transaction control, LOCK, and SET must *not* set a snapshot, since
1778 : * they need to be executable at the start of a transaction-snapshot-mode
1779 : * transaction without freezing a snapshot. By extension we allow SHOW
1780 : * not to set a snapshot. The other stmts listed are just efficiency
1781 : * hacks. Beware of listing anything that can modify the database --- if,
1782 : * say, it has to update an index with expressions that invoke
1783 : * user-defined functions, then it had better have a snapshot.
1784 : */
1785 364458 : if (IsA(utilityStmt, TransactionStmt) ||
1786 329072 : IsA(utilityStmt, LockStmt) ||
1787 327970 : IsA(utilityStmt, VariableSetStmt) ||
1788 292470 : IsA(utilityStmt, VariableShowStmt) ||
1789 291672 : IsA(utilityStmt, ConstraintsSetStmt) ||
1790 : /* efficiency hacks from here down */
1791 291570 : IsA(utilityStmt, FetchStmt) ||
1792 285814 : IsA(utilityStmt, ListenStmt) ||
1793 285740 : IsA(utilityStmt, NotifyStmt) ||
1794 285652 : IsA(utilityStmt, UnlistenStmt) ||
1795 285614 : IsA(utilityStmt, CheckPointStmt))
1796 79052 : return false;
1797 :
1798 285406 : return true;
1799 : }
1800 :
1801 : /*
1802 : * EnsurePortalSnapshotExists - recreate Portal-level snapshot, if needed
1803 : *
1804 : * Generally, we will have an active snapshot whenever we are executing
1805 : * inside a Portal, unless the Portal's query is one of the utility
1806 : * statements exempted from that rule (see PlannedStmtRequiresSnapshot).
1807 : * However, procedures and DO blocks can commit or abort the transaction,
1808 : * and thereby destroy all snapshots. This function can be called to
1809 : * re-establish the Portal-level snapshot when none exists.
1810 : */
1811 : void
1812 401528 : EnsurePortalSnapshotExists(void)
1813 : {
1814 : Portal portal;
1815 :
1816 : /*
1817 : * Nothing to do if a snapshot is set. (We take it on faith that the
1818 : * outermost active snapshot belongs to some Portal; or if there is no
1819 : * Portal, it's somebody else's responsibility to manage things.)
1820 : */
1821 401528 : if (ActiveSnapshotSet())
1822 397078 : return;
1823 :
1824 : /* Otherwise, we'd better have an active Portal */
1825 4450 : portal = ActivePortal;
1826 4450 : if (unlikely(portal == NULL))
1827 0 : elog(ERROR, "cannot execute SQL without an outer snapshot or portal");
1828 : Assert(portal->portalSnapshot == NULL);
1829 :
1830 : /*
1831 : * Create a new snapshot, make it active, and remember it in portal.
1832 : * Because the portal now references the snapshot, we must tell snapmgr.c
1833 : * that the snapshot belongs to the portal's transaction level, else we
1834 : * risk portalSnapshot becoming a dangling pointer.
1835 : */
1836 4450 : PushActiveSnapshotWithLevel(GetTransactionSnapshot(), portal->createLevel);
1837 : /* PushActiveSnapshotWithLevel might have copied the snapshot */
1838 4450 : portal->portalSnapshot = GetActiveSnapshot();
1839 : }
|