LCOV - code coverage report
Current view: top level - src/backend/executor - execAsync.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 86.4 % 59 51
Test Date: 2026-04-16 02:16:28 Functions: 100.0 % 6 6
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * execAsync.c
       4              :  *    Support routines for asynchronous execution
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *    src/backend/executor/execAsync.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "executor/execAsync.h"
      18              : #include "executor/executor.h"
      19              : #include "executor/instrument.h"
      20              : #include "executor/nodeAppend.h"
      21              : #include "executor/nodeForeignscan.h"
      22              : 
      23              : /*
      24              :  * Asynchronously request a tuple from a designed async-capable node.
      25              :  */
      26              : void
      27         6175 : ExecAsyncRequest(AsyncRequest *areq)
      28              : {
      29         6175 :     if (areq->requestee->chgParam != NULL)    /* something changed? */
      30            2 :         ExecReScan(areq->requestee); /* let ReScan handle this */
      31              : 
      32              :     /* must provide our own instrumentation support */
      33         6175 :     if (areq->requestee->instrument)
      34          810 :         InstrStartNode(areq->requestee->instrument);
      35              : 
      36         6175 :     switch (nodeTag(areq->requestee))
      37              :     {
      38         6175 :         case T_ForeignScanState:
      39         6175 :             ExecAsyncForeignScanRequest(areq);
      40         6175 :             break;
      41            0 :         default:
      42              :             /* If the node doesn't support async, caller messed up. */
      43            0 :             elog(ERROR, "unrecognized node type: %d",
      44              :                  (int) nodeTag(areq->requestee));
      45              :     }
      46              : 
      47         6175 :     ExecAsyncResponse(areq);
      48              : 
      49              :     /* must provide our own instrumentation support */
      50         6175 :     if (areq->requestee->instrument)
      51          810 :         InstrStopNode(areq->requestee->instrument,
      52          810 :                       TupIsNull(areq->result) ? 0.0 : 1.0);
      53         6175 : }
      54              : 
      55              : /*
      56              :  * Give the asynchronous node a chance to configure the file descriptor event
      57              :  * for which it wishes to wait.  We expect the node-type specific callback to
      58              :  * make a single call of the following form:
      59              :  *
      60              :  * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
      61              :  */
      62              : void
      63          210 : ExecAsyncConfigureWait(AsyncRequest *areq)
      64              : {
      65              :     /* must provide our own instrumentation support */
      66          210 :     if (areq->requestee->instrument)
      67           20 :         InstrStartNode(areq->requestee->instrument);
      68              : 
      69          210 :     switch (nodeTag(areq->requestee))
      70              :     {
      71          210 :         case T_ForeignScanState:
      72          210 :             ExecAsyncForeignScanConfigureWait(areq);
      73          209 :             break;
      74            0 :         default:
      75              :             /* If the node doesn't support async, caller messed up. */
      76            0 :             elog(ERROR, "unrecognized node type: %d",
      77              :                  (int) nodeTag(areq->requestee));
      78              :     }
      79              : 
      80              :     /* must provide our own instrumentation support */
      81          209 :     if (areq->requestee->instrument)
      82           20 :         InstrStopNode(areq->requestee->instrument, 0.0);
      83          209 : }
      84              : 
      85              : /*
      86              :  * Call the asynchronous node back when a relevant event has occurred.
      87              :  */
      88              : void
      89          149 : ExecAsyncNotify(AsyncRequest *areq)
      90              : {
      91              :     /* must provide our own instrumentation support */
      92          149 :     if (areq->requestee->instrument)
      93           15 :         InstrStartNode(areq->requestee->instrument);
      94              : 
      95          149 :     switch (nodeTag(areq->requestee))
      96              :     {
      97          149 :         case T_ForeignScanState:
      98          149 :             ExecAsyncForeignScanNotify(areq);
      99          149 :             break;
     100            0 :         default:
     101              :             /* If the node doesn't support async, caller messed up. */
     102            0 :             elog(ERROR, "unrecognized node type: %d",
     103              :                  (int) nodeTag(areq->requestee));
     104              :     }
     105              : 
     106          149 :     ExecAsyncResponse(areq);
     107              : 
     108              :     /* must provide our own instrumentation support */
     109          149 :     if (areq->requestee->instrument)
     110           15 :         InstrStopNode(areq->requestee->instrument,
     111           15 :                       TupIsNull(areq->result) ? 0.0 : 1.0);
     112          149 : }
     113              : 
     114              : /*
     115              :  * Call the requestor back when an asynchronous node has produced a result.
     116              :  */
     117              : void
     118         6328 : ExecAsyncResponse(AsyncRequest *areq)
     119              : {
     120         6328 :     switch (nodeTag(areq->requestor))
     121              :     {
     122         6328 :         case T_AppendState:
     123         6328 :             ExecAsyncAppendResponse(areq);
     124         6328 :             break;
     125            0 :         default:
     126              :             /* If the node doesn't support async, caller messed up. */
     127            0 :             elog(ERROR, "unrecognized node type: %d",
     128              :                  (int) nodeTag(areq->requestor));
     129              :     }
     130         6328 : }
     131              : 
     132              : /*
     133              :  * A requestee node should call this function to deliver the tuple to its
     134              :  * requestor node.  The requestee node can call this from its ExecAsyncRequest
     135              :  * or ExecAsyncNotify callback.
     136              :  */
     137              : void
     138         6167 : ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
     139              : {
     140         6167 :     areq->request_complete = true;
     141         6167 :     areq->result = result;
     142         6167 : }
     143              : 
     144              : /*
     145              :  * A requestee node should call this function to indicate that it is pending
     146              :  * for a callback.  The requestee node can call this from its ExecAsyncRequest
     147              :  * or ExecAsyncNotify callback.
     148              :  */
     149              : void
     150          161 : ExecAsyncRequestPending(AsyncRequest *areq)
     151              : {
     152          161 :     areq->callback_pending = true;
     153          161 :     areq->request_complete = false;
     154          161 :     areq->result = NULL;
     155          161 : }
        

Generated by: LCOV version 2.0-1