LCOV - code coverage report
Current view: top level - src/pl/plpython - plpy_cursorobject.c (source / functions) Hit Total Coverage
Test: PostgreSQL 14devel Lines: 169 206 82.0 %
Date: 2020-09-25 16:06:32 Functions: 8 8 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * the PLyCursor class
       3             :  *
       4             :  * src/pl/plpython/plpy_cursorobject.c
       5             :  */
       6             : 
       7             : #include "postgres.h"
       8             : 
       9             : #include <limits.h>
      10             : 
      11             : #include "access/xact.h"
      12             : #include "catalog/pg_type.h"
      13             : #include "mb/pg_wchar.h"
      14             : #include "plpy_cursorobject.h"
      15             : #include "plpy_elog.h"
      16             : #include "plpy_main.h"
      17             : #include "plpy_planobject.h"
      18             : #include "plpy_procedure.h"
      19             : #include "plpy_resultobject.h"
      20             : #include "plpy_spi.h"
      21             : #include "plpython.h"
      22             : #include "utils/memutils.h"
      23             : 
      24             : static PyObject *PLy_cursor_query(const char *query);
      25             : static void PLy_cursor_dealloc(PyObject *arg);
      26             : static PyObject *PLy_cursor_iternext(PyObject *self);
      27             : static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
      28             : static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
      29             : 
      30             : static char PLy_cursor_doc[] = "Wrapper around a PostgreSQL cursor";
      31             : 
      32             : static PyMethodDef PLy_cursor_methods[] = {
      33             :     {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
      34             :     {"close", PLy_cursor_close, METH_NOARGS, NULL},
      35             :     {NULL, NULL, 0, NULL}
      36             : };
      37             : 
      38             : static PyTypeObject PLy_CursorType = {
      39             :     PyVarObject_HEAD_INIT(NULL, 0)
      40             :     .tp_name = "PLyCursor",
      41             :     .tp_basicsize = sizeof(PLyCursorObject),
      42             :     .tp_dealloc = PLy_cursor_dealloc,
      43             :     .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER,
      44             :     .tp_doc = PLy_cursor_doc,
      45             :     .tp_iter = PyObject_SelfIter,
      46             :     .tp_iternext = PLy_cursor_iternext,
      47             :     .tp_methods = PLy_cursor_methods,
      48             : };
      49             : 
      50             : void
      51          46 : PLy_cursor_init_type(void)
      52             : {
      53          46 :     if (PyType_Ready(&PLy_CursorType) < 0)
      54           0 :         elog(ERROR, "could not initialize PLy_CursorType");
      55          46 : }
      56             : 
      57             : PyObject *
      58          34 : PLy_cursor(PyObject *self, PyObject *args)
      59             : {
      60             :     char       *query;
      61             :     PyObject   *plan;
      62          34 :     PyObject   *planargs = NULL;
      63             : 
      64          34 :     if (PyArg_ParseTuple(args, "s", &query))
      65          28 :         return PLy_cursor_query(query);
      66             : 
      67           6 :     PyErr_Clear();
      68             : 
      69           6 :     if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
      70           6 :         return PLy_cursor_plan(plan, planargs);
      71             : 
      72           0 :     PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
      73           0 :     return NULL;
      74             : }
      75             : 
      76             : 
      77             : static PyObject *
      78          28 : PLy_cursor_query(const char *query)
      79             : {
      80             :     PLyCursorObject *cursor;
      81          28 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
      82             :     volatile MemoryContext oldcontext;
      83             :     volatile ResourceOwner oldowner;
      84             : 
      85          28 :     if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
      86           0 :         return NULL;
      87          28 :     cursor->portalname = NULL;
      88          28 :     cursor->closed = false;
      89          28 :     cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
      90             :                                          "PL/Python cursor context",
      91             :                                          ALLOCSET_DEFAULT_SIZES);
      92             : 
      93             :     /* Initialize for converting result tuples to Python */
      94          28 :     PLy_input_setup_func(&cursor->result, cursor->mcxt,
      95             :                          RECORDOID, -1,
      96          28 :                          exec_ctx->curr_proc);
      97             : 
      98          28 :     oldcontext = CurrentMemoryContext;
      99          28 :     oldowner = CurrentResourceOwner;
     100             : 
     101          28 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     102             : 
     103          28 :     PG_TRY();
     104             :     {
     105             :         SPIPlanPtr  plan;
     106             :         Portal      portal;
     107             : 
     108          28 :         pg_verifymbstr(query, strlen(query), false);
     109             : 
     110          28 :         plan = SPI_prepare(query, 0, NULL);
     111          28 :         if (plan == NULL)
     112           0 :             elog(ERROR, "SPI_prepare failed: %s",
     113             :                  SPI_result_code_string(SPI_result));
     114             : 
     115          56 :         portal = SPI_cursor_open(NULL, plan, NULL, NULL,
     116          28 :                                  exec_ctx->curr_proc->fn_readonly);
     117          28 :         SPI_freeplan(plan);
     118             : 
     119          28 :         if (portal == NULL)
     120           0 :             elog(ERROR, "SPI_cursor_open() failed: %s",
     121             :                  SPI_result_code_string(SPI_result));
     122             : 
     123          28 :         cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
     124             : 
     125          28 :         PinPortal(portal);
     126             : 
     127          28 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     128             :     }
     129           0 :     PG_CATCH();
     130             :     {
     131           0 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     132           0 :         return NULL;
     133             :     }
     134          28 :     PG_END_TRY();
     135             : 
     136             :     Assert(cursor->portalname != NULL);
     137          28 :     return (PyObject *) cursor;
     138             : }
     139             : 
     140             : PyObject *
     141           8 : PLy_cursor_plan(PyObject *ob, PyObject *args)
     142             : {
     143             :     PLyCursorObject *cursor;
     144             :     volatile int nargs;
     145             :     int         i;
     146             :     PLyPlanObject *plan;
     147           8 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
     148             :     volatile MemoryContext oldcontext;
     149             :     volatile ResourceOwner oldowner;
     150             : 
     151           8 :     if (args)
     152             :     {
     153           6 :         if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
     154             :         {
     155           0 :             PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
     156           0 :             return NULL;
     157             :         }
     158           6 :         nargs = PySequence_Length(args);
     159             :     }
     160             :     else
     161           2 :         nargs = 0;
     162             : 
     163           8 :     plan = (PLyPlanObject *) ob;
     164             : 
     165           8 :     if (nargs != plan->nargs)
     166             :     {
     167             :         char       *sv;
     168           2 :         PyObject   *so = PyObject_Str(args);
     169             : 
     170           2 :         if (!so)
     171           0 :             PLy_elog(ERROR, "could not execute plan");
     172           2 :         sv = PyString_AsString(so);
     173           4 :         PLy_exception_set_plural(PyExc_TypeError,
     174             :                                  "Expected sequence of %d argument, got %d: %s",
     175             :                                  "Expected sequence of %d arguments, got %d: %s",
     176           2 :                                  plan->nargs,
     177             :                                  plan->nargs, nargs, sv);
     178           2 :         Py_DECREF(so);
     179             : 
     180           2 :         return NULL;
     181             :     }
     182             : 
     183           6 :     if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
     184           0 :         return NULL;
     185           6 :     cursor->portalname = NULL;
     186           6 :     cursor->closed = false;
     187           6 :     cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
     188             :                                          "PL/Python cursor context",
     189             :                                          ALLOCSET_DEFAULT_SIZES);
     190             : 
     191             :     /* Initialize for converting result tuples to Python */
     192           6 :     PLy_input_setup_func(&cursor->result, cursor->mcxt,
     193             :                          RECORDOID, -1,
     194           6 :                          exec_ctx->curr_proc);
     195             : 
     196           6 :     oldcontext = CurrentMemoryContext;
     197           6 :     oldowner = CurrentResourceOwner;
     198             : 
     199           6 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     200             : 
     201           6 :     PG_TRY();
     202             :     {
     203             :         Portal      portal;
     204             :         char       *volatile nulls;
     205             :         volatile int j;
     206             : 
     207           6 :         if (nargs > 0)
     208           4 :             nulls = palloc(nargs * sizeof(char));
     209             :         else
     210           2 :             nulls = NULL;
     211             : 
     212          10 :         for (j = 0; j < nargs; j++)
     213             :         {
     214           4 :             PLyObToDatum *arg = &plan->args[j];
     215             :             PyObject   *elem;
     216             : 
     217           4 :             elem = PySequence_GetItem(args, j);
     218           4 :             PG_TRY();
     219             :             {
     220             :                 bool        isnull;
     221             : 
     222           4 :                 plan->values[j] = PLy_output_convert(arg, elem, &isnull);
     223           4 :                 nulls[j] = isnull ? 'n' : ' ';
     224             :             }
     225           0 :             PG_FINALLY();
     226             :             {
     227           4 :                 Py_DECREF(elem);
     228             :             }
     229           4 :             PG_END_TRY();
     230             :         }
     231             : 
     232          12 :         portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
     233           6 :                                  exec_ctx->curr_proc->fn_readonly);
     234           6 :         if (portal == NULL)
     235           0 :             elog(ERROR, "SPI_cursor_open() failed: %s",
     236             :                  SPI_result_code_string(SPI_result));
     237             : 
     238           6 :         cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
     239             : 
     240           6 :         PinPortal(portal);
     241             : 
     242           6 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     243             :     }
     244           0 :     PG_CATCH();
     245             :     {
     246             :         int         k;
     247             : 
     248             :         /* cleanup plan->values array */
     249           0 :         for (k = 0; k < nargs; k++)
     250             :         {
     251           0 :             if (!plan->args[k].typbyval &&
     252           0 :                 (plan->values[k] != PointerGetDatum(NULL)))
     253             :             {
     254           0 :                 pfree(DatumGetPointer(plan->values[k]));
     255           0 :                 plan->values[k] = PointerGetDatum(NULL);
     256             :             }
     257             :         }
     258             : 
     259           0 :         Py_DECREF(cursor);
     260             : 
     261           0 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     262           0 :         return NULL;
     263             :     }
     264           6 :     PG_END_TRY();
     265             : 
     266          10 :     for (i = 0; i < nargs; i++)
     267             :     {
     268           4 :         if (!plan->args[i].typbyval &&
     269           4 :             (plan->values[i] != PointerGetDatum(NULL)))
     270             :         {
     271           4 :             pfree(DatumGetPointer(plan->values[i]));
     272           4 :             plan->values[i] = PointerGetDatum(NULL);
     273             :         }
     274             :     }
     275             : 
     276             :     Assert(cursor->portalname != NULL);
     277           6 :     return (PyObject *) cursor;
     278             : }
     279             : 
     280             : static void
     281          34 : PLy_cursor_dealloc(PyObject *arg)
     282             : {
     283             :     PLyCursorObject *cursor;
     284             :     Portal      portal;
     285             : 
     286          34 :     cursor = (PLyCursorObject *) arg;
     287             : 
     288          34 :     if (!cursor->closed)
     289             :     {
     290          28 :         portal = GetPortalByName(cursor->portalname);
     291             : 
     292          28 :         if (PortalIsValid(portal))
     293             :         {
     294          22 :             UnpinPortal(portal);
     295          22 :             SPI_cursor_close(portal);
     296             :         }
     297          28 :         cursor->closed = true;
     298             :     }
     299          34 :     if (cursor->mcxt)
     300             :     {
     301          34 :         MemoryContextDelete(cursor->mcxt);
     302          34 :         cursor->mcxt = NULL;
     303             :     }
     304          34 :     arg->ob_type->tp_free(arg);
     305          34 : }
     306             : 
     307             : static PyObject *
     308          70 : PLy_cursor_iternext(PyObject *self)
     309             : {
     310             :     PLyCursorObject *cursor;
     311             :     PyObject   *ret;
     312          70 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
     313             :     volatile MemoryContext oldcontext;
     314             :     volatile ResourceOwner oldowner;
     315             :     Portal      portal;
     316             : 
     317          70 :     cursor = (PLyCursorObject *) self;
     318             : 
     319          70 :     if (cursor->closed)
     320             :     {
     321           2 :         PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
     322           2 :         return NULL;
     323             :     }
     324             : 
     325          68 :     portal = GetPortalByName(cursor->portalname);
     326          68 :     if (!PortalIsValid(portal))
     327             :     {
     328           0 :         PLy_exception_set(PyExc_ValueError,
     329             :                           "iterating a cursor in an aborted subtransaction");
     330           0 :         return NULL;
     331             :     }
     332             : 
     333          68 :     oldcontext = CurrentMemoryContext;
     334          68 :     oldowner = CurrentResourceOwner;
     335             : 
     336          68 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     337             : 
     338          68 :     PG_TRY();
     339             :     {
     340          68 :         SPI_cursor_fetch(portal, true, 1);
     341          68 :         if (SPI_processed == 0)
     342             :         {
     343          16 :             PyErr_SetNone(PyExc_StopIteration);
     344          16 :             ret = NULL;
     345             :         }
     346             :         else
     347             :         {
     348          52 :             PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
     349          52 :                                   exec_ctx->curr_proc);
     350             : 
     351          52 :             ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
     352          52 :                                        SPI_tuptable->tupdesc, true);
     353             :         }
     354             : 
     355          68 :         SPI_freetuptable(SPI_tuptable);
     356             : 
     357          68 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     358             :     }
     359           0 :     PG_CATCH();
     360             :     {
     361           0 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     362           0 :         return NULL;
     363             :     }
     364          68 :     PG_END_TRY();
     365             : 
     366          68 :     return ret;
     367             : }
     368             : 
     369             : static PyObject *
     370          26 : PLy_cursor_fetch(PyObject *self, PyObject *args)
     371             : {
     372             :     PLyCursorObject *cursor;
     373             :     int         count;
     374             :     PLyResultObject *ret;
     375          26 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
     376             :     volatile MemoryContext oldcontext;
     377             :     volatile ResourceOwner oldowner;
     378             :     Portal      portal;
     379             : 
     380          26 :     if (!PyArg_ParseTuple(args, "i:fetch", &count))
     381           0 :         return NULL;
     382             : 
     383          26 :     cursor = (PLyCursorObject *) self;
     384             : 
     385          26 :     if (cursor->closed)
     386             :     {
     387           2 :         PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
     388           2 :         return NULL;
     389             :     }
     390             : 
     391          24 :     portal = GetPortalByName(cursor->portalname);
     392          24 :     if (!PortalIsValid(portal))
     393             :     {
     394           4 :         PLy_exception_set(PyExc_ValueError,
     395             :                           "iterating a cursor in an aborted subtransaction");
     396           4 :         return NULL;
     397             :     }
     398             : 
     399          20 :     ret = (PLyResultObject *) PLy_result_new();
     400          20 :     if (ret == NULL)
     401           0 :         return NULL;
     402             : 
     403          20 :     oldcontext = CurrentMemoryContext;
     404          20 :     oldowner = CurrentResourceOwner;
     405             : 
     406          20 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     407             : 
     408          20 :     PG_TRY();
     409             :     {
     410          20 :         SPI_cursor_fetch(portal, true, count);
     411             : 
     412          20 :         Py_DECREF(ret->status);
     413          20 :         ret->status = PyInt_FromLong(SPI_OK_FETCH);
     414             : 
     415          20 :         Py_DECREF(ret->nrows);
     416          20 :         ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
     417             : 
     418          20 :         if (SPI_processed != 0)
     419             :         {
     420             :             uint64      i;
     421             : 
     422             :             /*
     423             :              * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
     424             :              * and list indices; so we cannot support a result larger than
     425             :              * PY_SSIZE_T_MAX.
     426             :              */
     427          14 :             if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
     428           0 :                 ereport(ERROR,
     429             :                         (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     430             :                          errmsg("query result has too many rows to fit in a Python list")));
     431             : 
     432          14 :             Py_DECREF(ret->rows);
     433          14 :             ret->rows = PyList_New(SPI_processed);
     434          14 :             if (!ret->rows)
     435             :             {
     436           0 :                 Py_DECREF(ret);
     437           0 :                 ret = NULL;
     438             :             }
     439             :             else
     440             :             {
     441          14 :                 PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
     442          14 :                                       exec_ctx->curr_proc);
     443             : 
     444          88 :                 for (i = 0; i < SPI_processed; i++)
     445             :                 {
     446         222 :                     PyObject   *row = PLy_input_from_tuple(&cursor->result,
     447          74 :                                                            SPI_tuptable->vals[i],
     448          74 :                                                            SPI_tuptable->tupdesc,
     449             :                                                            true);
     450             : 
     451          74 :                     PyList_SetItem(ret->rows, i, row);
     452             :                 }
     453             :             }
     454             :         }
     455             : 
     456          20 :         SPI_freetuptable(SPI_tuptable);
     457             : 
     458          20 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     459             :     }
     460           0 :     PG_CATCH();
     461             :     {
     462           0 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     463           0 :         return NULL;
     464             :     }
     465          20 :     PG_END_TRY();
     466             : 
     467          20 :     return (PyObject *) ret;
     468             : }
     469             : 
     470             : static PyObject *
     471          10 : PLy_cursor_close(PyObject *self, PyObject *unused)
     472             : {
     473          10 :     PLyCursorObject *cursor = (PLyCursorObject *) self;
     474             : 
     475          10 :     if (!cursor->closed)
     476             :     {
     477           8 :         Portal      portal = GetPortalByName(cursor->portalname);
     478             : 
     479           8 :         if (!PortalIsValid(portal))
     480             :         {
     481           2 :             PLy_exception_set(PyExc_ValueError,
     482             :                               "closing a cursor in an aborted subtransaction");
     483           2 :             return NULL;
     484             :         }
     485             : 
     486           6 :         UnpinPortal(portal);
     487           6 :         SPI_cursor_close(portal);
     488           6 :         cursor->closed = true;
     489             :     }
     490             : 
     491           8 :     Py_RETURN_NONE;
     492             : }

Generated by: LCOV version 1.13