LCOV - code coverage report
Current view: top level - src/backend/executor - execAsync.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 51 59 86.4 %
Date: 2024-04-16 13:11:18 Functions: 6 6 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execAsync.c
       4             :  *    Support routines for asynchronous execution
       5             :  *
       6             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/executor/execAsync.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "executor/execAsync.h"
      18             : #include "executor/executor.h"
      19             : #include "executor/nodeAppend.h"
      20             : #include "executor/nodeForeignscan.h"
      21             : 
      22             : /*
      23             :  * Asynchronously request a tuple from a designed async-capable node.
      24             :  */
      25             : void
      26       12350 : ExecAsyncRequest(AsyncRequest *areq)
      27             : {
      28       12350 :     if (areq->requestee->chgParam != NULL)    /* something changed? */
      29           4 :         ExecReScan(areq->requestee); /* let ReScan handle this */
      30             : 
      31             :     /* must provide our own instrumentation support */
      32       12350 :     if (areq->requestee->instrument)
      33        1620 :         InstrStartNode(areq->requestee->instrument);
      34             : 
      35       12350 :     switch (nodeTag(areq->requestee))
      36             :     {
      37       12350 :         case T_ForeignScanState:
      38       12350 :             ExecAsyncForeignScanRequest(areq);
      39       12350 :             break;
      40           0 :         default:
      41             :             /* If the node doesn't support async, caller messed up. */
      42           0 :             elog(ERROR, "unrecognized node type: %d",
      43             :                  (int) nodeTag(areq->requestee));
      44             :     }
      45             : 
      46       12350 :     ExecAsyncResponse(areq);
      47             : 
      48             :     /* must provide our own instrumentation support */
      49       12350 :     if (areq->requestee->instrument)
      50        1620 :         InstrStopNode(areq->requestee->instrument,
      51        1620 :                       TupIsNull(areq->result) ? 0.0 : 1.0);
      52       12350 : }
      53             : 
      54             : /*
      55             :  * Give the asynchronous node a chance to configure the file descriptor event
      56             :  * for which it wishes to wait.  We expect the node-type specific callback to
      57             :  * make a single call of the following form:
      58             :  *
      59             :  * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
      60             :  */
      61             : void
      62         360 : ExecAsyncConfigureWait(AsyncRequest *areq)
      63             : {
      64             :     /* must provide our own instrumentation support */
      65         360 :     if (areq->requestee->instrument)
      66          40 :         InstrStartNode(areq->requestee->instrument);
      67             : 
      68         360 :     switch (nodeTag(areq->requestee))
      69             :     {
      70         360 :         case T_ForeignScanState:
      71         360 :             ExecAsyncForeignScanConfigureWait(areq);
      72         358 :             break;
      73           0 :         default:
      74             :             /* If the node doesn't support async, caller messed up. */
      75           0 :             elog(ERROR, "unrecognized node type: %d",
      76             :                  (int) nodeTag(areq->requestee));
      77             :     }
      78             : 
      79             :     /* must provide our own instrumentation support */
      80         358 :     if (areq->requestee->instrument)
      81          40 :         InstrStopNode(areq->requestee->instrument, 0.0);
      82         358 : }
      83             : 
      84             : /*
      85             :  * Call the asynchronous node back when a relevant event has occurred.
      86             :  */
      87             : void
      88         298 : ExecAsyncNotify(AsyncRequest *areq)
      89             : {
      90             :     /* must provide our own instrumentation support */
      91         298 :     if (areq->requestee->instrument)
      92          28 :         InstrStartNode(areq->requestee->instrument);
      93             : 
      94         298 :     switch (nodeTag(areq->requestee))
      95             :     {
      96         298 :         case T_ForeignScanState:
      97         298 :             ExecAsyncForeignScanNotify(areq);
      98         298 :             break;
      99           0 :         default:
     100             :             /* If the node doesn't support async, caller messed up. */
     101           0 :             elog(ERROR, "unrecognized node type: %d",
     102             :                  (int) nodeTag(areq->requestee));
     103             :     }
     104             : 
     105         298 :     ExecAsyncResponse(areq);
     106             : 
     107             :     /* must provide our own instrumentation support */
     108         298 :     if (areq->requestee->instrument)
     109          28 :         InstrStopNode(areq->requestee->instrument,
     110          28 :                       TupIsNull(areq->result) ? 0.0 : 1.0);
     111         298 : }
     112             : 
     113             : /*
     114             :  * Call the requestor back when an asynchronous node has produced a result.
     115             :  */
     116             : void
     117       12658 : ExecAsyncResponse(AsyncRequest *areq)
     118             : {
     119       12658 :     switch (nodeTag(areq->requestor))
     120             :     {
     121       12658 :         case T_AppendState:
     122       12658 :             ExecAsyncAppendResponse(areq);
     123       12658 :             break;
     124           0 :         default:
     125             :             /* If the node doesn't support async, caller messed up. */
     126           0 :             elog(ERROR, "unrecognized node type: %d",
     127             :                  (int) nodeTag(areq->requestor));
     128             :     }
     129       12658 : }
     130             : 
     131             : /*
     132             :  * A requestee node should call this function to deliver the tuple to its
     133             :  * requestor node.  The requestee node can call this from its ExecAsyncRequest
     134             :  * or ExecAsyncNotify callback.
     135             :  */
     136             : void
     137       12336 : ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
     138             : {
     139       12336 :     areq->request_complete = true;
     140       12336 :     areq->result = result;
     141       12336 : }
     142             : 
     143             : /*
     144             :  * A requestee node should call this function to indicate that it is pending
     145             :  * for a callback.  The requestee node can call this from its ExecAsyncRequest
     146             :  * or ExecAsyncNotify callback.
     147             :  */
     148             : void
     149         322 : ExecAsyncRequestPending(AsyncRequest *areq)
     150             : {
     151         322 :     areq->callback_pending = true;
     152         322 :     areq->request_complete = false;
     153         322 :     areq->result = NULL;
     154         322 : }

Generated by: LCOV version 1.14