LCOV - code coverage report
Current view: top level - src/pl/plpython - plpy_cursorobject.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 169 206 82.0 %
Date: 2019-11-21 13:06:38 Functions: 8 8 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * the PLyCursor class
       3             :  *
       4             :  * src/pl/plpython/plpy_cursorobject.c
       5             :  */
       6             : 
       7             : #include "postgres.h"
       8             : 
       9             : #include <limits.h>
      10             : 
      11             : #include "access/xact.h"
      12             : #include "catalog/pg_type.h"
      13             : #include "mb/pg_wchar.h"
      14             : #include "utils/memutils.h"
      15             : 
      16             : #include "plpython.h"
      17             : 
      18             : #include "plpy_cursorobject.h"
      19             : 
      20             : #include "plpy_elog.h"
      21             : #include "plpy_main.h"
      22             : #include "plpy_planobject.h"
      23             : #include "plpy_procedure.h"
      24             : #include "plpy_resultobject.h"
      25             : #include "plpy_spi.h"
      26             : 
      27             : 
      28             : static PyObject *PLy_cursor_query(const char *query);
      29             : static void PLy_cursor_dealloc(PyObject *arg);
      30             : static PyObject *PLy_cursor_iternext(PyObject *self);
      31             : static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
      32             : static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
      33             : 
      34             : static char PLy_cursor_doc[] = {
      35             :     "Wrapper around a PostgreSQL cursor"
      36             : };
      37             : 
      38             : static PyMethodDef PLy_cursor_methods[] = {
      39             :     {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
      40             :     {"close", PLy_cursor_close, METH_NOARGS, NULL},
      41             :     {NULL, NULL, 0, NULL}
      42             : };
      43             : 
      44             : static PyTypeObject PLy_CursorType = {
      45             :     PyVarObject_HEAD_INIT(NULL, 0)
      46             :     .tp_name = "PLyCursor",
      47             :     .tp_basicsize = sizeof(PLyCursorObject),
      48             :     .tp_dealloc = PLy_cursor_dealloc,
      49             :     .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER,
      50             :     .tp_doc = PLy_cursor_doc,
      51             :     .tp_iter = PyObject_SelfIter,
      52             :     .tp_iternext = PLy_cursor_iternext,
      53             :     .tp_methods = PLy_cursor_methods,
      54             : };
      55             : 
      56             : void
      57          46 : PLy_cursor_init_type(void)
      58             : {
      59          46 :     if (PyType_Ready(&PLy_CursorType) < 0)
      60           0 :         elog(ERROR, "could not initialize PLy_CursorType");
      61          46 : }
      62             : 
      63             : PyObject *
      64          34 : PLy_cursor(PyObject *self, PyObject *args)
      65             : {
      66             :     char       *query;
      67             :     PyObject   *plan;
      68          34 :     PyObject   *planargs = NULL;
      69             : 
      70          34 :     if (PyArg_ParseTuple(args, "s", &query))
      71          28 :         return PLy_cursor_query(query);
      72             : 
      73           6 :     PyErr_Clear();
      74             : 
      75           6 :     if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
      76           6 :         return PLy_cursor_plan(plan, planargs);
      77             : 
      78           0 :     PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
      79           0 :     return NULL;
      80             : }
      81             : 
      82             : 
      83             : static PyObject *
      84          28 : PLy_cursor_query(const char *query)
      85             : {
      86             :     PLyCursorObject *cursor;
      87          28 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
      88             :     volatile MemoryContext oldcontext;
      89             :     volatile ResourceOwner oldowner;
      90             : 
      91          28 :     if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
      92           0 :         return NULL;
      93          28 :     cursor->portalname = NULL;
      94          28 :     cursor->closed = false;
      95          28 :     cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
      96             :                                          "PL/Python cursor context",
      97             :                                          ALLOCSET_DEFAULT_SIZES);
      98             : 
      99             :     /* Initialize for converting result tuples to Python */
     100          28 :     PLy_input_setup_func(&cursor->result, cursor->mcxt,
     101             :                          RECORDOID, -1,
     102          28 :                          exec_ctx->curr_proc);
     103             : 
     104          28 :     oldcontext = CurrentMemoryContext;
     105          28 :     oldowner = CurrentResourceOwner;
     106             : 
     107          28 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     108             : 
     109          28 :     PG_TRY();
     110             :     {
     111             :         SPIPlanPtr  plan;
     112             :         Portal      portal;
     113             : 
     114          28 :         pg_verifymbstr(query, strlen(query), false);
     115             : 
     116          28 :         plan = SPI_prepare(query, 0, NULL);
     117          28 :         if (plan == NULL)
     118           0 :             elog(ERROR, "SPI_prepare failed: %s",
     119             :                  SPI_result_code_string(SPI_result));
     120             : 
     121          28 :         portal = SPI_cursor_open(NULL, plan, NULL, NULL,
     122          28 :                                  exec_ctx->curr_proc->fn_readonly);
     123          28 :         SPI_freeplan(plan);
     124             : 
     125          28 :         if (portal == NULL)
     126           0 :             elog(ERROR, "SPI_cursor_open() failed: %s",
     127             :                  SPI_result_code_string(SPI_result));
     128             : 
     129          28 :         cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
     130             : 
     131          28 :         PinPortal(portal);
     132             : 
     133          28 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     134             :     }
     135           0 :     PG_CATCH();
     136             :     {
     137           0 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     138           0 :         return NULL;
     139             :     }
     140          28 :     PG_END_TRY();
     141             : 
     142             :     Assert(cursor->portalname != NULL);
     143          28 :     return (PyObject *) cursor;
     144             : }
     145             : 
     146             : PyObject *
     147           8 : PLy_cursor_plan(PyObject *ob, PyObject *args)
     148             : {
     149             :     PLyCursorObject *cursor;
     150             :     volatile int nargs;
     151             :     int         i;
     152             :     PLyPlanObject *plan;
     153           8 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
     154             :     volatile MemoryContext oldcontext;
     155             :     volatile ResourceOwner oldowner;
     156             : 
     157           8 :     if (args)
     158             :     {
     159           6 :         if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
     160             :         {
     161           0 :             PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
     162           0 :             return NULL;
     163             :         }
     164           6 :         nargs = PySequence_Length(args);
     165             :     }
     166             :     else
     167           2 :         nargs = 0;
     168             : 
     169           8 :     plan = (PLyPlanObject *) ob;
     170             : 
     171           8 :     if (nargs != plan->nargs)
     172             :     {
     173             :         char       *sv;
     174           2 :         PyObject   *so = PyObject_Str(args);
     175             : 
     176           2 :         if (!so)
     177           0 :             PLy_elog(ERROR, "could not execute plan");
     178           2 :         sv = PyString_AsString(so);
     179           4 :         PLy_exception_set_plural(PyExc_TypeError,
     180             :                                  "Expected sequence of %d argument, got %d: %s",
     181             :                                  "Expected sequence of %d arguments, got %d: %s",
     182           2 :                                  plan->nargs,
     183             :                                  plan->nargs, nargs, sv);
     184           2 :         Py_DECREF(so);
     185             : 
     186           2 :         return NULL;
     187             :     }
     188             : 
     189           6 :     if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
     190           0 :         return NULL;
     191           6 :     cursor->portalname = NULL;
     192           6 :     cursor->closed = false;
     193           6 :     cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
     194             :                                          "PL/Python cursor context",
     195             :                                          ALLOCSET_DEFAULT_SIZES);
     196             : 
     197             :     /* Initialize for converting result tuples to Python */
     198           6 :     PLy_input_setup_func(&cursor->result, cursor->mcxt,
     199             :                          RECORDOID, -1,
     200           6 :                          exec_ctx->curr_proc);
     201             : 
     202           6 :     oldcontext = CurrentMemoryContext;
     203           6 :     oldowner = CurrentResourceOwner;
     204             : 
     205           6 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     206             : 
     207           6 :     PG_TRY();
     208             :     {
     209             :         Portal      portal;
     210             :         char       *volatile nulls;
     211             :         volatile int j;
     212             : 
     213           6 :         if (nargs > 0)
     214           4 :             nulls = palloc(nargs * sizeof(char));
     215             :         else
     216           2 :             nulls = NULL;
     217             : 
     218          10 :         for (j = 0; j < nargs; j++)
     219             :         {
     220           4 :             PLyObToDatum *arg = &plan->args[j];
     221             :             PyObject   *elem;
     222             : 
     223           4 :             elem = PySequence_GetItem(args, j);
     224           4 :             PG_TRY();
     225             :             {
     226             :                 bool        isnull;
     227             : 
     228           4 :                 plan->values[j] = PLy_output_convert(arg, elem, &isnull);
     229           4 :                 nulls[j] = isnull ? 'n' : ' ';
     230             :             }
     231           0 :             PG_FINALLY();
     232             :             {
     233           4 :                 Py_DECREF(elem);
     234             :             }
     235           4 :             PG_END_TRY();
     236             :         }
     237             : 
     238           6 :         portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
     239           6 :                                  exec_ctx->curr_proc->fn_readonly);
     240           6 :         if (portal == NULL)
     241           0 :             elog(ERROR, "SPI_cursor_open() failed: %s",
     242             :                  SPI_result_code_string(SPI_result));
     243             : 
     244           6 :         cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
     245             : 
     246           6 :         PinPortal(portal);
     247             : 
     248           6 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     249             :     }
     250           0 :     PG_CATCH();
     251             :     {
     252             :         int         k;
     253             : 
     254             :         /* cleanup plan->values array */
     255           0 :         for (k = 0; k < nargs; k++)
     256             :         {
     257           0 :             if (!plan->args[k].typbyval &&
     258           0 :                 (plan->values[k] != PointerGetDatum(NULL)))
     259             :             {
     260           0 :                 pfree(DatumGetPointer(plan->values[k]));
     261           0 :                 plan->values[k] = PointerGetDatum(NULL);
     262             :             }
     263             :         }
     264             : 
     265           0 :         Py_DECREF(cursor);
     266             : 
     267           0 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     268           0 :         return NULL;
     269             :     }
     270           6 :     PG_END_TRY();
     271             : 
     272          10 :     for (i = 0; i < nargs; i++)
     273             :     {
     274           8 :         if (!plan->args[i].typbyval &&
     275           4 :             (plan->values[i] != PointerGetDatum(NULL)))
     276             :         {
     277           4 :             pfree(DatumGetPointer(plan->values[i]));
     278           4 :             plan->values[i] = PointerGetDatum(NULL);
     279             :         }
     280             :     }
     281             : 
     282             :     Assert(cursor->portalname != NULL);
     283           6 :     return (PyObject *) cursor;
     284             : }
     285             : 
     286             : static void
     287          34 : PLy_cursor_dealloc(PyObject *arg)
     288             : {
     289             :     PLyCursorObject *cursor;
     290             :     Portal      portal;
     291             : 
     292          34 :     cursor = (PLyCursorObject *) arg;
     293             : 
     294          34 :     if (!cursor->closed)
     295             :     {
     296          28 :         portal = GetPortalByName(cursor->portalname);
     297             : 
     298          28 :         if (PortalIsValid(portal))
     299             :         {
     300          22 :             UnpinPortal(portal);
     301          22 :             SPI_cursor_close(portal);
     302             :         }
     303          28 :         cursor->closed = true;
     304             :     }
     305          34 :     if (cursor->mcxt)
     306             :     {
     307          34 :         MemoryContextDelete(cursor->mcxt);
     308          34 :         cursor->mcxt = NULL;
     309             :     }
     310          34 :     arg->ob_type->tp_free(arg);
     311          34 : }
     312             : 
     313             : static PyObject *
     314          70 : PLy_cursor_iternext(PyObject *self)
     315             : {
     316             :     PLyCursorObject *cursor;
     317             :     PyObject   *ret;
     318          70 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
     319             :     volatile MemoryContext oldcontext;
     320             :     volatile ResourceOwner oldowner;
     321             :     Portal      portal;
     322             : 
     323          70 :     cursor = (PLyCursorObject *) self;
     324             : 
     325          70 :     if (cursor->closed)
     326             :     {
     327           2 :         PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
     328           2 :         return NULL;
     329             :     }
     330             : 
     331          68 :     portal = GetPortalByName(cursor->portalname);
     332          68 :     if (!PortalIsValid(portal))
     333             :     {
     334           0 :         PLy_exception_set(PyExc_ValueError,
     335             :                           "iterating a cursor in an aborted subtransaction");
     336           0 :         return NULL;
     337             :     }
     338             : 
     339          68 :     oldcontext = CurrentMemoryContext;
     340          68 :     oldowner = CurrentResourceOwner;
     341             : 
     342          68 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     343             : 
     344          68 :     PG_TRY();
     345             :     {
     346          68 :         SPI_cursor_fetch(portal, true, 1);
     347          68 :         if (SPI_processed == 0)
     348             :         {
     349          16 :             PyErr_SetNone(PyExc_StopIteration);
     350          16 :             ret = NULL;
     351             :         }
     352             :         else
     353             :         {
     354          52 :             PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
     355          52 :                                   exec_ctx->curr_proc);
     356             : 
     357          52 :             ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
     358          52 :                                        SPI_tuptable->tupdesc, true);
     359             :         }
     360             : 
     361          68 :         SPI_freetuptable(SPI_tuptable);
     362             : 
     363          68 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     364             :     }
     365           0 :     PG_CATCH();
     366             :     {
     367           0 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     368           0 :         return NULL;
     369             :     }
     370          68 :     PG_END_TRY();
     371             : 
     372          68 :     return ret;
     373             : }
     374             : 
     375             : static PyObject *
     376          26 : PLy_cursor_fetch(PyObject *self, PyObject *args)
     377             : {
     378             :     PLyCursorObject *cursor;
     379             :     int         count;
     380             :     PLyResultObject *ret;
     381          26 :     PLyExecutionContext *exec_ctx = PLy_current_execution_context();
     382             :     volatile MemoryContext oldcontext;
     383             :     volatile ResourceOwner oldowner;
     384             :     Portal      portal;
     385             : 
     386          26 :     if (!PyArg_ParseTuple(args, "i:fetch", &count))
     387           0 :         return NULL;
     388             : 
     389          26 :     cursor = (PLyCursorObject *) self;
     390             : 
     391          26 :     if (cursor->closed)
     392             :     {
     393           2 :         PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
     394           2 :         return NULL;
     395             :     }
     396             : 
     397          24 :     portal = GetPortalByName(cursor->portalname);
     398          24 :     if (!PortalIsValid(portal))
     399             :     {
     400           4 :         PLy_exception_set(PyExc_ValueError,
     401             :                           "iterating a cursor in an aborted subtransaction");
     402           4 :         return NULL;
     403             :     }
     404             : 
     405          20 :     ret = (PLyResultObject *) PLy_result_new();
     406          20 :     if (ret == NULL)
     407           0 :         return NULL;
     408             : 
     409          20 :     oldcontext = CurrentMemoryContext;
     410          20 :     oldowner = CurrentResourceOwner;
     411             : 
     412          20 :     PLy_spi_subtransaction_begin(oldcontext, oldowner);
     413             : 
     414          20 :     PG_TRY();
     415             :     {
     416          20 :         SPI_cursor_fetch(portal, true, count);
     417             : 
     418          20 :         Py_DECREF(ret->status);
     419          20 :         ret->status = PyInt_FromLong(SPI_OK_FETCH);
     420             : 
     421          20 :         Py_DECREF(ret->nrows);
     422          20 :         ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
     423             : 
     424          20 :         if (SPI_processed != 0)
     425             :         {
     426             :             uint64      i;
     427             : 
     428             :             /*
     429             :              * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
     430             :              * and list indices; so we cannot support a result larger than
     431             :              * PY_SSIZE_T_MAX.
     432             :              */
     433          14 :             if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
     434           0 :                 ereport(ERROR,
     435             :                         (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     436             :                          errmsg("query result has too many rows to fit in a Python list")));
     437             : 
     438          14 :             Py_DECREF(ret->rows);
     439          14 :             ret->rows = PyList_New(SPI_processed);
     440          14 :             if (!ret->rows)
     441             :             {
     442           0 :                 Py_DECREF(ret);
     443           0 :                 ret = NULL;
     444             :             }
     445             :             else
     446             :             {
     447          14 :                 PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
     448          14 :                                       exec_ctx->curr_proc);
     449             : 
     450          88 :                 for (i = 0; i < SPI_processed; i++)
     451             :                 {
     452         148 :                     PyObject   *row = PLy_input_from_tuple(&cursor->result,
     453          74 :                                                            SPI_tuptable->vals[i],
     454          74 :                                                            SPI_tuptable->tupdesc,
     455             :                                                            true);
     456             : 
     457          74 :                     PyList_SetItem(ret->rows, i, row);
     458             :                 }
     459             :             }
     460             :         }
     461             : 
     462          20 :         SPI_freetuptable(SPI_tuptable);
     463             : 
     464          20 :         PLy_spi_subtransaction_commit(oldcontext, oldowner);
     465             :     }
     466           0 :     PG_CATCH();
     467             :     {
     468           0 :         PLy_spi_subtransaction_abort(oldcontext, oldowner);
     469           0 :         return NULL;
     470             :     }
     471          20 :     PG_END_TRY();
     472             : 
     473          20 :     return (PyObject *) ret;
     474             : }
     475             : 
     476             : static PyObject *
     477          10 : PLy_cursor_close(PyObject *self, PyObject *unused)
     478             : {
     479          10 :     PLyCursorObject *cursor = (PLyCursorObject *) self;
     480             : 
     481          10 :     if (!cursor->closed)
     482             :     {
     483           8 :         Portal      portal = GetPortalByName(cursor->portalname);
     484             : 
     485           8 :         if (!PortalIsValid(portal))
     486             :         {
     487           2 :             PLy_exception_set(PyExc_ValueError,
     488             :                               "closing a cursor in an aborted subtransaction");
     489           2 :             return NULL;
     490             :         }
     491             : 
     492           6 :         UnpinPortal(portal);
     493           6 :         SPI_cursor_close(portal);
     494           6 :         cursor->closed = true;
     495             :     }
     496             : 
     497           8 :     Py_RETURN_NONE;
     498             : }

Generated by: LCOV version 1.13