Line data Source code
1 : /*
2 : * the PLyCursor class
3 : *
4 : * src/pl/plpython/plpy_cursorobject.c
5 : */
6 :
7 : #include "postgres.h"
8 :
9 : #include <limits.h>
10 :
11 : #include "catalog/pg_type.h"
12 : #include "mb/pg_wchar.h"
13 : #include "plpy_cursorobject.h"
14 : #include "plpy_elog.h"
15 : #include "plpy_main.h"
16 : #include "plpy_planobject.h"
17 : #include "plpy_resultobject.h"
18 : #include "plpy_spi.h"
19 : #include "plpython.h"
20 : #include "utils/memutils.h"
21 :
22 : static PyObject *PLy_cursor_query(const char *query);
23 : static void PLy_cursor_dealloc(PyObject *arg);
24 : static PyObject *PLy_cursor_iternext(PyObject *self);
25 : static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
26 : static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
27 :
28 : static const char PLy_cursor_doc[] = "Wrapper around a PostgreSQL cursor";
29 :
30 : static PyMethodDef PLy_cursor_methods[] = {
31 : {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
32 : {"close", PLy_cursor_close, METH_NOARGS, NULL},
33 : {NULL, NULL, 0, NULL}
34 : };
35 :
36 : static PyTypeObject PLy_CursorType = {
37 : PyVarObject_HEAD_INIT(NULL, 0)
38 : .tp_name = "PLyCursor",
39 : .tp_basicsize = sizeof(PLyCursorObject),
40 : .tp_dealloc = PLy_cursor_dealloc,
41 : .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
42 : .tp_doc = PLy_cursor_doc,
43 : .tp_iter = PyObject_SelfIter,
44 : .tp_iternext = PLy_cursor_iternext,
45 : .tp_methods = PLy_cursor_methods,
46 : };
47 :
48 : void
49 46 : PLy_cursor_init_type(void)
50 : {
51 46 : if (PyType_Ready(&PLy_CursorType) < 0)
52 0 : elog(ERROR, "could not initialize PLy_CursorType");
53 46 : }
54 :
55 : PyObject *
56 36 : PLy_cursor(PyObject *self, PyObject *args)
57 : {
58 : char *query;
59 : PyObject *plan;
60 36 : PyObject *planargs = NULL;
61 :
62 36 : if (PyArg_ParseTuple(args, "s", &query))
63 28 : return PLy_cursor_query(query);
64 :
65 8 : PyErr_Clear();
66 :
67 8 : if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
68 8 : return PLy_cursor_plan(plan, planargs);
69 :
70 0 : PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
71 0 : return NULL;
72 : }
73 :
74 :
75 : static PyObject *
76 28 : PLy_cursor_query(const char *query)
77 : {
78 : PLyCursorObject *cursor;
79 28 : PLyExecutionContext *exec_ctx = PLy_current_execution_context();
80 : volatile MemoryContext oldcontext;
81 : volatile ResourceOwner oldowner;
82 :
83 28 : if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
84 0 : return NULL;
85 28 : cursor->portalname = NULL;
86 28 : cursor->closed = false;
87 28 : cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
88 : "PL/Python cursor context",
89 : ALLOCSET_DEFAULT_SIZES);
90 :
91 : /* Initialize for converting result tuples to Python */
92 28 : PLy_input_setup_func(&cursor->result, cursor->mcxt,
93 : RECORDOID, -1,
94 28 : exec_ctx->curr_proc);
95 :
96 28 : oldcontext = CurrentMemoryContext;
97 28 : oldowner = CurrentResourceOwner;
98 :
99 28 : PLy_spi_subtransaction_begin(oldcontext, oldowner);
100 :
101 28 : PG_TRY();
102 : {
103 : SPIPlanPtr plan;
104 : Portal portal;
105 :
106 28 : pg_verifymbstr(query, strlen(query), false);
107 :
108 28 : plan = SPI_prepare(query, 0, NULL);
109 28 : if (plan == NULL)
110 0 : elog(ERROR, "SPI_prepare failed: %s",
111 : SPI_result_code_string(SPI_result));
112 :
113 56 : portal = SPI_cursor_open(NULL, plan, NULL, NULL,
114 28 : exec_ctx->curr_proc->fn_readonly);
115 28 : SPI_freeplan(plan);
116 :
117 28 : if (portal == NULL)
118 0 : elog(ERROR, "SPI_cursor_open() failed: %s",
119 : SPI_result_code_string(SPI_result));
120 :
121 28 : cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
122 :
123 28 : PinPortal(portal);
124 :
125 28 : PLy_spi_subtransaction_commit(oldcontext, oldowner);
126 : }
127 0 : PG_CATCH();
128 : {
129 0 : PLy_spi_subtransaction_abort(oldcontext, oldowner);
130 0 : return NULL;
131 : }
132 28 : PG_END_TRY();
133 :
134 : Assert(cursor->portalname != NULL);
135 28 : return (PyObject *) cursor;
136 : }
137 :
138 : PyObject *
139 10 : PLy_cursor_plan(PyObject *ob, PyObject *args)
140 : {
141 : PLyCursorObject *cursor;
142 : volatile int nargs;
143 : int i;
144 : PLyPlanObject *plan;
145 10 : PLyExecutionContext *exec_ctx = PLy_current_execution_context();
146 : volatile MemoryContext oldcontext;
147 : volatile ResourceOwner oldowner;
148 :
149 10 : if (args)
150 : {
151 6 : if (!PySequence_Check(args) || PyUnicode_Check(args))
152 : {
153 0 : PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
154 0 : return NULL;
155 : }
156 6 : nargs = PySequence_Length(args);
157 : }
158 : else
159 4 : nargs = 0;
160 :
161 10 : plan = (PLyPlanObject *) ob;
162 :
163 10 : if (nargs != plan->nargs)
164 : {
165 : char *sv;
166 2 : PyObject *so = PyObject_Str(args);
167 :
168 2 : if (!so)
169 0 : PLy_elog(ERROR, "could not execute plan");
170 2 : sv = PLyUnicode_AsString(so);
171 2 : PLy_exception_set_plural(PyExc_TypeError,
172 : "Expected sequence of %d argument, got %d: %s",
173 : "Expected sequence of %d arguments, got %d: %s",
174 2 : plan->nargs,
175 : plan->nargs, nargs, sv);
176 2 : Py_DECREF(so);
177 :
178 2 : return NULL;
179 : }
180 :
181 8 : if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
182 0 : return NULL;
183 8 : cursor->portalname = NULL;
184 8 : cursor->closed = false;
185 8 : cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
186 : "PL/Python cursor context",
187 : ALLOCSET_DEFAULT_SIZES);
188 :
189 : /* Initialize for converting result tuples to Python */
190 8 : PLy_input_setup_func(&cursor->result, cursor->mcxt,
191 : RECORDOID, -1,
192 8 : exec_ctx->curr_proc);
193 :
194 8 : oldcontext = CurrentMemoryContext;
195 8 : oldowner = CurrentResourceOwner;
196 :
197 8 : PLy_spi_subtransaction_begin(oldcontext, oldowner);
198 :
199 8 : PG_TRY();
200 : {
201 : Portal portal;
202 : char *volatile nulls;
203 : volatile int j;
204 :
205 8 : if (nargs > 0)
206 4 : nulls = palloc(nargs * sizeof(char));
207 : else
208 4 : nulls = NULL;
209 :
210 12 : for (j = 0; j < nargs; j++)
211 : {
212 4 : PLyObToDatum *arg = &plan->args[j];
213 : PyObject *elem;
214 :
215 4 : elem = PySequence_GetItem(args, j);
216 4 : PG_TRY(2);
217 : {
218 : bool isnull;
219 :
220 4 : plan->values[j] = PLy_output_convert(arg, elem, &isnull);
221 4 : nulls[j] = isnull ? 'n' : ' ';
222 : }
223 0 : PG_FINALLY(2);
224 : {
225 4 : Py_DECREF(elem);
226 : }
227 4 : PG_END_TRY(2);
228 : }
229 :
230 16 : portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
231 8 : exec_ctx->curr_proc->fn_readonly);
232 8 : if (portal == NULL)
233 0 : elog(ERROR, "SPI_cursor_open() failed: %s",
234 : SPI_result_code_string(SPI_result));
235 :
236 8 : cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
237 :
238 8 : PinPortal(portal);
239 :
240 8 : PLy_spi_subtransaction_commit(oldcontext, oldowner);
241 : }
242 0 : PG_CATCH();
243 : {
244 : int k;
245 :
246 : /* cleanup plan->values array */
247 0 : for (k = 0; k < nargs; k++)
248 : {
249 0 : if (!plan->args[k].typbyval &&
250 0 : (plan->values[k] != PointerGetDatum(NULL)))
251 : {
252 0 : pfree(DatumGetPointer(plan->values[k]));
253 0 : plan->values[k] = PointerGetDatum(NULL);
254 : }
255 : }
256 :
257 0 : Py_DECREF(cursor);
258 :
259 0 : PLy_spi_subtransaction_abort(oldcontext, oldowner);
260 0 : return NULL;
261 : }
262 8 : PG_END_TRY();
263 :
264 12 : for (i = 0; i < nargs; i++)
265 : {
266 4 : if (!plan->args[i].typbyval &&
267 4 : (plan->values[i] != PointerGetDatum(NULL)))
268 : {
269 4 : pfree(DatumGetPointer(plan->values[i]));
270 4 : plan->values[i] = PointerGetDatum(NULL);
271 : }
272 : }
273 :
274 : Assert(cursor->portalname != NULL);
275 8 : return (PyObject *) cursor;
276 : }
277 :
278 : static void
279 36 : PLy_cursor_dealloc(PyObject *arg)
280 : {
281 : PLyCursorObject *cursor;
282 : Portal portal;
283 :
284 36 : cursor = (PLyCursorObject *) arg;
285 :
286 36 : if (!cursor->closed)
287 : {
288 30 : portal = GetPortalByName(cursor->portalname);
289 :
290 30 : if (PortalIsValid(portal))
291 : {
292 24 : UnpinPortal(portal);
293 24 : SPI_cursor_close(portal);
294 : }
295 30 : cursor->closed = true;
296 : }
297 36 : if (cursor->mcxt)
298 : {
299 36 : MemoryContextDelete(cursor->mcxt);
300 36 : cursor->mcxt = NULL;
301 : }
302 36 : arg->ob_type->tp_free(arg);
303 36 : }
304 :
305 : static PyObject *
306 72 : PLy_cursor_iternext(PyObject *self)
307 : {
308 : PLyCursorObject *cursor;
309 : PyObject *ret;
310 72 : PLyExecutionContext *exec_ctx = PLy_current_execution_context();
311 : volatile MemoryContext oldcontext;
312 : volatile ResourceOwner oldowner;
313 : Portal portal;
314 :
315 72 : cursor = (PLyCursorObject *) self;
316 :
317 72 : if (cursor->closed)
318 : {
319 2 : PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
320 2 : return NULL;
321 : }
322 :
323 70 : portal = GetPortalByName(cursor->portalname);
324 70 : if (!PortalIsValid(portal))
325 : {
326 0 : PLy_exception_set(PyExc_ValueError,
327 : "iterating a cursor in an aborted subtransaction");
328 0 : return NULL;
329 : }
330 :
331 70 : oldcontext = CurrentMemoryContext;
332 70 : oldowner = CurrentResourceOwner;
333 :
334 70 : PLy_spi_subtransaction_begin(oldcontext, oldowner);
335 :
336 70 : PG_TRY();
337 : {
338 70 : SPI_cursor_fetch(portal, true, 1);
339 68 : if (SPI_processed == 0)
340 : {
341 16 : PyErr_SetNone(PyExc_StopIteration);
342 16 : ret = NULL;
343 : }
344 : else
345 : {
346 52 : PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
347 52 : exec_ctx->curr_proc);
348 :
349 52 : ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
350 52 : SPI_tuptable->tupdesc, true);
351 : }
352 :
353 68 : SPI_freetuptable(SPI_tuptable);
354 :
355 68 : PLy_spi_subtransaction_commit(oldcontext, oldowner);
356 : }
357 2 : PG_CATCH();
358 : {
359 2 : PLy_spi_subtransaction_abort(oldcontext, oldowner);
360 2 : return NULL;
361 : }
362 68 : PG_END_TRY();
363 :
364 68 : return ret;
365 : }
366 :
367 : static PyObject *
368 26 : PLy_cursor_fetch(PyObject *self, PyObject *args)
369 : {
370 : PLyCursorObject *cursor;
371 : int count;
372 : PLyResultObject *ret;
373 26 : PLyExecutionContext *exec_ctx = PLy_current_execution_context();
374 : volatile MemoryContext oldcontext;
375 : volatile ResourceOwner oldowner;
376 : Portal portal;
377 :
378 26 : if (!PyArg_ParseTuple(args, "i:fetch", &count))
379 0 : return NULL;
380 :
381 26 : cursor = (PLyCursorObject *) self;
382 :
383 26 : if (cursor->closed)
384 : {
385 2 : PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
386 2 : return NULL;
387 : }
388 :
389 24 : portal = GetPortalByName(cursor->portalname);
390 24 : if (!PortalIsValid(portal))
391 : {
392 4 : PLy_exception_set(PyExc_ValueError,
393 : "iterating a cursor in an aborted subtransaction");
394 4 : return NULL;
395 : }
396 :
397 20 : ret = (PLyResultObject *) PLy_result_new();
398 20 : if (ret == NULL)
399 0 : return NULL;
400 :
401 20 : oldcontext = CurrentMemoryContext;
402 20 : oldowner = CurrentResourceOwner;
403 :
404 20 : PLy_spi_subtransaction_begin(oldcontext, oldowner);
405 :
406 20 : PG_TRY();
407 : {
408 20 : SPI_cursor_fetch(portal, true, count);
409 :
410 20 : Py_DECREF(ret->status);
411 20 : ret->status = PyLong_FromLong(SPI_OK_FETCH);
412 :
413 20 : Py_DECREF(ret->nrows);
414 20 : ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
415 :
416 20 : if (SPI_processed != 0)
417 : {
418 : uint64 i;
419 :
420 : /*
421 : * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
422 : * and list indices; so we cannot support a result larger than
423 : * PY_SSIZE_T_MAX.
424 : */
425 14 : if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
426 0 : ereport(ERROR,
427 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
428 : errmsg("query result has too many rows to fit in a Python list")));
429 :
430 14 : Py_DECREF(ret->rows);
431 14 : ret->rows = PyList_New(SPI_processed);
432 14 : if (!ret->rows)
433 : {
434 0 : Py_DECREF(ret);
435 0 : ret = NULL;
436 : }
437 : else
438 : {
439 14 : PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
440 14 : exec_ctx->curr_proc);
441 :
442 88 : for (i = 0; i < SPI_processed; i++)
443 : {
444 148 : PyObject *row = PLy_input_from_tuple(&cursor->result,
445 74 : SPI_tuptable->vals[i],
446 74 : SPI_tuptable->tupdesc,
447 : true);
448 :
449 74 : PyList_SetItem(ret->rows, i, row);
450 : }
451 : }
452 : }
453 :
454 20 : SPI_freetuptable(SPI_tuptable);
455 :
456 20 : PLy_spi_subtransaction_commit(oldcontext, oldowner);
457 : }
458 0 : PG_CATCH();
459 : {
460 0 : PLy_spi_subtransaction_abort(oldcontext, oldowner);
461 0 : return NULL;
462 : }
463 20 : PG_END_TRY();
464 :
465 20 : return (PyObject *) ret;
466 : }
467 :
468 : static PyObject *
469 10 : PLy_cursor_close(PyObject *self, PyObject *unused)
470 : {
471 10 : PLyCursorObject *cursor = (PLyCursorObject *) self;
472 :
473 10 : if (!cursor->closed)
474 : {
475 8 : Portal portal = GetPortalByName(cursor->portalname);
476 :
477 8 : if (!PortalIsValid(portal))
478 : {
479 2 : PLy_exception_set(PyExc_ValueError,
480 : "closing a cursor in an aborted subtransaction");
481 2 : return NULL;
482 : }
483 :
484 6 : UnpinPortal(portal);
485 6 : SPI_cursor_close(portal);
486 6 : cursor->closed = true;
487 : }
488 :
489 8 : Py_RETURN_NONE;
490 : }
|