LCOV - code coverage report
Current view: top level - src/pl/plpython - plpy_cursorobject.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 172 206 83.5 %
Date: 2024-11-21 08:14:44 Functions: 8 8 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * the PLyCursor class
       3             :  *
       4             :  * src/pl/plpython/plpy_cursorobject.c
       5             :  */
       6             : 
       7             : #include "postgres.h"
       8             : 
       9             : #include <limits.h>
      10             : 
      11             : #include "catalog/pg_type.h"
      12             : #include "mb/pg_wchar.h"
      13             : #include "plpy_cursorobject.h"
      14             : #include "plpy_elog.h"
      15             : #include "plpy_main.h"
      16             : #include "plpy_planobject.h"
      17             : #include "plpy_resultobject.h"
      18             : #include "plpy_spi.h"
      19             : #include "plpython.h"
      20             : #include "utils/memutils.h"
      21             : 
      22             : static PyObject *PLy_cursor_query(const char *query);
      23             : static void PLy_cursor_dealloc(PyObject *arg);
      24             : static PyObject *PLy_cursor_iternext(PyObject *self);
      25             : static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
      26             : static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
      27             : 
      28             : static const char PLy_cursor_doc[] = "Wrapper around a PostgreSQL cursor";
      29             : 
      30             : static PyMethodDef PLy_cursor_methods[] = {
      31             :     {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
      32             :     {"close", PLy_cursor_close, METH_NOARGS, NULL},
      33             :     {NULL, NULL, 0, NULL}
      34             : };
      35             : 
      36             : static PyTypeObject PLy_CursorType = {
      37             :     PyVarObject_HEAD_INIT(NULL, 0)
      38             :     .tp_name = "PLyCursor",
      39             :     .tp_basicsize = sizeof(PLyCursorObject),
      40             :     .tp_dealloc = PLy_cursor_dealloc,
      41             :     .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
      42             :     .tp_doc = PLy_cursor_doc,
      43             :     .tp_iter = PyObject_SelfIter,
      44             :     .tp_iternext = PLy_cursor_iternext,
      45             :     .tp_methods = PLy_cursor_methods,
      46             : };
      47             : 
      48             : void
      49          46 : PLy_cursor_init_type(void)
      50             : {
      51          46 :     if (PyType_Ready(&PLy_CursorType) < 0)
      52           0 :         elog(ERROR, "could not initialize PLy_CursorType");
      53          46 : }
      54             : 
      55             : PyObject *
      56          36 : PLy_cursor(PyObject *self, PyObject *args)
      57             : {
      58             :     char       *query;
      59             :     PyObject   *plan;
      60          36 :     PyObject   *planargs = NULL;
      61             : 
      62          36 :     if (PyArg_ParseTuple(args, "s", &query))
      63          28 :         return PLy_cursor_query(query);
      64             : 
      65           8 :     PyErr_Clear();
      66             : 
      67           8 :     if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
      68           8 :         return PLy_cursor_plan(plan, planargs);
      69             : 
      70           0 :     PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
      71           0 :     return NULL;
      72             : }
      73             : 
      74             : 
      75             : static PyObject *
      76          28 : PLy_cursor_query(const char *query)
      77             : {
      78             :     PLyCursorObject *cursor;
      79          28 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
      80             :     volatile MemoryContext oldcontext;
      81             :     volatile ResourceOwner oldowner;
      82             : 
      83          28 :     if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
      84           0 :         return NULL;
      85          28 :     cursor->portalname = NULL;
      86          28 :     cursor->closed = false;
      87          28 :     cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
      88             :                                          "PL/Python cursor context",
      89             :                                          ALLOCSET_DEFAULT_SIZES);
      90             : 
      91             :     /* Initialize for converting result tuples to Python */
      92          28 :     PLy_input_setup_func(&cursor->result, cursor->mcxt,
      93             :                          RECORDOID, -1,
      94          28 :                          exec_ctx->curr_proc);
      95             : 
      96          28 :     oldcontext = CurrentMemoryContext;
      97          28 :     oldowner = CurrentResourceOwner;
      98             : 
      99          28 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     100             : 
     101          28 :     PG_TRY();
     102             :     {
     103             :         SPIPlanPtr  plan;
     104             :         Portal      portal;
     105             : 
     106          28 :         pg_verifymbstr(query, strlen(query), false);
     107             : 
     108          28 :         plan = SPI_prepare(query, 0, NULL);
     109          28 :         if (plan == NULL)
     110           0 :             elog(ERROR, "SPI_prepare failed: %s",
     111             :                  SPI_result_code_string(SPI_result));
     112             : 
     113          56 :         portal = SPI_cursor_open(NULL, plan, NULL, NULL,
     114          28 :                                  exec_ctx->curr_proc->fn_readonly);
     115          28 :         SPI_freeplan(plan);
     116             : 
     117          28 :         if (portal == NULL)
     118           0 :             elog(ERROR, "SPI_cursor_open() failed: %s",
     119             :                  SPI_result_code_string(SPI_result));
     120             : 
     121          28 :         cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
     122             : 
     123          28 :         PinPortal(portal);
     124             : 
     125          28 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     126             :     }
     127           0 :     PG_CATCH();
     128             :     {
     129           0 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     130           0 :         return NULL;
     131             :     }
     132          28 :     PG_END_TRY();
     133             : 
     134             :     Assert(cursor->portalname != NULL);
     135          28 :     return (PyObject *) cursor;
     136             : }
     137             : 
     138             : PyObject *
     139          10 : PLy_cursor_plan(PyObject *ob, PyObject *args)
     140             : {
     141             :     PLyCursorObject *cursor;
     142             :     volatile int nargs;
     143             :     int         i;
     144             :     PLyPlanObject *plan;
     145          10 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
     146             :     volatile MemoryContext oldcontext;
     147             :     volatile ResourceOwner oldowner;
     148             : 
     149          10 :     if (args)
     150             :     {
     151           6 :         if (!PySequence_Check(args) || PyUnicode_Check(args))
     152             :         {
     153           0 :             PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
     154           0 :             return NULL;
     155             :         }
     156           6 :         nargs = PySequence_Length(args);
     157             :     }
     158             :     else
     159           4 :         nargs = 0;
     160             : 
     161          10 :     plan = (PLyPlanObject *) ob;
     162             : 
     163          10 :     if (nargs != plan->nargs)
     164             :     {
     165             :         char       *sv;
     166           2 :         PyObject   *so = PyObject_Str(args);
     167             : 
     168           2 :         if (!so)
     169           0 :             PLy_elog(ERROR, "could not execute plan");
     170           2 :         sv = PLyUnicode_AsString(so);
     171           2 :         PLy_exception_set_plural(PyExc_TypeError,
     172             :                                  "Expected sequence of %d argument, got %d: %s",
     173             :                                  "Expected sequence of %d arguments, got %d: %s",
     174           2 :                                  plan->nargs,
     175             :                                  plan->nargs, nargs, sv);
     176           2 :         Py_DECREF(so);
     177             : 
     178           2 :         return NULL;
     179             :     }
     180             : 
     181           8 :     if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
     182           0 :         return NULL;
     183           8 :     cursor->portalname = NULL;
     184           8 :     cursor->closed = false;
     185           8 :     cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
     186             :                                          "PL/Python cursor context",
     187             :                                          ALLOCSET_DEFAULT_SIZES);
     188             : 
     189             :     /* Initialize for converting result tuples to Python */
     190           8 :     PLy_input_setup_func(&cursor->result, cursor->mcxt,
     191             :                          RECORDOID, -1,
     192           8 :                          exec_ctx->curr_proc);
     193             : 
     194           8 :     oldcontext = CurrentMemoryContext;
     195           8 :     oldowner = CurrentResourceOwner;
     196             : 
     197           8 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     198             : 
     199           8 :     PG_TRY();
     200             :     {
     201             :         Portal      portal;
     202             :         char       *volatile nulls;
     203             :         volatile int j;
     204             : 
     205           8 :         if (nargs > 0)
     206           4 :             nulls = palloc(nargs * sizeof(char));
     207             :         else
     208           4 :             nulls = NULL;
     209             : 
     210          12 :         for (j = 0; j < nargs; j++)
     211             :         {
     212           4 :             PLyObToDatum *arg = &plan->args[j];
     213             :             PyObject   *elem;
     214             : 
     215           4 :             elem = PySequence_GetItem(args, j);
     216           4 :             PG_TRY(2);
     217             :             {
     218             :                 bool        isnull;
     219             : 
     220           4 :                 plan->values[j] = PLy_output_convert(arg, elem, &isnull);
     221           4 :                 nulls[j] = isnull ? 'n' : ' ';
     222             :             }
     223           0 :             PG_FINALLY(2);
     224             :             {
     225           4 :                 Py_DECREF(elem);
     226             :             }
     227           4 :             PG_END_TRY(2);
     228             :         }
     229             : 
     230          16 :         portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
     231           8 :                                  exec_ctx->curr_proc->fn_readonly);
     232           8 :         if (portal == NULL)
     233           0 :             elog(ERROR, "SPI_cursor_open() failed: %s",
     234             :                  SPI_result_code_string(SPI_result));
     235             : 
     236           8 :         cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
     237             : 
     238           8 :         PinPortal(portal);
     239             : 
     240           8 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     241             :     }
     242           0 :     PG_CATCH();
     243             :     {
     244             :         int         k;
     245             : 
     246             :         /* cleanup plan->values array */
     247           0 :         for (k = 0; k < nargs; k++)
     248             :         {
     249           0 :             if (!plan->args[k].typbyval &&
     250           0 :                 (plan->values[k] != PointerGetDatum(NULL)))
     251             :             {
     252           0 :                 pfree(DatumGetPointer(plan->values[k]));
     253           0 :                 plan->values[k] = PointerGetDatum(NULL);
     254             :             }
     255             :         }
     256             : 
     257           0 :         Py_DECREF(cursor);
     258             : 
     259           0 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     260           0 :         return NULL;
     261             :     }
     262           8 :     PG_END_TRY();
     263             : 
     264          12 :     for (i = 0; i < nargs; i++)
     265             :     {
     266           4 :         if (!plan->args[i].typbyval &&
     267           4 :             (plan->values[i] != PointerGetDatum(NULL)))
     268             :         {
     269           4 :             pfree(DatumGetPointer(plan->values[i]));
     270           4 :             plan->values[i] = PointerGetDatum(NULL);
     271             :         }
     272             :     }
     273             : 
     274             :     Assert(cursor->portalname != NULL);
     275           8 :     return (PyObject *) cursor;
     276             : }
     277             : 
     278             : static void
     279          36 : PLy_cursor_dealloc(PyObject *arg)
     280             : {
     281             :     PLyCursorObject *cursor;
     282             :     Portal      portal;
     283             : 
     284          36 :     cursor = (PLyCursorObject *) arg;
     285             : 
     286          36 :     if (!cursor->closed)
     287             :     {
     288          30 :         portal = GetPortalByName(cursor->portalname);
     289             : 
     290          30 :         if (PortalIsValid(portal))
     291             :         {
     292          24 :             UnpinPortal(portal);
     293          24 :             SPI_cursor_close(portal);
     294             :         }
     295          30 :         cursor->closed = true;
     296             :     }
     297          36 :     if (cursor->mcxt)
     298             :     {
     299          36 :         MemoryContextDelete(cursor->mcxt);
     300          36 :         cursor->mcxt = NULL;
     301             :     }
     302          36 :     arg->ob_type->tp_free(arg);
     303          36 : }
     304             : 
     305             : static PyObject *
     306          72 : PLy_cursor_iternext(PyObject *self)
     307             : {
     308             :     PLyCursorObject *cursor;
     309             :     PyObject   *ret;
     310          72 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
     311             :     volatile MemoryContext oldcontext;
     312             :     volatile ResourceOwner oldowner;
     313             :     Portal      portal;
     314             : 
     315          72 :     cursor = (PLyCursorObject *) self;
     316             : 
     317          72 :     if (cursor->closed)
     318             :     {
     319           2 :         PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
     320           2 :         return NULL;
     321             :     }
     322             : 
     323          70 :     portal = GetPortalByName(cursor->portalname);
     324          70 :     if (!PortalIsValid(portal))
     325             :     {
     326           0 :         PLy_exception_set(PyExc_ValueError,
     327             :                           "iterating a cursor in an aborted subtransaction");
     328           0 :         return NULL;
     329             :     }
     330             : 
     331          70 :     oldcontext = CurrentMemoryContext;
     332          70 :     oldowner = CurrentResourceOwner;
     333             : 
     334          70 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     335             : 
     336          70 :     PG_TRY();
     337             :     {
     338          70 :         SPI_cursor_fetch(portal, true, 1);
     339          68 :         if (SPI_processed == 0)
     340             :         {
     341          16 :             PyErr_SetNone(PyExc_StopIteration);
     342          16 :             ret = NULL;
     343             :         }
     344             :         else
     345             :         {
     346          52 :             PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
     347          52 :                                   exec_ctx->curr_proc);
     348             : 
     349          52 :             ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
     350          52 :                                        SPI_tuptable->tupdesc, true);
     351             :         }
     352             : 
     353          68 :         SPI_freetuptable(SPI_tuptable);
     354             : 
     355          68 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     356             :     }
     357           2 :     PG_CATCH();
     358             :     {
     359           2 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     360           2 :         return NULL;
     361             :     }
     362          68 :     PG_END_TRY();
     363             : 
     364          68 :     return ret;
     365             : }
     366             : 
     367             : static PyObject *
     368          26 : PLy_cursor_fetch(PyObject *self, PyObject *args)
     369             : {
     370             :     PLyCursorObject *cursor;
     371             :     int         count;
     372             :     PLyResultObject *ret;
     373          26 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
     374             :     volatile MemoryContext oldcontext;
     375             :     volatile ResourceOwner oldowner;
     376             :     Portal      portal;
     377             : 
     378          26 :     if (!PyArg_ParseTuple(args, "i:fetch", &count))
     379           0 :         return NULL;
     380             : 
     381          26 :     cursor = (PLyCursorObject *) self;
     382             : 
     383          26 :     if (cursor->closed)
     384             :     {
     385           2 :         PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
     386           2 :         return NULL;
     387             :     }
     388             : 
     389          24 :     portal = GetPortalByName(cursor->portalname);
     390          24 :     if (!PortalIsValid(portal))
     391             :     {
     392           4 :         PLy_exception_set(PyExc_ValueError,
     393             :                           "iterating a cursor in an aborted subtransaction");
     394           4 :         return NULL;
     395             :     }
     396             : 
     397          20 :     ret = (PLyResultObject *) PLy_result_new();
     398          20 :     if (ret == NULL)
     399           0 :         return NULL;
     400             : 
     401          20 :     oldcontext = CurrentMemoryContext;
     402          20 :     oldowner = CurrentResourceOwner;
     403             : 
     404          20 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     405             : 
     406          20 :     PG_TRY();
     407             :     {
     408          20 :         SPI_cursor_fetch(portal, true, count);
     409             : 
     410          20 :         Py_DECREF(ret->status);
     411          20 :         ret->status = PyLong_FromLong(SPI_OK_FETCH);
     412             : 
     413          20 :         Py_DECREF(ret->nrows);
     414          20 :         ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
     415             : 
     416          20 :         if (SPI_processed != 0)
     417             :         {
     418             :             uint64      i;
     419             : 
     420             :             /*
     421             :              * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
     422             :              * and list indices; so we cannot support a result larger than
     423             :              * PY_SSIZE_T_MAX.
     424             :              */
     425          14 :             if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
     426           0 :                 ereport(ERROR,
     427             :                         (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     428             :                          errmsg("query result has too many rows to fit in a Python list")));
     429             : 
     430          14 :             Py_DECREF(ret->rows);
     431          14 :             ret->rows = PyList_New(SPI_processed);
     432          14 :             if (!ret->rows)
     433             :             {
     434           0 :                 Py_DECREF(ret);
     435           0 :                 ret = NULL;
     436             :             }
     437             :             else
     438             :             {
     439          14 :                 PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
     440          14 :                                       exec_ctx->curr_proc);
     441             : 
     442          88 :                 for (i = 0; i < SPI_processed; i++)
     443             :                 {
     444         148 :                     PyObject   *row = PLy_input_from_tuple(&cursor->result,
     445          74 :                                                            SPI_tuptable->vals[i],
     446          74 :                                                            SPI_tuptable->tupdesc,
     447             :                                                            true);
     448             : 
     449          74 :                     PyList_SetItem(ret->rows, i, row);
     450             :                 }
     451             :             }
     452             :         }
     453             : 
     454          20 :         SPI_freetuptable(SPI_tuptable);
     455             : 
     456          20 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     457             :     }
     458           0 :     PG_CATCH();
     459             :     {
     460           0 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     461           0 :         return NULL;
     462             :     }
     463          20 :     PG_END_TRY();
     464             : 
     465          20 :     return (PyObject *) ret;
     466             : }
     467             : 
     468             : static PyObject *
     469          10 : PLy_cursor_close(PyObject *self, PyObject *unused)
     470             : {
     471          10 :     PLyCursorObject *cursor = (PLyCursorObject *) self;
     472             : 
     473          10 :     if (!cursor->closed)
     474             :     {
     475           8 :         Portal      portal = GetPortalByName(cursor->portalname);
     476             : 
     477           8 :         if (!PortalIsValid(portal))
     478             :         {
     479           2 :             PLy_exception_set(PyExc_ValueError,
     480             :                               "closing a cursor in an aborted subtransaction");
     481           2 :             return NULL;
     482             :         }
     483             : 
     484           6 :         UnpinPortal(portal);
     485           6 :         SPI_cursor_close(portal);
     486           6 :         cursor->closed = true;
     487             :     }
     488             : 
     489           8 :     Py_RETURN_NONE;
     490             : }

Generated by: LCOV version 1.14