LCOV - code coverage report
Current view: top level - src/pl/plpython - plpy_cursorobject.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 173 202 85.6 %
Date: 2025-01-18 04:15:08 Functions: 8 8 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * the PLyCursor class
       3             :  *
       4             :  * src/pl/plpython/plpy_cursorobject.c
       5             :  */
       6             : 
       7             : #include "postgres.h"
       8             : 
       9             : #include <limits.h>
      10             : 
      11             : #include "catalog/pg_type.h"
      12             : #include "mb/pg_wchar.h"
      13             : #include "plpy_cursorobject.h"
      14             : #include "plpy_elog.h"
      15             : #include "plpy_main.h"
      16             : #include "plpy_planobject.h"
      17             : #include "plpy_resultobject.h"
      18             : #include "plpy_spi.h"
      19             : #include "plpython.h"
      20             : #include "utils/memutils.h"
      21             : 
      22             : static PyObject *PLy_cursor_query(const char *query);
      23             : static void PLy_cursor_dealloc(PyObject *arg);
      24             : static PyObject *PLy_cursor_iternext(PyObject *self);
      25             : static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
      26             : static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
      27             : 
      28             : static const char PLy_cursor_doc[] = "Wrapper around a PostgreSQL cursor";
      29             : 
      30             : static PyMethodDef PLy_cursor_methods[] = {
      31             :     {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
      32             :     {"close", PLy_cursor_close, METH_NOARGS, NULL},
      33             :     {NULL, NULL, 0, NULL}
      34             : };
      35             : 
      36             : static PyTypeObject PLy_CursorType = {
      37             :     PyVarObject_HEAD_INIT(NULL, 0)
      38             :     .tp_name = "PLyCursor",
      39             :     .tp_basicsize = sizeof(PLyCursorObject),
      40             :     .tp_dealloc = PLy_cursor_dealloc,
      41             :     .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
      42             :     .tp_doc = PLy_cursor_doc,
      43             :     .tp_iter = PyObject_SelfIter,
      44             :     .tp_iternext = PLy_cursor_iternext,
      45             :     .tp_methods = PLy_cursor_methods,
      46             : };
      47             : 
      48             : void
      49          46 : PLy_cursor_init_type(void)
      50             : {
      51          46 :     if (PyType_Ready(&PLy_CursorType) < 0)
      52           0 :         elog(ERROR, "could not initialize PLy_CursorType");
      53          46 : }
      54             : 
      55             : PyObject *
      56          36 : PLy_cursor(PyObject *self, PyObject *args)
      57             : {
      58             :     char       *query;
      59             :     PyObject   *plan;
      60          36 :     PyObject   *planargs = NULL;
      61             : 
      62          36 :     if (PyArg_ParseTuple(args, "s", &query))
      63          28 :         return PLy_cursor_query(query);
      64             : 
      65           8 :     PyErr_Clear();
      66             : 
      67           8 :     if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
      68           8 :         return PLy_cursor_plan(plan, planargs);
      69             : 
      70           0 :     PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
      71           0 :     return NULL;
      72             : }
      73             : 
      74             : 
      75             : static PyObject *
      76          28 : PLy_cursor_query(const char *query)
      77             : {
      78             :     PLyCursorObject *cursor;
      79          28 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
      80             :     volatile MemoryContext oldcontext;
      81             :     volatile ResourceOwner oldowner;
      82             : 
      83          28 :     if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
      84           0 :         return NULL;
      85          28 :     cursor->portalname = NULL;
      86          28 :     cursor->closed = false;
      87          28 :     cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
      88             :                                          "PL/Python cursor context",
      89             :                                          ALLOCSET_DEFAULT_SIZES);
      90             : 
      91             :     /* Initialize for converting result tuples to Python */
      92          28 :     PLy_input_setup_func(&cursor->result, cursor->mcxt,
      93             :                          RECORDOID, -1,
      94          28 :                          exec_ctx->curr_proc);
      95             : 
      96          28 :     oldcontext = CurrentMemoryContext;
      97          28 :     oldowner = CurrentResourceOwner;
      98             : 
      99          28 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     100             : 
     101          28 :     PG_TRY();
     102             :     {
     103             :         SPIPlanPtr  plan;
     104             :         Portal      portal;
     105             : 
     106          28 :         pg_verifymbstr(query, strlen(query), false);
     107             : 
     108          28 :         plan = SPI_prepare(query, 0, NULL);
     109          28 :         if (plan == NULL)
     110           0 :             elog(ERROR, "SPI_prepare failed: %s",
     111             :                  SPI_result_code_string(SPI_result));
     112             : 
     113          56 :         portal = SPI_cursor_open(NULL, plan, NULL, NULL,
     114          28 :                                  exec_ctx->curr_proc->fn_readonly);
     115          28 :         SPI_freeplan(plan);
     116             : 
     117          28 :         if (portal == NULL)
     118           0 :             elog(ERROR, "SPI_cursor_open() failed: %s",
     119             :                  SPI_result_code_string(SPI_result));
     120             : 
     121          28 :         cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
     122             : 
     123          28 :         PinPortal(portal);
     124             : 
     125          28 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     126             :     }
     127           0 :     PG_CATCH();
     128             :     {
     129           0 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     130           0 :         return NULL;
     131             :     }
     132          28 :     PG_END_TRY();
     133             : 
     134             :     Assert(cursor->portalname != NULL);
     135          28 :     return (PyObject *) cursor;
     136             : }
     137             : 
     138             : PyObject *
     139          10 : PLy_cursor_plan(PyObject *ob, PyObject *args)
     140             : {
     141             :     PLyCursorObject *cursor;
     142             :     volatile int nargs;
     143             :     PLyPlanObject *plan;
     144          10 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
     145             :     volatile MemoryContext oldcontext;
     146             :     volatile ResourceOwner oldowner;
     147             : 
     148          10 :     if (args)
     149             :     {
     150           6 :         if (!PySequence_Check(args) || PyUnicode_Check(args))
     151             :         {
     152           0 :             PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
     153           0 :             return NULL;
     154             :         }
     155           6 :         nargs = PySequence_Length(args);
     156             :     }
     157             :     else
     158           4 :         nargs = 0;
     159             : 
     160          10 :     plan = (PLyPlanObject *) ob;
     161             : 
     162          10 :     if (nargs != plan->nargs)
     163             :     {
     164             :         char       *sv;
     165           2 :         PyObject   *so = PyObject_Str(args);
     166             : 
     167           2 :         if (!so)
     168           0 :             PLy_elog(ERROR, "could not execute plan");
     169           2 :         sv = PLyUnicode_AsString(so);
     170           2 :         PLy_exception_set_plural(PyExc_TypeError,
     171             :                                  "Expected sequence of %d argument, got %d: %s",
     172             :                                  "Expected sequence of %d arguments, got %d: %s",
     173           2 :                                  plan->nargs,
     174             :                                  plan->nargs, nargs, sv);
     175           2 :         Py_DECREF(so);
     176             : 
     177           2 :         return NULL;
     178             :     }
     179             : 
     180           8 :     if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
     181           0 :         return NULL;
     182           8 :     cursor->portalname = NULL;
     183           8 :     cursor->closed = false;
     184           8 :     cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
     185             :                                          "PL/Python cursor context",
     186             :                                          ALLOCSET_DEFAULT_SIZES);
     187             : 
     188             :     /* Initialize for converting result tuples to Python */
     189           8 :     PLy_input_setup_func(&cursor->result, cursor->mcxt,
     190             :                          RECORDOID, -1,
     191           8 :                          exec_ctx->curr_proc);
     192             : 
     193           8 :     oldcontext = CurrentMemoryContext;
     194           8 :     oldowner = CurrentResourceOwner;
     195             : 
     196           8 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     197             : 
     198           8 :     PG_TRY();
     199             :     {
     200             :         Portal      portal;
     201             :         MemoryContext tmpcontext;
     202             :         Datum      *volatile values;
     203             :         char       *volatile nulls;
     204             :         volatile int j;
     205             : 
     206             :         /*
     207             :          * Converted arguments and associated cruft will be in this context,
     208             :          * which is local to our subtransaction.
     209             :          */
     210           8 :         tmpcontext = AllocSetContextCreate(CurTransactionContext,
     211             :                                            "PL/Python temporary context",
     212             :                                            ALLOCSET_SMALL_SIZES);
     213           8 :         MemoryContextSwitchTo(tmpcontext);
     214             : 
     215           8 :         if (nargs > 0)
     216             :         {
     217           4 :             values = (Datum *) palloc(nargs * sizeof(Datum));
     218           4 :             nulls = (char *) palloc(nargs * sizeof(char));
     219             :         }
     220             :         else
     221             :         {
     222           4 :             values = NULL;
     223           4 :             nulls = NULL;
     224             :         }
     225             : 
     226          12 :         for (j = 0; j < nargs; j++)
     227             :         {
     228           4 :             PLyObToDatum *arg = &plan->args[j];
     229             :             PyObject   *elem;
     230             : 
     231           4 :             elem = PySequence_GetItem(args, j);
     232           4 :             PG_TRY(2);
     233             :             {
     234             :                 bool        isnull;
     235             : 
     236           4 :                 values[j] = PLy_output_convert(arg, elem, &isnull);
     237           4 :                 nulls[j] = isnull ? 'n' : ' ';
     238             :             }
     239           0 :             PG_FINALLY(2);
     240             :             {
     241           4 :                 Py_DECREF(elem);
     242             :             }
     243           4 :             PG_END_TRY(2);
     244             :         }
     245             : 
     246           8 :         MemoryContextSwitchTo(oldcontext);
     247             : 
     248          16 :         portal = SPI_cursor_open(NULL, plan->plan, values, nulls,
     249           8 :                                  exec_ctx->curr_proc->fn_readonly);
     250           8 :         if (portal == NULL)
     251           0 :             elog(ERROR, "SPI_cursor_open() failed: %s",
     252             :                  SPI_result_code_string(SPI_result));
     253             : 
     254           8 :         cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
     255             : 
     256           8 :         PinPortal(portal);
     257             : 
     258           8 :         MemoryContextDelete(tmpcontext);
     259           8 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     260             :     }
     261           0 :     PG_CATCH();
     262             :     {
     263           0 :         Py_DECREF(cursor);
     264             :         /* Subtransaction abort will remove the tmpcontext */
     265           0 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     266           0 :         return NULL;
     267             :     }
     268           8 :     PG_END_TRY();
     269             : 
     270             :     Assert(cursor->portalname != NULL);
     271           8 :     return (PyObject *) cursor;
     272             : }
     273             : 
     274             : static void
     275          36 : PLy_cursor_dealloc(PyObject *arg)
     276             : {
     277             :     PLyCursorObject *cursor;
     278             :     Portal      portal;
     279             : 
     280          36 :     cursor = (PLyCursorObject *) arg;
     281             : 
     282          36 :     if (!cursor->closed)
     283             :     {
     284          30 :         portal = GetPortalByName(cursor->portalname);
     285             : 
     286          30 :         if (PortalIsValid(portal))
     287             :         {
     288          24 :             UnpinPortal(portal);
     289          24 :             SPI_cursor_close(portal);
     290             :         }
     291          30 :         cursor->closed = true;
     292             :     }
     293          36 :     if (cursor->mcxt)
     294             :     {
     295          36 :         MemoryContextDelete(cursor->mcxt);
     296          36 :         cursor->mcxt = NULL;
     297             :     }
     298          36 :     arg->ob_type->tp_free(arg);
     299          36 : }
     300             : 
     301             : static PyObject *
     302          72 : PLy_cursor_iternext(PyObject *self)
     303             : {
     304             :     PLyCursorObject *cursor;
     305             :     PyObject   *ret;
     306          72 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
     307             :     volatile MemoryContext oldcontext;
     308             :     volatile ResourceOwner oldowner;
     309             :     Portal      portal;
     310             : 
     311          72 :     cursor = (PLyCursorObject *) self;
     312             : 
     313          72 :     if (cursor->closed)
     314             :     {
     315           2 :         PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
     316           2 :         return NULL;
     317             :     }
     318             : 
     319          70 :     portal = GetPortalByName(cursor->portalname);
     320          70 :     if (!PortalIsValid(portal))
     321             :     {
     322           0 :         PLy_exception_set(PyExc_ValueError,
     323             :                           "iterating a cursor in an aborted subtransaction");
     324           0 :         return NULL;
     325             :     }
     326             : 
     327          70 :     oldcontext = CurrentMemoryContext;
     328          70 :     oldowner = CurrentResourceOwner;
     329             : 
     330          70 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     331             : 
     332          70 :     PG_TRY();
     333             :     {
     334          70 :         SPI_cursor_fetch(portal, true, 1);
     335          68 :         if (SPI_processed == 0)
     336             :         {
     337          16 :             PyErr_SetNone(PyExc_StopIteration);
     338          16 :             ret = NULL;
     339             :         }
     340             :         else
     341             :         {
     342          52 :             PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
     343          52 :                                   exec_ctx->curr_proc);
     344             : 
     345          52 :             ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
     346          52 :                                        SPI_tuptable->tupdesc, true);
     347             :         }
     348             : 
     349          68 :         SPI_freetuptable(SPI_tuptable);
     350             : 
     351          68 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     352             :     }
     353           2 :     PG_CATCH();
     354             :     {
     355           2 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     356           2 :         return NULL;
     357             :     }
     358          68 :     PG_END_TRY();
     359             : 
     360          68 :     return ret;
     361             : }
     362             : 
     363             : static PyObject *
     364          26 : PLy_cursor_fetch(PyObject *self, PyObject *args)
     365             : {
     366             :     PLyCursorObject *cursor;
     367             :     int         count;
     368             :     PLyResultObject *ret;
     369          26 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
     370             :     volatile MemoryContext oldcontext;
     371             :     volatile ResourceOwner oldowner;
     372             :     Portal      portal;
     373             : 
     374          26 :     if (!PyArg_ParseTuple(args, "i:fetch", &count))
     375           0 :         return NULL;
     376             : 
     377          26 :     cursor = (PLyCursorObject *) self;
     378             : 
     379          26 :     if (cursor->closed)
     380             :     {
     381           2 :         PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
     382           2 :         return NULL;
     383             :     }
     384             : 
     385          24 :     portal = GetPortalByName(cursor->portalname);
     386          24 :     if (!PortalIsValid(portal))
     387             :     {
     388           4 :         PLy_exception_set(PyExc_ValueError,
     389             :                           "iterating a cursor in an aborted subtransaction");
     390           4 :         return NULL;
     391             :     }
     392             : 
     393          20 :     ret = (PLyResultObject *) PLy_result_new();
     394          20 :     if (ret == NULL)
     395           0 :         return NULL;
     396             : 
     397          20 :     oldcontext = CurrentMemoryContext;
     398          20 :     oldowner = CurrentResourceOwner;
     399             : 
     400          20 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     401             : 
     402          20 :     PG_TRY();
     403             :     {
     404          20 :         SPI_cursor_fetch(portal, true, count);
     405             : 
     406          20 :         Py_DECREF(ret->status);
     407          20 :         ret->status = PyLong_FromLong(SPI_OK_FETCH);
     408             : 
     409          20 :         Py_DECREF(ret->nrows);
     410          20 :         ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
     411             : 
     412          20 :         if (SPI_processed != 0)
     413             :         {
     414             :             uint64      i;
     415             : 
     416             :             /*
     417             :              * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
     418             :              * and list indices; so we cannot support a result larger than
     419             :              * PY_SSIZE_T_MAX.
     420             :              */
     421          14 :             if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
     422           0 :                 ereport(ERROR,
     423             :                         (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     424             :                          errmsg("query result has too many rows to fit in a Python list")));
     425             : 
     426          14 :             Py_DECREF(ret->rows);
     427          14 :             ret->rows = PyList_New(SPI_processed);
     428          14 :             if (!ret->rows)
     429             :             {
     430           0 :                 Py_DECREF(ret);
     431           0 :                 ret = NULL;
     432             :             }
     433             :             else
     434             :             {
     435          14 :                 PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
     436          14 :                                       exec_ctx->curr_proc);
     437             : 
     438          88 :                 for (i = 0; i < SPI_processed; i++)
     439             :                 {
     440         148 :                     PyObject   *row = PLy_input_from_tuple(&cursor->result,
     441          74 :                                                            SPI_tuptable->vals[i],
     442          74 :                                                            SPI_tuptable->tupdesc,
     443             :                                                            true);
     444             : 
     445          74 :                     PyList_SetItem(ret->rows, i, row);
     446             :                 }
     447             :             }
     448             :         }
     449             : 
     450          20 :         SPI_freetuptable(SPI_tuptable);
     451             : 
     452          20 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     453             :     }
     454           0 :     PG_CATCH();
     455             :     {
     456           0 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     457           0 :         return NULL;
     458             :     }
     459          20 :     PG_END_TRY();
     460             : 
     461          20 :     return (PyObject *) ret;
     462             : }
     463             : 
     464             : static PyObject *
     465          10 : PLy_cursor_close(PyObject *self, PyObject *unused)
     466             : {
     467          10 :     PLyCursorObject *cursor = (PLyCursorObject *) self;
     468             : 
     469          10 :     if (!cursor->closed)
     470             :     {
     471           8 :         Portal      portal = GetPortalByName(cursor->portalname);
     472             : 
     473           8 :         if (!PortalIsValid(portal))
     474             :         {
     475           2 :             PLy_exception_set(PyExc_ValueError,
     476             :                               "closing a cursor in an aborted subtransaction");
     477           2 :             return NULL;
     478             :         }
     479             : 
     480           6 :         UnpinPortal(portal);
     481           6 :         SPI_cursor_close(portal);
     482           6 :         cursor->closed = true;
     483             :     }
     484             : 
     485           8 :     Py_RETURN_NONE;
     486             : }

Generated by: LCOV version 1.14