Line data Source code
1 : /*
2 : * the PLyCursor class
3 : *
4 : * src/pl/plpython/plpy_cursorobject.c
5 : */
6 :
7 : #include "postgres.h"
8 :
9 : #include <limits.h>
10 :
11 : #include "catalog/pg_type.h"
12 : #include "mb/pg_wchar.h"
13 : #include "plpy_cursorobject.h"
14 : #include "plpy_elog.h"
15 : #include "plpy_main.h"
16 : #include "plpy_planobject.h"
17 : #include "plpy_resultobject.h"
18 : #include "plpy_spi.h"
19 : #include "plpython.h"
20 : #include "utils/memutils.h"
21 :
22 : static PyObject *PLy_cursor_query(const char *query);
23 : static void PLy_cursor_dealloc(PyObject *arg);
24 : static PyObject *PLy_cursor_iternext(PyObject *self);
25 : static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
26 : static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
27 :
28 : static const char PLy_cursor_doc[] = "Wrapper around a PostgreSQL cursor";
29 :
30 : static PyMethodDef PLy_cursor_methods[] = {
31 : {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
32 : {"close", PLy_cursor_close, METH_NOARGS, NULL},
33 : {NULL, NULL, 0, NULL}
34 : };
35 :
36 : static PyTypeObject PLy_CursorType = {
37 : PyVarObject_HEAD_INIT(NULL, 0)
38 : .tp_name = "PLyCursor",
39 : .tp_basicsize = sizeof(PLyCursorObject),
40 : .tp_dealloc = PLy_cursor_dealloc,
41 : .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
42 : .tp_doc = PLy_cursor_doc,
43 : .tp_iter = PyObject_SelfIter,
44 : .tp_iternext = PLy_cursor_iternext,
45 : .tp_methods = PLy_cursor_methods,
46 : };
47 :
48 : void
49 46 : PLy_cursor_init_type(void)
50 : {
51 46 : if (PyType_Ready(&PLy_CursorType) < 0)
52 0 : elog(ERROR, "could not initialize PLy_CursorType");
53 46 : }
54 :
55 : PyObject *
56 36 : PLy_cursor(PyObject *self, PyObject *args)
57 : {
58 : char *query;
59 : PyObject *plan;
60 36 : PyObject *planargs = NULL;
61 :
62 36 : if (PyArg_ParseTuple(args, "s", &query))
63 28 : return PLy_cursor_query(query);
64 :
65 8 : PyErr_Clear();
66 :
67 8 : if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
68 8 : return PLy_cursor_plan(plan, planargs);
69 :
70 0 : PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
71 0 : return NULL;
72 : }
73 :
74 :
75 : static PyObject *
76 28 : PLy_cursor_query(const char *query)
77 : {
78 : PLyCursorObject *cursor;
79 28 : PLyExecutionContext *exec_ctx = PLy_current_execution_context();
80 : volatile MemoryContext oldcontext;
81 : volatile ResourceOwner oldowner;
82 :
83 28 : if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
84 0 : return NULL;
85 28 : cursor->portalname = NULL;
86 28 : cursor->closed = false;
87 28 : cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
88 : "PL/Python cursor context",
89 : ALLOCSET_DEFAULT_SIZES);
90 :
91 : /* Initialize for converting result tuples to Python */
92 28 : PLy_input_setup_func(&cursor->result, cursor->mcxt,
93 : RECORDOID, -1,
94 28 : exec_ctx->curr_proc);
95 :
96 28 : oldcontext = CurrentMemoryContext;
97 28 : oldowner = CurrentResourceOwner;
98 :
99 28 : PLy_spi_subtransaction_begin(oldcontext, oldowner);
100 :
101 28 : PG_TRY();
102 : {
103 : SPIPlanPtr plan;
104 : Portal portal;
105 :
106 28 : pg_verifymbstr(query, strlen(query), false);
107 :
108 28 : plan = SPI_prepare(query, 0, NULL);
109 28 : if (plan == NULL)
110 0 : elog(ERROR, "SPI_prepare failed: %s",
111 : SPI_result_code_string(SPI_result));
112 :
113 56 : portal = SPI_cursor_open(NULL, plan, NULL, NULL,
114 28 : exec_ctx->curr_proc->fn_readonly);
115 28 : SPI_freeplan(plan);
116 :
117 28 : if (portal == NULL)
118 0 : elog(ERROR, "SPI_cursor_open() failed: %s",
119 : SPI_result_code_string(SPI_result));
120 :
121 28 : cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
122 :
123 28 : PinPortal(portal);
124 :
125 28 : PLy_spi_subtransaction_commit(oldcontext, oldowner);
126 : }
127 0 : PG_CATCH();
128 : {
129 0 : PLy_spi_subtransaction_abort(oldcontext, oldowner);
130 0 : return NULL;
131 : }
132 28 : PG_END_TRY();
133 :
134 : Assert(cursor->portalname != NULL);
135 28 : return (PyObject *) cursor;
136 : }
137 :
138 : PyObject *
139 10 : PLy_cursor_plan(PyObject *ob, PyObject *args)
140 : {
141 : PLyCursorObject *cursor;
142 : volatile int nargs;
143 : PLyPlanObject *plan;
144 10 : PLyExecutionContext *exec_ctx = PLy_current_execution_context();
145 : volatile MemoryContext oldcontext;
146 : volatile ResourceOwner oldowner;
147 :
148 10 : if (args)
149 : {
150 6 : if (!PySequence_Check(args) || PyUnicode_Check(args))
151 : {
152 0 : PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
153 0 : return NULL;
154 : }
155 6 : nargs = PySequence_Length(args);
156 : }
157 : else
158 4 : nargs = 0;
159 :
160 10 : plan = (PLyPlanObject *) ob;
161 :
162 10 : if (nargs != plan->nargs)
163 : {
164 : char *sv;
165 2 : PyObject *so = PyObject_Str(args);
166 :
167 2 : if (!so)
168 0 : PLy_elog(ERROR, "could not execute plan");
169 2 : sv = PLyUnicode_AsString(so);
170 2 : PLy_exception_set_plural(PyExc_TypeError,
171 : "Expected sequence of %d argument, got %d: %s",
172 : "Expected sequence of %d arguments, got %d: %s",
173 2 : plan->nargs,
174 : plan->nargs, nargs, sv);
175 2 : Py_DECREF(so);
176 :
177 2 : return NULL;
178 : }
179 :
180 8 : if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
181 0 : return NULL;
182 8 : cursor->portalname = NULL;
183 8 : cursor->closed = false;
184 8 : cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
185 : "PL/Python cursor context",
186 : ALLOCSET_DEFAULT_SIZES);
187 :
188 : /* Initialize for converting result tuples to Python */
189 8 : PLy_input_setup_func(&cursor->result, cursor->mcxt,
190 : RECORDOID, -1,
191 8 : exec_ctx->curr_proc);
192 :
193 8 : oldcontext = CurrentMemoryContext;
194 8 : oldowner = CurrentResourceOwner;
195 :
196 8 : PLy_spi_subtransaction_begin(oldcontext, oldowner);
197 :
198 8 : PG_TRY();
199 : {
200 : Portal portal;
201 : MemoryContext tmpcontext;
202 : Datum *volatile values;
203 : char *volatile nulls;
204 : volatile int j;
205 :
206 : /*
207 : * Converted arguments and associated cruft will be in this context,
208 : * which is local to our subtransaction.
209 : */
210 8 : tmpcontext = AllocSetContextCreate(CurTransactionContext,
211 : "PL/Python temporary context",
212 : ALLOCSET_SMALL_SIZES);
213 8 : MemoryContextSwitchTo(tmpcontext);
214 :
215 8 : if (nargs > 0)
216 : {
217 4 : values = (Datum *) palloc(nargs * sizeof(Datum));
218 4 : nulls = (char *) palloc(nargs * sizeof(char));
219 : }
220 : else
221 : {
222 4 : values = NULL;
223 4 : nulls = NULL;
224 : }
225 :
226 12 : for (j = 0; j < nargs; j++)
227 : {
228 4 : PLyObToDatum *arg = &plan->args[j];
229 : PyObject *elem;
230 :
231 4 : elem = PySequence_GetItem(args, j);
232 4 : PG_TRY(2);
233 : {
234 : bool isnull;
235 :
236 4 : values[j] = PLy_output_convert(arg, elem, &isnull);
237 4 : nulls[j] = isnull ? 'n' : ' ';
238 : }
239 0 : PG_FINALLY(2);
240 : {
241 4 : Py_DECREF(elem);
242 : }
243 4 : PG_END_TRY(2);
244 : }
245 :
246 8 : MemoryContextSwitchTo(oldcontext);
247 :
248 16 : portal = SPI_cursor_open(NULL, plan->plan, values, nulls,
249 8 : exec_ctx->curr_proc->fn_readonly);
250 8 : if (portal == NULL)
251 0 : elog(ERROR, "SPI_cursor_open() failed: %s",
252 : SPI_result_code_string(SPI_result));
253 :
254 8 : cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
255 :
256 8 : PinPortal(portal);
257 :
258 8 : MemoryContextDelete(tmpcontext);
259 8 : PLy_spi_subtransaction_commit(oldcontext, oldowner);
260 : }
261 0 : PG_CATCH();
262 : {
263 0 : Py_DECREF(cursor);
264 : /* Subtransaction abort will remove the tmpcontext */
265 0 : PLy_spi_subtransaction_abort(oldcontext, oldowner);
266 0 : return NULL;
267 : }
268 8 : PG_END_TRY();
269 :
270 : Assert(cursor->portalname != NULL);
271 8 : return (PyObject *) cursor;
272 : }
273 :
274 : static void
275 36 : PLy_cursor_dealloc(PyObject *arg)
276 : {
277 : PLyCursorObject *cursor;
278 : Portal portal;
279 :
280 36 : cursor = (PLyCursorObject *) arg;
281 :
282 36 : if (!cursor->closed)
283 : {
284 30 : portal = GetPortalByName(cursor->portalname);
285 :
286 30 : if (PortalIsValid(portal))
287 : {
288 24 : UnpinPortal(portal);
289 24 : SPI_cursor_close(portal);
290 : }
291 30 : cursor->closed = true;
292 : }
293 36 : if (cursor->mcxt)
294 : {
295 36 : MemoryContextDelete(cursor->mcxt);
296 36 : cursor->mcxt = NULL;
297 : }
298 36 : arg->ob_type->tp_free(arg);
299 36 : }
300 :
301 : static PyObject *
302 72 : PLy_cursor_iternext(PyObject *self)
303 : {
304 : PLyCursorObject *cursor;
305 : PyObject *ret;
306 72 : PLyExecutionContext *exec_ctx = PLy_current_execution_context();
307 : volatile MemoryContext oldcontext;
308 : volatile ResourceOwner oldowner;
309 : Portal portal;
310 :
311 72 : cursor = (PLyCursorObject *) self;
312 :
313 72 : if (cursor->closed)
314 : {
315 2 : PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
316 2 : return NULL;
317 : }
318 :
319 70 : portal = GetPortalByName(cursor->portalname);
320 70 : if (!PortalIsValid(portal))
321 : {
322 0 : PLy_exception_set(PyExc_ValueError,
323 : "iterating a cursor in an aborted subtransaction");
324 0 : return NULL;
325 : }
326 :
327 70 : oldcontext = CurrentMemoryContext;
328 70 : oldowner = CurrentResourceOwner;
329 :
330 70 : PLy_spi_subtransaction_begin(oldcontext, oldowner);
331 :
332 70 : PG_TRY();
333 : {
334 70 : SPI_cursor_fetch(portal, true, 1);
335 68 : if (SPI_processed == 0)
336 : {
337 16 : PyErr_SetNone(PyExc_StopIteration);
338 16 : ret = NULL;
339 : }
340 : else
341 : {
342 52 : PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
343 52 : exec_ctx->curr_proc);
344 :
345 52 : ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
346 52 : SPI_tuptable->tupdesc, true);
347 : }
348 :
349 68 : SPI_freetuptable(SPI_tuptable);
350 :
351 68 : PLy_spi_subtransaction_commit(oldcontext, oldowner);
352 : }
353 2 : PG_CATCH();
354 : {
355 2 : PLy_spi_subtransaction_abort(oldcontext, oldowner);
356 2 : return NULL;
357 : }
358 68 : PG_END_TRY();
359 :
360 68 : return ret;
361 : }
362 :
363 : static PyObject *
364 26 : PLy_cursor_fetch(PyObject *self, PyObject *args)
365 : {
366 : PLyCursorObject *cursor;
367 : int count;
368 : PLyResultObject *ret;
369 26 : PLyExecutionContext *exec_ctx = PLy_current_execution_context();
370 : volatile MemoryContext oldcontext;
371 : volatile ResourceOwner oldowner;
372 : Portal portal;
373 :
374 26 : if (!PyArg_ParseTuple(args, "i:fetch", &count))
375 0 : return NULL;
376 :
377 26 : cursor = (PLyCursorObject *) self;
378 :
379 26 : if (cursor->closed)
380 : {
381 2 : PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
382 2 : return NULL;
383 : }
384 :
385 24 : portal = GetPortalByName(cursor->portalname);
386 24 : if (!PortalIsValid(portal))
387 : {
388 4 : PLy_exception_set(PyExc_ValueError,
389 : "iterating a cursor in an aborted subtransaction");
390 4 : return NULL;
391 : }
392 :
393 20 : ret = (PLyResultObject *) PLy_result_new();
394 20 : if (ret == NULL)
395 0 : return NULL;
396 :
397 20 : oldcontext = CurrentMemoryContext;
398 20 : oldowner = CurrentResourceOwner;
399 :
400 20 : PLy_spi_subtransaction_begin(oldcontext, oldowner);
401 :
402 20 : PG_TRY();
403 : {
404 20 : SPI_cursor_fetch(portal, true, count);
405 :
406 20 : Py_DECREF(ret->status);
407 20 : ret->status = PyLong_FromLong(SPI_OK_FETCH);
408 :
409 20 : Py_DECREF(ret->nrows);
410 20 : ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
411 :
412 20 : if (SPI_processed != 0)
413 : {
414 : uint64 i;
415 :
416 : /*
417 : * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
418 : * and list indices; so we cannot support a result larger than
419 : * PY_SSIZE_T_MAX.
420 : */
421 14 : if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
422 0 : ereport(ERROR,
423 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
424 : errmsg("query result has too many rows to fit in a Python list")));
425 :
426 14 : Py_DECREF(ret->rows);
427 14 : ret->rows = PyList_New(SPI_processed);
428 14 : if (!ret->rows)
429 : {
430 0 : Py_DECREF(ret);
431 0 : ret = NULL;
432 : }
433 : else
434 : {
435 14 : PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
436 14 : exec_ctx->curr_proc);
437 :
438 88 : for (i = 0; i < SPI_processed; i++)
439 : {
440 148 : PyObject *row = PLy_input_from_tuple(&cursor->result,
441 74 : SPI_tuptable->vals[i],
442 74 : SPI_tuptable->tupdesc,
443 : true);
444 :
445 74 : PyList_SetItem(ret->rows, i, row);
446 : }
447 : }
448 : }
449 :
450 20 : SPI_freetuptable(SPI_tuptable);
451 :
452 20 : PLy_spi_subtransaction_commit(oldcontext, oldowner);
453 : }
454 0 : PG_CATCH();
455 : {
456 0 : PLy_spi_subtransaction_abort(oldcontext, oldowner);
457 0 : return NULL;
458 : }
459 20 : PG_END_TRY();
460 :
461 20 : return (PyObject *) ret;
462 : }
463 :
464 : static PyObject *
465 10 : PLy_cursor_close(PyObject *self, PyObject *unused)
466 : {
467 10 : PLyCursorObject *cursor = (PLyCursorObject *) self;
468 :
469 10 : if (!cursor->closed)
470 : {
471 8 : Portal portal = GetPortalByName(cursor->portalname);
472 :
473 8 : if (!PortalIsValid(portal))
474 : {
475 2 : PLy_exception_set(PyExc_ValueError,
476 : "closing a cursor in an aborted subtransaction");
477 2 : return NULL;
478 : }
479 :
480 6 : UnpinPortal(portal);
481 6 : SPI_cursor_close(portal);
482 6 : cursor->closed = true;
483 : }
484 :
485 8 : Py_RETURN_NONE;
486 : }
|