Line data Source code
1 : /*
2 : * the PLyCursor class
3 : *
4 : * src/pl/plpython/plpy_cursorobject.c
5 : */
6 :
7 : #include "postgres.h"
8 :
9 : #include <limits.h>
10 :
11 : #include "catalog/pg_type.h"
12 : #include "mb/pg_wchar.h"
13 : #include "plpy_cursorobject.h"
14 : #include "plpy_elog.h"
15 : #include "plpy_main.h"
16 : #include "plpy_planobject.h"
17 : #include "plpy_resultobject.h"
18 : #include "plpy_spi.h"
19 : #include "plpython.h"
20 : #include "utils/memutils.h"
21 :
22 : static PyObject *PLy_cursor_query(const char *query);
23 : static void PLy_cursor_dealloc(PLyCursorObject *self);
24 : static PyObject *PLy_cursor_iternext(PyObject *self);
25 : static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
26 : static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
27 :
28 : static const char PLy_cursor_doc[] = "Wrapper around a PostgreSQL cursor";
29 :
30 : static PyMethodDef PLy_cursor_methods[] = {
31 : {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
32 : {"close", PLy_cursor_close, METH_NOARGS, NULL},
33 : {NULL, NULL, 0, NULL}
34 : };
35 :
36 : static PyType_Slot PLyCursor_slots[] =
37 : {
38 : {
39 : Py_tp_dealloc, PLy_cursor_dealloc
40 : },
41 : {
42 : Py_tp_doc, (char *) PLy_cursor_doc
43 : },
44 : {
45 : Py_tp_iter, PyObject_SelfIter
46 : },
47 : {
48 : Py_tp_iternext, PLy_cursor_iternext
49 : },
50 : {
51 : Py_tp_methods, PLy_cursor_methods
52 : },
53 : {
54 : 0, NULL
55 : }
56 : };
57 :
58 : static PyType_Spec PLyCursor_spec =
59 : {
60 : .name = "PLyCursor",
61 : .basicsize = sizeof(PLyCursorObject),
62 : .flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
63 : .slots = PLyCursor_slots,
64 : };
65 :
66 : static PyTypeObject *PLy_CursorType;
67 :
68 : void
69 46 : PLy_cursor_init_type(void)
70 : {
71 46 : PLy_CursorType = (PyTypeObject *) PyType_FromSpec(&PLyCursor_spec);
72 46 : if (!PLy_CursorType)
73 0 : elog(ERROR, "could not initialize PLy_CursorType");
74 46 : }
75 :
76 : PyObject *
77 36 : PLy_cursor(PyObject *self, PyObject *args)
78 : {
79 : char *query;
80 : PyObject *plan;
81 36 : PyObject *planargs = NULL;
82 :
83 36 : if (PyArg_ParseTuple(args, "s", &query))
84 28 : return PLy_cursor_query(query);
85 :
86 8 : PyErr_Clear();
87 :
88 8 : if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
89 8 : return PLy_cursor_plan(plan, planargs);
90 :
91 0 : PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
92 0 : return NULL;
93 : }
94 :
95 :
96 : static PyObject *
97 28 : PLy_cursor_query(const char *query)
98 : {
99 : PLyCursorObject *cursor;
100 28 : PLyExecutionContext *exec_ctx = PLy_current_execution_context();
101 : volatile MemoryContext oldcontext;
102 : volatile ResourceOwner oldowner;
103 :
104 28 : if ((cursor = PyObject_New(PLyCursorObject, PLy_CursorType)) == NULL)
105 0 : return NULL;
106 : #if PY_VERSION_HEX < 0x03080000
107 : /* Workaround for Python issue 35810; no longer necessary in Python 3.8 */
108 : Py_INCREF(PLy_CursorType);
109 : #endif
110 28 : cursor->portalname = NULL;
111 28 : cursor->closed = false;
112 28 : cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
113 : "PL/Python cursor context",
114 : ALLOCSET_DEFAULT_SIZES);
115 :
116 : /* Initialize for converting result tuples to Python */
117 28 : PLy_input_setup_func(&cursor->result, cursor->mcxt,
118 : RECORDOID, -1,
119 28 : exec_ctx->curr_proc);
120 :
121 28 : oldcontext = CurrentMemoryContext;
122 28 : oldowner = CurrentResourceOwner;
123 :
124 28 : PLy_spi_subtransaction_begin(oldcontext, oldowner);
125 :
126 28 : PG_TRY();
127 : {
128 : SPIPlanPtr plan;
129 : Portal portal;
130 :
131 28 : pg_verifymbstr(query, strlen(query), false);
132 :
133 28 : plan = SPI_prepare(query, 0, NULL);
134 28 : if (plan == NULL)
135 0 : elog(ERROR, "SPI_prepare failed: %s",
136 : SPI_result_code_string(SPI_result));
137 :
138 56 : portal = SPI_cursor_open(NULL, plan, NULL, NULL,
139 28 : exec_ctx->curr_proc->fn_readonly);
140 28 : SPI_freeplan(plan);
141 :
142 28 : if (portal == NULL)
143 0 : elog(ERROR, "SPI_cursor_open() failed: %s",
144 : SPI_result_code_string(SPI_result));
145 :
146 28 : cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
147 :
148 28 : PinPortal(portal);
149 :
150 28 : PLy_spi_subtransaction_commit(oldcontext, oldowner);
151 : }
152 0 : PG_CATCH();
153 : {
154 0 : PLy_spi_subtransaction_abort(oldcontext, oldowner);
155 0 : return NULL;
156 : }
157 28 : PG_END_TRY();
158 :
159 : Assert(cursor->portalname != NULL);
160 28 : return (PyObject *) cursor;
161 : }
162 :
163 : PyObject *
164 10 : PLy_cursor_plan(PyObject *ob, PyObject *args)
165 : {
166 : PLyCursorObject *cursor;
167 : volatile int nargs;
168 : PLyPlanObject *plan;
169 10 : PLyExecutionContext *exec_ctx = PLy_current_execution_context();
170 : volatile MemoryContext oldcontext;
171 : volatile ResourceOwner oldowner;
172 :
173 10 : if (args)
174 : {
175 6 : if (!PySequence_Check(args) || PyUnicode_Check(args))
176 : {
177 0 : PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
178 0 : return NULL;
179 : }
180 6 : nargs = PySequence_Length(args);
181 : }
182 : else
183 4 : nargs = 0;
184 :
185 10 : plan = (PLyPlanObject *) ob;
186 :
187 10 : if (nargs != plan->nargs)
188 : {
189 : char *sv;
190 2 : PyObject *so = PyObject_Str(args);
191 :
192 2 : if (!so)
193 0 : PLy_elog(ERROR, "could not execute plan");
194 2 : sv = PLyUnicode_AsString(so);
195 2 : PLy_exception_set_plural(PyExc_TypeError,
196 : "Expected sequence of %d argument, got %d: %s",
197 : "Expected sequence of %d arguments, got %d: %s",
198 2 : plan->nargs,
199 : plan->nargs, nargs, sv);
200 2 : Py_DECREF(so);
201 :
202 2 : return NULL;
203 : }
204 :
205 8 : if ((cursor = PyObject_New(PLyCursorObject, PLy_CursorType)) == NULL)
206 0 : return NULL;
207 : #if PY_VERSION_HEX < 0x03080000
208 : /* Workaround for Python issue 35810; no longer necessary in Python 3.8 */
209 : Py_INCREF(PLy_CursorType);
210 : #endif
211 8 : cursor->portalname = NULL;
212 8 : cursor->closed = false;
213 8 : cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
214 : "PL/Python cursor context",
215 : ALLOCSET_DEFAULT_SIZES);
216 :
217 : /* Initialize for converting result tuples to Python */
218 8 : PLy_input_setup_func(&cursor->result, cursor->mcxt,
219 : RECORDOID, -1,
220 8 : exec_ctx->curr_proc);
221 :
222 8 : oldcontext = CurrentMemoryContext;
223 8 : oldowner = CurrentResourceOwner;
224 :
225 8 : PLy_spi_subtransaction_begin(oldcontext, oldowner);
226 :
227 8 : PG_TRY();
228 : {
229 : Portal portal;
230 : MemoryContext tmpcontext;
231 : Datum *volatile values;
232 : char *volatile nulls;
233 : volatile int j;
234 :
235 : /*
236 : * Converted arguments and associated cruft will be in this context,
237 : * which is local to our subtransaction.
238 : */
239 8 : tmpcontext = AllocSetContextCreate(CurTransactionContext,
240 : "PL/Python temporary context",
241 : ALLOCSET_SMALL_SIZES);
242 8 : MemoryContextSwitchTo(tmpcontext);
243 :
244 8 : if (nargs > 0)
245 : {
246 4 : values = (Datum *) palloc(nargs * sizeof(Datum));
247 4 : nulls = (char *) palloc(nargs * sizeof(char));
248 : }
249 : else
250 : {
251 4 : values = NULL;
252 4 : nulls = NULL;
253 : }
254 :
255 12 : for (j = 0; j < nargs; j++)
256 : {
257 4 : PLyObToDatum *arg = &plan->args[j];
258 : PyObject *elem;
259 :
260 4 : elem = PySequence_GetItem(args, j);
261 4 : PG_TRY(2);
262 : {
263 : bool isnull;
264 :
265 4 : values[j] = PLy_output_convert(arg, elem, &isnull);
266 4 : nulls[j] = isnull ? 'n' : ' ';
267 : }
268 0 : PG_FINALLY(2);
269 : {
270 4 : Py_DECREF(elem);
271 : }
272 4 : PG_END_TRY(2);
273 : }
274 :
275 8 : MemoryContextSwitchTo(oldcontext);
276 :
277 16 : portal = SPI_cursor_open(NULL, plan->plan, values, nulls,
278 8 : exec_ctx->curr_proc->fn_readonly);
279 8 : if (portal == NULL)
280 0 : elog(ERROR, "SPI_cursor_open() failed: %s",
281 : SPI_result_code_string(SPI_result));
282 :
283 8 : cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
284 :
285 8 : PinPortal(portal);
286 :
287 8 : MemoryContextDelete(tmpcontext);
288 8 : PLy_spi_subtransaction_commit(oldcontext, oldowner);
289 : }
290 0 : PG_CATCH();
291 : {
292 0 : Py_DECREF(cursor);
293 : /* Subtransaction abort will remove the tmpcontext */
294 0 : PLy_spi_subtransaction_abort(oldcontext, oldowner);
295 0 : return NULL;
296 : }
297 8 : PG_END_TRY();
298 :
299 : Assert(cursor->portalname != NULL);
300 8 : return (PyObject *) cursor;
301 : }
302 :
303 : static void
304 36 : PLy_cursor_dealloc(PLyCursorObject *self)
305 : {
306 : #if PY_VERSION_HEX >= 0x03080000
307 36 : PyTypeObject *tp = Py_TYPE(self);
308 : #endif
309 : Portal portal;
310 :
311 36 : if (!self->closed)
312 : {
313 30 : portal = GetPortalByName(self->portalname);
314 :
315 30 : if (PortalIsValid(portal))
316 : {
317 24 : UnpinPortal(portal);
318 24 : SPI_cursor_close(portal);
319 : }
320 30 : self->closed = true;
321 : }
322 36 : if (self->mcxt)
323 : {
324 36 : MemoryContextDelete(self->mcxt);
325 36 : self->mcxt = NULL;
326 : }
327 :
328 36 : PyObject_Free(self);
329 : #if PY_VERSION_HEX >= 0x03080000
330 : /* This was not needed before Python 3.8 (Python issue 35810) */
331 36 : Py_DECREF(tp);
332 : #endif
333 36 : }
334 :
335 : static PyObject *
336 72 : PLy_cursor_iternext(PyObject *self)
337 : {
338 : PLyCursorObject *cursor;
339 : PyObject *ret;
340 72 : PLyExecutionContext *exec_ctx = PLy_current_execution_context();
341 : volatile MemoryContext oldcontext;
342 : volatile ResourceOwner oldowner;
343 : Portal portal;
344 :
345 72 : cursor = (PLyCursorObject *) self;
346 :
347 72 : if (cursor->closed)
348 : {
349 2 : PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
350 2 : return NULL;
351 : }
352 :
353 70 : portal = GetPortalByName(cursor->portalname);
354 70 : if (!PortalIsValid(portal))
355 : {
356 0 : PLy_exception_set(PyExc_ValueError,
357 : "iterating a cursor in an aborted subtransaction");
358 0 : return NULL;
359 : }
360 :
361 70 : oldcontext = CurrentMemoryContext;
362 70 : oldowner = CurrentResourceOwner;
363 :
364 70 : PLy_spi_subtransaction_begin(oldcontext, oldowner);
365 :
366 70 : PG_TRY();
367 : {
368 70 : SPI_cursor_fetch(portal, true, 1);
369 68 : if (SPI_processed == 0)
370 : {
371 16 : PyErr_SetNone(PyExc_StopIteration);
372 16 : ret = NULL;
373 : }
374 : else
375 : {
376 52 : PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
377 52 : exec_ctx->curr_proc);
378 :
379 52 : ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
380 52 : SPI_tuptable->tupdesc, true);
381 : }
382 :
383 68 : SPI_freetuptable(SPI_tuptable);
384 :
385 68 : PLy_spi_subtransaction_commit(oldcontext, oldowner);
386 : }
387 2 : PG_CATCH();
388 : {
389 2 : PLy_spi_subtransaction_abort(oldcontext, oldowner);
390 2 : return NULL;
391 : }
392 68 : PG_END_TRY();
393 :
394 68 : return ret;
395 : }
396 :
397 : static PyObject *
398 26 : PLy_cursor_fetch(PyObject *self, PyObject *args)
399 : {
400 : PLyCursorObject *cursor;
401 : int count;
402 : PLyResultObject *ret;
403 26 : PLyExecutionContext *exec_ctx = PLy_current_execution_context();
404 : volatile MemoryContext oldcontext;
405 : volatile ResourceOwner oldowner;
406 : Portal portal;
407 :
408 26 : if (!PyArg_ParseTuple(args, "i:fetch", &count))
409 0 : return NULL;
410 :
411 26 : cursor = (PLyCursorObject *) self;
412 :
413 26 : if (cursor->closed)
414 : {
415 2 : PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
416 2 : return NULL;
417 : }
418 :
419 24 : portal = GetPortalByName(cursor->portalname);
420 24 : if (!PortalIsValid(portal))
421 : {
422 4 : PLy_exception_set(PyExc_ValueError,
423 : "iterating a cursor in an aborted subtransaction");
424 4 : return NULL;
425 : }
426 :
427 20 : ret = (PLyResultObject *) PLy_result_new();
428 20 : if (ret == NULL)
429 0 : return NULL;
430 :
431 20 : oldcontext = CurrentMemoryContext;
432 20 : oldowner = CurrentResourceOwner;
433 :
434 20 : PLy_spi_subtransaction_begin(oldcontext, oldowner);
435 :
436 20 : PG_TRY();
437 : {
438 20 : SPI_cursor_fetch(portal, true, count);
439 :
440 20 : Py_DECREF(ret->status);
441 20 : ret->status = PyLong_FromLong(SPI_OK_FETCH);
442 :
443 20 : Py_DECREF(ret->nrows);
444 20 : ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
445 :
446 20 : if (SPI_processed != 0)
447 : {
448 : uint64 i;
449 :
450 : /*
451 : * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
452 : * and list indices; so we cannot support a result larger than
453 : * PY_SSIZE_T_MAX.
454 : */
455 14 : if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
456 0 : ereport(ERROR,
457 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
458 : errmsg("query result has too many rows to fit in a Python list")));
459 :
460 14 : Py_DECREF(ret->rows);
461 14 : ret->rows = PyList_New(SPI_processed);
462 14 : if (!ret->rows)
463 : {
464 0 : Py_DECREF(ret);
465 0 : ret = NULL;
466 : }
467 : else
468 : {
469 14 : PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
470 14 : exec_ctx->curr_proc);
471 :
472 88 : for (i = 0; i < SPI_processed; i++)
473 : {
474 148 : PyObject *row = PLy_input_from_tuple(&cursor->result,
475 74 : SPI_tuptable->vals[i],
476 74 : SPI_tuptable->tupdesc,
477 : true);
478 :
479 74 : PyList_SetItem(ret->rows, i, row);
480 : }
481 : }
482 : }
483 :
484 20 : SPI_freetuptable(SPI_tuptable);
485 :
486 20 : PLy_spi_subtransaction_commit(oldcontext, oldowner);
487 : }
488 0 : PG_CATCH();
489 : {
490 0 : PLy_spi_subtransaction_abort(oldcontext, oldowner);
491 0 : return NULL;
492 : }
493 20 : PG_END_TRY();
494 :
495 20 : return (PyObject *) ret;
496 : }
497 :
498 : static PyObject *
499 10 : PLy_cursor_close(PyObject *self, PyObject *unused)
500 : {
501 10 : PLyCursorObject *cursor = (PLyCursorObject *) self;
502 :
503 10 : if (!cursor->closed)
504 : {
505 8 : Portal portal = GetPortalByName(cursor->portalname);
506 :
507 8 : if (!PortalIsValid(portal))
508 : {
509 2 : PLy_exception_set(PyExc_ValueError,
510 : "closing a cursor in an aborted subtransaction");
511 2 : return NULL;
512 : }
513 :
514 6 : UnpinPortal(portal);
515 6 : SPI_cursor_close(portal);
516 6 : cursor->closed = true;
517 : }
518 :
519 8 : Py_RETURN_NONE;
520 : }
|