Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * libpq_pipeline.c
4 : * Verify libpq pipeline execution functionality
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/test/modules/libpq_pipeline/libpq_pipeline.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres_fe.h"
17 :
18 : #include <sys/select.h>
19 : #include <sys/time.h>
20 :
21 : #include "catalog/pg_type_d.h"
22 : #include "libpq-fe.h"
23 : #include "pg_getopt.h"
24 :
25 :
26 : static void exit_nicely(PGconn *conn);
27 : pg_noreturn static void pg_fatal_impl(int line, const char *fmt,...)
28 : pg_attribute_printf(2, 3);
29 : static bool process_result(PGconn *conn, PGresult *res, int results,
30 : int numsent);
31 :
32 : static const char *const progname = "libpq_pipeline";
33 :
34 : /* Options and defaults */
35 : static char *tracefile = NULL; /* path to PQtrace() file */
36 :
37 :
38 : #ifdef DEBUG_OUTPUT
39 : #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
40 : #else
41 : #define pg_debug(...)
42 : #endif
43 :
44 : static const char *const drop_table_sql =
45 : "DROP TABLE IF EXISTS pq_pipeline_demo";
46 : static const char *const create_table_sql =
47 : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
48 : "int8filler int8);";
49 : static const char *const insert_sql =
50 : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
51 : static const char *const insert_sql2 =
52 : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
53 :
54 : /* max char length of an int32/64, plus sign and null terminator */
55 : #define MAXINTLEN 12
56 : #define MAXINT8LEN 20
57 :
58 : static void
59 0 : exit_nicely(PGconn *conn)
60 : {
61 0 : PQfinish(conn);
62 0 : exit(1);
63 : }
64 :
65 : /*
66 : * The following few functions are wrapped in macros to make the reported line
67 : * number in an error match the line number of the invocation.
68 : */
69 :
70 : /*
71 : * Print an error to stderr and terminate the program.
72 : */
73 : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
74 : pg_noreturn static void
75 0 : pg_fatal_impl(int line, const char *fmt,...)
76 : {
77 : va_list args;
78 :
79 0 : fflush(stdout);
80 :
81 0 : fprintf(stderr, "\n%s:%d: ", progname, line);
82 0 : va_start(args, fmt);
83 0 : vfprintf(stderr, fmt, args);
84 0 : va_end(args);
85 : Assert(fmt[strlen(fmt) - 1] != '\n');
86 0 : fprintf(stderr, "\n");
87 0 : exit(1);
88 : }
89 :
90 : /*
91 : * Check that libpq next returns a PGresult with the specified status,
92 : * returning the PGresult so that caller can perform additional checks.
93 : */
94 : #define confirm_result_status(conn, status) confirm_result_status_impl(__LINE__, conn, status)
95 : static PGresult *
96 112 : confirm_result_status_impl(int line, PGconn *conn, ExecStatusType status)
97 : {
98 : PGresult *res;
99 :
100 112 : res = PQgetResult(conn);
101 112 : if (res == NULL)
102 0 : pg_fatal_impl(line, "PQgetResult returned null unexpectedly: %s",
103 : PQerrorMessage(conn));
104 112 : if (PQresultStatus(res) != status)
105 0 : pg_fatal_impl(line, "PQgetResult returned status %s, expected %s: %s",
106 : PQresStatus(PQresultStatus(res)),
107 : PQresStatus(status),
108 : PQerrorMessage(conn));
109 112 : return res;
110 : }
111 :
112 : /*
113 : * Check that libpq next returns a PGresult with the specified status,
114 : * then free the PGresult.
115 : */
116 : #define consume_result_status(conn, status) consume_result_status_impl(__LINE__, conn, status)
117 : static void
118 78 : consume_result_status_impl(int line, PGconn *conn, ExecStatusType status)
119 : {
120 : PGresult *res;
121 :
122 78 : res = confirm_result_status_impl(line, conn, status);
123 78 : PQclear(res);
124 78 : }
125 :
126 : /*
127 : * Check that libpq next returns a null PGresult.
128 : */
129 : #define consume_null_result(conn) consume_null_result_impl(__LINE__, conn)
130 : static void
131 1638 : consume_null_result_impl(int line, PGconn *conn)
132 : {
133 : PGresult *res;
134 :
135 1638 : res = PQgetResult(conn);
136 1638 : if (res != NULL)
137 0 : pg_fatal_impl(line, "expected NULL PGresult, got %s: %s",
138 : PQresStatus(PQresultStatus(res)),
139 : PQerrorMessage(conn));
140 1638 : }
141 :
142 : /*
143 : * Check that the query on the given connection got canceled.
144 : */
145 : #define consume_query_cancel(conn) consume_query_cancel_impl(__LINE__, conn)
146 : static void
147 24 : consume_query_cancel_impl(int line, PGconn *conn)
148 : {
149 : PGresult *res;
150 :
151 24 : res = confirm_result_status_impl(line, conn, PGRES_FATAL_ERROR);
152 24 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
153 0 : pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
154 : PQerrorMessage(conn));
155 24 : PQclear(res);
156 :
157 24 : while (PQisBusy(conn))
158 0 : PQconsumeInput(conn);
159 24 : }
160 :
161 : /*
162 : * Using monitorConn, query pg_stat_activity to see that the connection with
163 : * the given PID is either in the given state, or waiting on the given event
164 : * (only one of them can be given).
165 : */
166 : static void
167 48 : wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
168 : char *state, char *event)
169 : {
170 48 : const Oid paramTypes[] = {INT4OID, TEXTOID};
171 : const char *paramValues[2];
172 48 : char *pidstr = psprintf("%d", procpid);
173 :
174 : Assert((state == NULL) ^ (event == NULL));
175 :
176 48 : paramValues[0] = pidstr;
177 48 : paramValues[1] = state ? state : event;
178 :
179 : while (true)
180 2 : {
181 : PGresult *res;
182 : char *value;
183 :
184 50 : if (state != NULL)
185 24 : res = PQexecParams(monitorConn,
186 : "SELECT count(*) FROM pg_stat_activity WHERE "
187 : "pid = $1 AND state = $2",
188 : 2, paramTypes, paramValues, NULL, NULL, 0);
189 : else
190 26 : res = PQexecParams(monitorConn,
191 : "SELECT count(*) FROM pg_stat_activity WHERE "
192 : "pid = $1 AND wait_event = $2",
193 : 2, paramTypes, paramValues, NULL, NULL, 0);
194 :
195 50 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
196 0 : pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
197 50 : if (PQntuples(res) != 1)
198 0 : pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
199 50 : if (PQnfields(res) != 1)
200 0 : pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
201 50 : value = PQgetvalue(res, 0, 0);
202 50 : if (strcmp(value, "0") != 0)
203 : {
204 48 : PQclear(res);
205 48 : break;
206 : }
207 2 : PQclear(res);
208 :
209 : /* wait 10ms before polling again */
210 2 : pg_usleep(10000);
211 : }
212 :
213 48 : pfree(pidstr);
214 48 : }
215 :
216 : #define send_cancellable_query(conn, monitorConn) \
217 : send_cancellable_query_impl(__LINE__, conn, monitorConn)
218 : static void
219 24 : send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
220 : {
221 : const char *env_wait;
222 24 : const Oid paramTypes[1] = {INT4OID};
223 :
224 : /*
225 : * Wait for the connection to be idle, so that our check for an active
226 : * connection below is reliable, instead of possibly seeing an outdated
227 : * state.
228 : */
229 24 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
230 :
231 24 : env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
232 24 : if (env_wait == NULL)
233 24 : env_wait = "180";
234 :
235 24 : if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
236 : &env_wait, NULL, NULL, 0) != 1)
237 0 : pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
238 :
239 : /*
240 : * Wait for the sleep to be active, because if the query is not running
241 : * yet, the cancel request that we send won't have any effect.
242 : */
243 24 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
244 24 : }
245 :
246 : /*
247 : * Create a new connection with the same conninfo as the given one.
248 : */
249 : static PGconn *
250 4 : copy_connection(PGconn *conn)
251 : {
252 : PGconn *copyConn;
253 4 : PQconninfoOption *opts = PQconninfo(conn);
254 : const char **keywords;
255 : const char **vals;
256 4 : int nopts = 0;
257 : int i;
258 :
259 208 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
260 204 : nopts++;
261 4 : nopts++; /* for the NULL terminator */
262 :
263 4 : keywords = pg_malloc(sizeof(char *) * nopts);
264 4 : vals = pg_malloc(sizeof(char *) * nopts);
265 :
266 4 : i = 0;
267 208 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
268 : {
269 204 : if (opt->val)
270 : {
271 80 : keywords[i] = opt->keyword;
272 80 : vals[i] = opt->val;
273 80 : i++;
274 : }
275 : }
276 4 : keywords[i] = vals[i] = NULL;
277 :
278 4 : copyConn = PQconnectdbParams(keywords, vals, false);
279 :
280 4 : if (PQstatus(copyConn) != CONNECTION_OK)
281 0 : pg_fatal("Connection to database failed: %s",
282 : PQerrorMessage(copyConn));
283 :
284 4 : pfree(keywords);
285 4 : pfree(vals);
286 4 : PQconninfoFree(opts);
287 :
288 4 : return copyConn;
289 : }
290 :
291 : /*
292 : * Test query cancellation routines
293 : */
294 : static void
295 4 : test_cancel(PGconn *conn)
296 : {
297 : PGcancel *cancel;
298 : PGcancelConn *cancelConn;
299 : PGconn *monitorConn;
300 : char errorbuf[256];
301 :
302 4 : fprintf(stderr, "test cancellations... ");
303 :
304 4 : if (PQsetnonblocking(conn, 1) != 0)
305 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
306 :
307 : /*
308 : * Make a separate connection to the database to monitor the query on the
309 : * main connection.
310 : */
311 4 : monitorConn = copy_connection(conn);
312 : Assert(PQstatus(monitorConn) == CONNECTION_OK);
313 :
314 : /* test PQcancel */
315 4 : send_cancellable_query(conn, monitorConn);
316 4 : cancel = PQgetCancel(conn);
317 4 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
318 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
319 4 : consume_query_cancel(conn);
320 :
321 : /* PGcancel object can be reused for the next query */
322 4 : send_cancellable_query(conn, monitorConn);
323 4 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
324 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
325 4 : consume_query_cancel(conn);
326 :
327 4 : PQfreeCancel(cancel);
328 :
329 : /* test PQrequestCancel */
330 4 : send_cancellable_query(conn, monitorConn);
331 4 : if (!PQrequestCancel(conn))
332 0 : pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
333 4 : consume_query_cancel(conn);
334 :
335 : /* test PQcancelBlocking */
336 4 : send_cancellable_query(conn, monitorConn);
337 4 : cancelConn = PQcancelCreate(conn);
338 4 : if (!PQcancelBlocking(cancelConn))
339 0 : pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
340 4 : consume_query_cancel(conn);
341 4 : PQcancelFinish(cancelConn);
342 :
343 : /* test PQcancelCreate and then polling with PQcancelPoll */
344 4 : send_cancellable_query(conn, monitorConn);
345 4 : cancelConn = PQcancelCreate(conn);
346 4 : if (!PQcancelStart(cancelConn))
347 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
348 : while (true)
349 4 : {
350 : struct timeval tv;
351 : fd_set input_mask;
352 : fd_set output_mask;
353 8 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
354 8 : int sock = PQcancelSocket(cancelConn);
355 :
356 8 : if (pollres == PGRES_POLLING_OK)
357 4 : break;
358 :
359 68 : FD_ZERO(&input_mask);
360 68 : FD_ZERO(&output_mask);
361 4 : switch (pollres)
362 : {
363 4 : case PGRES_POLLING_READING:
364 : pg_debug("polling for reads\n");
365 4 : FD_SET(sock, &input_mask);
366 4 : break;
367 0 : case PGRES_POLLING_WRITING:
368 : pg_debug("polling for writes\n");
369 0 : FD_SET(sock, &output_mask);
370 0 : break;
371 0 : default:
372 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
373 : }
374 :
375 4 : if (sock < 0)
376 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
377 :
378 4 : tv.tv_sec = 3;
379 4 : tv.tv_usec = 0;
380 :
381 : while (true)
382 : {
383 4 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
384 : {
385 0 : if (errno == EINTR)
386 0 : continue;
387 0 : pg_fatal("select() failed: %m");
388 : }
389 4 : break;
390 : }
391 : }
392 4 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
393 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
394 4 : consume_query_cancel(conn);
395 :
396 : /*
397 : * test PQcancelReset works on the cancel connection and it can be reused
398 : * afterwards
399 : */
400 4 : PQcancelReset(cancelConn);
401 :
402 4 : send_cancellable_query(conn, monitorConn);
403 4 : if (!PQcancelStart(cancelConn))
404 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
405 : while (true)
406 4 : {
407 : struct timeval tv;
408 : fd_set input_mask;
409 : fd_set output_mask;
410 8 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
411 8 : int sock = PQcancelSocket(cancelConn);
412 :
413 8 : if (pollres == PGRES_POLLING_OK)
414 4 : break;
415 :
416 68 : FD_ZERO(&input_mask);
417 68 : FD_ZERO(&output_mask);
418 4 : switch (pollres)
419 : {
420 4 : case PGRES_POLLING_READING:
421 : pg_debug("polling for reads\n");
422 4 : FD_SET(sock, &input_mask);
423 4 : break;
424 0 : case PGRES_POLLING_WRITING:
425 : pg_debug("polling for writes\n");
426 0 : FD_SET(sock, &output_mask);
427 0 : break;
428 0 : default:
429 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
430 : }
431 :
432 4 : if (sock < 0)
433 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
434 :
435 4 : tv.tv_sec = 3;
436 4 : tv.tv_usec = 0;
437 :
438 : while (true)
439 : {
440 4 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
441 : {
442 0 : if (errno == EINTR)
443 0 : continue;
444 0 : pg_fatal("select() failed: %m");
445 : }
446 4 : break;
447 : }
448 : }
449 4 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
450 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
451 4 : consume_query_cancel(conn);
452 :
453 4 : PQcancelFinish(cancelConn);
454 4 : PQfinish(monitorConn);
455 :
456 4 : fprintf(stderr, "ok\n");
457 4 : }
458 :
459 : static void
460 2 : test_disallowed_in_pipeline(PGconn *conn)
461 : {
462 2 : PGresult *res = NULL;
463 :
464 2 : fprintf(stderr, "test error cases... ");
465 :
466 2 : if (PQisnonblocking(conn))
467 0 : pg_fatal("Expected blocking connection mode");
468 :
469 2 : if (PQenterPipelineMode(conn) != 1)
470 0 : pg_fatal("Unable to enter pipeline mode");
471 :
472 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
473 0 : pg_fatal("Pipeline mode not activated properly");
474 :
475 : /* PQexec should fail in pipeline mode */
476 2 : res = PQexec(conn, "SELECT 1");
477 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
478 0 : pg_fatal("PQexec should fail in pipeline mode but succeeded");
479 2 : if (strcmp(PQerrorMessage(conn),
480 : "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
481 0 : pg_fatal("did not get expected error message; got: \"%s\"",
482 : PQerrorMessage(conn));
483 2 : PQclear(res);
484 :
485 : /* PQsendQuery should fail in pipeline mode */
486 2 : if (PQsendQuery(conn, "SELECT 1") != 0)
487 0 : pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
488 2 : if (strcmp(PQerrorMessage(conn),
489 : "PQsendQuery not allowed in pipeline mode\n") != 0)
490 0 : pg_fatal("did not get expected error message; got: \"%s\"",
491 : PQerrorMessage(conn));
492 :
493 : /* Entering pipeline mode when already in pipeline mode is OK */
494 2 : if (PQenterPipelineMode(conn) != 1)
495 0 : pg_fatal("re-entering pipeline mode should be a no-op but failed");
496 :
497 2 : if (PQisBusy(conn) != 0)
498 0 : pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
499 :
500 : /* ok, back to normal command mode */
501 2 : if (PQexitPipelineMode(conn) != 1)
502 0 : pg_fatal("couldn't exit idle empty pipeline mode");
503 :
504 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
505 0 : pg_fatal("Pipeline mode not terminated properly");
506 :
507 : /* exiting pipeline mode when not in pipeline mode should be a no-op */
508 2 : if (PQexitPipelineMode(conn) != 1)
509 0 : pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
510 :
511 : /* can now PQexec again */
512 2 : res = PQexec(conn, "SELECT 1");
513 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
514 0 : pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
515 : PQerrorMessage(conn));
516 2 : PQclear(res);
517 :
518 2 : fprintf(stderr, "ok\n");
519 2 : }
520 :
521 : static void
522 2 : test_multi_pipelines(PGconn *conn)
523 : {
524 2 : const char *dummy_params[1] = {"1"};
525 2 : Oid dummy_param_oids[1] = {INT4OID};
526 :
527 2 : fprintf(stderr, "multi pipeline... ");
528 :
529 : /*
530 : * Queue up a couple of small pipelines and process each without returning
531 : * to command mode first.
532 : */
533 2 : if (PQenterPipelineMode(conn) != 1)
534 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
535 :
536 : /* first pipeline */
537 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
538 : dummy_params, NULL, NULL, 0) != 1)
539 0 : pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
540 :
541 2 : if (PQpipelineSync(conn) != 1)
542 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
543 :
544 : /* second pipeline */
545 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
546 : dummy_params, NULL, NULL, 0) != 1)
547 0 : pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
548 :
549 : /* Skip flushing once. */
550 2 : if (PQsendPipelineSync(conn) != 1)
551 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
552 :
553 : /* third pipeline */
554 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
555 : dummy_params, NULL, NULL, 0) != 1)
556 0 : pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
557 :
558 2 : if (PQpipelineSync(conn) != 1)
559 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
560 :
561 : /* OK, start processing the results */
562 :
563 : /* first pipeline */
564 2 : consume_result_status(conn, PGRES_TUPLES_OK);
565 :
566 2 : consume_null_result(conn);
567 :
568 2 : if (PQexitPipelineMode(conn) != 0)
569 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
570 :
571 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
572 :
573 : /* second pipeline */
574 2 : consume_result_status(conn, PGRES_TUPLES_OK);
575 :
576 2 : consume_null_result(conn);
577 :
578 2 : if (PQexitPipelineMode(conn) != 0)
579 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
580 :
581 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
582 :
583 : /* third pipeline */
584 2 : consume_result_status(conn, PGRES_TUPLES_OK);
585 :
586 2 : consume_null_result(conn);
587 :
588 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
589 :
590 : /* We're still in pipeline mode ... */
591 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
592 0 : pg_fatal("Fell out of pipeline mode somehow");
593 :
594 : /* until we end it, which we can safely do now */
595 2 : if (PQexitPipelineMode(conn) != 1)
596 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
597 : PQerrorMessage(conn));
598 :
599 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
600 0 : pg_fatal("exiting pipeline mode didn't seem to work");
601 :
602 2 : fprintf(stderr, "ok\n");
603 2 : }
604 :
605 : /*
606 : * Test behavior when a pipeline dispatches a number of commands that are
607 : * not flushed by a sync point.
608 : */
609 : static void
610 2 : test_nosync(PGconn *conn)
611 : {
612 2 : int numqueries = 10;
613 2 : int results = 0;
614 2 : int sock = PQsocket(conn);
615 :
616 2 : fprintf(stderr, "nosync... ");
617 :
618 2 : if (sock < 0)
619 0 : pg_fatal("invalid socket");
620 :
621 2 : if (PQenterPipelineMode(conn) != 1)
622 0 : pg_fatal("could not enter pipeline mode");
623 22 : for (int i = 0; i < numqueries; i++)
624 : {
625 : fd_set input_mask;
626 : struct timeval tv;
627 :
628 20 : if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
629 : 0, NULL, NULL, NULL, NULL, 0) != 1)
630 0 : pg_fatal("error sending select: %s", PQerrorMessage(conn));
631 20 : PQflush(conn);
632 :
633 : /*
634 : * If the server has written anything to us, read (some of) it now.
635 : */
636 340 : FD_ZERO(&input_mask);
637 20 : FD_SET(sock, &input_mask);
638 20 : tv.tv_sec = 0;
639 20 : tv.tv_usec = 0;
640 20 : if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
641 : {
642 0 : fprintf(stderr, "select() failed: %m\n");
643 0 : exit_nicely(conn);
644 : }
645 20 : if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
646 0 : pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
647 : }
648 :
649 : /* tell server to flush its output buffer */
650 2 : if (PQsendFlushRequest(conn) != 1)
651 0 : pg_fatal("failed to send flush request");
652 2 : PQflush(conn);
653 :
654 : /* Now read all results */
655 : for (;;)
656 : {
657 : /* We expect exactly one TUPLES_OK result for each query we sent */
658 20 : consume_result_status(conn, PGRES_TUPLES_OK);
659 :
660 : /* and one NULL result should follow each */
661 20 : consume_null_result(conn);
662 :
663 20 : results++;
664 :
665 : /* if we're done, we're done */
666 20 : if (results == numqueries)
667 2 : break;
668 : }
669 :
670 2 : fprintf(stderr, "ok\n");
671 2 : }
672 :
673 : /*
674 : * When an operation in a pipeline fails the rest of the pipeline is flushed. We
675 : * still have to get results for each pipeline item, but the item will just be
676 : * a PGRES_PIPELINE_ABORTED code.
677 : *
678 : * This intentionally doesn't use a transaction to wrap the pipeline. You should
679 : * usually use an xact, but in this case we want to observe the effects of each
680 : * statement.
681 : */
682 : static void
683 2 : test_pipeline_abort(PGconn *conn)
684 : {
685 2 : PGresult *res = NULL;
686 2 : const char *dummy_params[1] = {"1"};
687 2 : Oid dummy_param_oids[1] = {INT4OID};
688 : int i;
689 : int gotrows;
690 : bool goterror;
691 :
692 2 : fprintf(stderr, "aborted pipeline... ");
693 :
694 2 : res = PQexec(conn, drop_table_sql);
695 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
696 0 : pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
697 2 : PQclear(res);
698 :
699 2 : res = PQexec(conn, create_table_sql);
700 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
701 0 : pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
702 2 : PQclear(res);
703 :
704 : /*
705 : * Queue up a couple of small pipelines and process each without returning
706 : * to command mode first. Make sure the second operation in the first
707 : * pipeline ERRORs.
708 : */
709 2 : if (PQenterPipelineMode(conn) != 1)
710 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
711 :
712 2 : dummy_params[0] = "1";
713 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
714 : dummy_params, NULL, NULL, 0) != 1)
715 0 : pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
716 :
717 2 : if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
718 : 1, dummy_param_oids, dummy_params,
719 : NULL, NULL, 0) != 1)
720 0 : pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
721 :
722 2 : dummy_params[0] = "2";
723 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
724 : dummy_params, NULL, NULL, 0) != 1)
725 0 : pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
726 :
727 2 : if (PQpipelineSync(conn) != 1)
728 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
729 :
730 2 : dummy_params[0] = "3";
731 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
732 : dummy_params, NULL, NULL, 0) != 1)
733 0 : pg_fatal("dispatching second-pipeline insert failed: %s",
734 : PQerrorMessage(conn));
735 :
736 2 : if (PQpipelineSync(conn) != 1)
737 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
738 :
739 : /*
740 : * OK, start processing the pipeline results.
741 : *
742 : * We should get a command-ok for the first query, then a fatal error and
743 : * a pipeline aborted message for the second insert, a pipeline-end, then
744 : * a command-ok and a pipeline-ok for the second pipeline operation.
745 : */
746 2 : consume_result_status(conn, PGRES_COMMAND_OK);
747 :
748 : /* NULL result to signal end-of-results for this command */
749 2 : consume_null_result(conn);
750 :
751 : /* Second query caused error, so we expect an error next */
752 2 : consume_result_status(conn, PGRES_FATAL_ERROR);
753 :
754 : /* NULL result to signal end-of-results for this command */
755 2 : consume_null_result(conn);
756 :
757 : /*
758 : * pipeline should now be aborted.
759 : *
760 : * Note that we could still queue more queries at this point if we wanted;
761 : * they'd get added to a new third pipeline since we've already sent a
762 : * second. The aborted flag relates only to the pipeline being received.
763 : */
764 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
765 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
766 :
767 : /* third query in pipeline, the second insert */
768 2 : consume_result_status(conn, PGRES_PIPELINE_ABORTED);
769 :
770 : /* NULL result to signal end-of-results for this command */
771 2 : consume_null_result(conn);
772 :
773 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
774 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
775 :
776 : /* Ensure we're still in pipeline */
777 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
778 0 : pg_fatal("Fell out of pipeline mode somehow");
779 :
780 : /*
781 : * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
782 : *
783 : * (This is so clients know to start processing results normally again and
784 : * can tell the difference between skipped commands and the sync.)
785 : */
786 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
787 :
788 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
789 0 : pg_fatal("sync should've cleared the aborted flag but didn't");
790 :
791 : /* We're still in pipeline mode... */
792 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
793 0 : pg_fatal("Fell out of pipeline mode somehow");
794 :
795 : /* the insert from the second pipeline */
796 2 : consume_result_status(conn, PGRES_COMMAND_OK);
797 :
798 : /* Read the NULL result at the end of the command */
799 2 : consume_null_result(conn);
800 :
801 : /* the second pipeline sync */
802 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
803 :
804 : /* Read the NULL result at the end of the command */
805 2 : consume_null_result(conn);
806 :
807 : /* Try to send two queries in one command */
808 2 : if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
809 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
810 2 : if (PQpipelineSync(conn) != 1)
811 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
812 2 : goterror = false;
813 4 : while ((res = PQgetResult(conn)) != NULL)
814 : {
815 2 : switch (PQresultStatus(res))
816 : {
817 2 : case PGRES_FATAL_ERROR:
818 2 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
819 0 : pg_fatal("expected error about multiple commands, got %s",
820 : PQerrorMessage(conn));
821 2 : printf("got expected %s", PQerrorMessage(conn));
822 2 : goterror = true;
823 2 : break;
824 0 : default:
825 0 : pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
826 : break;
827 : }
828 2 : PQclear(res);
829 : }
830 2 : if (!goterror)
831 0 : pg_fatal("did not get cannot-insert-multiple-commands error");
832 :
833 : /* the second pipeline sync */
834 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
835 :
836 2 : fprintf(stderr, "ok\n");
837 :
838 : /* Test single-row mode with an error partways */
839 2 : if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
840 : 0, NULL, NULL, NULL, NULL, 0) != 1)
841 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
842 2 : if (PQpipelineSync(conn) != 1)
843 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
844 2 : PQsetSingleRowMode(conn);
845 2 : goterror = false;
846 2 : gotrows = 0;
847 10 : while ((res = PQgetResult(conn)) != NULL)
848 : {
849 8 : switch (PQresultStatus(res))
850 : {
851 6 : case PGRES_SINGLE_TUPLE:
852 6 : printf("got row: %s\n", PQgetvalue(res, 0, 0));
853 6 : gotrows++;
854 6 : break;
855 2 : case PGRES_FATAL_ERROR:
856 2 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
857 0 : pg_fatal("expected division-by-zero, got: %s (%s)",
858 : PQerrorMessage(conn),
859 : PQresultErrorField(res, PG_DIAG_SQLSTATE));
860 2 : printf("got expected division-by-zero\n");
861 2 : goterror = true;
862 2 : break;
863 0 : default:
864 0 : pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
865 : }
866 8 : PQclear(res);
867 : }
868 2 : if (!goterror)
869 0 : pg_fatal("did not get division-by-zero error");
870 2 : if (gotrows != 3)
871 0 : pg_fatal("did not get three rows");
872 :
873 : /* the third pipeline sync */
874 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
875 :
876 : /* We're still in pipeline mode... */
877 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
878 0 : pg_fatal("Fell out of pipeline mode somehow");
879 :
880 : /* until we end it, which we can safely do now */
881 2 : if (PQexitPipelineMode(conn) != 1)
882 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
883 : PQerrorMessage(conn));
884 :
885 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
886 0 : pg_fatal("exiting pipeline mode didn't seem to work");
887 :
888 : /*-
889 : * Since we fired the pipelines off without a surrounding xact, the results
890 : * should be:
891 : *
892 : * - Implicit xact started by server around 1st pipeline
893 : * - First insert applied
894 : * - Second statement aborted xact
895 : * - Third insert skipped
896 : * - Sync rolled back first implicit xact
897 : * - Implicit xact created by server around 2nd pipeline
898 : * - insert applied from 2nd pipeline
899 : * - Sync commits 2nd xact
900 : *
901 : * So we should only have the value 3 that we inserted.
902 : */
903 2 : res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
904 :
905 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
906 0 : pg_fatal("Expected tuples, got %s: %s",
907 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
908 2 : if (PQntuples(res) != 1)
909 0 : pg_fatal("expected 1 result, got %d", PQntuples(res));
910 4 : for (i = 0; i < PQntuples(res); i++)
911 : {
912 2 : const char *val = PQgetvalue(res, i, 0);
913 :
914 2 : if (strcmp(val, "3") != 0)
915 0 : pg_fatal("expected only insert with value 3, got %s", val);
916 : }
917 :
918 2 : PQclear(res);
919 :
920 2 : fprintf(stderr, "ok\n");
921 2 : }
922 :
923 : /* State machine enum for test_pipelined_insert */
924 : enum PipelineInsertStep
925 : {
926 : BI_BEGIN_TX,
927 : BI_DROP_TABLE,
928 : BI_CREATE_TABLE,
929 : BI_PREPARE,
930 : BI_INSERT_ROWS,
931 : BI_COMMIT_TX,
932 : BI_SYNC,
933 : BI_DONE,
934 : };
935 :
936 : static void
937 2 : test_pipelined_insert(PGconn *conn, int n_rows)
938 : {
939 2 : Oid insert_param_oids[2] = {INT4OID, INT8OID};
940 : const char *insert_params[2];
941 : char insert_param_0[MAXINTLEN];
942 : char insert_param_1[MAXINT8LEN];
943 2 : enum PipelineInsertStep send_step = BI_BEGIN_TX,
944 2 : recv_step = BI_BEGIN_TX;
945 : int rows_to_send,
946 : rows_to_receive;
947 :
948 2 : insert_params[0] = insert_param_0;
949 2 : insert_params[1] = insert_param_1;
950 :
951 2 : rows_to_send = rows_to_receive = n_rows;
952 :
953 : /*
954 : * Do a pipelined insert into a table created at the start of the pipeline
955 : */
956 2 : if (PQenterPipelineMode(conn) != 1)
957 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
958 :
959 8 : while (send_step != BI_PREPARE)
960 : {
961 : const char *sql;
962 :
963 6 : switch (send_step)
964 : {
965 2 : case BI_BEGIN_TX:
966 2 : sql = "BEGIN TRANSACTION";
967 2 : send_step = BI_DROP_TABLE;
968 2 : break;
969 :
970 2 : case BI_DROP_TABLE:
971 2 : sql = drop_table_sql;
972 2 : send_step = BI_CREATE_TABLE;
973 2 : break;
974 :
975 2 : case BI_CREATE_TABLE:
976 2 : sql = create_table_sql;
977 2 : send_step = BI_PREPARE;
978 2 : break;
979 :
980 0 : default:
981 0 : pg_fatal("invalid state");
982 : sql = NULL; /* keep compiler quiet */
983 : }
984 :
985 : pg_debug("sending: %s\n", sql);
986 6 : if (PQsendQueryParams(conn, sql,
987 : 0, NULL, NULL, NULL, NULL, 0) != 1)
988 0 : pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
989 : }
990 :
991 : Assert(send_step == BI_PREPARE);
992 : pg_debug("sending: %s\n", insert_sql2);
993 2 : if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
994 0 : pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
995 2 : send_step = BI_INSERT_ROWS;
996 :
997 : /*
998 : * Now we start inserting. We'll be sending enough data that we could fill
999 : * our output buffer, so to avoid deadlocking we need to enter nonblocking
1000 : * mode and consume input while we send more output. As results of each
1001 : * query are processed we should pop them to allow processing of the next
1002 : * query. There's no need to finish the pipeline before processing
1003 : * results.
1004 : */
1005 2 : if (PQsetnonblocking(conn, 1) != 0)
1006 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1007 :
1008 25114 : while (recv_step != BI_DONE)
1009 : {
1010 : int sock;
1011 : fd_set input_mask;
1012 : fd_set output_mask;
1013 :
1014 25112 : sock = PQsocket(conn);
1015 :
1016 25112 : if (sock < 0)
1017 0 : break; /* shouldn't happen */
1018 :
1019 426904 : FD_ZERO(&input_mask);
1020 25112 : FD_SET(sock, &input_mask);
1021 426904 : FD_ZERO(&output_mask);
1022 25112 : FD_SET(sock, &output_mask);
1023 :
1024 25112 : if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1025 : {
1026 0 : fprintf(stderr, "select() failed: %m\n");
1027 0 : exit_nicely(conn);
1028 : }
1029 :
1030 : /*
1031 : * Process any results, so we keep the server's output buffer free
1032 : * flowing and it can continue to process input
1033 : */
1034 25112 : if (FD_ISSET(sock, &input_mask))
1035 : {
1036 6 : PQconsumeInput(conn);
1037 :
1038 : /* Read until we'd block if we tried to read */
1039 2828 : while (!PQisBusy(conn) && recv_step < BI_DONE)
1040 : {
1041 : PGresult *res;
1042 2822 : const char *cmdtag = "";
1043 2822 : const char *description = "";
1044 : int status;
1045 :
1046 : /*
1047 : * Read next result. If no more results from this query,
1048 : * advance to the next query
1049 : */
1050 2822 : res = PQgetResult(conn);
1051 2822 : if (res == NULL)
1052 1410 : continue;
1053 :
1054 1412 : status = PGRES_COMMAND_OK;
1055 1412 : switch (recv_step)
1056 : {
1057 2 : case BI_BEGIN_TX:
1058 2 : cmdtag = "BEGIN";
1059 2 : recv_step++;
1060 2 : break;
1061 2 : case BI_DROP_TABLE:
1062 2 : cmdtag = "DROP TABLE";
1063 2 : recv_step++;
1064 2 : break;
1065 2 : case BI_CREATE_TABLE:
1066 2 : cmdtag = "CREATE TABLE";
1067 2 : recv_step++;
1068 2 : break;
1069 2 : case BI_PREPARE:
1070 2 : cmdtag = "";
1071 2 : description = "PREPARE";
1072 2 : recv_step++;
1073 2 : break;
1074 1400 : case BI_INSERT_ROWS:
1075 1400 : cmdtag = "INSERT";
1076 1400 : rows_to_receive--;
1077 1400 : if (rows_to_receive == 0)
1078 2 : recv_step++;
1079 1400 : break;
1080 2 : case BI_COMMIT_TX:
1081 2 : cmdtag = "COMMIT";
1082 2 : recv_step++;
1083 2 : break;
1084 2 : case BI_SYNC:
1085 2 : cmdtag = "";
1086 2 : description = "SYNC";
1087 2 : status = PGRES_PIPELINE_SYNC;
1088 2 : recv_step++;
1089 2 : break;
1090 0 : case BI_DONE:
1091 : /* unreachable */
1092 0 : pg_fatal("unreachable state");
1093 : }
1094 :
1095 1412 : if (PQresultStatus(res) != status)
1096 0 : pg_fatal("%s reported status %s, expected %s\n"
1097 : "Error message: \"%s\"",
1098 : description, PQresStatus(PQresultStatus(res)),
1099 : PQresStatus(status), PQerrorMessage(conn));
1100 :
1101 1412 : if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1102 0 : pg_fatal("%s expected command tag '%s', got '%s'",
1103 : description, cmdtag, PQcmdStatus(res));
1104 :
1105 : pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1106 :
1107 1412 : PQclear(res);
1108 : }
1109 : }
1110 :
1111 : /* Write more rows and/or the end pipeline message, if needed */
1112 25112 : if (FD_ISSET(sock, &output_mask))
1113 : {
1114 25108 : PQflush(conn);
1115 :
1116 25108 : if (send_step == BI_INSERT_ROWS)
1117 : {
1118 1400 : snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1119 : /* use up some buffer space with a wide value */
1120 1400 : snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1121 :
1122 1400 : if (PQsendQueryPrepared(conn, "my_insert",
1123 : 2, insert_params, NULL, NULL, 0) == 1)
1124 : {
1125 : pg_debug("sent row %d\n", rows_to_send);
1126 :
1127 1400 : rows_to_send--;
1128 1400 : if (rows_to_send == 0)
1129 2 : send_step++;
1130 : }
1131 : else
1132 : {
1133 : /*
1134 : * in nonblocking mode, so it's OK for an insert to fail
1135 : * to send
1136 : */
1137 0 : fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1138 : rows_to_send, PQerrorMessage(conn));
1139 : }
1140 : }
1141 23708 : else if (send_step == BI_COMMIT_TX)
1142 : {
1143 2 : if (PQsendQueryParams(conn, "COMMIT",
1144 : 0, NULL, NULL, NULL, NULL, 0) == 1)
1145 : {
1146 : pg_debug("sent COMMIT\n");
1147 2 : send_step++;
1148 : }
1149 : else
1150 : {
1151 0 : fprintf(stderr, "WARNING: failed to send commit: %s\n",
1152 : PQerrorMessage(conn));
1153 : }
1154 : }
1155 23706 : else if (send_step == BI_SYNC)
1156 : {
1157 2 : if (PQpipelineSync(conn) == 1)
1158 : {
1159 2 : fprintf(stdout, "pipeline sync sent\n");
1160 2 : send_step++;
1161 : }
1162 : else
1163 : {
1164 0 : fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1165 : PQerrorMessage(conn));
1166 : }
1167 : }
1168 : }
1169 : }
1170 :
1171 : /* We've got the sync message and the pipeline should be done */
1172 2 : if (PQexitPipelineMode(conn) != 1)
1173 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1174 : PQerrorMessage(conn));
1175 :
1176 2 : if (PQsetnonblocking(conn, 0) != 0)
1177 0 : pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1178 :
1179 2 : fprintf(stderr, "ok\n");
1180 2 : }
1181 :
1182 : static void
1183 2 : test_prepared(PGconn *conn)
1184 : {
1185 2 : PGresult *res = NULL;
1186 2 : Oid param_oids[1] = {INT4OID};
1187 : Oid expected_oids[4];
1188 : Oid typ;
1189 :
1190 2 : fprintf(stderr, "prepared... ");
1191 :
1192 2 : if (PQenterPipelineMode(conn) != 1)
1193 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1194 2 : if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1195 : "interval '1 sec'",
1196 : 1, param_oids) != 1)
1197 0 : pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1198 2 : expected_oids[0] = INT4OID;
1199 2 : expected_oids[1] = TEXTOID;
1200 2 : expected_oids[2] = NUMERICOID;
1201 2 : expected_oids[3] = INTERVALOID;
1202 2 : if (PQsendDescribePrepared(conn, "select_one") != 1)
1203 0 : pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1204 2 : if (PQpipelineSync(conn) != 1)
1205 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1206 :
1207 2 : consume_result_status(conn, PGRES_COMMAND_OK);
1208 :
1209 2 : consume_null_result(conn);
1210 :
1211 2 : res = confirm_result_status(conn, PGRES_COMMAND_OK);
1212 2 : if (PQnfields(res) != lengthof(expected_oids))
1213 0 : pg_fatal("expected %zu columns, got %d",
1214 : lengthof(expected_oids), PQnfields(res));
1215 10 : for (int i = 0; i < PQnfields(res); i++)
1216 : {
1217 8 : typ = PQftype(res, i);
1218 8 : if (typ != expected_oids[i])
1219 0 : pg_fatal("field %d: expected type %u, got %u",
1220 : i, expected_oids[i], typ);
1221 : }
1222 2 : PQclear(res);
1223 :
1224 2 : consume_null_result(conn);
1225 :
1226 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
1227 :
1228 2 : fprintf(stderr, "closing statement..");
1229 2 : if (PQsendClosePrepared(conn, "select_one") != 1)
1230 0 : pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1231 2 : if (PQpipelineSync(conn) != 1)
1232 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1233 :
1234 2 : consume_result_status(conn, PGRES_COMMAND_OK);
1235 :
1236 2 : consume_null_result(conn);
1237 :
1238 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
1239 :
1240 2 : if (PQexitPipelineMode(conn) != 1)
1241 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1242 :
1243 : /* Now that it's closed we should get an error when describing */
1244 2 : res = PQdescribePrepared(conn, "select_one");
1245 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1246 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1247 2 : PQclear(res);
1248 :
1249 : /*
1250 : * Also test the blocking close, this should not fail since closing a
1251 : * non-existent prepared statement is a no-op
1252 : */
1253 2 : res = PQclosePrepared(conn, "select_one");
1254 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1255 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1256 2 : PQclear(res);
1257 :
1258 2 : fprintf(stderr, "creating portal... ");
1259 :
1260 2 : res = PQexec(conn, "BEGIN");
1261 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1262 0 : pg_fatal("BEGIN failed: %s", PQerrorMessage(conn));
1263 2 : PQclear(res);
1264 :
1265 2 : res = PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1266 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1267 0 : pg_fatal("DECLARE CURSOR failed: %s", PQerrorMessage(conn));
1268 2 : PQclear(res);
1269 :
1270 2 : PQenterPipelineMode(conn);
1271 2 : if (PQsendDescribePortal(conn, "cursor_one") != 1)
1272 0 : pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1273 2 : if (PQpipelineSync(conn) != 1)
1274 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1275 :
1276 2 : res = confirm_result_status(conn, PGRES_COMMAND_OK);
1277 2 : typ = PQftype(res, 0);
1278 2 : if (typ != INT4OID)
1279 0 : pg_fatal("portal: expected type %u, got %u",
1280 : INT4OID, typ);
1281 2 : PQclear(res);
1282 :
1283 2 : consume_null_result(conn);
1284 :
1285 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
1286 :
1287 2 : fprintf(stderr, "closing portal... ");
1288 2 : if (PQsendClosePortal(conn, "cursor_one") != 1)
1289 0 : pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1290 2 : if (PQpipelineSync(conn) != 1)
1291 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1292 :
1293 2 : consume_result_status(conn, PGRES_COMMAND_OK);
1294 :
1295 2 : consume_null_result(conn);
1296 :
1297 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
1298 :
1299 2 : if (PQexitPipelineMode(conn) != 1)
1300 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1301 :
1302 : /* Now that it's closed we should get an error when describing */
1303 2 : res = PQdescribePortal(conn, "cursor_one");
1304 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1305 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1306 2 : PQclear(res);
1307 :
1308 : /*
1309 : * Also test the blocking close, this should not fail since closing a
1310 : * non-existent portal is a no-op
1311 : */
1312 2 : res = PQclosePortal(conn, "cursor_one");
1313 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1314 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1315 2 : PQclear(res);
1316 :
1317 2 : fprintf(stderr, "ok\n");
1318 2 : }
1319 :
1320 : /*
1321 : * Test max_protocol_version options.
1322 : */
1323 : static void
1324 2 : test_protocol_version(PGconn *conn)
1325 : {
1326 : const char **keywords;
1327 : const char **vals;
1328 : int nopts;
1329 2 : PQconninfoOption *opts = PQconninfo(conn);
1330 : int protocol_version;
1331 : int max_protocol_version_index;
1332 : int i;
1333 :
1334 : /*
1335 : * Prepare keywords/vals arrays, copied from the existing connection, with
1336 : * an extra slot for 'max_protocol_version'.
1337 : */
1338 2 : nopts = 0;
1339 104 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
1340 102 : nopts++;
1341 2 : nopts++; /* max_protocol_version */
1342 2 : nopts++; /* NULL terminator */
1343 :
1344 2 : keywords = pg_malloc0(sizeof(char *) * nopts);
1345 2 : vals = pg_malloc0(sizeof(char *) * nopts);
1346 :
1347 2 : i = 0;
1348 104 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
1349 : {
1350 102 : if (opt->val)
1351 : {
1352 40 : keywords[i] = opt->keyword;
1353 40 : vals[i] = opt->val;
1354 40 : i++;
1355 : }
1356 : }
1357 :
1358 2 : max_protocol_version_index = i;
1359 2 : keywords[i] = "max_protocol_version"; /* value is filled in below */
1360 2 : i++;
1361 2 : keywords[i] = vals[i] = NULL;
1362 :
1363 : /*
1364 : * Test max_protocol_version=3.0
1365 : */
1366 2 : vals[max_protocol_version_index] = "3.0";
1367 2 : conn = PQconnectdbParams(keywords, vals, false);
1368 :
1369 2 : if (PQstatus(conn) != CONNECTION_OK)
1370 0 : pg_fatal("Connection to database failed: %s",
1371 : PQerrorMessage(conn));
1372 :
1373 2 : protocol_version = PQfullProtocolVersion(conn);
1374 2 : if (protocol_version != 30000)
1375 0 : pg_fatal("expected 30000, got %d", protocol_version);
1376 :
1377 2 : PQfinish(conn);
1378 :
1379 : /*
1380 : * Test max_protocol_version=3.1. It's not valid, we went straight from
1381 : * 3.0 to 3.2.
1382 : */
1383 2 : vals[max_protocol_version_index] = "3.1";
1384 2 : conn = PQconnectdbParams(keywords, vals, false);
1385 :
1386 2 : if (PQstatus(conn) != CONNECTION_BAD)
1387 0 : pg_fatal("Connecting with max_protocol_version 3.1 should have failed.");
1388 :
1389 2 : PQfinish(conn);
1390 :
1391 : /*
1392 : * Test max_protocol_version=3.2
1393 : */
1394 2 : vals[max_protocol_version_index] = "3.2";
1395 2 : conn = PQconnectdbParams(keywords, vals, false);
1396 :
1397 2 : if (PQstatus(conn) != CONNECTION_OK)
1398 0 : pg_fatal("Connection to database failed: %s",
1399 : PQerrorMessage(conn));
1400 :
1401 2 : protocol_version = PQfullProtocolVersion(conn);
1402 2 : if (protocol_version != 30002)
1403 0 : pg_fatal("expected 30002, got %d", protocol_version);
1404 :
1405 2 : PQfinish(conn);
1406 :
1407 : /*
1408 : * Test max_protocol_version=latest. 'latest' currently means '3.2'.
1409 : */
1410 2 : vals[max_protocol_version_index] = "latest";
1411 2 : conn = PQconnectdbParams(keywords, vals, false);
1412 :
1413 2 : if (PQstatus(conn) != CONNECTION_OK)
1414 0 : pg_fatal("Connection to database failed: %s",
1415 : PQerrorMessage(conn));
1416 :
1417 2 : protocol_version = PQfullProtocolVersion(conn);
1418 2 : if (protocol_version != 30002)
1419 0 : pg_fatal("expected 30002, got %d", protocol_version);
1420 :
1421 2 : PQfinish(conn);
1422 :
1423 2 : pfree(keywords);
1424 2 : pfree(vals);
1425 2 : PQconninfoFree(opts);
1426 2 : }
1427 :
1428 : /* Notice processor: print notices, and count how many we got */
1429 : static void
1430 2 : notice_processor(void *arg, const char *message)
1431 : {
1432 2 : int *n_notices = (int *) arg;
1433 :
1434 2 : (*n_notices)++;
1435 2 : fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1436 2 : }
1437 :
1438 : /* Verify behavior in "idle" state */
1439 : static void
1440 2 : test_pipeline_idle(PGconn *conn)
1441 : {
1442 2 : int n_notices = 0;
1443 :
1444 2 : fprintf(stderr, "\npipeline idle...\n");
1445 :
1446 2 : PQsetNoticeProcessor(conn, notice_processor, &n_notices);
1447 :
1448 : /* Try to exit pipeline mode in pipeline-idle state */
1449 2 : if (PQenterPipelineMode(conn) != 1)
1450 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1451 2 : if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1452 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1453 2 : PQsendFlushRequest(conn);
1454 :
1455 2 : consume_result_status(conn, PGRES_TUPLES_OK);
1456 :
1457 2 : consume_null_result(conn);
1458 :
1459 2 : if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1460 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1461 2 : if (PQexitPipelineMode(conn) == 1)
1462 0 : pg_fatal("exiting pipeline succeeded when it shouldn't");
1463 2 : if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1464 : strlen("cannot exit pipeline mode")) != 0)
1465 0 : pg_fatal("did not get expected error; got: %s",
1466 : PQerrorMessage(conn));
1467 2 : PQsendFlushRequest(conn);
1468 :
1469 2 : consume_result_status(conn, PGRES_TUPLES_OK);
1470 :
1471 2 : consume_null_result(conn);
1472 :
1473 2 : if (PQexitPipelineMode(conn) != 1)
1474 0 : pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1475 :
1476 2 : if (n_notices > 0)
1477 0 : pg_fatal("got %d notice(s)", n_notices);
1478 2 : fprintf(stderr, "ok - 1\n");
1479 :
1480 : /* Have a WARNING in the middle of a resultset */
1481 2 : if (PQenterPipelineMode(conn) != 1)
1482 0 : pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1483 2 : if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1484 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1485 2 : PQsendFlushRequest(conn);
1486 :
1487 2 : consume_result_status(conn, PGRES_TUPLES_OK);
1488 :
1489 2 : if (PQexitPipelineMode(conn) != 1)
1490 0 : pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1491 2 : fprintf(stderr, "ok - 2\n");
1492 2 : }
1493 :
1494 : static void
1495 2 : test_simple_pipeline(PGconn *conn)
1496 : {
1497 2 : const char *dummy_params[1] = {"1"};
1498 2 : Oid dummy_param_oids[1] = {INT4OID};
1499 :
1500 2 : fprintf(stderr, "simple pipeline... ");
1501 :
1502 : /*
1503 : * Enter pipeline mode and dispatch a set of operations, which we'll then
1504 : * process the results of as they come in.
1505 : *
1506 : * For a simple case we should be able to do this without interim
1507 : * processing of results since our output buffer will give us enough slush
1508 : * to work with and we won't block on sending. So blocking mode is fine.
1509 : */
1510 2 : if (PQisnonblocking(conn))
1511 0 : pg_fatal("Expected blocking connection mode");
1512 :
1513 2 : if (PQenterPipelineMode(conn) != 1)
1514 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1515 :
1516 2 : if (PQsendQueryParams(conn, "SELECT $1",
1517 : 1, dummy_param_oids, dummy_params,
1518 : NULL, NULL, 0) != 1)
1519 0 : pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1520 :
1521 2 : if (PQexitPipelineMode(conn) != 0)
1522 0 : pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1523 :
1524 2 : if (PQpipelineSync(conn) != 1)
1525 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1526 :
1527 2 : consume_result_status(conn, PGRES_TUPLES_OK);
1528 :
1529 2 : consume_null_result(conn);
1530 :
1531 : /*
1532 : * Even though we've processed the result there's still a sync to come and
1533 : * we can't exit pipeline mode yet
1534 : */
1535 2 : if (PQexitPipelineMode(conn) != 0)
1536 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1537 :
1538 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
1539 :
1540 2 : consume_null_result(conn);
1541 :
1542 : /* We're still in pipeline mode... */
1543 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1544 0 : pg_fatal("Fell out of pipeline mode somehow");
1545 :
1546 : /* ... until we end it, which we can safely do now */
1547 2 : if (PQexitPipelineMode(conn) != 1)
1548 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1549 : PQerrorMessage(conn));
1550 :
1551 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1552 0 : pg_fatal("Exiting pipeline mode didn't seem to work");
1553 :
1554 2 : fprintf(stderr, "ok\n");
1555 2 : }
1556 :
1557 : static void
1558 2 : test_singlerowmode(PGconn *conn)
1559 : {
1560 : PGresult *res;
1561 : int i;
1562 2 : bool pipeline_ended = false;
1563 :
1564 2 : if (PQenterPipelineMode(conn) != 1)
1565 0 : pg_fatal("failed to enter pipeline mode: %s",
1566 : PQerrorMessage(conn));
1567 :
1568 : /* One series of three commands, using single-row mode for the first two. */
1569 8 : for (i = 0; i < 3; i++)
1570 : {
1571 : char *param[1];
1572 :
1573 6 : param[0] = psprintf("%d", 44 + i);
1574 :
1575 6 : if (PQsendQueryParams(conn,
1576 : "SELECT generate_series(42, $1)",
1577 : 1,
1578 : NULL,
1579 : (const char **) param,
1580 : NULL,
1581 : NULL,
1582 : 0) != 1)
1583 0 : pg_fatal("failed to send query: %s",
1584 : PQerrorMessage(conn));
1585 6 : pfree(param[0]);
1586 : }
1587 2 : if (PQpipelineSync(conn) != 1)
1588 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1589 :
1590 10 : for (i = 0; !pipeline_ended; i++)
1591 : {
1592 8 : bool first = true;
1593 : bool saw_ending_tuplesok;
1594 8 : bool isSingleTuple = false;
1595 :
1596 : /* Set single row mode for only first 2 SELECT queries */
1597 8 : if (i < 2)
1598 : {
1599 4 : if (PQsetSingleRowMode(conn) != 1)
1600 0 : pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1601 : }
1602 :
1603 : /* Consume rows for this query */
1604 8 : saw_ending_tuplesok = false;
1605 28 : while ((res = PQgetResult(conn)) != NULL)
1606 : {
1607 22 : ExecStatusType est = PQresultStatus(res);
1608 :
1609 22 : if (est == PGRES_PIPELINE_SYNC)
1610 : {
1611 2 : fprintf(stderr, "end of pipeline reached\n");
1612 2 : pipeline_ended = true;
1613 2 : PQclear(res);
1614 2 : if (i != 3)
1615 0 : pg_fatal("Expected three results, got %d", i);
1616 2 : break;
1617 : }
1618 :
1619 : /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1620 20 : if (first)
1621 : {
1622 6 : if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1623 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1624 : i, PQresStatus(est));
1625 6 : if (i >= 2 && est != PGRES_TUPLES_OK)
1626 0 : pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1627 : i, PQresStatus(est));
1628 6 : first = false;
1629 : }
1630 :
1631 20 : fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1632 20 : switch (est)
1633 : {
1634 6 : case PGRES_TUPLES_OK:
1635 6 : fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1636 6 : saw_ending_tuplesok = true;
1637 6 : if (isSingleTuple)
1638 : {
1639 4 : if (PQntuples(res) == 0)
1640 4 : fprintf(stderr, "all tuples received in query %d\n", i);
1641 : else
1642 0 : pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1643 : }
1644 6 : break;
1645 :
1646 14 : case PGRES_SINGLE_TUPLE:
1647 14 : isSingleTuple = true;
1648 14 : fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1649 14 : break;
1650 :
1651 0 : default:
1652 0 : pg_fatal("unexpected");
1653 : }
1654 20 : PQclear(res);
1655 : }
1656 8 : if (!pipeline_ended && !saw_ending_tuplesok)
1657 0 : pg_fatal("didn't get expected terminating TUPLES_OK");
1658 : }
1659 :
1660 : /*
1661 : * Now issue one command, get its results in with single-row mode, then
1662 : * issue another command, and get its results in normal mode; make sure
1663 : * the single-row mode flag is reset as expected.
1664 : */
1665 2 : if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1666 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1667 0 : pg_fatal("failed to send query: %s",
1668 : PQerrorMessage(conn));
1669 2 : if (PQsendFlushRequest(conn) != 1)
1670 0 : pg_fatal("failed to send flush request");
1671 2 : if (PQsetSingleRowMode(conn) != 1)
1672 0 : pg_fatal("PQsetSingleRowMode() failed");
1673 :
1674 2 : consume_result_status(conn, PGRES_SINGLE_TUPLE);
1675 :
1676 2 : consume_result_status(conn, PGRES_TUPLES_OK);
1677 :
1678 2 : consume_null_result(conn);
1679 :
1680 2 : if (PQsendQueryParams(conn, "SELECT 1",
1681 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1682 0 : pg_fatal("failed to send query: %s",
1683 : PQerrorMessage(conn));
1684 2 : if (PQsendFlushRequest(conn) != 1)
1685 0 : pg_fatal("failed to send flush request");
1686 :
1687 2 : consume_result_status(conn, PGRES_TUPLES_OK);
1688 :
1689 2 : consume_null_result(conn);
1690 :
1691 : /*
1692 : * Try chunked mode as well; make sure that it correctly delivers a
1693 : * partial final chunk.
1694 : */
1695 2 : if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1696 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1697 0 : pg_fatal("failed to send query: %s",
1698 : PQerrorMessage(conn));
1699 2 : if (PQsendFlushRequest(conn) != 1)
1700 0 : pg_fatal("failed to send flush request");
1701 2 : if (PQsetChunkedRowsMode(conn, 3) != 1)
1702 0 : pg_fatal("PQsetChunkedRowsMode() failed");
1703 :
1704 2 : res = confirm_result_status(conn, PGRES_TUPLES_CHUNK);
1705 2 : if (PQntuples(res) != 3)
1706 0 : pg_fatal("Expected 3 rows, got %d", PQntuples(res));
1707 2 : PQclear(res);
1708 :
1709 2 : res = confirm_result_status(conn, PGRES_TUPLES_CHUNK);
1710 2 : if (PQntuples(res) != 2)
1711 0 : pg_fatal("Expected 2 rows, got %d", PQntuples(res));
1712 2 : PQclear(res);
1713 :
1714 2 : res = confirm_result_status(conn, PGRES_TUPLES_OK);
1715 2 : if (PQntuples(res) != 0)
1716 0 : pg_fatal("Expected 0 rows, got %d", PQntuples(res));
1717 2 : PQclear(res);
1718 :
1719 2 : consume_null_result(conn);
1720 :
1721 2 : if (PQexitPipelineMode(conn) != 1)
1722 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1723 :
1724 2 : fprintf(stderr, "ok\n");
1725 2 : }
1726 :
1727 : /*
1728 : * Simple test to verify that a pipeline is discarded as a whole when there's
1729 : * an error, ignoring transaction commands.
1730 : */
1731 : static void
1732 2 : test_transaction(PGconn *conn)
1733 : {
1734 : PGresult *res;
1735 : bool expect_null;
1736 2 : int num_syncs = 0;
1737 :
1738 2 : res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1739 : "CREATE TABLE pq_pipeline_tst (id int)");
1740 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1741 0 : pg_fatal("failed to create test table: %s",
1742 : PQerrorMessage(conn));
1743 2 : PQclear(res);
1744 :
1745 2 : if (PQenterPipelineMode(conn) != 1)
1746 0 : pg_fatal("failed to enter pipeline mode: %s",
1747 : PQerrorMessage(conn));
1748 2 : if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1749 0 : pg_fatal("could not send prepare on pipeline: %s",
1750 : PQerrorMessage(conn));
1751 :
1752 2 : if (PQsendQueryParams(conn,
1753 : "BEGIN",
1754 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1755 0 : pg_fatal("failed to send query: %s",
1756 : PQerrorMessage(conn));
1757 2 : if (PQsendQueryParams(conn,
1758 : "SELECT 0/0",
1759 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1760 0 : pg_fatal("failed to send query: %s",
1761 : PQerrorMessage(conn));
1762 :
1763 : /*
1764 : * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1765 : * get out of the pipeline-aborted state first.
1766 : */
1767 2 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1768 0 : pg_fatal("failed to execute prepared: %s",
1769 : PQerrorMessage(conn));
1770 :
1771 : /* This insert fails because we're in pipeline-aborted state */
1772 2 : if (PQsendQueryParams(conn,
1773 : "INSERT INTO pq_pipeline_tst VALUES (1)",
1774 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1775 0 : pg_fatal("failed to send query: %s",
1776 : PQerrorMessage(conn));
1777 2 : if (PQpipelineSync(conn) != 1)
1778 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1779 2 : num_syncs++;
1780 :
1781 : /*
1782 : * This insert fails even though the pipeline got a SYNC, because we're in
1783 : * an aborted transaction
1784 : */
1785 2 : if (PQsendQueryParams(conn,
1786 : "INSERT INTO pq_pipeline_tst VALUES (2)",
1787 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1788 0 : pg_fatal("failed to send query: %s",
1789 : PQerrorMessage(conn));
1790 2 : if (PQpipelineSync(conn) != 1)
1791 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1792 2 : num_syncs++;
1793 :
1794 : /*
1795 : * Send ROLLBACK using prepared stmt. This one works because we just did
1796 : * PQpipelineSync above.
1797 : */
1798 2 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1799 0 : pg_fatal("failed to execute prepared: %s",
1800 : PQerrorMessage(conn));
1801 :
1802 : /*
1803 : * Now that we're out of a transaction and in pipeline-good mode, this
1804 : * insert works
1805 : */
1806 2 : if (PQsendQueryParams(conn,
1807 : "INSERT INTO pq_pipeline_tst VALUES (3)",
1808 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1809 0 : pg_fatal("failed to send query: %s",
1810 : PQerrorMessage(conn));
1811 : /* Send two syncs now -- match up to SYNC messages below */
1812 2 : if (PQpipelineSync(conn) != 1)
1813 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1814 2 : num_syncs++;
1815 2 : if (PQpipelineSync(conn) != 1)
1816 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1817 2 : num_syncs++;
1818 :
1819 2 : expect_null = false;
1820 2 : for (int i = 0;; i++)
1821 38 : {
1822 : ExecStatusType restype;
1823 :
1824 40 : res = PQgetResult(conn);
1825 40 : if (res == NULL)
1826 : {
1827 16 : printf("%d: got NULL result\n", i);
1828 16 : if (!expect_null)
1829 0 : pg_fatal("did not expect NULL here");
1830 16 : expect_null = false;
1831 16 : continue;
1832 : }
1833 24 : restype = PQresultStatus(res);
1834 24 : printf("%d: got status %s", i, PQresStatus(restype));
1835 24 : if (expect_null)
1836 0 : pg_fatal("expected NULL");
1837 24 : if (restype == PGRES_FATAL_ERROR)
1838 4 : printf("; error: %s", PQerrorMessage(conn));
1839 20 : else if (restype == PGRES_PIPELINE_ABORTED)
1840 : {
1841 4 : printf(": command didn't run because pipeline aborted\n");
1842 : }
1843 : else
1844 16 : printf("\n");
1845 24 : PQclear(res);
1846 :
1847 24 : if (restype == PGRES_PIPELINE_SYNC)
1848 8 : num_syncs--;
1849 : else
1850 16 : expect_null = true;
1851 24 : if (num_syncs <= 0)
1852 2 : break;
1853 : }
1854 :
1855 2 : consume_null_result(conn);
1856 :
1857 2 : if (PQexitPipelineMode(conn) != 1)
1858 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1859 :
1860 : /* We expect to find one tuple containing the value "3" */
1861 2 : res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1862 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1863 0 : pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1864 2 : if (PQntuples(res) != 1)
1865 0 : pg_fatal("did not get 1 tuple");
1866 2 : if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1867 0 : pg_fatal("did not get expected tuple");
1868 2 : PQclear(res);
1869 :
1870 2 : fprintf(stderr, "ok\n");
1871 2 : }
1872 :
1873 : /*
1874 : * In this test mode we send a stream of queries, with one in the middle
1875 : * causing an error. Verify that we can still send some more after the
1876 : * error and have libpq work properly.
1877 : */
1878 : static void
1879 2 : test_uniqviol(PGconn *conn)
1880 : {
1881 2 : int sock = PQsocket(conn);
1882 : PGresult *res;
1883 2 : Oid paramTypes[2] = {INT8OID, INT8OID};
1884 : const char *paramValues[2];
1885 : char paramValue0[MAXINT8LEN];
1886 : char paramValue1[MAXINT8LEN];
1887 2 : int ctr = 0;
1888 2 : int numsent = 0;
1889 2 : int results = 0;
1890 2 : bool read_done = false;
1891 2 : bool write_done = false;
1892 2 : bool error_sent = false;
1893 2 : bool got_error = false;
1894 2 : int switched = 0;
1895 2 : int socketful = 0;
1896 : fd_set in_fds;
1897 : fd_set out_fds;
1898 :
1899 2 : fprintf(stderr, "uniqviol ...");
1900 :
1901 2 : PQsetnonblocking(conn, 1);
1902 :
1903 2 : paramValues[0] = paramValue0;
1904 2 : paramValues[1] = paramValue1;
1905 2 : sprintf(paramValue1, "42");
1906 :
1907 2 : res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1908 : "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1909 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1910 0 : pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1911 2 : PQclear(res);
1912 :
1913 2 : res = PQexec(conn, "begin");
1914 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1915 0 : pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1916 2 : PQclear(res);
1917 :
1918 2 : res = PQprepare(conn, "insertion",
1919 : "insert into ppln_uniqviol values ($1, $2) returning id",
1920 : 2, paramTypes);
1921 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1922 0 : pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1923 2 : PQclear(res);
1924 :
1925 2 : if (PQenterPipelineMode(conn) != 1)
1926 0 : pg_fatal("failed to enter pipeline mode");
1927 :
1928 16 : while (!read_done)
1929 : {
1930 : /*
1931 : * Avoid deadlocks by reading everything the server has sent before
1932 : * sending anything. (Special precaution is needed here to process
1933 : * PQisBusy before testing the socket for read-readiness, because the
1934 : * socket does not turn read-ready after "sending" queries in aborted
1935 : * pipeline mode.)
1936 : */
1937 1590 : while (PQisBusy(conn) == 0)
1938 : {
1939 : bool new_error;
1940 :
1941 1578 : if (results >= numsent)
1942 : {
1943 2 : if (write_done)
1944 0 : read_done = true;
1945 2 : break;
1946 : }
1947 :
1948 1576 : res = PQgetResult(conn);
1949 1576 : new_error = process_result(conn, res, results, numsent);
1950 1576 : if (new_error && got_error)
1951 0 : pg_fatal("got two errors");
1952 1576 : got_error |= new_error;
1953 1576 : if (results++ >= numsent - 1)
1954 : {
1955 2 : if (write_done)
1956 2 : read_done = true;
1957 2 : break;
1958 : }
1959 : }
1960 :
1961 16 : if (read_done)
1962 2 : break;
1963 :
1964 238 : FD_ZERO(&out_fds);
1965 14 : FD_SET(sock, &out_fds);
1966 :
1967 238 : FD_ZERO(&in_fds);
1968 14 : FD_SET(sock, &in_fds);
1969 :
1970 14 : if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1971 : {
1972 0 : if (errno == EINTR)
1973 0 : continue;
1974 0 : pg_fatal("select() failed: %m");
1975 : }
1976 :
1977 14 : if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
1978 0 : pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
1979 :
1980 : /*
1981 : * If the socket is writable and we haven't finished sending queries,
1982 : * send some.
1983 : */
1984 14 : if (!write_done && FD_ISSET(sock, &out_fds))
1985 : {
1986 : for (;;)
1987 1570 : {
1988 : int flush;
1989 :
1990 : /*
1991 : * provoke uniqueness violation exactly once after having
1992 : * switched to read mode.
1993 : */
1994 1576 : if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
1995 : {
1996 2 : sprintf(paramValue0, "%d", numsent / 2);
1997 2 : fprintf(stderr, "E");
1998 2 : error_sent = true;
1999 : }
2000 : else
2001 : {
2002 1574 : fprintf(stderr, ".");
2003 1574 : sprintf(paramValue0, "%d", ctr++);
2004 : }
2005 :
2006 1576 : if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
2007 0 : pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
2008 1576 : numsent++;
2009 :
2010 : /* Are we done writing? */
2011 1576 : if (socketful != 0 && numsent % socketful == 42 && error_sent)
2012 : {
2013 2 : if (PQsendFlushRequest(conn) != 1)
2014 0 : pg_fatal("failed to send flush request");
2015 2 : write_done = true;
2016 2 : fprintf(stderr, "\ndone writing\n");
2017 2 : PQflush(conn);
2018 2 : break;
2019 : }
2020 :
2021 : /* is the outgoing socket full? */
2022 1574 : flush = PQflush(conn);
2023 1574 : if (flush == -1)
2024 0 : pg_fatal("failed to flush: %s", PQerrorMessage(conn));
2025 1574 : if (flush == 1)
2026 : {
2027 4 : if (socketful == 0)
2028 2 : socketful = numsent;
2029 4 : fprintf(stderr, "\nswitch to reading\n");
2030 4 : switched++;
2031 4 : break;
2032 : }
2033 : }
2034 : }
2035 : }
2036 :
2037 2 : if (!got_error)
2038 0 : pg_fatal("did not get expected error");
2039 :
2040 2 : fprintf(stderr, "ok\n");
2041 2 : }
2042 :
2043 : /*
2044 : * Subroutine for test_uniqviol; given a PGresult, print it out and consume
2045 : * the expected NULL that should follow it.
2046 : *
2047 : * Returns true if we read a fatal error message, otherwise false.
2048 : */
2049 : static bool
2050 1576 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
2051 : {
2052 1576 : bool got_error = false;
2053 :
2054 1576 : if (res == NULL)
2055 0 : pg_fatal("got unexpected NULL");
2056 :
2057 1576 : switch (PQresultStatus(res))
2058 : {
2059 2 : case PGRES_FATAL_ERROR:
2060 2 : got_error = true;
2061 2 : fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2062 2 : PQclear(res);
2063 2 : consume_null_result(conn);
2064 2 : break;
2065 :
2066 1118 : case PGRES_TUPLES_OK:
2067 1118 : fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2068 1118 : PQclear(res);
2069 1118 : consume_null_result(conn);
2070 1118 : break;
2071 :
2072 456 : case PGRES_PIPELINE_ABORTED:
2073 456 : fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2074 456 : PQclear(res);
2075 456 : consume_null_result(conn);
2076 456 : break;
2077 :
2078 0 : default:
2079 0 : pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2080 : }
2081 :
2082 1576 : return got_error;
2083 : }
2084 :
2085 :
2086 : static void
2087 0 : usage(const char *progname)
2088 : {
2089 0 : fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2090 0 : fprintf(stderr, "Usage:\n");
2091 0 : fprintf(stderr, " %s [OPTION] tests\n", progname);
2092 0 : fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2093 0 : fprintf(stderr, "\nOptions:\n");
2094 0 : fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2095 0 : fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2096 0 : }
2097 :
2098 : static void
2099 2 : print_test_list(void)
2100 : {
2101 2 : printf("cancel\n");
2102 2 : printf("disallowed_in_pipeline\n");
2103 2 : printf("multi_pipelines\n");
2104 2 : printf("nosync\n");
2105 2 : printf("pipeline_abort\n");
2106 2 : printf("pipeline_idle\n");
2107 2 : printf("pipelined_insert\n");
2108 2 : printf("prepared\n");
2109 2 : printf("protocol_version\n");
2110 2 : printf("simple_pipeline\n");
2111 2 : printf("singlerow\n");
2112 2 : printf("transaction\n");
2113 2 : printf("uniqviol\n");
2114 2 : }
2115 :
2116 : int
2117 30 : main(int argc, char **argv)
2118 : {
2119 30 : const char *conninfo = "";
2120 : PGconn *conn;
2121 30 : FILE *trace = NULL;
2122 : char *testname;
2123 30 : int numrows = 10000;
2124 : PGresult *res;
2125 : int c;
2126 :
2127 104 : while ((c = getopt(argc, argv, "r:t:")) != -1)
2128 : {
2129 44 : switch (c)
2130 : {
2131 26 : case 'r': /* numrows */
2132 26 : errno = 0;
2133 26 : numrows = strtol(optarg, NULL, 10);
2134 26 : if (errno != 0 || numrows <= 0)
2135 : {
2136 0 : fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2137 : optarg);
2138 0 : exit(1);
2139 : }
2140 26 : break;
2141 18 : case 't': /* trace file */
2142 18 : tracefile = pg_strdup(optarg);
2143 18 : break;
2144 : }
2145 : }
2146 :
2147 30 : if (optind < argc)
2148 : {
2149 30 : testname = pg_strdup(argv[optind]);
2150 30 : optind++;
2151 : }
2152 : else
2153 : {
2154 0 : usage(argv[0]);
2155 0 : exit(1);
2156 : }
2157 :
2158 30 : if (strcmp(testname, "tests") == 0)
2159 : {
2160 2 : print_test_list();
2161 2 : exit(0);
2162 : }
2163 :
2164 28 : if (optind < argc)
2165 : {
2166 28 : conninfo = pg_strdup(argv[optind]);
2167 28 : optind++;
2168 : }
2169 :
2170 : /* Make a connection to the database */
2171 28 : conn = PQconnectdb(conninfo);
2172 28 : if (PQstatus(conn) != CONNECTION_OK)
2173 : {
2174 0 : fprintf(stderr, "Connection to database failed: %s\n",
2175 : PQerrorMessage(conn));
2176 0 : exit_nicely(conn);
2177 : }
2178 :
2179 28 : res = PQexec(conn, "SET lc_messages TO \"C\"");
2180 28 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2181 0 : pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
2182 28 : PQclear(res);
2183 28 : res = PQexec(conn, "SET debug_parallel_query = off");
2184 28 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2185 0 : pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
2186 28 : PQclear(res);
2187 :
2188 : /* Set the trace file, if requested */
2189 28 : if (tracefile != NULL)
2190 : {
2191 18 : if (strcmp(tracefile, "-") == 0)
2192 0 : trace = stdout;
2193 : else
2194 18 : trace = fopen(tracefile, "w");
2195 18 : if (trace == NULL)
2196 0 : pg_fatal("could not open file \"%s\": %m", tracefile);
2197 :
2198 : /* Make it line-buffered */
2199 18 : setvbuf(trace, NULL, PG_IOLBF, 0);
2200 :
2201 18 : PQtrace(conn, trace);
2202 18 : PQsetTraceFlags(conn,
2203 : PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
2204 : }
2205 :
2206 28 : if (strcmp(testname, "cancel") == 0)
2207 4 : test_cancel(conn);
2208 24 : else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2209 2 : test_disallowed_in_pipeline(conn);
2210 22 : else if (strcmp(testname, "multi_pipelines") == 0)
2211 2 : test_multi_pipelines(conn);
2212 20 : else if (strcmp(testname, "nosync") == 0)
2213 2 : test_nosync(conn);
2214 18 : else if (strcmp(testname, "pipeline_abort") == 0)
2215 2 : test_pipeline_abort(conn);
2216 16 : else if (strcmp(testname, "pipeline_idle") == 0)
2217 2 : test_pipeline_idle(conn);
2218 14 : else if (strcmp(testname, "pipelined_insert") == 0)
2219 2 : test_pipelined_insert(conn, numrows);
2220 12 : else if (strcmp(testname, "prepared") == 0)
2221 2 : test_prepared(conn);
2222 10 : else if (strcmp(testname, "protocol_version") == 0)
2223 2 : test_protocol_version(conn);
2224 8 : else if (strcmp(testname, "simple_pipeline") == 0)
2225 2 : test_simple_pipeline(conn);
2226 6 : else if (strcmp(testname, "singlerow") == 0)
2227 2 : test_singlerowmode(conn);
2228 4 : else if (strcmp(testname, "transaction") == 0)
2229 2 : test_transaction(conn);
2230 2 : else if (strcmp(testname, "uniqviol") == 0)
2231 2 : test_uniqviol(conn);
2232 : else
2233 : {
2234 0 : fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2235 0 : exit(1);
2236 : }
2237 :
2238 : /* close the connection to the database and cleanup */
2239 28 : PQfinish(conn);
2240 :
2241 28 : if (trace && trace != stdout)
2242 18 : fclose(trace);
2243 :
2244 28 : return 0;
2245 : }
|