LCOV - code coverage report
Current view: top level - src/backend/executor - execAsync.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 86.4 % 59 51
Test Date: 2026-03-21 18:16:11 Functions: 100.0 % 6 6
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * execAsync.c
       4              :  *    Support routines for asynchronous execution
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *    src/backend/executor/execAsync.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "executor/execAsync.h"
      18              : #include "executor/executor.h"
      19              : #include "executor/instrument.h"
      20              : #include "executor/nodeAppend.h"
      21              : #include "executor/nodeForeignscan.h"
      22              : 
      23              : /*
      24              :  * Asynchronously request a tuple from a designed async-capable node.
      25              :  */
      26              : void
      27         5975 : ExecAsyncRequest(AsyncRequest *areq)
      28              : {
      29         5975 :     if (areq->requestee->chgParam != NULL)    /* something changed? */
      30            2 :         ExecReScan(areq->requestee); /* let ReScan handle this */
      31              : 
      32              :     /* must provide our own instrumentation support */
      33         5975 :     if (areq->requestee->instrument)
      34          810 :         InstrStartNode(areq->requestee->instrument);
      35              : 
      36         5975 :     switch (nodeTag(areq->requestee))
      37              :     {
      38         5975 :         case T_ForeignScanState:
      39         5975 :             ExecAsyncForeignScanRequest(areq);
      40         5975 :             break;
      41            0 :         default:
      42              :             /* If the node doesn't support async, caller messed up. */
      43            0 :             elog(ERROR, "unrecognized node type: %d",
      44              :                  (int) nodeTag(areq->requestee));
      45              :     }
      46              : 
      47         5975 :     ExecAsyncResponse(areq);
      48              : 
      49              :     /* must provide our own instrumentation support */
      50         5975 :     if (areq->requestee->instrument)
      51          810 :         InstrStopNode(areq->requestee->instrument,
      52          810 :                       TupIsNull(areq->result) ? 0.0 : 1.0);
      53         5975 : }
      54              : 
      55              : /*
      56              :  * Give the asynchronous node a chance to configure the file descriptor event
      57              :  * for which it wishes to wait.  We expect the node-type specific callback to
      58              :  * make a single call of the following form:
      59              :  *
      60              :  * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
      61              :  */
      62              : void
      63          222 : ExecAsyncConfigureWait(AsyncRequest *areq)
      64              : {
      65              :     /* must provide our own instrumentation support */
      66          222 :     if (areq->requestee->instrument)
      67           22 :         InstrStartNode(areq->requestee->instrument);
      68              : 
      69          222 :     switch (nodeTag(areq->requestee))
      70              :     {
      71          222 :         case T_ForeignScanState:
      72          222 :             ExecAsyncForeignScanConfigureWait(areq);
      73          221 :             break;
      74            0 :         default:
      75              :             /* If the node doesn't support async, caller messed up. */
      76            0 :             elog(ERROR, "unrecognized node type: %d",
      77              :                  (int) nodeTag(areq->requestee));
      78              :     }
      79              : 
      80              :     /* must provide our own instrumentation support */
      81          221 :     if (areq->requestee->instrument)
      82           22 :         InstrStopNode(areq->requestee->instrument, 0.0);
      83          221 : }
      84              : 
      85              : /*
      86              :  * Call the asynchronous node back when a relevant event has occurred.
      87              :  */
      88              : void
      89          146 : ExecAsyncNotify(AsyncRequest *areq)
      90              : {
      91              :     /* must provide our own instrumentation support */
      92          146 :     if (areq->requestee->instrument)
      93           14 :         InstrStartNode(areq->requestee->instrument);
      94              : 
      95          146 :     switch (nodeTag(areq->requestee))
      96              :     {
      97          146 :         case T_ForeignScanState:
      98          146 :             ExecAsyncForeignScanNotify(areq);
      99          146 :             break;
     100            0 :         default:
     101              :             /* If the node doesn't support async, caller messed up. */
     102            0 :             elog(ERROR, "unrecognized node type: %d",
     103              :                  (int) nodeTag(areq->requestee));
     104              :     }
     105              : 
     106          146 :     ExecAsyncResponse(areq);
     107              : 
     108              :     /* must provide our own instrumentation support */
     109          146 :     if (areq->requestee->instrument)
     110           14 :         InstrStopNode(areq->requestee->instrument,
     111           14 :                       TupIsNull(areq->result) ? 0.0 : 1.0);
     112          146 : }
     113              : 
     114              : /*
     115              :  * Call the requestor back when an asynchronous node has produced a result.
     116              :  */
     117              : void
     118         6126 : ExecAsyncResponse(AsyncRequest *areq)
     119              : {
     120         6126 :     switch (nodeTag(areq->requestor))
     121              :     {
     122         6126 :         case T_AppendState:
     123         6126 :             ExecAsyncAppendResponse(areq);
     124         6126 :             break;
     125            0 :         default:
     126              :             /* If the node doesn't support async, caller messed up. */
     127            0 :             elog(ERROR, "unrecognized node type: %d",
     128              :                  (int) nodeTag(areq->requestor));
     129              :     }
     130         6126 : }
     131              : 
     132              : /*
     133              :  * A requestee node should call this function to deliver the tuple to its
     134              :  * requestor node.  The requestee node can call this from its ExecAsyncRequest
     135              :  * or ExecAsyncNotify callback.
     136              :  */
     137              : void
     138         5967 : ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
     139              : {
     140         5967 :     areq->request_complete = true;
     141         5967 :     areq->result = result;
     142         5967 : }
     143              : 
     144              : /*
     145              :  * A requestee node should call this function to indicate that it is pending
     146              :  * for a callback.  The requestee node can call this from its ExecAsyncRequest
     147              :  * or ExecAsyncNotify callback.
     148              :  */
     149              : void
     150          159 : ExecAsyncRequestPending(AsyncRequest *areq)
     151              : {
     152          159 :     areq->callback_pending = true;
     153          159 :     areq->request_complete = false;
     154          159 :     areq->result = NULL;
     155          159 : }
        

Generated by: LCOV version 2.0-1