Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pquery.c
4 : * POSTGRES process query command code
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/tcop/pquery.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include <limits.h>
19 :
20 : #include "access/xact.h"
21 : #include "commands/prepare.h"
22 : #include "executor/executor.h"
23 : #include "executor/tstoreReceiver.h"
24 : #include "miscadmin.h"
25 : #include "pg_trace.h"
26 : #include "tcop/pquery.h"
27 : #include "tcop/utility.h"
28 : #include "utils/memutils.h"
29 : #include "utils/snapmgr.h"
30 :
31 :
32 : /*
33 : * ActivePortal is the currently executing Portal (the most closely nested,
34 : * if there are several).
35 : */
36 : Portal ActivePortal = NULL;
37 :
38 :
39 : static void ProcessQuery(PlannedStmt *plan,
40 : const char *sourceText,
41 : ParamListInfo params,
42 : QueryEnvironment *queryEnv,
43 : DestReceiver *dest,
44 : QueryCompletion *qc);
45 : static void FillPortalStore(Portal portal, bool isTopLevel);
46 : static uint64 RunFromStore(Portal portal, ScanDirection direction, uint64 count,
47 : DestReceiver *dest);
48 : static uint64 PortalRunSelect(Portal portal, bool forward, long count,
49 : DestReceiver *dest);
50 : static void PortalRunUtility(Portal portal, PlannedStmt *pstmt,
51 : bool isTopLevel, bool setHoldSnapshot,
52 : DestReceiver *dest, QueryCompletion *qc);
53 : static void PortalRunMulti(Portal portal,
54 : bool isTopLevel, bool setHoldSnapshot,
55 : DestReceiver *dest, DestReceiver *altdest,
56 : QueryCompletion *qc);
57 : static uint64 DoPortalRunFetch(Portal portal,
58 : FetchDirection fdirection,
59 : long count,
60 : DestReceiver *dest);
61 : static void DoPortalRewind(Portal portal);
62 :
63 :
64 : /*
65 : * CreateQueryDesc
66 : */
67 : QueryDesc *
68 589922 : CreateQueryDesc(PlannedStmt *plannedstmt,
69 : const char *sourceText,
70 : Snapshot snapshot,
71 : Snapshot crosscheck_snapshot,
72 : DestReceiver *dest,
73 : ParamListInfo params,
74 : QueryEnvironment *queryEnv,
75 : int instrument_options)
76 : {
77 589922 : QueryDesc *qd = (QueryDesc *) palloc(sizeof(QueryDesc));
78 :
79 589922 : qd->operation = plannedstmt->commandType; /* operation */
80 589922 : qd->plannedstmt = plannedstmt; /* plan */
81 589922 : qd->sourceText = sourceText; /* query text */
82 589922 : qd->snapshot = RegisterSnapshot(snapshot); /* snapshot */
83 : /* RI check snapshot */
84 589922 : qd->crosscheck_snapshot = RegisterSnapshot(crosscheck_snapshot);
85 589922 : qd->dest = dest; /* output dest */
86 589922 : qd->params = params; /* parameter values passed into query */
87 589922 : qd->queryEnv = queryEnv;
88 589922 : qd->instrument_options = instrument_options; /* instrumentation wanted? */
89 :
90 : /* null these fields until set by ExecutorStart */
91 589922 : qd->tupDesc = NULL;
92 589922 : qd->estate = NULL;
93 589922 : qd->planstate = NULL;
94 589922 : qd->totaltime = NULL;
95 :
96 : /* not yet executed */
97 589922 : qd->already_executed = false;
98 :
99 589922 : return qd;
100 : }
101 :
102 : /*
103 : * FreeQueryDesc
104 : */
105 : void
106 561042 : FreeQueryDesc(QueryDesc *qdesc)
107 : {
108 : /* Can't be a live query */
109 : Assert(qdesc->estate == NULL);
110 :
111 : /* forget our snapshots */
112 561042 : UnregisterSnapshot(qdesc->snapshot);
113 561042 : UnregisterSnapshot(qdesc->crosscheck_snapshot);
114 :
115 : /* Only the QueryDesc itself need be freed */
116 561042 : pfree(qdesc);
117 561042 : }
118 :
119 :
120 : /*
121 : * ProcessQuery
122 : * Execute a single plannable query within a PORTAL_MULTI_QUERY,
123 : * PORTAL_ONE_RETURNING, or PORTAL_ONE_MOD_WITH portal
124 : *
125 : * plan: the plan tree for the query
126 : * sourceText: the source text of the query
127 : * params: any parameters needed
128 : * dest: where to send results
129 : * qc: where to store the command completion status data.
130 : *
131 : * qc may be NULL if caller doesn't want a status string.
132 : *
133 : * Must be called in a memory context that will be reset or deleted on
134 : * error; otherwise the executor's memory usage will be leaked.
135 : */
136 : static void
137 91188 : ProcessQuery(PlannedStmt *plan,
138 : const char *sourceText,
139 : ParamListInfo params,
140 : QueryEnvironment *queryEnv,
141 : DestReceiver *dest,
142 : QueryCompletion *qc)
143 : {
144 : QueryDesc *queryDesc;
145 :
146 : /*
147 : * Create the QueryDesc object
148 : */
149 91188 : queryDesc = CreateQueryDesc(plan, sourceText,
150 : GetActiveSnapshot(), InvalidSnapshot,
151 : dest, params, queryEnv, 0);
152 :
153 : /*
154 : * Call ExecutorStart to prepare the plan for execution
155 : */
156 91188 : ExecutorStart(queryDesc, 0);
157 :
158 : /*
159 : * Run the plan to completion.
160 : */
161 89910 : ExecutorRun(queryDesc, ForwardScanDirection, 0);
162 :
163 : /*
164 : * Build command completion status data, if caller wants one.
165 : */
166 86716 : if (qc)
167 : {
168 86092 : switch (queryDesc->operation)
169 : {
170 116 : case CMD_SELECT:
171 116 : SetQueryCompletion(qc, CMDTAG_SELECT, queryDesc->estate->es_processed);
172 116 : break;
173 70486 : case CMD_INSERT:
174 70486 : SetQueryCompletion(qc, CMDTAG_INSERT, queryDesc->estate->es_processed);
175 70486 : break;
176 10960 : case CMD_UPDATE:
177 10960 : SetQueryCompletion(qc, CMDTAG_UPDATE, queryDesc->estate->es_processed);
178 10960 : break;
179 3462 : case CMD_DELETE:
180 3462 : SetQueryCompletion(qc, CMDTAG_DELETE, queryDesc->estate->es_processed);
181 3462 : break;
182 1068 : case CMD_MERGE:
183 1068 : SetQueryCompletion(qc, CMDTAG_MERGE, queryDesc->estate->es_processed);
184 1068 : break;
185 0 : default:
186 0 : SetQueryCompletion(qc, CMDTAG_UNKNOWN, queryDesc->estate->es_processed);
187 0 : break;
188 : }
189 : }
190 :
191 : /*
192 : * Now, we close down all the scans and free allocated resources.
193 : */
194 86716 : ExecutorFinish(queryDesc);
195 85638 : ExecutorEnd(queryDesc);
196 :
197 85638 : FreeQueryDesc(queryDesc);
198 85638 : }
199 :
200 : /*
201 : * ChoosePortalStrategy
202 : * Select portal execution strategy given the intended statement list.
203 : *
204 : * The list elements can be Querys or PlannedStmts.
205 : * That's more general than portals need, but plancache.c uses this too.
206 : *
207 : * See the comments in portal.h.
208 : */
209 : PortalStrategy
210 850154 : ChoosePortalStrategy(List *stmts)
211 : {
212 : int nSetTag;
213 : ListCell *lc;
214 :
215 : /*
216 : * PORTAL_ONE_SELECT and PORTAL_UTIL_SELECT need only consider the
217 : * single-statement case, since there are no rewrite rules that can add
218 : * auxiliary queries to a SELECT or a utility command. PORTAL_ONE_MOD_WITH
219 : * likewise allows only one top-level statement.
220 : */
221 850154 : if (list_length(stmts) == 1)
222 : {
223 849818 : Node *stmt = (Node *) linitial(stmts);
224 :
225 849818 : if (IsA(stmt, Query))
226 : {
227 84338 : Query *query = (Query *) stmt;
228 :
229 84338 : if (query->canSetTag)
230 : {
231 84338 : if (query->commandType == CMD_SELECT)
232 : {
233 55932 : if (query->hasModifyingCTE)
234 0 : return PORTAL_ONE_MOD_WITH;
235 : else
236 55932 : return PORTAL_ONE_SELECT;
237 : }
238 28406 : if (query->commandType == CMD_UTILITY)
239 : {
240 20282 : if (UtilityReturnsTuples(query->utilityStmt))
241 10050 : return PORTAL_UTIL_SELECT;
242 : /* it can't be ONE_RETURNING, so give up */
243 10232 : return PORTAL_MULTI_QUERY;
244 : }
245 : }
246 : }
247 765480 : else if (IsA(stmt, PlannedStmt))
248 : {
249 765480 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
250 :
251 765480 : if (pstmt->canSetTag)
252 : {
253 765444 : if (pstmt->commandType == CMD_SELECT)
254 : {
255 297646 : if (pstmt->hasModifyingCTE)
256 128 : return PORTAL_ONE_MOD_WITH;
257 : else
258 297518 : return PORTAL_ONE_SELECT;
259 : }
260 467798 : if (pstmt->commandType == CMD_UTILITY)
261 : {
262 377686 : if (UtilityReturnsTuples(pstmt->utilityStmt))
263 49788 : return PORTAL_UTIL_SELECT;
264 : /* it can't be ONE_RETURNING, so give up */
265 327898 : return PORTAL_MULTI_QUERY;
266 : }
267 : }
268 : }
269 : else
270 0 : elog(ERROR, "unrecognized node type: %d", (int) nodeTag(stmt));
271 : }
272 :
273 : /*
274 : * PORTAL_ONE_RETURNING has to allow auxiliary queries added by rewrite.
275 : * Choose PORTAL_ONE_RETURNING if there is exactly one canSetTag query and
276 : * it has a RETURNING list.
277 : */
278 98608 : nSetTag = 0;
279 102108 : foreach(lc, stmts)
280 : {
281 98776 : Node *stmt = (Node *) lfirst(lc);
282 :
283 98776 : if (IsA(stmt, Query))
284 : {
285 8136 : Query *query = (Query *) stmt;
286 :
287 8136 : if (query->canSetTag)
288 : {
289 8130 : if (++nSetTag > 1)
290 95276 : return PORTAL_MULTI_QUERY; /* no need to look further */
291 8130 : if (query->commandType == CMD_UTILITY ||
292 8130 : query->returningList == NIL)
293 7824 : return PORTAL_MULTI_QUERY; /* no need to look further */
294 : }
295 : }
296 90640 : else if (IsA(stmt, PlannedStmt))
297 : {
298 90640 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
299 :
300 90640 : if (pstmt->canSetTag)
301 : {
302 90430 : if (++nSetTag > 1)
303 0 : return PORTAL_MULTI_QUERY; /* no need to look further */
304 90430 : if (pstmt->commandType == CMD_UTILITY ||
305 90430 : !pstmt->hasReturning)
306 87452 : return PORTAL_MULTI_QUERY; /* no need to look further */
307 : }
308 : }
309 : else
310 0 : elog(ERROR, "unrecognized node type: %d", (int) nodeTag(stmt));
311 : }
312 3332 : if (nSetTag == 1)
313 3284 : return PORTAL_ONE_RETURNING;
314 :
315 : /* Else, it's the general case... */
316 48 : return PORTAL_MULTI_QUERY;
317 : }
318 :
319 : /*
320 : * FetchPortalTargetList
321 : * Given a portal that returns tuples, extract the query targetlist.
322 : * Returns NIL if the portal doesn't have a determinable targetlist.
323 : *
324 : * Note: do not modify the result.
325 : */
326 : List *
327 318704 : FetchPortalTargetList(Portal portal)
328 : {
329 : /* no point in looking if we determined it doesn't return tuples */
330 318704 : if (portal->strategy == PORTAL_MULTI_QUERY)
331 30 : return NIL;
332 : /* get the primary statement and find out what it returns */
333 318674 : return FetchStatementTargetList((Node *) PortalGetPrimaryStmt(portal));
334 : }
335 :
336 : /*
337 : * FetchStatementTargetList
338 : * Given a statement that returns tuples, extract the query targetlist.
339 : * Returns NIL if the statement doesn't have a determinable targetlist.
340 : *
341 : * This can be applied to a Query or a PlannedStmt.
342 : * That's more general than portals need, but plancache.c uses this too.
343 : *
344 : * Note: do not modify the result.
345 : *
346 : * XXX be careful to keep this in sync with UtilityReturnsTuples.
347 : */
348 : List *
349 338148 : FetchStatementTargetList(Node *stmt)
350 : {
351 338148 : if (stmt == NULL)
352 0 : return NIL;
353 338148 : if (IsA(stmt, Query))
354 : {
355 19474 : Query *query = (Query *) stmt;
356 :
357 19474 : if (query->commandType == CMD_UTILITY)
358 : {
359 : /* transfer attention to utility statement */
360 12 : stmt = query->utilityStmt;
361 : }
362 : else
363 : {
364 19462 : if (query->commandType == CMD_SELECT)
365 19450 : return query->targetList;
366 12 : if (query->returningList)
367 12 : return query->returningList;
368 0 : return NIL;
369 : }
370 : }
371 318686 : if (IsA(stmt, PlannedStmt))
372 : {
373 318674 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
374 :
375 318674 : if (pstmt->commandType == CMD_UTILITY)
376 : {
377 : /* transfer attention to utility statement */
378 41422 : stmt = pstmt->utilityStmt;
379 : }
380 : else
381 : {
382 277252 : if (pstmt->commandType == CMD_SELECT)
383 274424 : return pstmt->planTree->targetlist;
384 2828 : if (pstmt->hasReturning)
385 2828 : return pstmt->planTree->targetlist;
386 0 : return NIL;
387 : }
388 : }
389 41434 : if (IsA(stmt, FetchStmt))
390 : {
391 5654 : FetchStmt *fstmt = (FetchStmt *) stmt;
392 : Portal subportal;
393 :
394 : Assert(!fstmt->ismove);
395 5654 : subportal = GetPortalByName(fstmt->portalname);
396 : Assert(PortalIsValid(subportal));
397 5654 : return FetchPortalTargetList(subportal);
398 : }
399 35780 : if (IsA(stmt, ExecuteStmt))
400 : {
401 19346 : ExecuteStmt *estmt = (ExecuteStmt *) stmt;
402 : PreparedStatement *entry;
403 :
404 19346 : entry = FetchPreparedStatement(estmt->name, true);
405 19346 : return FetchPreparedStatementTargetList(entry);
406 : }
407 16434 : return NIL;
408 : }
409 :
410 : /*
411 : * PortalStart
412 : * Prepare a portal for execution.
413 : *
414 : * Caller must already have created the portal, done PortalDefineQuery(),
415 : * and adjusted portal options if needed.
416 : *
417 : * If parameters are needed by the query, they must be passed in "params"
418 : * (caller is responsible for giving them appropriate lifetime).
419 : *
420 : * The caller can also provide an initial set of "eflags" to be passed to
421 : * ExecutorStart (but note these can be modified internally, and they are
422 : * currently only honored for PORTAL_ONE_SELECT portals). Most callers
423 : * should simply pass zero.
424 : *
425 : * The caller can optionally pass a snapshot to be used; pass InvalidSnapshot
426 : * for the normal behavior of setting a new snapshot. This parameter is
427 : * presently ignored for non-PORTAL_ONE_SELECT portals (it's only intended
428 : * to be used for cursors).
429 : *
430 : * On return, portal is ready to accept PortalRun() calls, and the result
431 : * tupdesc (if any) is known.
432 : */
433 : void
434 765804 : PortalStart(Portal portal, ParamListInfo params,
435 : int eflags, Snapshot snapshot)
436 : {
437 : Portal saveActivePortal;
438 : ResourceOwner saveResourceOwner;
439 : MemoryContext savePortalContext;
440 : MemoryContext oldContext;
441 : QueryDesc *queryDesc;
442 : int myeflags;
443 :
444 : Assert(PortalIsValid(portal));
445 : Assert(portal->status == PORTAL_DEFINED);
446 :
447 : /*
448 : * Set up global portal context pointers.
449 : */
450 765804 : saveActivePortal = ActivePortal;
451 765804 : saveResourceOwner = CurrentResourceOwner;
452 765804 : savePortalContext = PortalContext;
453 765804 : PG_TRY();
454 : {
455 765804 : ActivePortal = portal;
456 765804 : if (portal->resowner)
457 765804 : CurrentResourceOwner = portal->resowner;
458 765804 : PortalContext = portal->portalContext;
459 :
460 765804 : oldContext = MemoryContextSwitchTo(PortalContext);
461 :
462 : /* Must remember portal param list, if any */
463 765804 : portal->portalParams = params;
464 :
465 : /*
466 : * Determine the portal execution strategy
467 : */
468 765804 : portal->strategy = ChoosePortalStrategy(portal->stmts);
469 :
470 : /*
471 : * Fire her up according to the strategy
472 : */
473 765804 : switch (portal->strategy)
474 : {
475 297518 : case PORTAL_ONE_SELECT:
476 :
477 : /* Must set snapshot before starting executor. */
478 297518 : if (snapshot)
479 28020 : PushActiveSnapshot(snapshot);
480 : else
481 269498 : PushActiveSnapshot(GetTransactionSnapshot());
482 :
483 : /*
484 : * We could remember the snapshot in portal->portalSnapshot,
485 : * but presently there seems no need to, as this code path
486 : * cannot be used for non-atomic execution. Hence there can't
487 : * be any commit/abort that might destroy the snapshot. Since
488 : * we don't do that, there's also no need to force a
489 : * non-default nesting level for the snapshot.
490 : */
491 :
492 : /*
493 : * Create QueryDesc in portal's context; for the moment, set
494 : * the destination to DestNone.
495 : */
496 297518 : queryDesc = CreateQueryDesc(linitial_node(PlannedStmt, portal->stmts),
497 : portal->sourceText,
498 : GetActiveSnapshot(),
499 : InvalidSnapshot,
500 : None_Receiver,
501 : params,
502 : portal->queryEnv,
503 : 0);
504 :
505 : /*
506 : * If it's a scrollable cursor, executor needs to support
507 : * REWIND and backwards scan, as well as whatever the caller
508 : * might've asked for.
509 : */
510 297518 : if (portal->cursorOptions & CURSOR_OPT_SCROLL)
511 3974 : myeflags = eflags | EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD;
512 : else
513 293544 : myeflags = eflags;
514 :
515 : /*
516 : * Call ExecutorStart to prepare the plan for execution
517 : */
518 297518 : ExecutorStart(queryDesc, myeflags);
519 :
520 : /*
521 : * This tells PortalCleanup to shut down the executor
522 : */
523 296860 : portal->queryDesc = queryDesc;
524 :
525 : /*
526 : * Remember tuple descriptor (computed by ExecutorStart)
527 : */
528 296860 : portal->tupDesc = queryDesc->tupDesc;
529 :
530 : /*
531 : * Reset cursor position data to "start of query"
532 : */
533 296860 : portal->atStart = true;
534 296860 : portal->atEnd = false; /* allow fetches */
535 296860 : portal->portalPos = 0;
536 :
537 296860 : PopActiveSnapshot();
538 296860 : break;
539 :
540 3106 : case PORTAL_ONE_RETURNING:
541 : case PORTAL_ONE_MOD_WITH:
542 :
543 : /*
544 : * We don't start the executor until we are told to run the
545 : * portal. We do need to set up the result tupdesc.
546 : */
547 : {
548 : PlannedStmt *pstmt;
549 :
550 3106 : pstmt = PortalGetPrimaryStmt(portal);
551 3106 : portal->tupDesc =
552 3106 : ExecCleanTypeFromTL(pstmt->planTree->targetlist);
553 : }
554 :
555 : /*
556 : * Reset cursor position data to "start of query"
557 : */
558 3106 : portal->atStart = true;
559 3106 : portal->atEnd = false; /* allow fetches */
560 3106 : portal->portalPos = 0;
561 3106 : break;
562 :
563 49788 : case PORTAL_UTIL_SELECT:
564 :
565 : /*
566 : * We don't set snapshot here, because PortalRunUtility will
567 : * take care of it if needed.
568 : */
569 : {
570 49788 : PlannedStmt *pstmt = PortalGetPrimaryStmt(portal);
571 :
572 : Assert(pstmt->commandType == CMD_UTILITY);
573 49788 : portal->tupDesc = UtilityTupleDescriptor(pstmt->utilityStmt);
574 : }
575 :
576 : /*
577 : * Reset cursor position data to "start of query"
578 : */
579 49760 : portal->atStart = true;
580 49760 : portal->atEnd = false; /* allow fetches */
581 49760 : portal->portalPos = 0;
582 49760 : break;
583 :
584 415392 : case PORTAL_MULTI_QUERY:
585 : /* Need do nothing now */
586 415392 : portal->tupDesc = NULL;
587 415392 : break;
588 : }
589 : }
590 686 : PG_CATCH();
591 : {
592 : /* Uncaught error while executing portal: mark it dead */
593 686 : MarkPortalFailed(portal);
594 :
595 : /* Restore global vars and propagate error */
596 686 : ActivePortal = saveActivePortal;
597 686 : CurrentResourceOwner = saveResourceOwner;
598 686 : PortalContext = savePortalContext;
599 :
600 686 : PG_RE_THROW();
601 : }
602 765118 : PG_END_TRY();
603 :
604 765118 : MemoryContextSwitchTo(oldContext);
605 :
606 765118 : ActivePortal = saveActivePortal;
607 765118 : CurrentResourceOwner = saveResourceOwner;
608 765118 : PortalContext = savePortalContext;
609 :
610 765118 : portal->status = PORTAL_READY;
611 765118 : }
612 :
613 : /*
614 : * PortalSetResultFormat
615 : * Select the format codes for a portal's output.
616 : *
617 : * This must be run after PortalStart for a portal that will be read by
618 : * a DestRemote or DestRemoteExecute destination. It is not presently needed
619 : * for other destination types.
620 : *
621 : * formats[] is the client format request, as per Bind message conventions.
622 : */
623 : void
624 728882 : PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
625 : {
626 : int natts;
627 : int i;
628 :
629 : /* Do nothing if portal won't return tuples */
630 728882 : if (portal->tupDesc == NULL)
631 415272 : return;
632 313610 : natts = portal->tupDesc->natts;
633 313610 : portal->formats = (int16 *)
634 313610 : MemoryContextAlloc(portal->portalContext,
635 : natts * sizeof(int16));
636 313610 : if (nFormats > 1)
637 : {
638 : /* format specified for each column */
639 0 : if (nFormats != natts)
640 0 : ereport(ERROR,
641 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
642 : errmsg("bind message has %d result formats but query has %d columns",
643 : nFormats, natts)));
644 0 : memcpy(portal->formats, formats, natts * sizeof(int16));
645 : }
646 313610 : else if (nFormats > 0)
647 : {
648 : /* single format specified, use for all columns */
649 313610 : int16 format1 = formats[0];
650 :
651 1374818 : for (i = 0; i < natts; i++)
652 1061208 : portal->formats[i] = format1;
653 : }
654 : else
655 : {
656 : /* use default format for all columns */
657 0 : for (i = 0; i < natts; i++)
658 0 : portal->formats[i] = 0;
659 : }
660 : }
661 :
662 : /*
663 : * PortalRun
664 : * Run a portal's query or queries.
665 : *
666 : * count <= 0 is interpreted as a no-op: the destination gets started up
667 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
668 : * interpreted as "all rows". Note that count is ignored in multi-query
669 : * situations, where we always run the portal to completion.
670 : *
671 : * isTopLevel: true if query is being executed at backend "top level"
672 : * (that is, directly from a client command message)
673 : *
674 : * dest: where to send output of primary (canSetTag) query
675 : *
676 : * altdest: where to send output of non-primary queries
677 : *
678 : * qc: where to store command completion status data.
679 : * May be NULL if caller doesn't want status data.
680 : *
681 : * Returns true if the portal's execution is complete, false if it was
682 : * suspended due to exhaustion of the count parameter.
683 : */
684 : bool
685 748396 : PortalRun(Portal portal, long count, bool isTopLevel,
686 : DestReceiver *dest, DestReceiver *altdest,
687 : QueryCompletion *qc)
688 : {
689 : bool result;
690 : uint64 nprocessed;
691 : ResourceOwner saveTopTransactionResourceOwner;
692 : MemoryContext saveTopTransactionContext;
693 : Portal saveActivePortal;
694 : ResourceOwner saveResourceOwner;
695 : MemoryContext savePortalContext;
696 : MemoryContext saveMemoryContext;
697 :
698 : Assert(PortalIsValid(portal));
699 :
700 : TRACE_POSTGRESQL_QUERY_EXECUTE_START();
701 :
702 : /* Initialize empty completion data */
703 748396 : if (qc)
704 748396 : InitializeQueryCompletion(qc);
705 :
706 748396 : if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
707 : {
708 0 : elog(DEBUG3, "PortalRun");
709 : /* PORTAL_MULTI_QUERY logs its own stats per query */
710 0 : ResetUsage();
711 : }
712 :
713 : /*
714 : * Check for improper portal use, and mark portal active.
715 : */
716 748396 : MarkPortalActive(portal);
717 :
718 : /*
719 : * Set up global portal context pointers.
720 : *
721 : * We have to play a special game here to support utility commands like
722 : * VACUUM and CLUSTER, which internally start and commit transactions.
723 : * When we are called to execute such a command, CurrentResourceOwner will
724 : * be pointing to the TopTransactionResourceOwner --- which will be
725 : * destroyed and replaced in the course of the internal commit and
726 : * restart. So we need to be prepared to restore it as pointing to the
727 : * exit-time TopTransactionResourceOwner. (Ain't that ugly? This idea of
728 : * internally starting whole new transactions is not good.)
729 : * CurrentMemoryContext has a similar problem, but the other pointers we
730 : * save here will be NULL or pointing to longer-lived objects.
731 : */
732 748396 : saveTopTransactionResourceOwner = TopTransactionResourceOwner;
733 748396 : saveTopTransactionContext = TopTransactionContext;
734 748396 : saveActivePortal = ActivePortal;
735 748396 : saveResourceOwner = CurrentResourceOwner;
736 748396 : savePortalContext = PortalContext;
737 748396 : saveMemoryContext = CurrentMemoryContext;
738 748396 : PG_TRY();
739 : {
740 748396 : ActivePortal = portal;
741 748396 : if (portal->resowner)
742 748396 : CurrentResourceOwner = portal->resowner;
743 748396 : PortalContext = portal->portalContext;
744 :
745 748396 : MemoryContextSwitchTo(PortalContext);
746 :
747 748396 : switch (portal->strategy)
748 : {
749 333004 : case PORTAL_ONE_SELECT:
750 : case PORTAL_ONE_RETURNING:
751 : case PORTAL_ONE_MOD_WITH:
752 : case PORTAL_UTIL_SELECT:
753 :
754 : /*
755 : * If we have not yet run the command, do so, storing its
756 : * results in the portal's tuplestore. But we don't do that
757 : * for the PORTAL_ONE_SELECT case.
758 : */
759 333004 : if (portal->strategy != PORTAL_ONE_SELECT && !portal->holdStore)
760 44788 : FillPortalStore(portal, isTopLevel);
761 :
762 : /*
763 : * Now fetch desired portion of results.
764 : */
765 332590 : nprocessed = PortalRunSelect(portal, true, count, dest);
766 :
767 : /*
768 : * If the portal result contains a command tag and the caller
769 : * gave us a pointer to store it, copy it and update the
770 : * rowcount.
771 : */
772 325394 : if (qc && portal->qc.commandTag != CMDTAG_UNKNOWN)
773 : {
774 325394 : CopyQueryCompletion(qc, &portal->qc);
775 325394 : qc->nprocessed = nprocessed;
776 : }
777 :
778 : /* Mark portal not active */
779 325394 : portal->status = PORTAL_READY;
780 :
781 : /*
782 : * Since it's a forward fetch, say DONE iff atEnd is now true.
783 : */
784 325394 : result = portal->atEnd;
785 325394 : break;
786 :
787 415392 : case PORTAL_MULTI_QUERY:
788 415392 : PortalRunMulti(portal, isTopLevel, false,
789 : dest, altdest, qc);
790 :
791 : /* Prevent portal's commands from being re-executed */
792 394754 : MarkPortalDone(portal);
793 :
794 : /* Always complete at end of RunMulti */
795 394754 : result = true;
796 394754 : break;
797 :
798 0 : default:
799 0 : elog(ERROR, "unrecognized portal strategy: %d",
800 : (int) portal->strategy);
801 : result = false; /* keep compiler quiet */
802 : break;
803 : }
804 : }
805 28240 : PG_CATCH();
806 : {
807 : /* Uncaught error while executing portal: mark it dead */
808 28240 : MarkPortalFailed(portal);
809 :
810 : /* Restore global vars and propagate error */
811 28240 : if (saveMemoryContext == saveTopTransactionContext)
812 27852 : MemoryContextSwitchTo(TopTransactionContext);
813 : else
814 388 : MemoryContextSwitchTo(saveMemoryContext);
815 28240 : ActivePortal = saveActivePortal;
816 28240 : if (saveResourceOwner == saveTopTransactionResourceOwner)
817 27952 : CurrentResourceOwner = TopTransactionResourceOwner;
818 : else
819 288 : CurrentResourceOwner = saveResourceOwner;
820 28240 : PortalContext = savePortalContext;
821 :
822 28240 : PG_RE_THROW();
823 : }
824 720148 : PG_END_TRY();
825 :
826 720148 : if (saveMemoryContext == saveTopTransactionContext)
827 673160 : MemoryContextSwitchTo(TopTransactionContext);
828 : else
829 46988 : MemoryContextSwitchTo(saveMemoryContext);
830 720148 : ActivePortal = saveActivePortal;
831 720148 : if (saveResourceOwner == saveTopTransactionResourceOwner)
832 694510 : CurrentResourceOwner = TopTransactionResourceOwner;
833 : else
834 25638 : CurrentResourceOwner = saveResourceOwner;
835 720148 : PortalContext = savePortalContext;
836 :
837 720148 : if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
838 0 : ShowUsage("EXECUTOR STATISTICS");
839 :
840 : TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
841 :
842 720148 : return result;
843 : }
844 :
845 : /*
846 : * PortalRunSelect
847 : * Execute a portal's query in PORTAL_ONE_SELECT mode, and also
848 : * when fetching from a completed holdStore in PORTAL_ONE_RETURNING,
849 : * PORTAL_ONE_MOD_WITH, and PORTAL_UTIL_SELECT cases.
850 : *
851 : * This handles simple N-rows-forward-or-backward cases. For more complex
852 : * nonsequential access to a portal, see PortalRunFetch.
853 : *
854 : * count <= 0 is interpreted as a no-op: the destination gets started up
855 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
856 : * interpreted as "all rows". (cf FetchStmt.howMany)
857 : *
858 : * Caller must already have validated the Portal and done appropriate
859 : * setup (cf. PortalRun).
860 : *
861 : * Returns number of rows processed (suitable for use in result tag)
862 : */
863 : static uint64
864 384518 : PortalRunSelect(Portal portal,
865 : bool forward,
866 : long count,
867 : DestReceiver *dest)
868 : {
869 : QueryDesc *queryDesc;
870 : ScanDirection direction;
871 : uint64 nprocessed;
872 :
873 : /*
874 : * NB: queryDesc will be NULL if we are fetching from a held cursor or a
875 : * completed utility query; can't use it in that path.
876 : */
877 384518 : queryDesc = portal->queryDesc;
878 :
879 : /* Caller messed up if we have neither a ready query nor held data. */
880 : Assert(queryDesc || portal->holdStore);
881 :
882 : /*
883 : * Force the queryDesc destination to the right thing. This supports
884 : * MOVE, for example, which will pass in dest = DestNone. This is okay to
885 : * change as long as we do it on every fetch. (The Executor must not
886 : * assume that dest never changes.)
887 : */
888 384518 : if (queryDesc)
889 308088 : queryDesc->dest = dest;
890 :
891 : /*
892 : * Determine which direction to go in, and check to see if we're already
893 : * at the end of the available tuples in that direction. If so, set the
894 : * direction to NoMovement to avoid trying to fetch any tuples. (This
895 : * check exists because not all plan node types are robust about being
896 : * called again if they've already returned NULL once.) Then call the
897 : * executor (we must not skip this, because the destination needs to see a
898 : * setup and shutdown even if no tuples are available). Finally, update
899 : * the portal position state depending on the number of tuples that were
900 : * retrieved.
901 : */
902 384518 : if (forward)
903 : {
904 383914 : if (portal->atEnd || count <= 0)
905 : {
906 3686 : direction = NoMovementScanDirection;
907 3686 : count = 0; /* don't pass negative count to executor */
908 : }
909 : else
910 380228 : direction = ForwardScanDirection;
911 :
912 : /* In the executor, zero count processes all rows */
913 383914 : if (count == FETCH_ALL)
914 332914 : count = 0;
915 :
916 383914 : if (portal->holdStore)
917 76412 : nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
918 : else
919 : {
920 307502 : PushActiveSnapshot(queryDesc->snapshot);
921 307502 : ExecutorRun(queryDesc, direction, (uint64) count);
922 300262 : nprocessed = queryDesc->estate->es_processed;
923 300262 : PopActiveSnapshot();
924 : }
925 :
926 376674 : if (!ScanDirectionIsNoMovement(direction))
927 : {
928 372988 : if (nprocessed > 0)
929 313626 : portal->atStart = false; /* OK to go backward now */
930 372988 : if (count == 0 || nprocessed < (uint64) count)
931 339804 : portal->atEnd = true; /* we retrieved 'em all */
932 372988 : portal->portalPos += nprocessed;
933 : }
934 : }
935 : else
936 : {
937 604 : if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
938 24 : ereport(ERROR,
939 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
940 : errmsg("cursor can only scan forward"),
941 : errhint("Declare it with SCROLL option to enable backward scan.")));
942 :
943 580 : if (portal->atStart || count <= 0)
944 : {
945 72 : direction = NoMovementScanDirection;
946 72 : count = 0; /* don't pass negative count to executor */
947 : }
948 : else
949 508 : direction = BackwardScanDirection;
950 :
951 : /* In the executor, zero count processes all rows */
952 580 : if (count == FETCH_ALL)
953 60 : count = 0;
954 :
955 580 : if (portal->holdStore)
956 12 : nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
957 : else
958 : {
959 568 : PushActiveSnapshot(queryDesc->snapshot);
960 568 : ExecutorRun(queryDesc, direction, (uint64) count);
961 568 : nprocessed = queryDesc->estate->es_processed;
962 568 : PopActiveSnapshot();
963 : }
964 :
965 580 : if (!ScanDirectionIsNoMovement(direction))
966 : {
967 508 : if (nprocessed > 0 && portal->atEnd)
968 : {
969 154 : portal->atEnd = false; /* OK to go forward now */
970 154 : portal->portalPos++; /* adjust for endpoint case */
971 : }
972 508 : if (count == 0 || nprocessed < (uint64) count)
973 : {
974 186 : portal->atStart = true; /* we retrieved 'em all */
975 186 : portal->portalPos = 0;
976 : }
977 : else
978 : {
979 322 : portal->portalPos -= nprocessed;
980 : }
981 : }
982 : }
983 :
984 377254 : return nprocessed;
985 : }
986 :
987 : /*
988 : * FillPortalStore
989 : * Run the query and load result tuples into the portal's tuple store.
990 : *
991 : * This is used for PORTAL_ONE_RETURNING, PORTAL_ONE_MOD_WITH, and
992 : * PORTAL_UTIL_SELECT cases only.
993 : */
994 : static void
995 52866 : FillPortalStore(Portal portal, bool isTopLevel)
996 : {
997 : DestReceiver *treceiver;
998 : QueryCompletion qc;
999 :
1000 52866 : InitializeQueryCompletion(&qc);
1001 52866 : PortalCreateHoldStore(portal);
1002 52866 : treceiver = CreateDestReceiver(DestTuplestore);
1003 52866 : SetTuplestoreDestReceiverParams(treceiver,
1004 : portal->holdStore,
1005 : portal->holdContext,
1006 : false,
1007 : NULL,
1008 : NULL);
1009 :
1010 52866 : switch (portal->strategy)
1011 : {
1012 3106 : case PORTAL_ONE_RETURNING:
1013 : case PORTAL_ONE_MOD_WITH:
1014 :
1015 : /*
1016 : * Run the portal to completion just as for the default
1017 : * PORTAL_MULTI_QUERY case, but send the primary query's output to
1018 : * the tuplestore. Auxiliary query outputs are discarded. Set the
1019 : * portal's holdSnapshot to the snapshot used (or a copy of it).
1020 : */
1021 3106 : PortalRunMulti(portal, isTopLevel, true,
1022 : treceiver, None_Receiver, &qc);
1023 2954 : break;
1024 :
1025 49760 : case PORTAL_UTIL_SELECT:
1026 49760 : PortalRunUtility(portal, linitial_node(PlannedStmt, portal->stmts),
1027 : isTopLevel, true, treceiver, &qc);
1028 49492 : break;
1029 :
1030 0 : default:
1031 0 : elog(ERROR, "unsupported portal strategy: %d",
1032 : (int) portal->strategy);
1033 : break;
1034 : }
1035 :
1036 : /* Override portal completion data with actual command results */
1037 52446 : if (qc.commandTag != CMDTAG_UNKNOWN)
1038 27948 : CopyQueryCompletion(&portal->qc, &qc);
1039 :
1040 52446 : treceiver->rDestroy(treceiver);
1041 52446 : }
1042 :
1043 : /*
1044 : * RunFromStore
1045 : * Fetch tuples from the portal's tuple store.
1046 : *
1047 : * Calling conventions are similar to ExecutorRun, except that we
1048 : * do not depend on having a queryDesc or estate. Therefore we return the
1049 : * number of tuples processed as the result, not in estate->es_processed.
1050 : *
1051 : * One difference from ExecutorRun is that the destination receiver functions
1052 : * are run in the caller's memory context (since we have no estate). Watch
1053 : * out for memory leaks.
1054 : */
1055 : static uint64
1056 76424 : RunFromStore(Portal portal, ScanDirection direction, uint64 count,
1057 : DestReceiver *dest)
1058 : {
1059 76424 : uint64 current_tuple_count = 0;
1060 : TupleTableSlot *slot;
1061 :
1062 76424 : slot = MakeSingleTupleTableSlot(portal->tupDesc, &TTSOpsMinimalTuple);
1063 :
1064 76424 : dest->rStartup(dest, CMD_SELECT, portal->tupDesc);
1065 :
1066 76424 : if (ScanDirectionIsNoMovement(direction))
1067 : {
1068 : /* do nothing except start/stop the destination */
1069 : }
1070 : else
1071 : {
1072 73862 : bool forward = ScanDirectionIsForward(direction);
1073 :
1074 : for (;;)
1075 422228 : {
1076 : MemoryContext oldcontext;
1077 : bool ok;
1078 :
1079 496090 : oldcontext = MemoryContextSwitchTo(portal->holdContext);
1080 :
1081 496090 : ok = tuplestore_gettupleslot(portal->holdStore, forward, false,
1082 : slot);
1083 :
1084 496090 : MemoryContextSwitchTo(oldcontext);
1085 :
1086 496090 : if (!ok)
1087 52500 : break;
1088 :
1089 : /*
1090 : * If we are not able to send the tuple, we assume the destination
1091 : * has closed and no more tuples can be sent. If that's the case,
1092 : * end the loop.
1093 : */
1094 443590 : if (!dest->receiveSlot(slot, dest))
1095 0 : break;
1096 :
1097 443590 : ExecClearTuple(slot);
1098 :
1099 : /*
1100 : * check our tuple count.. if we've processed the proper number
1101 : * then quit, else loop again and process more tuples. Zero count
1102 : * means no limit.
1103 : */
1104 443590 : current_tuple_count++;
1105 443590 : if (count && count == current_tuple_count)
1106 21362 : break;
1107 : }
1108 : }
1109 :
1110 76424 : dest->rShutdown(dest);
1111 :
1112 76424 : ExecDropSingleTupleTableSlot(slot);
1113 :
1114 76424 : return current_tuple_count;
1115 : }
1116 :
1117 : /*
1118 : * PortalRunUtility
1119 : * Execute a utility statement inside a portal.
1120 : */
1121 : static void
1122 377658 : PortalRunUtility(Portal portal, PlannedStmt *pstmt,
1123 : bool isTopLevel, bool setHoldSnapshot,
1124 : DestReceiver *dest, QueryCompletion *qc)
1125 : {
1126 : /*
1127 : * Set snapshot if utility stmt needs one.
1128 : */
1129 377658 : if (PlannedStmtRequiresSnapshot(pstmt))
1130 : {
1131 302214 : Snapshot snapshot = GetTransactionSnapshot();
1132 :
1133 : /* If told to, register the snapshot we're using and save in portal */
1134 302214 : if (setHoldSnapshot)
1135 : {
1136 43198 : snapshot = RegisterSnapshot(snapshot);
1137 43198 : portal->holdSnapshot = snapshot;
1138 : }
1139 :
1140 : /*
1141 : * In any case, make the snapshot active and remember it in portal.
1142 : * Because the portal now references the snapshot, we must tell
1143 : * snapmgr.c that the snapshot belongs to the portal's transaction
1144 : * level, else we risk portalSnapshot becoming a dangling pointer.
1145 : */
1146 302214 : PushActiveSnapshotWithLevel(snapshot, portal->createLevel);
1147 : /* PushActiveSnapshotWithLevel might have copied the snapshot */
1148 302214 : portal->portalSnapshot = GetActiveSnapshot();
1149 : }
1150 : else
1151 75444 : portal->portalSnapshot = NULL;
1152 :
1153 377658 : ProcessUtility(pstmt,
1154 : portal->sourceText,
1155 377658 : (portal->cplan != NULL), /* protect tree if in plancache */
1156 377658 : isTopLevel ? PROCESS_UTILITY_TOPLEVEL : PROCESS_UTILITY_QUERY,
1157 : portal->portalParams,
1158 : portal->queryEnv,
1159 : dest,
1160 : qc);
1161 :
1162 : /* Some utility statements may change context on us */
1163 362150 : MemoryContextSwitchTo(portal->portalContext);
1164 :
1165 : /*
1166 : * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from
1167 : * under us, so don't complain if it's now empty. Otherwise, our snapshot
1168 : * should be the top one; pop it. Note that this could be a different
1169 : * snapshot from the one we made above; see EnsurePortalSnapshotExists.
1170 : */
1171 362150 : if (portal->portalSnapshot != NULL && ActiveSnapshotSet())
1172 : {
1173 : Assert(portal->portalSnapshot == GetActiveSnapshot());
1174 276688 : PopActiveSnapshot();
1175 : }
1176 362150 : portal->portalSnapshot = NULL;
1177 362150 : }
1178 :
1179 : /*
1180 : * PortalRunMulti
1181 : * Execute a portal's queries in the general case (multi queries
1182 : * or non-SELECT-like queries)
1183 : */
1184 : static void
1185 418498 : PortalRunMulti(Portal portal,
1186 : bool isTopLevel, bool setHoldSnapshot,
1187 : DestReceiver *dest, DestReceiver *altdest,
1188 : QueryCompletion *qc)
1189 : {
1190 418498 : bool active_snapshot_set = false;
1191 : ListCell *stmtlist_item;
1192 :
1193 : /*
1194 : * If the destination is DestRemoteExecute, change to DestNone. The
1195 : * reason is that the client won't be expecting any tuples, and indeed has
1196 : * no way to know what they are, since there is no provision for Describe
1197 : * to send a RowDescription message when this portal execution strategy is
1198 : * in effect. This presently will only affect SELECT commands added to
1199 : * non-SELECT queries by rewrite rules: such commands will be executed,
1200 : * but the results will be discarded unless you use "simple Query"
1201 : * protocol.
1202 : */
1203 418498 : if (dest->mydest == DestRemoteExecute)
1204 12682 : dest = None_Receiver;
1205 418498 : if (altdest->mydest == DestRemoteExecute)
1206 12682 : altdest = None_Receiver;
1207 :
1208 : /*
1209 : * Loop to handle the individual queries generated from a single parsetree
1210 : * by analysis and rewrite.
1211 : */
1212 816794 : foreach(stmtlist_item, portal->stmts)
1213 : {
1214 419086 : PlannedStmt *pstmt = lfirst_node(PlannedStmt, stmtlist_item);
1215 :
1216 : /*
1217 : * If we got a cancel signal in prior command, quit
1218 : */
1219 419086 : CHECK_FOR_INTERRUPTS();
1220 :
1221 419086 : if (pstmt->utilityStmt == NULL)
1222 : {
1223 : /*
1224 : * process a plannable query.
1225 : */
1226 : TRACE_POSTGRESQL_QUERY_EXECUTE_START();
1227 :
1228 91188 : if (log_executor_stats)
1229 0 : ResetUsage();
1230 :
1231 : /*
1232 : * Must always have a snapshot for plannable queries. First time
1233 : * through, take a new snapshot; for subsequent queries in the
1234 : * same portal, just update the snapshot's copy of the command
1235 : * counter.
1236 : */
1237 91188 : if (!active_snapshot_set)
1238 : {
1239 90600 : Snapshot snapshot = GetTransactionSnapshot();
1240 :
1241 : /* If told to, register the snapshot and save in portal */
1242 90600 : if (setHoldSnapshot)
1243 : {
1244 3106 : snapshot = RegisterSnapshot(snapshot);
1245 3106 : portal->holdSnapshot = snapshot;
1246 : }
1247 :
1248 : /*
1249 : * We can't have the holdSnapshot also be the active one,
1250 : * because UpdateActiveSnapshotCommandId would complain. So
1251 : * force an extra snapshot copy. Plain PushActiveSnapshot
1252 : * would have copied the transaction snapshot anyway, so this
1253 : * only adds a copy step when setHoldSnapshot is true. (It's
1254 : * okay for the command ID of the active snapshot to diverge
1255 : * from what holdSnapshot has.)
1256 : */
1257 90600 : PushCopiedSnapshot(snapshot);
1258 :
1259 : /*
1260 : * As for PORTAL_ONE_SELECT portals, it does not seem
1261 : * necessary to maintain portal->portalSnapshot here.
1262 : */
1263 :
1264 90600 : active_snapshot_set = true;
1265 : }
1266 : else
1267 588 : UpdateActiveSnapshotCommandId();
1268 :
1269 91188 : if (pstmt->canSetTag)
1270 : {
1271 : /* statement can set tag string */
1272 90558 : ProcessQuery(pstmt,
1273 : portal->sourceText,
1274 : portal->portalParams,
1275 : portal->queryEnv,
1276 : dest, qc);
1277 : }
1278 : else
1279 : {
1280 : /* stmt added by rewrite cannot set tag */
1281 630 : ProcessQuery(pstmt,
1282 : portal->sourceText,
1283 : portal->portalParams,
1284 : portal->queryEnv,
1285 : altdest, NULL);
1286 : }
1287 :
1288 85638 : if (log_executor_stats)
1289 0 : ShowUsage("EXECUTOR STATISTICS");
1290 :
1291 : TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
1292 : }
1293 : else
1294 : {
1295 : /*
1296 : * process utility functions (create, destroy, etc..)
1297 : *
1298 : * We must not set a snapshot here for utility commands (if one is
1299 : * needed, PortalRunUtility will do it). If a utility command is
1300 : * alone in a portal then everything's fine. The only case where
1301 : * a utility command can be part of a longer list is that rules
1302 : * are allowed to include NotifyStmt. NotifyStmt doesn't care
1303 : * whether it has a snapshot or not, so we just leave the current
1304 : * snapshot alone if we have one.
1305 : */
1306 327898 : if (pstmt->canSetTag)
1307 : {
1308 : Assert(!active_snapshot_set);
1309 : /* statement can set tag string */
1310 327898 : PortalRunUtility(portal, pstmt, isTopLevel, false,
1311 : dest, qc);
1312 : }
1313 : else
1314 : {
1315 : Assert(IsA(pstmt->utilityStmt, NotifyStmt));
1316 : /* stmt added by rewrite cannot set tag */
1317 0 : PortalRunUtility(portal, pstmt, isTopLevel, false,
1318 : altdest, NULL);
1319 : }
1320 : }
1321 :
1322 : /*
1323 : * Clear subsidiary contexts to recover temporary memory.
1324 : */
1325 : Assert(portal->portalContext == CurrentMemoryContext);
1326 :
1327 398296 : MemoryContextDeleteChildren(portal->portalContext);
1328 :
1329 : /*
1330 : * Avoid crashing if portal->stmts has been reset. This can only
1331 : * occur if a CALL or DO utility statement executed an internal
1332 : * COMMIT/ROLLBACK (cf PortalReleaseCachedPlan). The CALL or DO must
1333 : * have been the only statement in the portal, so there's nothing left
1334 : * for us to do; but we don't want to dereference a now-dangling list
1335 : * pointer.
1336 : */
1337 398296 : if (portal->stmts == NIL)
1338 0 : break;
1339 :
1340 : /*
1341 : * Increment command counter between queries, but not after the last
1342 : * one.
1343 : */
1344 398296 : if (lnext(portal->stmts, stmtlist_item) != NULL)
1345 588 : CommandCounterIncrement();
1346 : }
1347 :
1348 : /* Pop the snapshot if we pushed one. */
1349 397708 : if (active_snapshot_set)
1350 85050 : PopActiveSnapshot();
1351 :
1352 : /*
1353 : * If a query completion data was supplied, use it. Otherwise use the
1354 : * portal's query completion data.
1355 : *
1356 : * Exception: Clients expect INSERT/UPDATE/DELETE tags to have counts, so
1357 : * fake them with zeros. This can happen with DO INSTEAD rules if there
1358 : * is no replacement query of the same type as the original. We print "0
1359 : * 0" here because technically there is no query of the matching tag type,
1360 : * and printing a non-zero count for a different query type seems wrong,
1361 : * e.g. an INSERT that does an UPDATE instead should not print "0 1" if
1362 : * one row was updated. See QueryRewrite(), step 3, for details.
1363 : */
1364 397708 : if (qc && qc->commandTag == CMDTAG_UNKNOWN)
1365 : {
1366 294656 : if (portal->qc.commandTag != CMDTAG_UNKNOWN)
1367 294656 : CopyQueryCompletion(qc, &portal->qc);
1368 : /* If the caller supplied a qc, we should have set it by now. */
1369 : Assert(qc->commandTag != CMDTAG_UNKNOWN);
1370 : }
1371 397708 : }
1372 :
1373 : /*
1374 : * PortalRunFetch
1375 : * Variant form of PortalRun that supports SQL FETCH directions.
1376 : *
1377 : * Note: we presently assume that no callers of this want isTopLevel = true.
1378 : *
1379 : * count <= 0 is interpreted as a no-op: the destination gets started up
1380 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
1381 : * interpreted as "all rows". (cf FetchStmt.howMany)
1382 : *
1383 : * Returns number of rows processed (suitable for use in result tag)
1384 : */
1385 : uint64
1386 51856 : PortalRunFetch(Portal portal,
1387 : FetchDirection fdirection,
1388 : long count,
1389 : DestReceiver *dest)
1390 : {
1391 : uint64 result;
1392 : Portal saveActivePortal;
1393 : ResourceOwner saveResourceOwner;
1394 : MemoryContext savePortalContext;
1395 : MemoryContext oldContext;
1396 :
1397 : Assert(PortalIsValid(portal));
1398 :
1399 : /*
1400 : * Check for improper portal use, and mark portal active.
1401 : */
1402 51856 : MarkPortalActive(portal);
1403 :
1404 : /*
1405 : * Set up global portal context pointers.
1406 : */
1407 51838 : saveActivePortal = ActivePortal;
1408 51838 : saveResourceOwner = CurrentResourceOwner;
1409 51838 : savePortalContext = PortalContext;
1410 51838 : PG_TRY();
1411 : {
1412 51838 : ActivePortal = portal;
1413 51838 : if (portal->resowner)
1414 51646 : CurrentResourceOwner = portal->resowner;
1415 51838 : PortalContext = portal->portalContext;
1416 :
1417 51838 : oldContext = MemoryContextSwitchTo(PortalContext);
1418 :
1419 51838 : switch (portal->strategy)
1420 : {
1421 19968 : case PORTAL_ONE_SELECT:
1422 19968 : result = DoPortalRunFetch(portal, fdirection, count, dest);
1423 19894 : break;
1424 :
1425 31870 : case PORTAL_ONE_RETURNING:
1426 : case PORTAL_ONE_MOD_WITH:
1427 : case PORTAL_UTIL_SELECT:
1428 :
1429 : /*
1430 : * If we have not yet run the command, do so, storing its
1431 : * results in the portal's tuplestore.
1432 : */
1433 31870 : if (!portal->holdStore)
1434 8078 : FillPortalStore(portal, false /* isTopLevel */ );
1435 :
1436 : /*
1437 : * Now fetch desired portion of results.
1438 : */
1439 31864 : result = DoPortalRunFetch(portal, fdirection, count, dest);
1440 31864 : break;
1441 :
1442 0 : default:
1443 0 : elog(ERROR, "unsupported portal strategy");
1444 : result = 0; /* keep compiler quiet */
1445 : break;
1446 : }
1447 : }
1448 80 : PG_CATCH();
1449 : {
1450 : /* Uncaught error while executing portal: mark it dead */
1451 80 : MarkPortalFailed(portal);
1452 :
1453 : /* Restore global vars and propagate error */
1454 80 : ActivePortal = saveActivePortal;
1455 80 : CurrentResourceOwner = saveResourceOwner;
1456 80 : PortalContext = savePortalContext;
1457 :
1458 80 : PG_RE_THROW();
1459 : }
1460 51758 : PG_END_TRY();
1461 :
1462 51758 : MemoryContextSwitchTo(oldContext);
1463 :
1464 : /* Mark portal not active */
1465 51758 : portal->status = PORTAL_READY;
1466 :
1467 51758 : ActivePortal = saveActivePortal;
1468 51758 : CurrentResourceOwner = saveResourceOwner;
1469 51758 : PortalContext = savePortalContext;
1470 :
1471 51758 : return result;
1472 : }
1473 :
1474 : /*
1475 : * DoPortalRunFetch
1476 : * Guts of PortalRunFetch --- the portal context is already set up
1477 : *
1478 : * Here, count < 0 typically reverses the direction. Also, count == FETCH_ALL
1479 : * is interpreted as "all rows". (cf FetchStmt.howMany)
1480 : *
1481 : * Returns number of rows processed (suitable for use in result tag)
1482 : */
1483 : static uint64
1484 51832 : DoPortalRunFetch(Portal portal,
1485 : FetchDirection fdirection,
1486 : long count,
1487 : DestReceiver *dest)
1488 : {
1489 : bool forward;
1490 :
1491 : Assert(portal->strategy == PORTAL_ONE_SELECT ||
1492 : portal->strategy == PORTAL_ONE_RETURNING ||
1493 : portal->strategy == PORTAL_ONE_MOD_WITH ||
1494 : portal->strategy == PORTAL_UTIL_SELECT);
1495 :
1496 : /*
1497 : * Note: we disallow backwards fetch (including re-fetch of current row)
1498 : * for NO SCROLL cursors, but we interpret that very loosely: you can use
1499 : * any of the FetchDirection options, so long as the end result is to move
1500 : * forwards by at least one row. Currently it's sufficient to check for
1501 : * NO SCROLL in DoPortalRewind() and in the forward == false path in
1502 : * PortalRunSelect(); but someday we might prefer to account for that
1503 : * restriction explicitly here.
1504 : */
1505 51832 : switch (fdirection)
1506 : {
1507 51092 : case FETCH_FORWARD:
1508 51092 : if (count < 0)
1509 : {
1510 0 : fdirection = FETCH_BACKWARD;
1511 0 : count = -count;
1512 : }
1513 : /* fall out of switch to share code with FETCH_BACKWARD */
1514 51092 : break;
1515 496 : case FETCH_BACKWARD:
1516 496 : if (count < 0)
1517 : {
1518 0 : fdirection = FETCH_FORWARD;
1519 0 : count = -count;
1520 : }
1521 : /* fall out of switch to share code with FETCH_FORWARD */
1522 496 : break;
1523 160 : case FETCH_ABSOLUTE:
1524 160 : if (count > 0)
1525 : {
1526 : /*
1527 : * Definition: Rewind to start, advance count-1 rows, return
1528 : * next row (if any).
1529 : *
1530 : * In practice, if the goal is less than halfway back to the
1531 : * start, it's better to scan from where we are.
1532 : *
1533 : * Also, if current portalPos is outside the range of "long",
1534 : * do it the hard way to avoid possible overflow of the count
1535 : * argument to PortalRunSelect. We must exclude exactly
1536 : * LONG_MAX, as well, lest the count look like FETCH_ALL.
1537 : *
1538 : * In any case, we arrange to fetch the target row going
1539 : * forwards.
1540 : */
1541 90 : if ((uint64) (count - 1) <= portal->portalPos / 2 ||
1542 36 : portal->portalPos >= (uint64) LONG_MAX)
1543 : {
1544 54 : DoPortalRewind(portal);
1545 48 : if (count > 1)
1546 0 : PortalRunSelect(portal, true, count - 1,
1547 : None_Receiver);
1548 : }
1549 : else
1550 : {
1551 36 : long pos = (long) portal->portalPos;
1552 :
1553 36 : if (portal->atEnd)
1554 0 : pos++; /* need one extra fetch if off end */
1555 36 : if (count <= pos)
1556 12 : PortalRunSelect(portal, false, pos - count + 1,
1557 : None_Receiver);
1558 24 : else if (count > pos + 1)
1559 12 : PortalRunSelect(portal, true, count - pos - 1,
1560 : None_Receiver);
1561 : }
1562 78 : return PortalRunSelect(portal, true, 1L, dest);
1563 : }
1564 70 : else if (count < 0)
1565 : {
1566 : /*
1567 : * Definition: Advance to end, back up abs(count)-1 rows,
1568 : * return prior row (if any). We could optimize this if we
1569 : * knew in advance where the end was, but typically we won't.
1570 : * (Is it worth considering case where count > half of size of
1571 : * query? We could rewind once we know the size ...)
1572 : */
1573 54 : PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
1574 54 : if (count < -1)
1575 0 : PortalRunSelect(portal, false, -count - 1, None_Receiver);
1576 54 : return PortalRunSelect(portal, false, 1L, dest);
1577 : }
1578 : else
1579 : {
1580 : /* count == 0 */
1581 : /* Rewind to start, return zero rows */
1582 16 : DoPortalRewind(portal);
1583 16 : return PortalRunSelect(portal, true, 0L, dest);
1584 : }
1585 : break;
1586 84 : case FETCH_RELATIVE:
1587 84 : if (count > 0)
1588 : {
1589 : /*
1590 : * Definition: advance count-1 rows, return next row (if any).
1591 : */
1592 36 : if (count > 1)
1593 24 : PortalRunSelect(portal, true, count - 1, None_Receiver);
1594 36 : return PortalRunSelect(portal, true, 1L, dest);
1595 : }
1596 48 : else if (count < 0)
1597 : {
1598 : /*
1599 : * Definition: back up abs(count)-1 rows, return prior row (if
1600 : * any).
1601 : */
1602 30 : if (count < -1)
1603 18 : PortalRunSelect(portal, false, -count - 1, None_Receiver);
1604 30 : return PortalRunSelect(portal, false, 1L, dest);
1605 : }
1606 : else
1607 : {
1608 : /* count == 0 */
1609 : /* Same as FETCH FORWARD 0, so fall out of switch */
1610 18 : fdirection = FETCH_FORWARD;
1611 : }
1612 18 : break;
1613 0 : default:
1614 0 : elog(ERROR, "bogus direction");
1615 : break;
1616 : }
1617 :
1618 : /*
1619 : * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, and count
1620 : * >= 0.
1621 : */
1622 51606 : forward = (fdirection == FETCH_FORWARD);
1623 :
1624 : /*
1625 : * Zero count means to re-fetch the current row, if any (per SQL)
1626 : */
1627 51606 : if (count == 0)
1628 : {
1629 : bool on_row;
1630 :
1631 : /* Are we sitting on a row? */
1632 18 : on_row = (!portal->atStart && !portal->atEnd);
1633 :
1634 18 : if (dest->mydest == DestNone)
1635 : {
1636 : /* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */
1637 0 : return on_row ? 1 : 0;
1638 : }
1639 : else
1640 : {
1641 : /*
1642 : * If we are sitting on a row, back up one so we can re-fetch it.
1643 : * If we are not sitting on a row, we still have to start up and
1644 : * shut down the executor so that the destination is initialized
1645 : * and shut down correctly; so keep going. To PortalRunSelect,
1646 : * count == 0 means we will retrieve no row.
1647 : */
1648 18 : if (on_row)
1649 : {
1650 18 : PortalRunSelect(portal, false, 1L, None_Receiver);
1651 : /* Set up to fetch one row forward */
1652 12 : count = 1;
1653 12 : forward = true;
1654 : }
1655 : }
1656 : }
1657 :
1658 : /*
1659 : * Optimize MOVE BACKWARD ALL into a Rewind.
1660 : */
1661 51600 : if (!forward && count == FETCH_ALL && dest->mydest == DestNone)
1662 : {
1663 24 : uint64 result = portal->portalPos;
1664 :
1665 24 : if (result > 0 && !portal->atEnd)
1666 0 : result--;
1667 24 : DoPortalRewind(portal);
1668 24 : return result;
1669 : }
1670 :
1671 51576 : return PortalRunSelect(portal, forward, count, dest);
1672 : }
1673 :
1674 : /*
1675 : * DoPortalRewind - rewind a Portal to starting point
1676 : */
1677 : static void
1678 94 : DoPortalRewind(Portal portal)
1679 : {
1680 : QueryDesc *queryDesc;
1681 :
1682 : /*
1683 : * No work is needed if we've not advanced nor attempted to advance the
1684 : * cursor (and we don't want to throw a NO SCROLL error in this case).
1685 : */
1686 94 : if (portal->atStart && !portal->atEnd)
1687 18 : return;
1688 :
1689 : /* Otherwise, cursor must allow scrolling */
1690 76 : if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
1691 6 : ereport(ERROR,
1692 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1693 : errmsg("cursor can only scan forward"),
1694 : errhint("Declare it with SCROLL option to enable backward scan.")));
1695 :
1696 : /* Rewind holdStore, if we have one */
1697 70 : if (portal->holdStore)
1698 : {
1699 : MemoryContext oldcontext;
1700 :
1701 6 : oldcontext = MemoryContextSwitchTo(portal->holdContext);
1702 6 : tuplestore_rescan(portal->holdStore);
1703 6 : MemoryContextSwitchTo(oldcontext);
1704 : }
1705 :
1706 : /* Rewind executor, if active */
1707 70 : queryDesc = portal->queryDesc;
1708 70 : if (queryDesc)
1709 : {
1710 64 : PushActiveSnapshot(queryDesc->snapshot);
1711 64 : ExecutorRewind(queryDesc);
1712 64 : PopActiveSnapshot();
1713 : }
1714 :
1715 70 : portal->atStart = true;
1716 70 : portal->atEnd = false;
1717 70 : portal->portalPos = 0;
1718 : }
1719 :
1720 : /*
1721 : * PlannedStmtRequiresSnapshot - what it says on the tin
1722 : */
1723 : bool
1724 492094 : PlannedStmtRequiresSnapshot(PlannedStmt *pstmt)
1725 : {
1726 492094 : Node *utilityStmt = pstmt->utilityStmt;
1727 :
1728 : /* If it's not a utility statement, it definitely needs a snapshot */
1729 492094 : if (utilityStmt == NULL)
1730 85598 : return true;
1731 :
1732 : /*
1733 : * Most utility statements need a snapshot, and the default presumption
1734 : * about new ones should be that they do too. Hence, enumerate those that
1735 : * do not need one.
1736 : *
1737 : * Transaction control, LOCK, and SET must *not* set a snapshot, since
1738 : * they need to be executable at the start of a transaction-snapshot-mode
1739 : * transaction without freezing a snapshot. By extension we allow SHOW
1740 : * not to set a snapshot. The other stmts listed are just efficiency
1741 : * hacks. Beware of listing anything that can modify the database --- if,
1742 : * say, it has to update an index with expressions that invoke
1743 : * user-defined functions, then it had better have a snapshot.
1744 : */
1745 406496 : if (IsA(utilityStmt, TransactionStmt) ||
1746 368432 : IsA(utilityStmt, LockStmt) ||
1747 365918 : IsA(utilityStmt, VariableSetStmt) ||
1748 327482 : IsA(utilityStmt, VariableShowStmt) ||
1749 326646 : IsA(utilityStmt, ConstraintsSetStmt) ||
1750 : /* efficiency hacks from here down */
1751 326544 : IsA(utilityStmt, FetchStmt) ||
1752 318982 : IsA(utilityStmt, ListenStmt) ||
1753 318908 : IsA(utilityStmt, NotifyStmt) ||
1754 318820 : IsA(utilityStmt, UnlistenStmt) ||
1755 318782 : IsA(utilityStmt, CheckPointStmt))
1756 88576 : return false;
1757 :
1758 317920 : return true;
1759 : }
1760 :
1761 : /*
1762 : * EnsurePortalSnapshotExists - recreate Portal-level snapshot, if needed
1763 : *
1764 : * Generally, we will have an active snapshot whenever we are executing
1765 : * inside a Portal, unless the Portal's query is one of the utility
1766 : * statements exempted from that rule (see PlannedStmtRequiresSnapshot).
1767 : * However, procedures and DO blocks can commit or abort the transaction,
1768 : * and thereby destroy all snapshots. This function can be called to
1769 : * re-establish the Portal-level snapshot when none exists.
1770 : */
1771 : void
1772 419260 : EnsurePortalSnapshotExists(void)
1773 : {
1774 : Portal portal;
1775 :
1776 : /*
1777 : * Nothing to do if a snapshot is set. (We take it on faith that the
1778 : * outermost active snapshot belongs to some Portal; or if there is no
1779 : * Portal, it's somebody else's responsibility to manage things.)
1780 : */
1781 419260 : if (ActiveSnapshotSet())
1782 414804 : return;
1783 :
1784 : /* Otherwise, we'd better have an active Portal */
1785 4456 : portal = ActivePortal;
1786 4456 : if (unlikely(portal == NULL))
1787 0 : elog(ERROR, "cannot execute SQL without an outer snapshot or portal");
1788 : Assert(portal->portalSnapshot == NULL);
1789 :
1790 : /*
1791 : * Create a new snapshot, make it active, and remember it in portal.
1792 : * Because the portal now references the snapshot, we must tell snapmgr.c
1793 : * that the snapshot belongs to the portal's transaction level, else we
1794 : * risk portalSnapshot becoming a dangling pointer.
1795 : */
1796 4456 : PushActiveSnapshotWithLevel(GetTransactionSnapshot(), portal->createLevel);
1797 : /* PushActiveSnapshotWithLevel might have copied the snapshot */
1798 4456 : portal->portalSnapshot = GetActiveSnapshot();
1799 : }
|