LCOV - code coverage report
Current view: top level - src/backend/executor - execAsync.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 86.4 % 59 51
Test Date: 2026-03-01 14:14:54 Functions: 100.0 % 6 6
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * execAsync.c
       4              :  *    Support routines for asynchronous execution
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *    src/backend/executor/execAsync.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "executor/execAsync.h"
      18              : #include "executor/executor.h"
      19              : #include "executor/nodeAppend.h"
      20              : #include "executor/nodeForeignscan.h"
      21              : 
      22              : /*
      23              :  * Asynchronously request a tuple from a designed async-capable node.
      24              :  */
      25              : void
      26         6075 : ExecAsyncRequest(AsyncRequest *areq)
      27              : {
      28         6075 :     if (areq->requestee->chgParam != NULL)    /* something changed? */
      29            2 :         ExecReScan(areq->requestee); /* let ReScan handle this */
      30              : 
      31              :     /* must provide our own instrumentation support */
      32         6075 :     if (areq->requestee->instrument)
      33          810 :         InstrStartNode(areq->requestee->instrument);
      34              : 
      35         6075 :     switch (nodeTag(areq->requestee))
      36              :     {
      37         6075 :         case T_ForeignScanState:
      38         6075 :             ExecAsyncForeignScanRequest(areq);
      39         6075 :             break;
      40            0 :         default:
      41              :             /* If the node doesn't support async, caller messed up. */
      42            0 :             elog(ERROR, "unrecognized node type: %d",
      43              :                  (int) nodeTag(areq->requestee));
      44              :     }
      45              : 
      46         6075 :     ExecAsyncResponse(areq);
      47              : 
      48              :     /* must provide our own instrumentation support */
      49         6075 :     if (areq->requestee->instrument)
      50          810 :         InstrStopNode(areq->requestee->instrument,
      51          810 :                       TupIsNull(areq->result) ? 0.0 : 1.0);
      52         6075 : }
      53              : 
      54              : /*
      55              :  * Give the asynchronous node a chance to configure the file descriptor event
      56              :  * for which it wishes to wait.  We expect the node-type specific callback to
      57              :  * make a single call of the following form:
      58              :  *
      59              :  * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
      60              :  */
      61              : void
      62          246 : ExecAsyncConfigureWait(AsyncRequest *areq)
      63              : {
      64              :     /* must provide our own instrumentation support */
      65          246 :     if (areq->requestee->instrument)
      66           19 :         InstrStartNode(areq->requestee->instrument);
      67              : 
      68          246 :     switch (nodeTag(areq->requestee))
      69              :     {
      70          246 :         case T_ForeignScanState:
      71          246 :             ExecAsyncForeignScanConfigureWait(areq);
      72          245 :             break;
      73            0 :         default:
      74              :             /* If the node doesn't support async, caller messed up. */
      75            0 :             elog(ERROR, "unrecognized node type: %d",
      76              :                  (int) nodeTag(areq->requestee));
      77              :     }
      78              : 
      79              :     /* must provide our own instrumentation support */
      80          245 :     if (areq->requestee->instrument)
      81           19 :         InstrStopNode(areq->requestee->instrument, 0.0);
      82          245 : }
      83              : 
      84              : /*
      85              :  * Call the asynchronous node back when a relevant event has occurred.
      86              :  */
      87              : void
      88          148 : ExecAsyncNotify(AsyncRequest *areq)
      89              : {
      90              :     /* must provide our own instrumentation support */
      91          148 :     if (areq->requestee->instrument)
      92           15 :         InstrStartNode(areq->requestee->instrument);
      93              : 
      94          148 :     switch (nodeTag(areq->requestee))
      95              :     {
      96          148 :         case T_ForeignScanState:
      97          148 :             ExecAsyncForeignScanNotify(areq);
      98          148 :             break;
      99            0 :         default:
     100              :             /* If the node doesn't support async, caller messed up. */
     101            0 :             elog(ERROR, "unrecognized node type: %d",
     102              :                  (int) nodeTag(areq->requestee));
     103              :     }
     104              : 
     105          148 :     ExecAsyncResponse(areq);
     106              : 
     107              :     /* must provide our own instrumentation support */
     108          148 :     if (areq->requestee->instrument)
     109           15 :         InstrStopNode(areq->requestee->instrument,
     110           15 :                       TupIsNull(areq->result) ? 0.0 : 1.0);
     111          148 : }
     112              : 
     113              : /*
     114              :  * Call the requestor back when an asynchronous node has produced a result.
     115              :  */
     116              : void
     117         6227 : ExecAsyncResponse(AsyncRequest *areq)
     118              : {
     119         6227 :     switch (nodeTag(areq->requestor))
     120              :     {
     121         6227 :         case T_AppendState:
     122         6227 :             ExecAsyncAppendResponse(areq);
     123         6227 :             break;
     124            0 :         default:
     125              :             /* If the node doesn't support async, caller messed up. */
     126            0 :             elog(ERROR, "unrecognized node type: %d",
     127              :                  (int) nodeTag(areq->requestor));
     128              :     }
     129         6227 : }
     130              : 
     131              : /*
     132              :  * A requestee node should call this function to deliver the tuple to its
     133              :  * requestor node.  The requestee node can call this from its ExecAsyncRequest
     134              :  * or ExecAsyncNotify callback.
     135              :  */
     136              : void
     137         6067 : ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
     138              : {
     139         6067 :     areq->request_complete = true;
     140         6067 :     areq->result = result;
     141         6067 : }
     142              : 
     143              : /*
     144              :  * A requestee node should call this function to indicate that it is pending
     145              :  * for a callback.  The requestee node can call this from its ExecAsyncRequest
     146              :  * or ExecAsyncNotify callback.
     147              :  */
     148              : void
     149          160 : ExecAsyncRequestPending(AsyncRequest *areq)
     150              : {
     151          160 :     areq->callback_pending = true;
     152          160 :     areq->request_complete = false;
     153          160 :     areq->result = NULL;
     154          160 : }
        

Generated by: LCOV version 2.0-1