Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * libpq_pipeline.c
4 : * Verify libpq pipeline execution functionality
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/test/modules/libpq_pipeline/libpq_pipeline.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres_fe.h"
17 :
18 : #include <sys/select.h>
19 : #include <sys/time.h>
20 :
21 : #include "catalog/pg_type_d.h"
22 : #include "libpq-fe.h"
23 : #include "pg_getopt.h"
24 :
25 :
26 : static void exit_nicely(PGconn *conn);
27 : pg_noreturn static void pg_fatal_impl(int line, const char *fmt,...)
28 : pg_attribute_printf(2, 3);
29 : static bool process_result(PGconn *conn, PGresult *res, int results,
30 : int numsent);
31 :
32 : static const char *const progname = "libpq_pipeline";
33 :
34 : /* Options and defaults */
35 : static char *tracefile = NULL; /* path to PQtrace() file */
36 :
37 :
38 : #ifdef DEBUG_OUTPUT
39 : #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
40 : #else
41 : #define pg_debug(...)
42 : #endif
43 :
44 : static const char *const drop_table_sql =
45 : "DROP TABLE IF EXISTS pq_pipeline_demo";
46 : static const char *const create_table_sql =
47 : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
48 : "int8filler int8);";
49 : static const char *const insert_sql =
50 : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
51 : static const char *const insert_sql2 =
52 : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
53 :
54 : /* max char length of an int32/64, plus sign and null terminator */
55 : #define MAXINTLEN 12
56 : #define MAXINT8LEN 20
57 :
58 : static void
59 0 : exit_nicely(PGconn *conn)
60 : {
61 0 : PQfinish(conn);
62 0 : exit(1);
63 : }
64 :
65 : /*
66 : * The following few functions are wrapped in macros to make the reported line
67 : * number in an error match the line number of the invocation.
68 : */
69 :
70 : /*
71 : * Print an error to stderr and terminate the program.
72 : */
73 : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
74 : pg_noreturn static void
75 0 : pg_fatal_impl(int line, const char *fmt,...)
76 : {
77 : va_list args;
78 :
79 0 : fflush(stdout);
80 :
81 0 : fprintf(stderr, "\n%s:%d: ", progname, line);
82 0 : va_start(args, fmt);
83 0 : vfprintf(stderr, fmt, args);
84 0 : va_end(args);
85 : Assert(fmt[strlen(fmt) - 1] != '\n');
86 0 : fprintf(stderr, "\n");
87 0 : exit(1);
88 : }
89 :
90 : /*
91 : * Check that the query on the given connection got canceled.
92 : */
93 : #define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
94 : static void
95 12 : confirm_query_canceled_impl(int line, PGconn *conn)
96 : {
97 12 : PGresult *res = NULL;
98 :
99 12 : res = PQgetResult(conn);
100 12 : if (res == NULL)
101 0 : pg_fatal_impl(line, "PQgetResult returned null: %s",
102 : PQerrorMessage(conn));
103 12 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
104 0 : pg_fatal_impl(line, "query did not fail when it was expected");
105 12 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
106 0 : pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
107 : PQerrorMessage(conn));
108 12 : PQclear(res);
109 :
110 12 : while (PQisBusy(conn))
111 0 : PQconsumeInput(conn);
112 12 : }
113 :
114 : /*
115 : * Using monitorConn, query pg_stat_activity to see that the connection with
116 : * the given PID is either in the given state, or waiting on the given event
117 : * (only one of them can be given).
118 : */
119 : static void
120 24 : wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
121 : char *state, char *event)
122 : {
123 24 : const Oid paramTypes[] = {INT4OID, TEXTOID};
124 : const char *paramValues[2];
125 24 : char *pidstr = psprintf("%d", procpid);
126 :
127 : Assert((state == NULL) ^ (event == NULL));
128 :
129 24 : paramValues[0] = pidstr;
130 24 : paramValues[1] = state ? state : event;
131 :
132 : while (true)
133 0 : {
134 : PGresult *res;
135 : char *value;
136 :
137 24 : if (state != NULL)
138 12 : res = PQexecParams(monitorConn,
139 : "SELECT count(*) FROM pg_stat_activity WHERE "
140 : "pid = $1 AND state = $2",
141 : 2, paramTypes, paramValues, NULL, NULL, 0);
142 : else
143 12 : res = PQexecParams(monitorConn,
144 : "SELECT count(*) FROM pg_stat_activity WHERE "
145 : "pid = $1 AND wait_event = $2",
146 : 2, paramTypes, paramValues, NULL, NULL, 0);
147 :
148 24 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
149 0 : pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
150 24 : if (PQntuples(res) != 1)
151 0 : pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
152 24 : if (PQnfields(res) != 1)
153 0 : pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
154 24 : value = PQgetvalue(res, 0, 0);
155 24 : if (strcmp(value, "0") != 0)
156 : {
157 24 : PQclear(res);
158 24 : break;
159 : }
160 0 : PQclear(res);
161 :
162 : /* wait 10ms before polling again */
163 0 : pg_usleep(10000);
164 : }
165 :
166 24 : pfree(pidstr);
167 24 : }
168 :
169 : #define send_cancellable_query(conn, monitorConn) \
170 : send_cancellable_query_impl(__LINE__, conn, monitorConn)
171 : static void
172 12 : send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
173 : {
174 : const char *env_wait;
175 12 : const Oid paramTypes[1] = {INT4OID};
176 :
177 : /*
178 : * Wait for the connection to be idle, so that our check for an active
179 : * connection below is reliable, instead of possibly seeing an outdated
180 : * state.
181 : */
182 12 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
183 :
184 12 : env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
185 12 : if (env_wait == NULL)
186 12 : env_wait = "180";
187 :
188 12 : if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
189 : &env_wait, NULL, NULL, 0) != 1)
190 0 : pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
191 :
192 : /*
193 : * Wait for the sleep to be active, because if the query is not running
194 : * yet, the cancel request that we send won't have any effect.
195 : */
196 12 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
197 12 : }
198 :
199 : /*
200 : * Create a new connection with the same conninfo as the given one.
201 : */
202 : static PGconn *
203 2 : copy_connection(PGconn *conn)
204 : {
205 : PGconn *copyConn;
206 2 : PQconninfoOption *opts = PQconninfo(conn);
207 : const char **keywords;
208 : const char **vals;
209 2 : int nopts = 1;
210 2 : int i = 0;
211 :
212 96 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
213 94 : nopts++;
214 :
215 2 : keywords = pg_malloc(sizeof(char *) * nopts);
216 2 : vals = pg_malloc(sizeof(char *) * nopts);
217 :
218 96 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
219 : {
220 94 : if (opt->val)
221 : {
222 38 : keywords[i] = opt->keyword;
223 38 : vals[i] = opt->val;
224 38 : i++;
225 : }
226 : }
227 2 : keywords[i] = vals[i] = NULL;
228 :
229 2 : copyConn = PQconnectdbParams(keywords, vals, false);
230 :
231 2 : if (PQstatus(copyConn) != CONNECTION_OK)
232 0 : pg_fatal("Connection to database failed: %s",
233 : PQerrorMessage(copyConn));
234 :
235 2 : return copyConn;
236 : }
237 :
238 : /*
239 : * Test query cancellation routines
240 : */
241 : static void
242 2 : test_cancel(PGconn *conn)
243 : {
244 : PGcancel *cancel;
245 : PGcancelConn *cancelConn;
246 : PGconn *monitorConn;
247 : char errorbuf[256];
248 :
249 2 : fprintf(stderr, "test cancellations... ");
250 :
251 2 : if (PQsetnonblocking(conn, 1) != 0)
252 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
253 :
254 : /*
255 : * Make a separate connection to the database to monitor the query on the
256 : * main connection.
257 : */
258 2 : monitorConn = copy_connection(conn);
259 : Assert(PQstatus(monitorConn) == CONNECTION_OK);
260 :
261 : /* test PQcancel */
262 2 : send_cancellable_query(conn, monitorConn);
263 2 : cancel = PQgetCancel(conn);
264 2 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
265 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
266 2 : confirm_query_canceled(conn);
267 :
268 : /* PGcancel object can be reused for the next query */
269 2 : send_cancellable_query(conn, monitorConn);
270 2 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
271 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
272 2 : confirm_query_canceled(conn);
273 :
274 2 : PQfreeCancel(cancel);
275 :
276 : /* test PQrequestCancel */
277 2 : send_cancellable_query(conn, monitorConn);
278 2 : if (!PQrequestCancel(conn))
279 0 : pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
280 2 : confirm_query_canceled(conn);
281 :
282 : /* test PQcancelBlocking */
283 2 : send_cancellable_query(conn, monitorConn);
284 2 : cancelConn = PQcancelCreate(conn);
285 2 : if (!PQcancelBlocking(cancelConn))
286 0 : pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
287 2 : confirm_query_canceled(conn);
288 2 : PQcancelFinish(cancelConn);
289 :
290 : /* test PQcancelCreate and then polling with PQcancelPoll */
291 2 : send_cancellable_query(conn, monitorConn);
292 2 : cancelConn = PQcancelCreate(conn);
293 2 : if (!PQcancelStart(cancelConn))
294 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
295 : while (true)
296 2 : {
297 : struct timeval tv;
298 : fd_set input_mask;
299 : fd_set output_mask;
300 4 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
301 4 : int sock = PQcancelSocket(cancelConn);
302 :
303 4 : if (pollres == PGRES_POLLING_OK)
304 2 : break;
305 :
306 2 : FD_ZERO(&input_mask);
307 2 : FD_ZERO(&output_mask);
308 2 : switch (pollres)
309 : {
310 2 : case PGRES_POLLING_READING:
311 : pg_debug("polling for reads\n");
312 2 : FD_SET(sock, &input_mask);
313 2 : break;
314 0 : case PGRES_POLLING_WRITING:
315 : pg_debug("polling for writes\n");
316 0 : FD_SET(sock, &output_mask);
317 0 : break;
318 0 : default:
319 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
320 : }
321 :
322 2 : if (sock < 0)
323 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
324 :
325 2 : tv.tv_sec = 3;
326 2 : tv.tv_usec = 0;
327 :
328 : while (true)
329 : {
330 2 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
331 : {
332 0 : if (errno == EINTR)
333 0 : continue;
334 0 : pg_fatal("select() failed: %m");
335 : }
336 2 : break;
337 : }
338 : }
339 2 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
340 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
341 2 : confirm_query_canceled(conn);
342 :
343 : /*
344 : * test PQcancelReset works on the cancel connection and it can be reused
345 : * afterwards
346 : */
347 2 : PQcancelReset(cancelConn);
348 :
349 2 : send_cancellable_query(conn, monitorConn);
350 2 : if (!PQcancelStart(cancelConn))
351 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
352 : while (true)
353 2 : {
354 : struct timeval tv;
355 : fd_set input_mask;
356 : fd_set output_mask;
357 4 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
358 4 : int sock = PQcancelSocket(cancelConn);
359 :
360 4 : if (pollres == PGRES_POLLING_OK)
361 2 : break;
362 :
363 2 : FD_ZERO(&input_mask);
364 2 : FD_ZERO(&output_mask);
365 2 : switch (pollres)
366 : {
367 2 : case PGRES_POLLING_READING:
368 : pg_debug("polling for reads\n");
369 2 : FD_SET(sock, &input_mask);
370 2 : break;
371 0 : case PGRES_POLLING_WRITING:
372 : pg_debug("polling for writes\n");
373 0 : FD_SET(sock, &output_mask);
374 0 : break;
375 0 : default:
376 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
377 : }
378 :
379 2 : if (sock < 0)
380 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
381 :
382 2 : tv.tv_sec = 3;
383 2 : tv.tv_usec = 0;
384 :
385 : while (true)
386 : {
387 2 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
388 : {
389 0 : if (errno == EINTR)
390 0 : continue;
391 0 : pg_fatal("select() failed: %m");
392 : }
393 2 : break;
394 : }
395 : }
396 2 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
397 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
398 2 : confirm_query_canceled(conn);
399 :
400 2 : PQcancelFinish(cancelConn);
401 :
402 2 : fprintf(stderr, "ok\n");
403 2 : }
404 :
405 : static void
406 2 : test_disallowed_in_pipeline(PGconn *conn)
407 : {
408 2 : PGresult *res = NULL;
409 :
410 2 : fprintf(stderr, "test error cases... ");
411 :
412 2 : if (PQisnonblocking(conn))
413 0 : pg_fatal("Expected blocking connection mode");
414 :
415 2 : if (PQenterPipelineMode(conn) != 1)
416 0 : pg_fatal("Unable to enter pipeline mode");
417 :
418 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
419 0 : pg_fatal("Pipeline mode not activated properly");
420 :
421 : /* PQexec should fail in pipeline mode */
422 2 : res = PQexec(conn, "SELECT 1");
423 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
424 0 : pg_fatal("PQexec should fail in pipeline mode but succeeded");
425 2 : if (strcmp(PQerrorMessage(conn),
426 : "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
427 0 : pg_fatal("did not get expected error message; got: \"%s\"",
428 : PQerrorMessage(conn));
429 :
430 : /* PQsendQuery should fail in pipeline mode */
431 2 : if (PQsendQuery(conn, "SELECT 1") != 0)
432 0 : pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
433 2 : if (strcmp(PQerrorMessage(conn),
434 : "PQsendQuery not allowed in pipeline mode\n") != 0)
435 0 : pg_fatal("did not get expected error message; got: \"%s\"",
436 : PQerrorMessage(conn));
437 :
438 : /* Entering pipeline mode when already in pipeline mode is OK */
439 2 : if (PQenterPipelineMode(conn) != 1)
440 0 : pg_fatal("re-entering pipeline mode should be a no-op but failed");
441 :
442 2 : if (PQisBusy(conn) != 0)
443 0 : pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
444 :
445 : /* ok, back to normal command mode */
446 2 : if (PQexitPipelineMode(conn) != 1)
447 0 : pg_fatal("couldn't exit idle empty pipeline mode");
448 :
449 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
450 0 : pg_fatal("Pipeline mode not terminated properly");
451 :
452 : /* exiting pipeline mode when not in pipeline mode should be a no-op */
453 2 : if (PQexitPipelineMode(conn) != 1)
454 0 : pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
455 :
456 : /* can now PQexec again */
457 2 : res = PQexec(conn, "SELECT 1");
458 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
459 0 : pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
460 : PQerrorMessage(conn));
461 :
462 2 : fprintf(stderr, "ok\n");
463 2 : }
464 :
465 : static void
466 2 : test_multi_pipelines(PGconn *conn)
467 : {
468 2 : PGresult *res = NULL;
469 2 : const char *dummy_params[1] = {"1"};
470 2 : Oid dummy_param_oids[1] = {INT4OID};
471 :
472 2 : fprintf(stderr, "multi pipeline... ");
473 :
474 : /*
475 : * Queue up a couple of small pipelines and process each without returning
476 : * to command mode first.
477 : */
478 2 : if (PQenterPipelineMode(conn) != 1)
479 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
480 :
481 : /* first pipeline */
482 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
483 : dummy_params, NULL, NULL, 0) != 1)
484 0 : pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
485 :
486 2 : if (PQpipelineSync(conn) != 1)
487 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
488 :
489 : /* second pipeline */
490 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
491 : dummy_params, NULL, NULL, 0) != 1)
492 0 : pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
493 :
494 : /* Skip flushing once. */
495 2 : if (PQsendPipelineSync(conn) != 1)
496 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
497 :
498 : /* third pipeline */
499 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
500 : dummy_params, NULL, NULL, 0) != 1)
501 0 : pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
502 :
503 2 : if (PQpipelineSync(conn) != 1)
504 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
505 :
506 : /* OK, start processing the results */
507 :
508 : /* first pipeline */
509 :
510 2 : res = PQgetResult(conn);
511 2 : if (res == NULL)
512 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
513 : PQerrorMessage(conn));
514 :
515 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
516 0 : pg_fatal("Unexpected result code %s from first pipeline item",
517 : PQresStatus(PQresultStatus(res)));
518 2 : PQclear(res);
519 2 : res = NULL;
520 :
521 2 : if (PQgetResult(conn) != NULL)
522 0 : pg_fatal("PQgetResult returned something extra after first result");
523 :
524 2 : if (PQexitPipelineMode(conn) != 0)
525 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
526 :
527 2 : res = PQgetResult(conn);
528 2 : if (res == NULL)
529 0 : pg_fatal("PQgetResult returned null when sync result expected: %s",
530 : PQerrorMessage(conn));
531 :
532 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
533 0 : pg_fatal("Unexpected result code %s instead of sync result, error: %s",
534 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
535 2 : PQclear(res);
536 :
537 : /* second pipeline */
538 :
539 2 : res = PQgetResult(conn);
540 2 : if (res == NULL)
541 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
542 : PQerrorMessage(conn));
543 :
544 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
545 0 : pg_fatal("Unexpected result code %s from second pipeline item",
546 : PQresStatus(PQresultStatus(res)));
547 2 : PQclear(res);
548 2 : res = NULL;
549 :
550 2 : if (PQgetResult(conn) != NULL)
551 0 : pg_fatal("PQgetResult returned something extra after first result");
552 :
553 2 : if (PQexitPipelineMode(conn) != 0)
554 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
555 :
556 2 : res = PQgetResult(conn);
557 2 : if (res == NULL)
558 0 : pg_fatal("PQgetResult returned null when sync result expected: %s",
559 : PQerrorMessage(conn));
560 :
561 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
562 0 : pg_fatal("Unexpected result code %s instead of sync result, error: %s",
563 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
564 2 : PQclear(res);
565 :
566 : /* third pipeline */
567 :
568 2 : res = PQgetResult(conn);
569 2 : if (res == NULL)
570 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
571 : PQerrorMessage(conn));
572 :
573 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
574 0 : pg_fatal("Unexpected result code %s from third pipeline item",
575 : PQresStatus(PQresultStatus(res)));
576 :
577 2 : res = PQgetResult(conn);
578 2 : if (res != NULL)
579 0 : pg_fatal("Expected null result, got %s",
580 : PQresStatus(PQresultStatus(res)));
581 :
582 2 : res = PQgetResult(conn);
583 2 : if (res == NULL)
584 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
585 : PQerrorMessage(conn));
586 :
587 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
588 0 : pg_fatal("Unexpected result code %s from second pipeline sync",
589 : PQresStatus(PQresultStatus(res)));
590 :
591 : /* We're still in pipeline mode ... */
592 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
593 0 : pg_fatal("Fell out of pipeline mode somehow");
594 :
595 : /* until we end it, which we can safely do now */
596 2 : if (PQexitPipelineMode(conn) != 1)
597 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
598 : PQerrorMessage(conn));
599 :
600 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
601 0 : pg_fatal("exiting pipeline mode didn't seem to work");
602 :
603 2 : fprintf(stderr, "ok\n");
604 2 : }
605 :
606 : /*
607 : * Test behavior when a pipeline dispatches a number of commands that are
608 : * not flushed by a sync point.
609 : */
610 : static void
611 2 : test_nosync(PGconn *conn)
612 : {
613 2 : int numqueries = 10;
614 2 : int results = 0;
615 2 : int sock = PQsocket(conn);
616 :
617 2 : fprintf(stderr, "nosync... ");
618 :
619 2 : if (sock < 0)
620 0 : pg_fatal("invalid socket");
621 :
622 2 : if (PQenterPipelineMode(conn) != 1)
623 0 : pg_fatal("could not enter pipeline mode");
624 22 : for (int i = 0; i < numqueries; i++)
625 : {
626 : fd_set input_mask;
627 : struct timeval tv;
628 :
629 20 : if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
630 : 0, NULL, NULL, NULL, NULL, 0) != 1)
631 0 : pg_fatal("error sending select: %s", PQerrorMessage(conn));
632 20 : PQflush(conn);
633 :
634 : /*
635 : * If the server has written anything to us, read (some of) it now.
636 : */
637 20 : FD_ZERO(&input_mask);
638 20 : FD_SET(sock, &input_mask);
639 20 : tv.tv_sec = 0;
640 20 : tv.tv_usec = 0;
641 20 : if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
642 : {
643 0 : fprintf(stderr, "select() failed: %m\n");
644 0 : exit_nicely(conn);
645 : }
646 20 : if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
647 0 : pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
648 : }
649 :
650 : /* tell server to flush its output buffer */
651 2 : if (PQsendFlushRequest(conn) != 1)
652 0 : pg_fatal("failed to send flush request");
653 2 : PQflush(conn);
654 :
655 : /* Now read all results */
656 : for (;;)
657 18 : {
658 : PGresult *res;
659 :
660 20 : res = PQgetResult(conn);
661 :
662 : /* NULL results are only expected after TUPLES_OK */
663 20 : if (res == NULL)
664 0 : pg_fatal("got unexpected NULL result after %d results", results);
665 :
666 : /* We expect exactly one TUPLES_OK result for each query we sent */
667 20 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
668 : {
669 : PGresult *res2;
670 :
671 : /* and one NULL result should follow each */
672 20 : res2 = PQgetResult(conn);
673 20 : if (res2 != NULL)
674 0 : pg_fatal("expected NULL, got %s",
675 : PQresStatus(PQresultStatus(res2)));
676 20 : PQclear(res);
677 20 : results++;
678 :
679 : /* if we're done, we're done */
680 20 : if (results == numqueries)
681 2 : break;
682 :
683 18 : continue;
684 : }
685 :
686 : /* anything else is unexpected */
687 0 : pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
688 : }
689 :
690 2 : fprintf(stderr, "ok\n");
691 2 : }
692 :
693 : /*
694 : * When an operation in a pipeline fails the rest of the pipeline is flushed. We
695 : * still have to get results for each pipeline item, but the item will just be
696 : * a PGRES_PIPELINE_ABORTED code.
697 : *
698 : * This intentionally doesn't use a transaction to wrap the pipeline. You should
699 : * usually use an xact, but in this case we want to observe the effects of each
700 : * statement.
701 : */
702 : static void
703 2 : test_pipeline_abort(PGconn *conn)
704 : {
705 2 : PGresult *res = NULL;
706 2 : const char *dummy_params[1] = {"1"};
707 2 : Oid dummy_param_oids[1] = {INT4OID};
708 : int i;
709 : int gotrows;
710 : bool goterror;
711 :
712 2 : fprintf(stderr, "aborted pipeline... ");
713 :
714 2 : res = PQexec(conn, drop_table_sql);
715 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
716 0 : pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
717 :
718 2 : res = PQexec(conn, create_table_sql);
719 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
720 0 : pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
721 :
722 : /*
723 : * Queue up a couple of small pipelines and process each without returning
724 : * to command mode first. Make sure the second operation in the first
725 : * pipeline ERRORs.
726 : */
727 2 : if (PQenterPipelineMode(conn) != 1)
728 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
729 :
730 2 : dummy_params[0] = "1";
731 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
732 : dummy_params, NULL, NULL, 0) != 1)
733 0 : pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
734 :
735 2 : if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
736 : 1, dummy_param_oids, dummy_params,
737 : NULL, NULL, 0) != 1)
738 0 : pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
739 :
740 2 : dummy_params[0] = "2";
741 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
742 : dummy_params, NULL, NULL, 0) != 1)
743 0 : pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
744 :
745 2 : if (PQpipelineSync(conn) != 1)
746 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
747 :
748 2 : dummy_params[0] = "3";
749 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
750 : dummy_params, NULL, NULL, 0) != 1)
751 0 : pg_fatal("dispatching second-pipeline insert failed: %s",
752 : PQerrorMessage(conn));
753 :
754 2 : if (PQpipelineSync(conn) != 1)
755 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
756 :
757 : /*
758 : * OK, start processing the pipeline results.
759 : *
760 : * We should get a command-ok for the first query, then a fatal error and
761 : * a pipeline aborted message for the second insert, a pipeline-end, then
762 : * a command-ok and a pipeline-ok for the second pipeline operation.
763 : */
764 2 : res = PQgetResult(conn);
765 2 : if (res == NULL)
766 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
767 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
768 0 : pg_fatal("Unexpected result status %s: %s",
769 : PQresStatus(PQresultStatus(res)),
770 : PQresultErrorMessage(res));
771 2 : PQclear(res);
772 :
773 : /* NULL result to signal end-of-results for this command */
774 2 : if ((res = PQgetResult(conn)) != NULL)
775 0 : pg_fatal("Expected null result, got %s",
776 : PQresStatus(PQresultStatus(res)));
777 :
778 : /* Second query caused error, so we expect an error next */
779 2 : res = PQgetResult(conn);
780 2 : if (res == NULL)
781 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
782 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
783 0 : pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
784 : PQresStatus(PQresultStatus(res)));
785 2 : PQclear(res);
786 :
787 : /* NULL result to signal end-of-results for this command */
788 2 : if ((res = PQgetResult(conn)) != NULL)
789 0 : pg_fatal("Expected null result, got %s",
790 : PQresStatus(PQresultStatus(res)));
791 :
792 : /*
793 : * pipeline should now be aborted.
794 : *
795 : * Note that we could still queue more queries at this point if we wanted;
796 : * they'd get added to a new third pipeline since we've already sent a
797 : * second. The aborted flag relates only to the pipeline being received.
798 : */
799 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
800 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
801 :
802 : /* third query in pipeline, the second insert */
803 2 : res = PQgetResult(conn);
804 2 : if (res == NULL)
805 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
806 2 : if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
807 0 : pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
808 : PQresStatus(PQresultStatus(res)));
809 2 : PQclear(res);
810 :
811 : /* NULL result to signal end-of-results for this command */
812 2 : if ((res = PQgetResult(conn)) != NULL)
813 0 : pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
814 :
815 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
816 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
817 :
818 : /* Ensure we're still in pipeline */
819 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
820 0 : pg_fatal("Fell out of pipeline mode somehow");
821 :
822 : /*
823 : * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
824 : *
825 : * (This is so clients know to start processing results normally again and
826 : * can tell the difference between skipped commands and the sync.)
827 : */
828 2 : res = PQgetResult(conn);
829 2 : if (res == NULL)
830 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
831 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
832 0 : pg_fatal("Unexpected result code from first pipeline sync\n"
833 : "Expected PGRES_PIPELINE_SYNC, got %s",
834 : PQresStatus(PQresultStatus(res)));
835 2 : PQclear(res);
836 :
837 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
838 0 : pg_fatal("sync should've cleared the aborted flag but didn't");
839 :
840 : /* We're still in pipeline mode... */
841 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
842 0 : pg_fatal("Fell out of pipeline mode somehow");
843 :
844 : /* the insert from the second pipeline */
845 2 : res = PQgetResult(conn);
846 2 : if (res == NULL)
847 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
848 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
849 0 : pg_fatal("Unexpected result code %s from first item in second pipeline",
850 : PQresStatus(PQresultStatus(res)));
851 2 : PQclear(res);
852 :
853 : /* Read the NULL result at the end of the command */
854 2 : if ((res = PQgetResult(conn)) != NULL)
855 0 : pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
856 :
857 : /* the second pipeline sync */
858 2 : if ((res = PQgetResult(conn)) == NULL)
859 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
860 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
861 0 : pg_fatal("Unexpected result code %s from second pipeline sync",
862 : PQresStatus(PQresultStatus(res)));
863 2 : PQclear(res);
864 :
865 2 : if ((res = PQgetResult(conn)) != NULL)
866 0 : pg_fatal("Expected null result, got %s: %s",
867 : PQresStatus(PQresultStatus(res)),
868 : PQerrorMessage(conn));
869 :
870 : /* Try to send two queries in one command */
871 2 : if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
872 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
873 2 : if (PQpipelineSync(conn) != 1)
874 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
875 2 : goterror = false;
876 4 : while ((res = PQgetResult(conn)) != NULL)
877 : {
878 2 : switch (PQresultStatus(res))
879 : {
880 2 : case PGRES_FATAL_ERROR:
881 2 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
882 0 : pg_fatal("expected error about multiple commands, got %s",
883 : PQerrorMessage(conn));
884 2 : printf("got expected %s", PQerrorMessage(conn));
885 2 : goterror = true;
886 2 : break;
887 0 : default:
888 0 : pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
889 : break;
890 : }
891 : }
892 2 : if (!goterror)
893 0 : pg_fatal("did not get cannot-insert-multiple-commands error");
894 2 : res = PQgetResult(conn);
895 2 : if (res == NULL)
896 0 : pg_fatal("got NULL result");
897 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
898 0 : pg_fatal("Unexpected result code %s from pipeline sync",
899 : PQresStatus(PQresultStatus(res)));
900 2 : fprintf(stderr, "ok\n");
901 :
902 : /* Test single-row mode with an error partways */
903 2 : if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
904 : 0, NULL, NULL, NULL, NULL, 0) != 1)
905 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
906 2 : if (PQpipelineSync(conn) != 1)
907 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
908 2 : PQsetSingleRowMode(conn);
909 2 : goterror = false;
910 2 : gotrows = 0;
911 10 : while ((res = PQgetResult(conn)) != NULL)
912 : {
913 8 : switch (PQresultStatus(res))
914 : {
915 6 : case PGRES_SINGLE_TUPLE:
916 6 : printf("got row: %s\n", PQgetvalue(res, 0, 0));
917 6 : gotrows++;
918 6 : break;
919 2 : case PGRES_FATAL_ERROR:
920 2 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
921 0 : pg_fatal("expected division-by-zero, got: %s (%s)",
922 : PQerrorMessage(conn),
923 : PQresultErrorField(res, PG_DIAG_SQLSTATE));
924 2 : printf("got expected division-by-zero\n");
925 2 : goterror = true;
926 2 : break;
927 0 : default:
928 0 : pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
929 : }
930 8 : PQclear(res);
931 : }
932 2 : if (!goterror)
933 0 : pg_fatal("did not get division-by-zero error");
934 2 : if (gotrows != 3)
935 0 : pg_fatal("did not get three rows");
936 : /* the third pipeline sync */
937 2 : if ((res = PQgetResult(conn)) == NULL)
938 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
939 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
940 0 : pg_fatal("Unexpected result code %s from third pipeline sync",
941 : PQresStatus(PQresultStatus(res)));
942 2 : PQclear(res);
943 :
944 : /* We're still in pipeline mode... */
945 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
946 0 : pg_fatal("Fell out of pipeline mode somehow");
947 :
948 : /* until we end it, which we can safely do now */
949 2 : if (PQexitPipelineMode(conn) != 1)
950 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
951 : PQerrorMessage(conn));
952 :
953 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
954 0 : pg_fatal("exiting pipeline mode didn't seem to work");
955 :
956 : /*-
957 : * Since we fired the pipelines off without a surrounding xact, the results
958 : * should be:
959 : *
960 : * - Implicit xact started by server around 1st pipeline
961 : * - First insert applied
962 : * - Second statement aborted xact
963 : * - Third insert skipped
964 : * - Sync rolled back first implicit xact
965 : * - Implicit xact created by server around 2nd pipeline
966 : * - insert applied from 2nd pipeline
967 : * - Sync commits 2nd xact
968 : *
969 : * So we should only have the value 3 that we inserted.
970 : */
971 2 : res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
972 :
973 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
974 0 : pg_fatal("Expected tuples, got %s: %s",
975 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
976 2 : if (PQntuples(res) != 1)
977 0 : pg_fatal("expected 1 result, got %d", PQntuples(res));
978 4 : for (i = 0; i < PQntuples(res); i++)
979 : {
980 2 : const char *val = PQgetvalue(res, i, 0);
981 :
982 2 : if (strcmp(val, "3") != 0)
983 0 : pg_fatal("expected only insert with value 3, got %s", val);
984 : }
985 :
986 2 : PQclear(res);
987 :
988 2 : fprintf(stderr, "ok\n");
989 2 : }
990 :
991 : /* State machine enum for test_pipelined_insert */
992 : enum PipelineInsertStep
993 : {
994 : BI_BEGIN_TX,
995 : BI_DROP_TABLE,
996 : BI_CREATE_TABLE,
997 : BI_PREPARE,
998 : BI_INSERT_ROWS,
999 : BI_COMMIT_TX,
1000 : BI_SYNC,
1001 : BI_DONE,
1002 : };
1003 :
1004 : static void
1005 2 : test_pipelined_insert(PGconn *conn, int n_rows)
1006 : {
1007 2 : Oid insert_param_oids[2] = {INT4OID, INT8OID};
1008 : const char *insert_params[2];
1009 : char insert_param_0[MAXINTLEN];
1010 : char insert_param_1[MAXINT8LEN];
1011 2 : enum PipelineInsertStep send_step = BI_BEGIN_TX,
1012 2 : recv_step = BI_BEGIN_TX;
1013 : int rows_to_send,
1014 : rows_to_receive;
1015 :
1016 2 : insert_params[0] = insert_param_0;
1017 2 : insert_params[1] = insert_param_1;
1018 :
1019 2 : rows_to_send = rows_to_receive = n_rows;
1020 :
1021 : /*
1022 : * Do a pipelined insert into a table created at the start of the pipeline
1023 : */
1024 2 : if (PQenterPipelineMode(conn) != 1)
1025 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1026 :
1027 8 : while (send_step != BI_PREPARE)
1028 : {
1029 : const char *sql;
1030 :
1031 6 : switch (send_step)
1032 : {
1033 2 : case BI_BEGIN_TX:
1034 2 : sql = "BEGIN TRANSACTION";
1035 2 : send_step = BI_DROP_TABLE;
1036 2 : break;
1037 :
1038 2 : case BI_DROP_TABLE:
1039 2 : sql = drop_table_sql;
1040 2 : send_step = BI_CREATE_TABLE;
1041 2 : break;
1042 :
1043 2 : case BI_CREATE_TABLE:
1044 2 : sql = create_table_sql;
1045 2 : send_step = BI_PREPARE;
1046 2 : break;
1047 :
1048 0 : default:
1049 0 : pg_fatal("invalid state");
1050 : sql = NULL; /* keep compiler quiet */
1051 : }
1052 :
1053 : pg_debug("sending: %s\n", sql);
1054 6 : if (PQsendQueryParams(conn, sql,
1055 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1056 0 : pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
1057 : }
1058 :
1059 : Assert(send_step == BI_PREPARE);
1060 : pg_debug("sending: %s\n", insert_sql2);
1061 2 : if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
1062 0 : pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
1063 2 : send_step = BI_INSERT_ROWS;
1064 :
1065 : /*
1066 : * Now we start inserting. We'll be sending enough data that we could fill
1067 : * our output buffer, so to avoid deadlocking we need to enter nonblocking
1068 : * mode and consume input while we send more output. As results of each
1069 : * query are processed we should pop them to allow processing of the next
1070 : * query. There's no need to finish the pipeline before processing
1071 : * results.
1072 : */
1073 2 : if (PQsetnonblocking(conn, 1) != 0)
1074 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1075 :
1076 28018 : while (recv_step != BI_DONE)
1077 : {
1078 : int sock;
1079 : fd_set input_mask;
1080 : fd_set output_mask;
1081 :
1082 28016 : sock = PQsocket(conn);
1083 :
1084 28016 : if (sock < 0)
1085 0 : break; /* shouldn't happen */
1086 :
1087 28016 : FD_ZERO(&input_mask);
1088 28016 : FD_SET(sock, &input_mask);
1089 28016 : FD_ZERO(&output_mask);
1090 28016 : FD_SET(sock, &output_mask);
1091 :
1092 28016 : if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1093 : {
1094 0 : fprintf(stderr, "select() failed: %m\n");
1095 0 : exit_nicely(conn);
1096 : }
1097 :
1098 : /*
1099 : * Process any results, so we keep the server's output buffer free
1100 : * flowing and it can continue to process input
1101 : */
1102 28016 : if (FD_ISSET(sock, &input_mask))
1103 : {
1104 4 : PQconsumeInput(conn);
1105 :
1106 : /* Read until we'd block if we tried to read */
1107 2826 : while (!PQisBusy(conn) && recv_step < BI_DONE)
1108 : {
1109 : PGresult *res;
1110 2822 : const char *cmdtag = "";
1111 2822 : const char *description = "";
1112 : int status;
1113 :
1114 : /*
1115 : * Read next result. If no more results from this query,
1116 : * advance to the next query
1117 : */
1118 2822 : res = PQgetResult(conn);
1119 2822 : if (res == NULL)
1120 1410 : continue;
1121 :
1122 1412 : status = PGRES_COMMAND_OK;
1123 1412 : switch (recv_step)
1124 : {
1125 2 : case BI_BEGIN_TX:
1126 2 : cmdtag = "BEGIN";
1127 2 : recv_step++;
1128 2 : break;
1129 2 : case BI_DROP_TABLE:
1130 2 : cmdtag = "DROP TABLE";
1131 2 : recv_step++;
1132 2 : break;
1133 2 : case BI_CREATE_TABLE:
1134 2 : cmdtag = "CREATE TABLE";
1135 2 : recv_step++;
1136 2 : break;
1137 2 : case BI_PREPARE:
1138 2 : cmdtag = "";
1139 2 : description = "PREPARE";
1140 2 : recv_step++;
1141 2 : break;
1142 1400 : case BI_INSERT_ROWS:
1143 1400 : cmdtag = "INSERT";
1144 1400 : rows_to_receive--;
1145 1400 : if (rows_to_receive == 0)
1146 2 : recv_step++;
1147 1400 : break;
1148 2 : case BI_COMMIT_TX:
1149 2 : cmdtag = "COMMIT";
1150 2 : recv_step++;
1151 2 : break;
1152 2 : case BI_SYNC:
1153 2 : cmdtag = "";
1154 2 : description = "SYNC";
1155 2 : status = PGRES_PIPELINE_SYNC;
1156 2 : recv_step++;
1157 2 : break;
1158 0 : case BI_DONE:
1159 : /* unreachable */
1160 0 : pg_fatal("unreachable state");
1161 : }
1162 :
1163 1412 : if (PQresultStatus(res) != status)
1164 0 : pg_fatal("%s reported status %s, expected %s\n"
1165 : "Error message: \"%s\"",
1166 : description, PQresStatus(PQresultStatus(res)),
1167 : PQresStatus(status), PQerrorMessage(conn));
1168 :
1169 1412 : if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1170 0 : pg_fatal("%s expected command tag '%s', got '%s'",
1171 : description, cmdtag, PQcmdStatus(res));
1172 :
1173 : pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1174 :
1175 1412 : PQclear(res);
1176 : }
1177 : }
1178 :
1179 : /* Write more rows and/or the end pipeline message, if needed */
1180 28016 : if (FD_ISSET(sock, &output_mask))
1181 : {
1182 28014 : PQflush(conn);
1183 :
1184 28014 : if (send_step == BI_INSERT_ROWS)
1185 : {
1186 1400 : snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1187 : /* use up some buffer space with a wide value */
1188 1400 : snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1189 :
1190 1400 : if (PQsendQueryPrepared(conn, "my_insert",
1191 : 2, insert_params, NULL, NULL, 0) == 1)
1192 : {
1193 : pg_debug("sent row %d\n", rows_to_send);
1194 :
1195 1400 : rows_to_send--;
1196 1400 : if (rows_to_send == 0)
1197 2 : send_step++;
1198 : }
1199 : else
1200 : {
1201 : /*
1202 : * in nonblocking mode, so it's OK for an insert to fail
1203 : * to send
1204 : */
1205 0 : fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1206 : rows_to_send, PQerrorMessage(conn));
1207 : }
1208 : }
1209 26614 : else if (send_step == BI_COMMIT_TX)
1210 : {
1211 2 : if (PQsendQueryParams(conn, "COMMIT",
1212 : 0, NULL, NULL, NULL, NULL, 0) == 1)
1213 : {
1214 : pg_debug("sent COMMIT\n");
1215 2 : send_step++;
1216 : }
1217 : else
1218 : {
1219 0 : fprintf(stderr, "WARNING: failed to send commit: %s\n",
1220 : PQerrorMessage(conn));
1221 : }
1222 : }
1223 26612 : else if (send_step == BI_SYNC)
1224 : {
1225 2 : if (PQpipelineSync(conn) == 1)
1226 : {
1227 2 : fprintf(stdout, "pipeline sync sent\n");
1228 2 : send_step++;
1229 : }
1230 : else
1231 : {
1232 0 : fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1233 : PQerrorMessage(conn));
1234 : }
1235 : }
1236 : }
1237 : }
1238 :
1239 : /* We've got the sync message and the pipeline should be done */
1240 2 : if (PQexitPipelineMode(conn) != 1)
1241 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1242 : PQerrorMessage(conn));
1243 :
1244 2 : if (PQsetnonblocking(conn, 0) != 0)
1245 0 : pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1246 :
1247 2 : fprintf(stderr, "ok\n");
1248 2 : }
1249 :
1250 : static void
1251 2 : test_prepared(PGconn *conn)
1252 : {
1253 2 : PGresult *res = NULL;
1254 2 : Oid param_oids[1] = {INT4OID};
1255 : Oid expected_oids[4];
1256 : Oid typ;
1257 :
1258 2 : fprintf(stderr, "prepared... ");
1259 :
1260 2 : if (PQenterPipelineMode(conn) != 1)
1261 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1262 2 : if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1263 : "interval '1 sec'",
1264 : 1, param_oids) != 1)
1265 0 : pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1266 2 : expected_oids[0] = INT4OID;
1267 2 : expected_oids[1] = TEXTOID;
1268 2 : expected_oids[2] = NUMERICOID;
1269 2 : expected_oids[3] = INTERVALOID;
1270 2 : if (PQsendDescribePrepared(conn, "select_one") != 1)
1271 0 : pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1272 2 : if (PQpipelineSync(conn) != 1)
1273 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1274 :
1275 2 : res = PQgetResult(conn);
1276 2 : if (res == NULL)
1277 0 : pg_fatal("PQgetResult returned null");
1278 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1279 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1280 2 : PQclear(res);
1281 2 : res = PQgetResult(conn);
1282 2 : if (res != NULL)
1283 0 : pg_fatal("expected NULL result");
1284 :
1285 2 : res = PQgetResult(conn);
1286 2 : if (res == NULL)
1287 0 : pg_fatal("PQgetResult returned NULL");
1288 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1289 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1290 2 : if (PQnfields(res) != lengthof(expected_oids))
1291 0 : pg_fatal("expected %zu columns, got %d",
1292 : lengthof(expected_oids), PQnfields(res));
1293 10 : for (int i = 0; i < PQnfields(res); i++)
1294 : {
1295 8 : typ = PQftype(res, i);
1296 8 : if (typ != expected_oids[i])
1297 0 : pg_fatal("field %d: expected type %u, got %u",
1298 : i, expected_oids[i], typ);
1299 : }
1300 2 : PQclear(res);
1301 2 : res = PQgetResult(conn);
1302 2 : if (res != NULL)
1303 0 : pg_fatal("expected NULL result");
1304 :
1305 2 : res = PQgetResult(conn);
1306 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1307 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1308 :
1309 2 : fprintf(stderr, "closing statement..");
1310 2 : if (PQsendClosePrepared(conn, "select_one") != 1)
1311 0 : pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1312 2 : if (PQpipelineSync(conn) != 1)
1313 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1314 :
1315 2 : res = PQgetResult(conn);
1316 2 : if (res == NULL)
1317 0 : pg_fatal("expected non-NULL result");
1318 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1319 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1320 2 : PQclear(res);
1321 2 : res = PQgetResult(conn);
1322 2 : if (res != NULL)
1323 0 : pg_fatal("expected NULL result");
1324 2 : res = PQgetResult(conn);
1325 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1326 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1327 :
1328 2 : if (PQexitPipelineMode(conn) != 1)
1329 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1330 :
1331 : /* Now that it's closed we should get an error when describing */
1332 2 : res = PQdescribePrepared(conn, "select_one");
1333 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1334 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1335 :
1336 : /*
1337 : * Also test the blocking close, this should not fail since closing a
1338 : * non-existent prepared statement is a no-op
1339 : */
1340 2 : res = PQclosePrepared(conn, "select_one");
1341 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1342 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1343 :
1344 2 : fprintf(stderr, "creating portal... ");
1345 2 : PQexec(conn, "BEGIN");
1346 2 : PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1347 2 : PQenterPipelineMode(conn);
1348 2 : if (PQsendDescribePortal(conn, "cursor_one") != 1)
1349 0 : pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1350 2 : if (PQpipelineSync(conn) != 1)
1351 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1352 2 : res = PQgetResult(conn);
1353 2 : if (res == NULL)
1354 0 : pg_fatal("PQgetResult returned null");
1355 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1356 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1357 :
1358 2 : typ = PQftype(res, 0);
1359 2 : if (typ != INT4OID)
1360 0 : pg_fatal("portal: expected type %u, got %u",
1361 : INT4OID, typ);
1362 2 : PQclear(res);
1363 2 : res = PQgetResult(conn);
1364 2 : if (res != NULL)
1365 0 : pg_fatal("expected NULL result");
1366 2 : res = PQgetResult(conn);
1367 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1368 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1369 :
1370 2 : fprintf(stderr, "closing portal... ");
1371 2 : if (PQsendClosePortal(conn, "cursor_one") != 1)
1372 0 : pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1373 2 : if (PQpipelineSync(conn) != 1)
1374 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1375 :
1376 2 : res = PQgetResult(conn);
1377 2 : if (res == NULL)
1378 0 : pg_fatal("expected non-NULL result");
1379 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1380 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1381 2 : PQclear(res);
1382 2 : res = PQgetResult(conn);
1383 2 : if (res != NULL)
1384 0 : pg_fatal("expected NULL result");
1385 2 : res = PQgetResult(conn);
1386 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1387 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1388 :
1389 2 : if (PQexitPipelineMode(conn) != 1)
1390 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1391 :
1392 : /* Now that it's closed we should get an error when describing */
1393 2 : res = PQdescribePortal(conn, "cursor_one");
1394 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1395 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1396 :
1397 : /*
1398 : * Also test the blocking close, this should not fail since closing a
1399 : * non-existent portal is a no-op
1400 : */
1401 2 : res = PQclosePortal(conn, "cursor_one");
1402 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1403 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1404 :
1405 2 : fprintf(stderr, "ok\n");
1406 2 : }
1407 :
1408 : /* Notice processor: print notices, and count how many we got */
1409 : static void
1410 2 : notice_processor(void *arg, const char *message)
1411 : {
1412 2 : int *n_notices = (int *) arg;
1413 :
1414 2 : (*n_notices)++;
1415 2 : fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1416 2 : }
1417 :
1418 : /* Verify behavior in "idle" state */
1419 : static void
1420 2 : test_pipeline_idle(PGconn *conn)
1421 : {
1422 : PGresult *res;
1423 2 : int n_notices = 0;
1424 :
1425 2 : fprintf(stderr, "\npipeline idle...\n");
1426 :
1427 2 : PQsetNoticeProcessor(conn, notice_processor, &n_notices);
1428 :
1429 : /* Try to exit pipeline mode in pipeline-idle state */
1430 2 : if (PQenterPipelineMode(conn) != 1)
1431 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1432 2 : if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1433 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1434 2 : PQsendFlushRequest(conn);
1435 2 : res = PQgetResult(conn);
1436 2 : if (res == NULL)
1437 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1438 : PQerrorMessage(conn));
1439 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1440 0 : pg_fatal("unexpected result code %s from first pipeline item",
1441 : PQresStatus(PQresultStatus(res)));
1442 2 : PQclear(res);
1443 2 : res = PQgetResult(conn);
1444 2 : if (res != NULL)
1445 0 : pg_fatal("did not receive terminating NULL");
1446 2 : if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1447 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1448 2 : if (PQexitPipelineMode(conn) == 1)
1449 0 : pg_fatal("exiting pipeline succeeded when it shouldn't");
1450 2 : if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1451 : strlen("cannot exit pipeline mode")) != 0)
1452 0 : pg_fatal("did not get expected error; got: %s",
1453 : PQerrorMessage(conn));
1454 2 : PQsendFlushRequest(conn);
1455 2 : res = PQgetResult(conn);
1456 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1457 0 : pg_fatal("unexpected result code %s from second pipeline item",
1458 : PQresStatus(PQresultStatus(res)));
1459 2 : PQclear(res);
1460 2 : res = PQgetResult(conn);
1461 2 : if (res != NULL)
1462 0 : pg_fatal("did not receive terminating NULL");
1463 2 : if (PQexitPipelineMode(conn) != 1)
1464 0 : pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1465 :
1466 2 : if (n_notices > 0)
1467 0 : pg_fatal("got %d notice(s)", n_notices);
1468 2 : fprintf(stderr, "ok - 1\n");
1469 :
1470 : /* Have a WARNING in the middle of a resultset */
1471 2 : if (PQenterPipelineMode(conn) != 1)
1472 0 : pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1473 2 : if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1474 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1475 2 : PQsendFlushRequest(conn);
1476 2 : res = PQgetResult(conn);
1477 2 : if (res == NULL)
1478 0 : pg_fatal("unexpected NULL result received");
1479 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1480 0 : pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
1481 2 : if (PQexitPipelineMode(conn) != 1)
1482 0 : pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1483 2 : fprintf(stderr, "ok - 2\n");
1484 2 : }
1485 :
1486 : static void
1487 2 : test_simple_pipeline(PGconn *conn)
1488 : {
1489 2 : PGresult *res = NULL;
1490 2 : const char *dummy_params[1] = {"1"};
1491 2 : Oid dummy_param_oids[1] = {INT4OID};
1492 :
1493 2 : fprintf(stderr, "simple pipeline... ");
1494 :
1495 : /*
1496 : * Enter pipeline mode and dispatch a set of operations, which we'll then
1497 : * process the results of as they come in.
1498 : *
1499 : * For a simple case we should be able to do this without interim
1500 : * processing of results since our output buffer will give us enough slush
1501 : * to work with and we won't block on sending. So blocking mode is fine.
1502 : */
1503 2 : if (PQisnonblocking(conn))
1504 0 : pg_fatal("Expected blocking connection mode");
1505 :
1506 2 : if (PQenterPipelineMode(conn) != 1)
1507 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1508 :
1509 2 : if (PQsendQueryParams(conn, "SELECT $1",
1510 : 1, dummy_param_oids, dummy_params,
1511 : NULL, NULL, 0) != 1)
1512 0 : pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1513 :
1514 2 : if (PQexitPipelineMode(conn) != 0)
1515 0 : pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1516 :
1517 2 : if (PQpipelineSync(conn) != 1)
1518 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1519 :
1520 2 : res = PQgetResult(conn);
1521 2 : if (res == NULL)
1522 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1523 : PQerrorMessage(conn));
1524 :
1525 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1526 0 : pg_fatal("Unexpected result code %s from first pipeline item",
1527 : PQresStatus(PQresultStatus(res)));
1528 :
1529 2 : PQclear(res);
1530 2 : res = NULL;
1531 :
1532 2 : if (PQgetResult(conn) != NULL)
1533 0 : pg_fatal("PQgetResult returned something extra after first query result.");
1534 :
1535 : /*
1536 : * Even though we've processed the result there's still a sync to come and
1537 : * we can't exit pipeline mode yet
1538 : */
1539 2 : if (PQexitPipelineMode(conn) != 0)
1540 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1541 :
1542 2 : res = PQgetResult(conn);
1543 2 : if (res == NULL)
1544 0 : pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1545 : PQerrorMessage(conn));
1546 :
1547 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1548 0 : pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1549 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
1550 :
1551 2 : PQclear(res);
1552 2 : res = NULL;
1553 :
1554 2 : if (PQgetResult(conn) != NULL)
1555 0 : pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1556 : PQresStatus(PQresultStatus(res)));
1557 :
1558 : /* We're still in pipeline mode... */
1559 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1560 0 : pg_fatal("Fell out of pipeline mode somehow");
1561 :
1562 : /* ... until we end it, which we can safely do now */
1563 2 : if (PQexitPipelineMode(conn) != 1)
1564 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1565 : PQerrorMessage(conn));
1566 :
1567 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1568 0 : pg_fatal("Exiting pipeline mode didn't seem to work");
1569 :
1570 2 : fprintf(stderr, "ok\n");
1571 2 : }
1572 :
1573 : static void
1574 2 : test_singlerowmode(PGconn *conn)
1575 : {
1576 : PGresult *res;
1577 : int i;
1578 2 : bool pipeline_ended = false;
1579 :
1580 2 : if (PQenterPipelineMode(conn) != 1)
1581 0 : pg_fatal("failed to enter pipeline mode: %s",
1582 : PQerrorMessage(conn));
1583 :
1584 : /* One series of three commands, using single-row mode for the first two. */
1585 8 : for (i = 0; i < 3; i++)
1586 : {
1587 : char *param[1];
1588 :
1589 6 : param[0] = psprintf("%d", 44 + i);
1590 :
1591 6 : if (PQsendQueryParams(conn,
1592 : "SELECT generate_series(42, $1)",
1593 : 1,
1594 : NULL,
1595 : (const char **) param,
1596 : NULL,
1597 : NULL,
1598 : 0) != 1)
1599 0 : pg_fatal("failed to send query: %s",
1600 : PQerrorMessage(conn));
1601 6 : pfree(param[0]);
1602 : }
1603 2 : if (PQpipelineSync(conn) != 1)
1604 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1605 :
1606 10 : for (i = 0; !pipeline_ended; i++)
1607 : {
1608 8 : bool first = true;
1609 : bool saw_ending_tuplesok;
1610 8 : bool isSingleTuple = false;
1611 :
1612 : /* Set single row mode for only first 2 SELECT queries */
1613 8 : if (i < 2)
1614 : {
1615 4 : if (PQsetSingleRowMode(conn) != 1)
1616 0 : pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1617 : }
1618 :
1619 : /* Consume rows for this query */
1620 8 : saw_ending_tuplesok = false;
1621 28 : while ((res = PQgetResult(conn)) != NULL)
1622 : {
1623 22 : ExecStatusType est = PQresultStatus(res);
1624 :
1625 22 : if (est == PGRES_PIPELINE_SYNC)
1626 : {
1627 2 : fprintf(stderr, "end of pipeline reached\n");
1628 2 : pipeline_ended = true;
1629 2 : PQclear(res);
1630 2 : if (i != 3)
1631 0 : pg_fatal("Expected three results, got %d", i);
1632 2 : break;
1633 : }
1634 :
1635 : /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1636 20 : if (first)
1637 : {
1638 6 : if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1639 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1640 : i, PQresStatus(est));
1641 6 : if (i >= 2 && est != PGRES_TUPLES_OK)
1642 0 : pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1643 : i, PQresStatus(est));
1644 6 : first = false;
1645 : }
1646 :
1647 20 : fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1648 20 : switch (est)
1649 : {
1650 6 : case PGRES_TUPLES_OK:
1651 6 : fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1652 6 : saw_ending_tuplesok = true;
1653 6 : if (isSingleTuple)
1654 : {
1655 4 : if (PQntuples(res) == 0)
1656 4 : fprintf(stderr, "all tuples received in query %d\n", i);
1657 : else
1658 0 : pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1659 : }
1660 6 : break;
1661 :
1662 14 : case PGRES_SINGLE_TUPLE:
1663 14 : isSingleTuple = true;
1664 14 : fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1665 14 : break;
1666 :
1667 0 : default:
1668 0 : pg_fatal("unexpected");
1669 : }
1670 20 : PQclear(res);
1671 : }
1672 8 : if (!pipeline_ended && !saw_ending_tuplesok)
1673 0 : pg_fatal("didn't get expected terminating TUPLES_OK");
1674 : }
1675 :
1676 : /*
1677 : * Now issue one command, get its results in with single-row mode, then
1678 : * issue another command, and get its results in normal mode; make sure
1679 : * the single-row mode flag is reset as expected.
1680 : */
1681 2 : if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1682 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1683 0 : pg_fatal("failed to send query: %s",
1684 : PQerrorMessage(conn));
1685 2 : if (PQsendFlushRequest(conn) != 1)
1686 0 : pg_fatal("failed to send flush request");
1687 2 : if (PQsetSingleRowMode(conn) != 1)
1688 0 : pg_fatal("PQsetSingleRowMode() failed");
1689 2 : res = PQgetResult(conn);
1690 2 : if (res == NULL)
1691 0 : pg_fatal("unexpected NULL");
1692 2 : if (PQresultStatus(res) != PGRES_SINGLE_TUPLE)
1693 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1694 : PQresStatus(PQresultStatus(res)));
1695 2 : res = PQgetResult(conn);
1696 2 : if (res == NULL)
1697 0 : pg_fatal("unexpected NULL");
1698 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1699 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1700 : PQresStatus(PQresultStatus(res)));
1701 2 : if (PQgetResult(conn) != NULL)
1702 0 : pg_fatal("expected NULL result");
1703 :
1704 2 : if (PQsendQueryParams(conn, "SELECT 1",
1705 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1706 0 : pg_fatal("failed to send query: %s",
1707 : PQerrorMessage(conn));
1708 2 : if (PQsendFlushRequest(conn) != 1)
1709 0 : pg_fatal("failed to send flush request");
1710 2 : res = PQgetResult(conn);
1711 2 : if (res == NULL)
1712 0 : pg_fatal("unexpected NULL");
1713 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1714 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1715 : PQresStatus(PQresultStatus(res)));
1716 2 : if (PQgetResult(conn) != NULL)
1717 0 : pg_fatal("expected NULL result");
1718 :
1719 : /*
1720 : * Try chunked mode as well; make sure that it correctly delivers a
1721 : * partial final chunk.
1722 : */
1723 2 : if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1724 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1725 0 : pg_fatal("failed to send query: %s",
1726 : PQerrorMessage(conn));
1727 2 : if (PQsendFlushRequest(conn) != 1)
1728 0 : pg_fatal("failed to send flush request");
1729 2 : if (PQsetChunkedRowsMode(conn, 3) != 1)
1730 0 : pg_fatal("PQsetChunkedRowsMode() failed");
1731 2 : res = PQgetResult(conn);
1732 2 : if (res == NULL)
1733 0 : pg_fatal("unexpected NULL");
1734 2 : if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
1735 0 : pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
1736 : PQresStatus(PQresultStatus(res)),
1737 : PQerrorMessage(conn));
1738 2 : if (PQntuples(res) != 3)
1739 0 : pg_fatal("Expected 3 rows, got %d", PQntuples(res));
1740 2 : res = PQgetResult(conn);
1741 2 : if (res == NULL)
1742 0 : pg_fatal("unexpected NULL");
1743 2 : if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
1744 0 : pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
1745 : PQresStatus(PQresultStatus(res)));
1746 2 : if (PQntuples(res) != 2)
1747 0 : pg_fatal("Expected 2 rows, got %d", PQntuples(res));
1748 2 : res = PQgetResult(conn);
1749 2 : if (res == NULL)
1750 0 : pg_fatal("unexpected NULL");
1751 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1752 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1753 : PQresStatus(PQresultStatus(res)));
1754 2 : if (PQntuples(res) != 0)
1755 0 : pg_fatal("Expected 0 rows, got %d", PQntuples(res));
1756 2 : if (PQgetResult(conn) != NULL)
1757 0 : pg_fatal("expected NULL result");
1758 :
1759 2 : if (PQexitPipelineMode(conn) != 1)
1760 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1761 :
1762 2 : fprintf(stderr, "ok\n");
1763 2 : }
1764 :
1765 : /*
1766 : * Simple test to verify that a pipeline is discarded as a whole when there's
1767 : * an error, ignoring transaction commands.
1768 : */
1769 : static void
1770 2 : test_transaction(PGconn *conn)
1771 : {
1772 : PGresult *res;
1773 : bool expect_null;
1774 2 : int num_syncs = 0;
1775 :
1776 2 : res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1777 : "CREATE TABLE pq_pipeline_tst (id int)");
1778 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1779 0 : pg_fatal("failed to create test table: %s",
1780 : PQerrorMessage(conn));
1781 2 : PQclear(res);
1782 :
1783 2 : if (PQenterPipelineMode(conn) != 1)
1784 0 : pg_fatal("failed to enter pipeline mode: %s",
1785 : PQerrorMessage(conn));
1786 2 : if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1787 0 : pg_fatal("could not send prepare on pipeline: %s",
1788 : PQerrorMessage(conn));
1789 :
1790 2 : if (PQsendQueryParams(conn,
1791 : "BEGIN",
1792 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1793 0 : pg_fatal("failed to send query: %s",
1794 : PQerrorMessage(conn));
1795 2 : if (PQsendQueryParams(conn,
1796 : "SELECT 0/0",
1797 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1798 0 : pg_fatal("failed to send query: %s",
1799 : PQerrorMessage(conn));
1800 :
1801 : /*
1802 : * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1803 : * get out of the pipeline-aborted state first.
1804 : */
1805 2 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1806 0 : pg_fatal("failed to execute prepared: %s",
1807 : PQerrorMessage(conn));
1808 :
1809 : /* This insert fails because we're in pipeline-aborted state */
1810 2 : if (PQsendQueryParams(conn,
1811 : "INSERT INTO pq_pipeline_tst VALUES (1)",
1812 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1813 0 : pg_fatal("failed to send query: %s",
1814 : PQerrorMessage(conn));
1815 2 : if (PQpipelineSync(conn) != 1)
1816 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1817 2 : num_syncs++;
1818 :
1819 : /*
1820 : * This insert fails even though the pipeline got a SYNC, because we're in
1821 : * an aborted transaction
1822 : */
1823 2 : if (PQsendQueryParams(conn,
1824 : "INSERT INTO pq_pipeline_tst VALUES (2)",
1825 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1826 0 : pg_fatal("failed to send query: %s",
1827 : PQerrorMessage(conn));
1828 2 : if (PQpipelineSync(conn) != 1)
1829 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1830 2 : num_syncs++;
1831 :
1832 : /*
1833 : * Send ROLLBACK using prepared stmt. This one works because we just did
1834 : * PQpipelineSync above.
1835 : */
1836 2 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1837 0 : pg_fatal("failed to execute prepared: %s",
1838 : PQerrorMessage(conn));
1839 :
1840 : /*
1841 : * Now that we're out of a transaction and in pipeline-good mode, this
1842 : * insert works
1843 : */
1844 2 : if (PQsendQueryParams(conn,
1845 : "INSERT INTO pq_pipeline_tst VALUES (3)",
1846 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1847 0 : pg_fatal("failed to send query: %s",
1848 : PQerrorMessage(conn));
1849 : /* Send two syncs now -- match up to SYNC messages below */
1850 2 : if (PQpipelineSync(conn) != 1)
1851 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1852 2 : num_syncs++;
1853 2 : if (PQpipelineSync(conn) != 1)
1854 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1855 2 : num_syncs++;
1856 :
1857 2 : expect_null = false;
1858 2 : for (int i = 0;; i++)
1859 38 : {
1860 : ExecStatusType restype;
1861 :
1862 40 : res = PQgetResult(conn);
1863 40 : if (res == NULL)
1864 : {
1865 16 : printf("%d: got NULL result\n", i);
1866 16 : if (!expect_null)
1867 0 : pg_fatal("did not expect NULL here");
1868 16 : expect_null = false;
1869 16 : continue;
1870 : }
1871 24 : restype = PQresultStatus(res);
1872 24 : printf("%d: got status %s", i, PQresStatus(restype));
1873 24 : if (expect_null)
1874 0 : pg_fatal("expected NULL");
1875 24 : if (restype == PGRES_FATAL_ERROR)
1876 4 : printf("; error: %s", PQerrorMessage(conn));
1877 20 : else if (restype == PGRES_PIPELINE_ABORTED)
1878 : {
1879 4 : printf(": command didn't run because pipeline aborted\n");
1880 : }
1881 : else
1882 16 : printf("\n");
1883 24 : PQclear(res);
1884 :
1885 24 : if (restype == PGRES_PIPELINE_SYNC)
1886 8 : num_syncs--;
1887 : else
1888 16 : expect_null = true;
1889 24 : if (num_syncs <= 0)
1890 2 : break;
1891 : }
1892 2 : if (PQgetResult(conn) != NULL)
1893 0 : pg_fatal("returned something extra after all the syncs: %s",
1894 : PQresStatus(PQresultStatus(res)));
1895 :
1896 2 : if (PQexitPipelineMode(conn) != 1)
1897 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1898 :
1899 : /* We expect to find one tuple containing the value "3" */
1900 2 : res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1901 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1902 0 : pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1903 2 : if (PQntuples(res) != 1)
1904 0 : pg_fatal("did not get 1 tuple");
1905 2 : if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1906 0 : pg_fatal("did not get expected tuple");
1907 2 : PQclear(res);
1908 :
1909 2 : fprintf(stderr, "ok\n");
1910 2 : }
1911 :
1912 : /*
1913 : * In this test mode we send a stream of queries, with one in the middle
1914 : * causing an error. Verify that we can still send some more after the
1915 : * error and have libpq work properly.
1916 : */
1917 : static void
1918 2 : test_uniqviol(PGconn *conn)
1919 : {
1920 2 : int sock = PQsocket(conn);
1921 : PGresult *res;
1922 2 : Oid paramTypes[2] = {INT8OID, INT8OID};
1923 : const char *paramValues[2];
1924 : char paramValue0[MAXINT8LEN];
1925 : char paramValue1[MAXINT8LEN];
1926 2 : int ctr = 0;
1927 2 : int numsent = 0;
1928 2 : int results = 0;
1929 2 : bool read_done = false;
1930 2 : bool write_done = false;
1931 2 : bool error_sent = false;
1932 2 : bool got_error = false;
1933 2 : int switched = 0;
1934 2 : int socketful = 0;
1935 : fd_set in_fds;
1936 : fd_set out_fds;
1937 :
1938 2 : fprintf(stderr, "uniqviol ...");
1939 :
1940 2 : PQsetnonblocking(conn, 1);
1941 :
1942 2 : paramValues[0] = paramValue0;
1943 2 : paramValues[1] = paramValue1;
1944 2 : sprintf(paramValue1, "42");
1945 :
1946 2 : res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1947 : "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1948 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1949 0 : pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1950 :
1951 2 : res = PQexec(conn, "begin");
1952 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1953 0 : pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1954 :
1955 2 : res = PQprepare(conn, "insertion",
1956 : "insert into ppln_uniqviol values ($1, $2) returning id",
1957 : 2, paramTypes);
1958 2 : if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1959 0 : pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1960 :
1961 2 : if (PQenterPipelineMode(conn) != 1)
1962 0 : pg_fatal("failed to enter pipeline mode");
1963 :
1964 16 : while (!read_done)
1965 : {
1966 : /*
1967 : * Avoid deadlocks by reading everything the server has sent before
1968 : * sending anything. (Special precaution is needed here to process
1969 : * PQisBusy before testing the socket for read-readiness, because the
1970 : * socket does not turn read-ready after "sending" queries in aborted
1971 : * pipeline mode.)
1972 : */
1973 1212 : while (PQisBusy(conn) == 0)
1974 : {
1975 : bool new_error;
1976 :
1977 1202 : if (results >= numsent)
1978 : {
1979 2 : if (write_done)
1980 0 : read_done = true;
1981 2 : break;
1982 : }
1983 :
1984 1200 : res = PQgetResult(conn);
1985 1200 : new_error = process_result(conn, res, results, numsent);
1986 1200 : if (new_error && got_error)
1987 0 : pg_fatal("got two errors");
1988 1200 : got_error |= new_error;
1989 1200 : if (results++ >= numsent - 1)
1990 : {
1991 4 : if (write_done)
1992 2 : read_done = true;
1993 4 : break;
1994 : }
1995 : }
1996 :
1997 16 : if (read_done)
1998 2 : break;
1999 :
2000 14 : FD_ZERO(&out_fds);
2001 14 : FD_SET(sock, &out_fds);
2002 :
2003 14 : FD_ZERO(&in_fds);
2004 14 : FD_SET(sock, &in_fds);
2005 :
2006 14 : if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
2007 : {
2008 0 : if (errno == EINTR)
2009 0 : continue;
2010 0 : pg_fatal("select() failed: %m");
2011 : }
2012 :
2013 14 : if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
2014 0 : pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
2015 :
2016 : /*
2017 : * If the socket is writable and we haven't finished sending queries,
2018 : * send some.
2019 : */
2020 14 : if (!write_done && FD_ISSET(sock, &out_fds))
2021 : {
2022 : for (;;)
2023 1194 : {
2024 : int flush;
2025 :
2026 : /*
2027 : * provoke uniqueness violation exactly once after having
2028 : * switched to read mode.
2029 : */
2030 1200 : if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
2031 : {
2032 2 : sprintf(paramValue0, "%d", numsent / 2);
2033 2 : fprintf(stderr, "E");
2034 2 : error_sent = true;
2035 : }
2036 : else
2037 : {
2038 1198 : fprintf(stderr, ".");
2039 1198 : sprintf(paramValue0, "%d", ctr++);
2040 : }
2041 :
2042 1200 : if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
2043 0 : pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
2044 1200 : numsent++;
2045 :
2046 : /* Are we done writing? */
2047 1200 : if (socketful != 0 && numsent % socketful == 42 && error_sent)
2048 : {
2049 2 : if (PQsendFlushRequest(conn) != 1)
2050 0 : pg_fatal("failed to send flush request");
2051 2 : write_done = true;
2052 2 : fprintf(stderr, "\ndone writing\n");
2053 2 : PQflush(conn);
2054 2 : break;
2055 : }
2056 :
2057 : /* is the outgoing socket full? */
2058 1198 : flush = PQflush(conn);
2059 1198 : if (flush == -1)
2060 0 : pg_fatal("failed to flush: %s", PQerrorMessage(conn));
2061 1198 : if (flush == 1)
2062 : {
2063 4 : if (socketful == 0)
2064 2 : socketful = numsent;
2065 4 : fprintf(stderr, "\nswitch to reading\n");
2066 4 : switched++;
2067 4 : break;
2068 : }
2069 : }
2070 : }
2071 : }
2072 :
2073 2 : if (!got_error)
2074 0 : pg_fatal("did not get expected error");
2075 :
2076 2 : fprintf(stderr, "ok\n");
2077 2 : }
2078 :
2079 : /*
2080 : * Subroutine for test_uniqviol; given a PGresult, print it out and consume
2081 : * the expected NULL that should follow it.
2082 : *
2083 : * Returns true if we read a fatal error message, otherwise false.
2084 : */
2085 : static bool
2086 1200 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
2087 : {
2088 : PGresult *res2;
2089 1200 : bool got_error = false;
2090 :
2091 1200 : if (res == NULL)
2092 0 : pg_fatal("got unexpected NULL");
2093 :
2094 1200 : switch (PQresultStatus(res))
2095 : {
2096 2 : case PGRES_FATAL_ERROR:
2097 2 : got_error = true;
2098 2 : fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2099 2 : PQclear(res);
2100 :
2101 2 : res2 = PQgetResult(conn);
2102 2 : if (res2 != NULL)
2103 0 : pg_fatal("expected NULL, got %s",
2104 : PQresStatus(PQresultStatus(res2)));
2105 2 : break;
2106 :
2107 836 : case PGRES_TUPLES_OK:
2108 836 : fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2109 836 : PQclear(res);
2110 :
2111 836 : res2 = PQgetResult(conn);
2112 836 : if (res2 != NULL)
2113 0 : pg_fatal("expected NULL, got %s",
2114 : PQresStatus(PQresultStatus(res2)));
2115 836 : break;
2116 :
2117 362 : case PGRES_PIPELINE_ABORTED:
2118 362 : fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2119 362 : res2 = PQgetResult(conn);
2120 362 : if (res2 != NULL)
2121 0 : pg_fatal("expected NULL, got %s",
2122 : PQresStatus(PQresultStatus(res2)));
2123 362 : break;
2124 :
2125 0 : default:
2126 0 : pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2127 : }
2128 :
2129 1200 : return got_error;
2130 : }
2131 :
2132 :
2133 : static void
2134 0 : usage(const char *progname)
2135 : {
2136 0 : fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2137 0 : fprintf(stderr, "Usage:\n");
2138 0 : fprintf(stderr, " %s [OPTION] tests\n", progname);
2139 0 : fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2140 0 : fprintf(stderr, "\nOptions:\n");
2141 0 : fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2142 0 : fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2143 0 : }
2144 :
2145 : static void
2146 2 : print_test_list(void)
2147 : {
2148 2 : printf("cancel\n");
2149 2 : printf("disallowed_in_pipeline\n");
2150 2 : printf("multi_pipelines\n");
2151 2 : printf("nosync\n");
2152 2 : printf("pipeline_abort\n");
2153 2 : printf("pipeline_idle\n");
2154 2 : printf("pipelined_insert\n");
2155 2 : printf("prepared\n");
2156 2 : printf("simple_pipeline\n");
2157 2 : printf("singlerow\n");
2158 2 : printf("transaction\n");
2159 2 : printf("uniqviol\n");
2160 2 : }
2161 :
2162 : int
2163 26 : main(int argc, char **argv)
2164 : {
2165 26 : const char *conninfo = "";
2166 : PGconn *conn;
2167 : FILE *trace;
2168 : char *testname;
2169 26 : int numrows = 10000;
2170 : PGresult *res;
2171 : int c;
2172 :
2173 94 : while ((c = getopt(argc, argv, "r:t:")) != -1)
2174 : {
2175 42 : switch (c)
2176 : {
2177 24 : case 'r': /* numrows */
2178 24 : errno = 0;
2179 24 : numrows = strtol(optarg, NULL, 10);
2180 24 : if (errno != 0 || numrows <= 0)
2181 : {
2182 0 : fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2183 : optarg);
2184 0 : exit(1);
2185 : }
2186 24 : break;
2187 18 : case 't': /* trace file */
2188 18 : tracefile = pg_strdup(optarg);
2189 18 : break;
2190 : }
2191 68 : }
2192 :
2193 26 : if (optind < argc)
2194 : {
2195 26 : testname = pg_strdup(argv[optind]);
2196 26 : optind++;
2197 : }
2198 : else
2199 : {
2200 0 : usage(argv[0]);
2201 0 : exit(1);
2202 : }
2203 :
2204 26 : if (strcmp(testname, "tests") == 0)
2205 : {
2206 2 : print_test_list();
2207 2 : exit(0);
2208 : }
2209 :
2210 24 : if (optind < argc)
2211 : {
2212 24 : conninfo = pg_strdup(argv[optind]);
2213 24 : optind++;
2214 : }
2215 :
2216 : /* Make a connection to the database */
2217 24 : conn = PQconnectdb(conninfo);
2218 24 : if (PQstatus(conn) != CONNECTION_OK)
2219 : {
2220 0 : fprintf(stderr, "Connection to database failed: %s\n",
2221 : PQerrorMessage(conn));
2222 0 : exit_nicely(conn);
2223 : }
2224 :
2225 24 : res = PQexec(conn, "SET lc_messages TO \"C\"");
2226 24 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2227 0 : pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
2228 24 : res = PQexec(conn, "SET debug_parallel_query = off");
2229 24 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2230 0 : pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
2231 :
2232 : /* Set the trace file, if requested */
2233 24 : if (tracefile != NULL)
2234 : {
2235 18 : if (strcmp(tracefile, "-") == 0)
2236 0 : trace = stdout;
2237 : else
2238 18 : trace = fopen(tracefile, "w");
2239 18 : if (trace == NULL)
2240 0 : pg_fatal("could not open file \"%s\": %m", tracefile);
2241 :
2242 : /* Make it line-buffered */
2243 18 : setvbuf(trace, NULL, PG_IOLBF, 0);
2244 :
2245 18 : PQtrace(conn, trace);
2246 18 : PQsetTraceFlags(conn,
2247 : PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
2248 : }
2249 :
2250 24 : if (strcmp(testname, "cancel") == 0)
2251 2 : test_cancel(conn);
2252 22 : else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2253 2 : test_disallowed_in_pipeline(conn);
2254 20 : else if (strcmp(testname, "multi_pipelines") == 0)
2255 2 : test_multi_pipelines(conn);
2256 18 : else if (strcmp(testname, "nosync") == 0)
2257 2 : test_nosync(conn);
2258 16 : else if (strcmp(testname, "pipeline_abort") == 0)
2259 2 : test_pipeline_abort(conn);
2260 14 : else if (strcmp(testname, "pipeline_idle") == 0)
2261 2 : test_pipeline_idle(conn);
2262 12 : else if (strcmp(testname, "pipelined_insert") == 0)
2263 2 : test_pipelined_insert(conn, numrows);
2264 10 : else if (strcmp(testname, "prepared") == 0)
2265 2 : test_prepared(conn);
2266 8 : else if (strcmp(testname, "simple_pipeline") == 0)
2267 2 : test_simple_pipeline(conn);
2268 6 : else if (strcmp(testname, "singlerow") == 0)
2269 2 : test_singlerowmode(conn);
2270 4 : else if (strcmp(testname, "transaction") == 0)
2271 2 : test_transaction(conn);
2272 2 : else if (strcmp(testname, "uniqviol") == 0)
2273 2 : test_uniqviol(conn);
2274 : else
2275 : {
2276 0 : fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2277 0 : exit(1);
2278 : }
2279 :
2280 : /* close the connection to the database and cleanup */
2281 24 : PQfinish(conn);
2282 24 : return 0;
2283 : }
|