Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * libpq_pipeline.c
4 : * Verify libpq pipeline execution functionality
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/test/modules/libpq_pipeline/libpq_pipeline.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres_fe.h"
17 :
18 : #include <sys/select.h>
19 : #include <sys/time.h>
20 :
21 : #include "catalog/pg_type_d.h"
22 : #include "common/fe_memutils.h"
23 : #include "libpq-fe.h"
24 : #include "pg_getopt.h"
25 : #include "portability/instr_time.h"
26 :
27 :
28 : static void exit_nicely(PGconn *conn);
29 : static void pg_attribute_noreturn() pg_fatal_impl(int line, const char *fmt,...)
30 : pg_attribute_printf(2, 3);
31 : static bool process_result(PGconn *conn, PGresult *res, int results,
32 : int numsent);
33 :
34 : const char *const progname = "libpq_pipeline";
35 :
36 : /* Options and defaults */
37 : char *tracefile = NULL; /* path to PQtrace() file */
38 :
39 :
40 : #ifdef DEBUG_OUTPUT
41 : #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
42 : #else
43 : #define pg_debug(...)
44 : #endif
45 :
46 : static const char *const drop_table_sql =
47 : "DROP TABLE IF EXISTS pq_pipeline_demo";
48 : static const char *const create_table_sql =
49 : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
50 : "int8filler int8);";
51 : static const char *const insert_sql =
52 : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
53 : static const char *const insert_sql2 =
54 : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
55 :
56 : /* max char length of an int32/64, plus sign and null terminator */
57 : #define MAXINTLEN 12
58 : #define MAXINT8LEN 20
59 :
60 : static void
61 0 : exit_nicely(PGconn *conn)
62 : {
63 0 : PQfinish(conn);
64 0 : exit(1);
65 : }
66 :
67 : /*
68 : * The following few functions are wrapped in macros to make the reported line
69 : * number in an error match the line number of the invocation.
70 : */
71 :
72 : /*
73 : * Print an error to stderr and terminate the program.
74 : */
75 : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
76 : static void
77 : pg_attribute_noreturn()
78 0 : pg_fatal_impl(int line, const char *fmt,...)
79 : {
80 : va_list args;
81 :
82 0 : fflush(stdout);
83 :
84 0 : fprintf(stderr, "\n%s:%d: ", progname, line);
85 0 : va_start(args, fmt);
86 0 : vfprintf(stderr, fmt, args);
87 0 : va_end(args);
88 : Assert(fmt[strlen(fmt) - 1] != '\n');
89 0 : fprintf(stderr, "\n");
90 0 : exit(1);
91 : }
92 :
93 : /*
94 : * Check that the query on the given connection got canceled.
95 : */
96 : #define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
97 : static void
98 12 : confirm_query_canceled_impl(int line, PGconn *conn)
99 : {
100 12 : PGresult *res = NULL;
101 :
102 12 : res = PQgetResult(conn);
103 12 : if (res == NULL)
104 0 : pg_fatal_impl(line, "PQgetResult returned null: %s",
105 : PQerrorMessage(conn));
106 12 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
107 0 : pg_fatal_impl(line, "query did not fail when it was expected");
108 12 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
109 0 : pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
110 : PQerrorMessage(conn));
111 12 : PQclear(res);
112 :
113 12 : while (PQisBusy(conn))
114 0 : PQconsumeInput(conn);
115 12 : }
116 :
117 : /*
118 : * Using monitorConn, query pg_stat_activity to see that the connection with
119 : * the given PID is either in the given state, or waiting on the given event
120 : * (only one of them can be given).
121 : */
122 : static void
123 24 : wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
124 : char *state, char *event)
125 : {
126 24 : const Oid paramTypes[] = {INT4OID, TEXTOID};
127 : const char *paramValues[2];
128 24 : char *pidstr = psprintf("%d", procpid);
129 :
130 : Assert((state == NULL) ^ (event == NULL));
131 :
132 24 : paramValues[0] = pidstr;
133 24 : paramValues[1] = state ? state : event;
134 :
135 : while (true)
136 0 : {
137 : PGresult *res;
138 : char *value;
139 :
140 24 : if (state != NULL)
141 12 : res = PQexecParams(monitorConn,
142 : "SELECT count(*) FROM pg_stat_activity WHERE "
143 : "pid = $1 AND state = $2",
144 : 2, paramTypes, paramValues, NULL, NULL, 0);
145 : else
146 12 : res = PQexecParams(monitorConn,
147 : "SELECT count(*) FROM pg_stat_activity WHERE "
148 : "pid = $1 AND wait_event = $2",
149 : 2, paramTypes, paramValues, NULL, NULL, 0);
150 :
151 24 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
152 0 : pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
153 24 : if (PQntuples(res) != 1)
154 0 : pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
155 24 : if (PQnfields(res) != 1)
156 0 : pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
157 24 : value = PQgetvalue(res, 0, 0);
158 24 : if (strcmp(value, "0") != 0)
159 : {
160 24 : PQclear(res);
161 24 : break;
162 : }
163 0 : PQclear(res);
164 :
165 : /* wait 10ms before polling again */
166 0 : pg_usleep(10000);
167 : }
168 :
169 24 : pfree(pidstr);
170 24 : }
171 :
172 : #define send_cancellable_query(conn, monitorConn) \
173 : send_cancellable_query_impl(__LINE__, conn, monitorConn)
174 : static void
175 12 : send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
176 : {
177 : const char *env_wait;
178 12 : const Oid paramTypes[1] = {INT4OID};
179 :
180 : /*
181 : * Wait for the connection to be idle, so that our check for an active
182 : * connection below is reliable, instead of possibly seeing an outdated
183 : * state.
184 : */
185 12 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
186 :
187 12 : env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
188 12 : if (env_wait == NULL)
189 12 : env_wait = "180";
190 :
191 12 : if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
192 : &env_wait, NULL, NULL, 0) != 1)
193 0 : pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
194 :
195 : /*
196 : * Wait for the sleep to be active, because if the query is not running
197 : * yet, the cancel request that we send won't have any effect.
198 : */
199 12 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
200 12 : }
201 :
202 : /*
203 : * Create a new connection with the same conninfo as the given one.
204 : */
205 : static PGconn *
206 2 : copy_connection(PGconn *conn)
207 : {
208 : PGconn *copyConn;
209 2 : PQconninfoOption *opts = PQconninfo(conn);
210 : const char **keywords;
211 : const char **vals;
212 2 : int nopts = 1;
213 2 : int i = 0;
214 :
215 84 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
216 82 : nopts++;
217 :
218 2 : keywords = pg_malloc(sizeof(char *) * nopts);
219 2 : vals = pg_malloc(sizeof(char *) * nopts);
220 :
221 84 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
222 : {
223 82 : if (opt->val)
224 : {
225 38 : keywords[i] = opt->keyword;
226 38 : vals[i] = opt->val;
227 38 : i++;
228 : }
229 : }
230 2 : keywords[i] = vals[i] = NULL;
231 :
232 2 : copyConn = PQconnectdbParams(keywords, vals, false);
233 :
234 2 : if (PQstatus(copyConn) != CONNECTION_OK)
235 0 : pg_fatal("Connection to database failed: %s",
236 : PQerrorMessage(copyConn));
237 :
238 2 : return copyConn;
239 : }
240 :
241 : /*
242 : * Test query cancellation routines
243 : */
244 : static void
245 2 : test_cancel(PGconn *conn)
246 : {
247 : PGcancel *cancel;
248 : PGcancelConn *cancelConn;
249 : PGconn *monitorConn;
250 : char errorbuf[256];
251 :
252 2 : fprintf(stderr, "test cancellations... ");
253 :
254 2 : if (PQsetnonblocking(conn, 1) != 0)
255 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
256 :
257 : /*
258 : * Make a separate connection to the database to monitor the query on the
259 : * main connection.
260 : */
261 2 : monitorConn = copy_connection(conn);
262 : Assert(PQstatus(monitorConn) == CONNECTION_OK);
263 :
264 : /* test PQcancel */
265 2 : send_cancellable_query(conn, monitorConn);
266 2 : cancel = PQgetCancel(conn);
267 2 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
268 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
269 2 : confirm_query_canceled(conn);
270 :
271 : /* PGcancel object can be reused for the next query */
272 2 : send_cancellable_query(conn, monitorConn);
273 2 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
274 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
275 2 : confirm_query_canceled(conn);
276 :
277 2 : PQfreeCancel(cancel);
278 :
279 : /* test PQrequestCancel */
280 2 : send_cancellable_query(conn, monitorConn);
281 2 : if (!PQrequestCancel(conn))
282 0 : pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
283 2 : confirm_query_canceled(conn);
284 :
285 : /* test PQcancelBlocking */
286 2 : send_cancellable_query(conn, monitorConn);
287 2 : cancelConn = PQcancelCreate(conn);
288 2 : if (!PQcancelBlocking(cancelConn))
289 0 : pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
290 2 : confirm_query_canceled(conn);
291 2 : PQcancelFinish(cancelConn);
292 :
293 : /* test PQcancelCreate and then polling with PQcancelPoll */
294 2 : send_cancellable_query(conn, monitorConn);
295 2 : cancelConn = PQcancelCreate(conn);
296 2 : if (!PQcancelStart(cancelConn))
297 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
298 : while (true)
299 2 : {
300 : struct timeval tv;
301 : fd_set input_mask;
302 : fd_set output_mask;
303 4 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
304 4 : int sock = PQcancelSocket(cancelConn);
305 :
306 4 : if (pollres == PGRES_POLLING_OK)
307 2 : break;
308 :
309 2 : FD_ZERO(&input_mask);
310 2 : FD_ZERO(&output_mask);
311 2 : switch (pollres)
312 : {
313 2 : case PGRES_POLLING_READING:
314 : pg_debug("polling for reads\n");
315 2 : FD_SET(sock, &input_mask);
316 2 : break;
317 0 : case PGRES_POLLING_WRITING:
318 : pg_debug("polling for writes\n");
319 0 : FD_SET(sock, &output_mask);
320 0 : break;
321 0 : default:
322 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
323 : }
324 :
325 2 : if (sock < 0)
326 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
327 :
328 2 : tv.tv_sec = 3;
329 2 : tv.tv_usec = 0;
330 :
331 : while (true)
332 : {
333 2 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
334 : {
335 0 : if (errno == EINTR)
336 0 : continue;
337 0 : pg_fatal("select() failed: %m");
338 : }
339 2 : break;
340 : }
341 : }
342 2 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
343 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
344 2 : confirm_query_canceled(conn);
345 :
346 : /*
347 : * test PQcancelReset works on the cancel connection and it can be reused
348 : * afterwards
349 : */
350 2 : PQcancelReset(cancelConn);
351 :
352 2 : send_cancellable_query(conn, monitorConn);
353 2 : if (!PQcancelStart(cancelConn))
354 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
355 : while (true)
356 2 : {
357 : struct timeval tv;
358 : fd_set input_mask;
359 : fd_set output_mask;
360 4 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
361 4 : int sock = PQcancelSocket(cancelConn);
362 :
363 4 : if (pollres == PGRES_POLLING_OK)
364 2 : break;
365 :
366 2 : FD_ZERO(&input_mask);
367 2 : FD_ZERO(&output_mask);
368 2 : switch (pollres)
369 : {
370 2 : case PGRES_POLLING_READING:
371 : pg_debug("polling for reads\n");
372 2 : FD_SET(sock, &input_mask);
373 2 : break;
374 0 : case PGRES_POLLING_WRITING:
375 : pg_debug("polling for writes\n");
376 0 : FD_SET(sock, &output_mask);
377 0 : break;
378 0 : default:
379 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
380 : }
381 :
382 2 : if (sock < 0)
383 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
384 :
385 2 : tv.tv_sec = 3;
386 2 : tv.tv_usec = 0;
387 :
388 : while (true)
389 : {
390 2 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
391 : {
392 0 : if (errno == EINTR)
393 0 : continue;
394 0 : pg_fatal("select() failed: %m");
395 : }
396 2 : break;
397 : }
398 : }
399 2 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
400 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
401 2 : confirm_query_canceled(conn);
402 :
403 2 : PQcancelFinish(cancelConn);
404 :
405 2 : fprintf(stderr, "ok\n");
406 2 : }
407 :
408 : static void
409 2 : test_disallowed_in_pipeline(PGconn *conn)
410 : {
411 2 : PGresult *res = NULL;
412 :
413 2 : fprintf(stderr, "test error cases... ");
414 :
415 2 : if (PQisnonblocking(conn))
416 0 : pg_fatal("Expected blocking connection mode");
417 :
418 2 : if (PQenterPipelineMode(conn) != 1)
419 0 : pg_fatal("Unable to enter pipeline mode");
420 :
421 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
422 0 : pg_fatal("Pipeline mode not activated properly");
423 :
424 : /* PQexec should fail in pipeline mode */
425 2 : res = PQexec(conn, "SELECT 1");
426 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
427 0 : pg_fatal("PQexec should fail in pipeline mode but succeeded");
428 2 : if (strcmp(PQerrorMessage(conn),
429 : "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
430 0 : pg_fatal("did not get expected error message; got: \"%s\"",
431 : PQerrorMessage(conn));
432 :
433 : /* PQsendQuery should fail in pipeline mode */
434 2 : if (PQsendQuery(conn, "SELECT 1") != 0)
435 0 : pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
436 2 : if (strcmp(PQerrorMessage(conn),
437 : "PQsendQuery not allowed in pipeline mode\n") != 0)
438 0 : pg_fatal("did not get expected error message; got: \"%s\"",
439 : PQerrorMessage(conn));
440 :
441 : /* Entering pipeline mode when already in pipeline mode is OK */
442 2 : if (PQenterPipelineMode(conn) != 1)
443 0 : pg_fatal("re-entering pipeline mode should be a no-op but failed");
444 :
445 2 : if (PQisBusy(conn) != 0)
446 0 : pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
447 :
448 : /* ok, back to normal command mode */
449 2 : if (PQexitPipelineMode(conn) != 1)
450 0 : pg_fatal("couldn't exit idle empty pipeline mode");
451 :
452 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
453 0 : pg_fatal("Pipeline mode not terminated properly");
454 :
455 : /* exiting pipeline mode when not in pipeline mode should be a no-op */
456 2 : if (PQexitPipelineMode(conn) != 1)
457 0 : pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
458 :
459 : /* can now PQexec again */
460 2 : res = PQexec(conn, "SELECT 1");
461 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
462 0 : pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
463 : PQerrorMessage(conn));
464 :
465 2 : fprintf(stderr, "ok\n");
466 2 : }
467 :
468 : static void
469 2 : test_multi_pipelines(PGconn *conn)
470 : {
471 2 : PGresult *res = NULL;
472 2 : const char *dummy_params[1] = {"1"};
473 2 : Oid dummy_param_oids[1] = {INT4OID};
474 :
475 2 : fprintf(stderr, "multi pipeline... ");
476 :
477 : /*
478 : * Queue up a couple of small pipelines and process each without returning
479 : * to command mode first.
480 : */
481 2 : if (PQenterPipelineMode(conn) != 1)
482 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
483 :
484 : /* first pipeline */
485 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
486 : dummy_params, NULL, NULL, 0) != 1)
487 0 : pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
488 :
489 2 : if (PQpipelineSync(conn) != 1)
490 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
491 :
492 : /* second pipeline */
493 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
494 : dummy_params, NULL, NULL, 0) != 1)
495 0 : pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
496 :
497 : /* Skip flushing once. */
498 2 : if (PQsendPipelineSync(conn) != 1)
499 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
500 :
501 : /* third pipeline */
502 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
503 : dummy_params, NULL, NULL, 0) != 1)
504 0 : pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
505 :
506 2 : if (PQpipelineSync(conn) != 1)
507 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
508 :
509 : /* OK, start processing the results */
510 :
511 : /* first pipeline */
512 :
513 2 : res = PQgetResult(conn);
514 2 : if (res == NULL)
515 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
516 : PQerrorMessage(conn));
517 :
518 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
519 0 : pg_fatal("Unexpected result code %s from first pipeline item",
520 : PQresStatus(PQresultStatus(res)));
521 2 : PQclear(res);
522 2 : res = NULL;
523 :
524 2 : if (PQgetResult(conn) != NULL)
525 0 : pg_fatal("PQgetResult returned something extra after first result");
526 :
527 2 : if (PQexitPipelineMode(conn) != 0)
528 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
529 :
530 2 : res = PQgetResult(conn);
531 2 : if (res == NULL)
532 0 : pg_fatal("PQgetResult returned null when sync result expected: %s",
533 : PQerrorMessage(conn));
534 :
535 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
536 0 : pg_fatal("Unexpected result code %s instead of sync result, error: %s",
537 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
538 2 : PQclear(res);
539 :
540 : /* second pipeline */
541 :
542 2 : res = PQgetResult(conn);
543 2 : if (res == NULL)
544 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
545 : PQerrorMessage(conn));
546 :
547 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
548 0 : pg_fatal("Unexpected result code %s from second pipeline item",
549 : PQresStatus(PQresultStatus(res)));
550 2 : PQclear(res);
551 2 : res = NULL;
552 :
553 2 : if (PQgetResult(conn) != NULL)
554 0 : pg_fatal("PQgetResult returned something extra after first result");
555 :
556 2 : if (PQexitPipelineMode(conn) != 0)
557 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
558 :
559 2 : res = PQgetResult(conn);
560 2 : if (res == NULL)
561 0 : pg_fatal("PQgetResult returned null when sync result expected: %s",
562 : PQerrorMessage(conn));
563 :
564 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
565 0 : pg_fatal("Unexpected result code %s instead of sync result, error: %s",
566 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
567 2 : PQclear(res);
568 :
569 : /* third pipeline */
570 :
571 2 : res = PQgetResult(conn);
572 2 : if (res == NULL)
573 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
574 : PQerrorMessage(conn));
575 :
576 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
577 0 : pg_fatal("Unexpected result code %s from third pipeline item",
578 : PQresStatus(PQresultStatus(res)));
579 :
580 2 : res = PQgetResult(conn);
581 2 : if (res != NULL)
582 0 : pg_fatal("Expected null result, got %s",
583 : PQresStatus(PQresultStatus(res)));
584 :
585 2 : res = PQgetResult(conn);
586 2 : if (res == NULL)
587 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
588 : PQerrorMessage(conn));
589 :
590 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
591 0 : pg_fatal("Unexpected result code %s from second pipeline sync",
592 : PQresStatus(PQresultStatus(res)));
593 :
594 : /* We're still in pipeline mode ... */
595 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
596 0 : pg_fatal("Fell out of pipeline mode somehow");
597 :
598 : /* until we end it, which we can safely do now */
599 2 : if (PQexitPipelineMode(conn) != 1)
600 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
601 : PQerrorMessage(conn));
602 :
603 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
604 0 : pg_fatal("exiting pipeline mode didn't seem to work");
605 :
606 2 : fprintf(stderr, "ok\n");
607 2 : }
608 :
609 : /*
610 : * Test behavior when a pipeline dispatches a number of commands that are
611 : * not flushed by a sync point.
612 : */
613 : static void
614 2 : test_nosync(PGconn *conn)
615 : {
616 2 : int numqueries = 10;
617 2 : int results = 0;
618 2 : int sock = PQsocket(conn);
619 :
620 2 : fprintf(stderr, "nosync... ");
621 :
622 2 : if (sock < 0)
623 0 : pg_fatal("invalid socket");
624 :
625 2 : if (PQenterPipelineMode(conn) != 1)
626 0 : pg_fatal("could not enter pipeline mode");
627 22 : for (int i = 0; i < numqueries; i++)
628 : {
629 : fd_set input_mask;
630 : struct timeval tv;
631 :
632 20 : if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
633 : 0, NULL, NULL, NULL, NULL, 0) != 1)
634 0 : pg_fatal("error sending select: %s", PQerrorMessage(conn));
635 20 : PQflush(conn);
636 :
637 : /*
638 : * If the server has written anything to us, read (some of) it now.
639 : */
640 20 : FD_ZERO(&input_mask);
641 20 : FD_SET(sock, &input_mask);
642 20 : tv.tv_sec = 0;
643 20 : tv.tv_usec = 0;
644 20 : if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
645 : {
646 0 : fprintf(stderr, "select() failed: %m\n");
647 0 : exit_nicely(conn);
648 : }
649 20 : if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
650 0 : pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
651 : }
652 :
653 : /* tell server to flush its output buffer */
654 2 : if (PQsendFlushRequest(conn) != 1)
655 0 : pg_fatal("failed to send flush request");
656 2 : PQflush(conn);
657 :
658 : /* Now read all results */
659 : for (;;)
660 18 : {
661 : PGresult *res;
662 :
663 20 : res = PQgetResult(conn);
664 :
665 : /* NULL results are only expected after TUPLES_OK */
666 20 : if (res == NULL)
667 0 : pg_fatal("got unexpected NULL result after %d results", results);
668 :
669 : /* We expect exactly one TUPLES_OK result for each query we sent */
670 20 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
671 : {
672 : PGresult *res2;
673 :
674 : /* and one NULL result should follow each */
675 20 : res2 = PQgetResult(conn);
676 20 : if (res2 != NULL)
677 0 : pg_fatal("expected NULL, got %s",
678 : PQresStatus(PQresultStatus(res2)));
679 20 : PQclear(res);
680 20 : results++;
681 :
682 : /* if we're done, we're done */
683 20 : if (results == numqueries)
684 2 : break;
685 :
686 18 : continue;
687 : }
688 :
689 : /* anything else is unexpected */
690 0 : pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
691 : }
692 :
693 2 : fprintf(stderr, "ok\n");
694 2 : }
695 :
696 : /*
697 : * When an operation in a pipeline fails the rest of the pipeline is flushed. We
698 : * still have to get results for each pipeline item, but the item will just be
699 : * a PGRES_PIPELINE_ABORTED code.
700 : *
701 : * This intentionally doesn't use a transaction to wrap the pipeline. You should
702 : * usually use an xact, but in this case we want to observe the effects of each
703 : * statement.
704 : */
705 : static void
706 2 : test_pipeline_abort(PGconn *conn)
707 : {
708 2 : PGresult *res = NULL;
709 2 : const char *dummy_params[1] = {"1"};
710 2 : Oid dummy_param_oids[1] = {INT4OID};
711 : int i;
712 : int gotrows;
713 : bool goterror;
714 :
715 2 : fprintf(stderr, "aborted pipeline... ");
716 :
717 2 : res = PQexec(conn, drop_table_sql);
718 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
719 0 : pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
720 :
721 2 : res = PQexec(conn, create_table_sql);
722 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
723 0 : pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
724 :
725 : /*
726 : * Queue up a couple of small pipelines and process each without returning
727 : * to command mode first. Make sure the second operation in the first
728 : * pipeline ERRORs.
729 : */
730 2 : if (PQenterPipelineMode(conn) != 1)
731 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
732 :
733 2 : dummy_params[0] = "1";
734 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
735 : dummy_params, NULL, NULL, 0) != 1)
736 0 : pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
737 :
738 2 : if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
739 : 1, dummy_param_oids, dummy_params,
740 : NULL, NULL, 0) != 1)
741 0 : pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
742 :
743 2 : dummy_params[0] = "2";
744 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
745 : dummy_params, NULL, NULL, 0) != 1)
746 0 : pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
747 :
748 2 : if (PQpipelineSync(conn) != 1)
749 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
750 :
751 2 : dummy_params[0] = "3";
752 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
753 : dummy_params, NULL, NULL, 0) != 1)
754 0 : pg_fatal("dispatching second-pipeline insert failed: %s",
755 : PQerrorMessage(conn));
756 :
757 2 : if (PQpipelineSync(conn) != 1)
758 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
759 :
760 : /*
761 : * OK, start processing the pipeline results.
762 : *
763 : * We should get a command-ok for the first query, then a fatal error and
764 : * a pipeline aborted message for the second insert, a pipeline-end, then
765 : * a command-ok and a pipeline-ok for the second pipeline operation.
766 : */
767 2 : res = PQgetResult(conn);
768 2 : if (res == NULL)
769 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
770 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
771 0 : pg_fatal("Unexpected result status %s: %s",
772 : PQresStatus(PQresultStatus(res)),
773 : PQresultErrorMessage(res));
774 2 : PQclear(res);
775 :
776 : /* NULL result to signal end-of-results for this command */
777 2 : if ((res = PQgetResult(conn)) != NULL)
778 0 : pg_fatal("Expected null result, got %s",
779 : PQresStatus(PQresultStatus(res)));
780 :
781 : /* Second query caused error, so we expect an error next */
782 2 : res = PQgetResult(conn);
783 2 : if (res == NULL)
784 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
785 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
786 0 : pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
787 : PQresStatus(PQresultStatus(res)));
788 2 : PQclear(res);
789 :
790 : /* NULL result to signal end-of-results for this command */
791 2 : if ((res = PQgetResult(conn)) != NULL)
792 0 : pg_fatal("Expected null result, got %s",
793 : PQresStatus(PQresultStatus(res)));
794 :
795 : /*
796 : * pipeline should now be aborted.
797 : *
798 : * Note that we could still queue more queries at this point if we wanted;
799 : * they'd get added to a new third pipeline since we've already sent a
800 : * second. The aborted flag relates only to the pipeline being received.
801 : */
802 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
803 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
804 :
805 : /* third query in pipeline, the second insert */
806 2 : res = PQgetResult(conn);
807 2 : if (res == NULL)
808 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
809 2 : if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
810 0 : pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
811 : PQresStatus(PQresultStatus(res)));
812 2 : PQclear(res);
813 :
814 : /* NULL result to signal end-of-results for this command */
815 2 : if ((res = PQgetResult(conn)) != NULL)
816 0 : pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
817 :
818 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
819 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
820 :
821 : /* Ensure we're still in pipeline */
822 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
823 0 : pg_fatal("Fell out of pipeline mode somehow");
824 :
825 : /*
826 : * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
827 : *
828 : * (This is so clients know to start processing results normally again and
829 : * can tell the difference between skipped commands and the sync.)
830 : */
831 2 : res = PQgetResult(conn);
832 2 : if (res == NULL)
833 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
834 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
835 0 : pg_fatal("Unexpected result code from first pipeline sync\n"
836 : "Expected PGRES_PIPELINE_SYNC, got %s",
837 : PQresStatus(PQresultStatus(res)));
838 2 : PQclear(res);
839 :
840 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
841 0 : pg_fatal("sync should've cleared the aborted flag but didn't");
842 :
843 : /* We're still in pipeline mode... */
844 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
845 0 : pg_fatal("Fell out of pipeline mode somehow");
846 :
847 : /* the insert from the second pipeline */
848 2 : res = PQgetResult(conn);
849 2 : if (res == NULL)
850 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
851 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
852 0 : pg_fatal("Unexpected result code %s from first item in second pipeline",
853 : PQresStatus(PQresultStatus(res)));
854 2 : PQclear(res);
855 :
856 : /* Read the NULL result at the end of the command */
857 2 : if ((res = PQgetResult(conn)) != NULL)
858 0 : pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
859 :
860 : /* the second pipeline sync */
861 2 : if ((res = PQgetResult(conn)) == NULL)
862 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
863 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
864 0 : pg_fatal("Unexpected result code %s from second pipeline sync",
865 : PQresStatus(PQresultStatus(res)));
866 2 : PQclear(res);
867 :
868 2 : if ((res = PQgetResult(conn)) != NULL)
869 0 : pg_fatal("Expected null result, got %s: %s",
870 : PQresStatus(PQresultStatus(res)),
871 : PQerrorMessage(conn));
872 :
873 : /* Try to send two queries in one command */
874 2 : if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
875 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
876 2 : if (PQpipelineSync(conn) != 1)
877 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
878 2 : goterror = false;
879 4 : while ((res = PQgetResult(conn)) != NULL)
880 : {
881 2 : switch (PQresultStatus(res))
882 : {
883 2 : case PGRES_FATAL_ERROR:
884 2 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
885 0 : pg_fatal("expected error about multiple commands, got %s",
886 : PQerrorMessage(conn));
887 2 : printf("got expected %s", PQerrorMessage(conn));
888 2 : goterror = true;
889 2 : break;
890 0 : default:
891 0 : pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
892 : break;
893 : }
894 : }
895 2 : if (!goterror)
896 0 : pg_fatal("did not get cannot-insert-multiple-commands error");
897 2 : res = PQgetResult(conn);
898 2 : if (res == NULL)
899 0 : pg_fatal("got NULL result");
900 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
901 0 : pg_fatal("Unexpected result code %s from pipeline sync",
902 : PQresStatus(PQresultStatus(res)));
903 2 : fprintf(stderr, "ok\n");
904 :
905 : /* Test single-row mode with an error partways */
906 2 : if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
907 : 0, NULL, NULL, NULL, NULL, 0) != 1)
908 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
909 2 : if (PQpipelineSync(conn) != 1)
910 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
911 2 : PQsetSingleRowMode(conn);
912 2 : goterror = false;
913 2 : gotrows = 0;
914 10 : while ((res = PQgetResult(conn)) != NULL)
915 : {
916 8 : switch (PQresultStatus(res))
917 : {
918 6 : case PGRES_SINGLE_TUPLE:
919 6 : printf("got row: %s\n", PQgetvalue(res, 0, 0));
920 6 : gotrows++;
921 6 : break;
922 2 : case PGRES_FATAL_ERROR:
923 2 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
924 0 : pg_fatal("expected division-by-zero, got: %s (%s)",
925 : PQerrorMessage(conn),
926 : PQresultErrorField(res, PG_DIAG_SQLSTATE));
927 2 : printf("got expected division-by-zero\n");
928 2 : goterror = true;
929 2 : break;
930 0 : default:
931 0 : pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
932 : }
933 8 : PQclear(res);
934 : }
935 2 : if (!goterror)
936 0 : pg_fatal("did not get division-by-zero error");
937 2 : if (gotrows != 3)
938 0 : pg_fatal("did not get three rows");
939 : /* the third pipeline sync */
940 2 : if ((res = PQgetResult(conn)) == NULL)
941 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
942 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
943 0 : pg_fatal("Unexpected result code %s from third pipeline sync",
944 : PQresStatus(PQresultStatus(res)));
945 2 : PQclear(res);
946 :
947 : /* We're still in pipeline mode... */
948 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
949 0 : pg_fatal("Fell out of pipeline mode somehow");
950 :
951 : /* until we end it, which we can safely do now */
952 2 : if (PQexitPipelineMode(conn) != 1)
953 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
954 : PQerrorMessage(conn));
955 :
956 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
957 0 : pg_fatal("exiting pipeline mode didn't seem to work");
958 :
959 : /*-
960 : * Since we fired the pipelines off without a surrounding xact, the results
961 : * should be:
962 : *
963 : * - Implicit xact started by server around 1st pipeline
964 : * - First insert applied
965 : * - Second statement aborted xact
966 : * - Third insert skipped
967 : * - Sync rolled back first implicit xact
968 : * - Implicit xact created by server around 2nd pipeline
969 : * - insert applied from 2nd pipeline
970 : * - Sync commits 2nd xact
971 : *
972 : * So we should only have the value 3 that we inserted.
973 : */
974 2 : res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
975 :
976 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
977 0 : pg_fatal("Expected tuples, got %s: %s",
978 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
979 2 : if (PQntuples(res) != 1)
980 0 : pg_fatal("expected 1 result, got %d", PQntuples(res));
981 4 : for (i = 0; i < PQntuples(res); i++)
982 : {
983 2 : const char *val = PQgetvalue(res, i, 0);
984 :
985 2 : if (strcmp(val, "3") != 0)
986 0 : pg_fatal("expected only insert with value 3, got %s", val);
987 : }
988 :
989 2 : PQclear(res);
990 :
991 2 : fprintf(stderr, "ok\n");
992 2 : }
993 :
994 : /* State machine enum for test_pipelined_insert */
995 : enum PipelineInsertStep
996 : {
997 : BI_BEGIN_TX,
998 : BI_DROP_TABLE,
999 : BI_CREATE_TABLE,
1000 : BI_PREPARE,
1001 : BI_INSERT_ROWS,
1002 : BI_COMMIT_TX,
1003 : BI_SYNC,
1004 : BI_DONE,
1005 : };
1006 :
1007 : static void
1008 2 : test_pipelined_insert(PGconn *conn, int n_rows)
1009 : {
1010 2 : Oid insert_param_oids[2] = {INT4OID, INT8OID};
1011 : const char *insert_params[2];
1012 : char insert_param_0[MAXINTLEN];
1013 : char insert_param_1[MAXINT8LEN];
1014 2 : enum PipelineInsertStep send_step = BI_BEGIN_TX,
1015 2 : recv_step = BI_BEGIN_TX;
1016 : int rows_to_send,
1017 : rows_to_receive;
1018 :
1019 2 : insert_params[0] = insert_param_0;
1020 2 : insert_params[1] = insert_param_1;
1021 :
1022 2 : rows_to_send = rows_to_receive = n_rows;
1023 :
1024 : /*
1025 : * Do a pipelined insert into a table created at the start of the pipeline
1026 : */
1027 2 : if (PQenterPipelineMode(conn) != 1)
1028 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1029 :
1030 8 : while (send_step != BI_PREPARE)
1031 : {
1032 : const char *sql;
1033 :
1034 6 : switch (send_step)
1035 : {
1036 2 : case BI_BEGIN_TX:
1037 2 : sql = "BEGIN TRANSACTION";
1038 2 : send_step = BI_DROP_TABLE;
1039 2 : break;
1040 :
1041 2 : case BI_DROP_TABLE:
1042 2 : sql = drop_table_sql;
1043 2 : send_step = BI_CREATE_TABLE;
1044 2 : break;
1045 :
1046 2 : case BI_CREATE_TABLE:
1047 2 : sql = create_table_sql;
1048 2 : send_step = BI_PREPARE;
1049 2 : break;
1050 :
1051 0 : default:
1052 0 : pg_fatal("invalid state");
1053 : sql = NULL; /* keep compiler quiet */
1054 : }
1055 :
1056 : pg_debug("sending: %s\n", sql);
1057 6 : if (PQsendQueryParams(conn, sql,
1058 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1059 0 : pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
1060 : }
1061 :
1062 : Assert(send_step == BI_PREPARE);
1063 : pg_debug("sending: %s\n", insert_sql2);
1064 2 : if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
1065 0 : pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
1066 2 : send_step = BI_INSERT_ROWS;
1067 :
1068 : /*
1069 : * Now we start inserting. We'll be sending enough data that we could fill
1070 : * our output buffer, so to avoid deadlocking we need to enter nonblocking
1071 : * mode and consume input while we send more output. As results of each
1072 : * query are processed we should pop them to allow processing of the next
1073 : * query. There's no need to finish the pipeline before processing
1074 : * results.
1075 : */
1076 2 : if (PQsetnonblocking(conn, 1) != 0)
1077 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1078 :
1079 33434 : while (recv_step != BI_DONE)
1080 : {
1081 : int sock;
1082 : fd_set input_mask;
1083 : fd_set output_mask;
1084 :
1085 33432 : sock = PQsocket(conn);
1086 :
1087 33432 : if (sock < 0)
1088 0 : break; /* shouldn't happen */
1089 :
1090 33432 : FD_ZERO(&input_mask);
1091 33432 : FD_SET(sock, &input_mask);
1092 33432 : FD_ZERO(&output_mask);
1093 33432 : FD_SET(sock, &output_mask);
1094 :
1095 33432 : if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1096 : {
1097 0 : fprintf(stderr, "select() failed: %m\n");
1098 0 : exit_nicely(conn);
1099 : }
1100 :
1101 : /*
1102 : * Process any results, so we keep the server's output buffer free
1103 : * flowing and it can continue to process input
1104 : */
1105 33432 : if (FD_ISSET(sock, &input_mask))
1106 : {
1107 6 : PQconsumeInput(conn);
1108 :
1109 : /* Read until we'd block if we tried to read */
1110 2828 : while (!PQisBusy(conn) && recv_step < BI_DONE)
1111 : {
1112 : PGresult *res;
1113 2822 : const char *cmdtag = "";
1114 2822 : const char *description = "";
1115 : int status;
1116 :
1117 : /*
1118 : * Read next result. If no more results from this query,
1119 : * advance to the next query
1120 : */
1121 2822 : res = PQgetResult(conn);
1122 2822 : if (res == NULL)
1123 1410 : continue;
1124 :
1125 1412 : status = PGRES_COMMAND_OK;
1126 1412 : switch (recv_step)
1127 : {
1128 2 : case BI_BEGIN_TX:
1129 2 : cmdtag = "BEGIN";
1130 2 : recv_step++;
1131 2 : break;
1132 2 : case BI_DROP_TABLE:
1133 2 : cmdtag = "DROP TABLE";
1134 2 : recv_step++;
1135 2 : break;
1136 2 : case BI_CREATE_TABLE:
1137 2 : cmdtag = "CREATE TABLE";
1138 2 : recv_step++;
1139 2 : break;
1140 2 : case BI_PREPARE:
1141 2 : cmdtag = "";
1142 2 : description = "PREPARE";
1143 2 : recv_step++;
1144 2 : break;
1145 1400 : case BI_INSERT_ROWS:
1146 1400 : cmdtag = "INSERT";
1147 1400 : rows_to_receive--;
1148 1400 : if (rows_to_receive == 0)
1149 2 : recv_step++;
1150 1400 : break;
1151 2 : case BI_COMMIT_TX:
1152 2 : cmdtag = "COMMIT";
1153 2 : recv_step++;
1154 2 : break;
1155 2 : case BI_SYNC:
1156 2 : cmdtag = "";
1157 2 : description = "SYNC";
1158 2 : status = PGRES_PIPELINE_SYNC;
1159 2 : recv_step++;
1160 2 : break;
1161 0 : case BI_DONE:
1162 : /* unreachable */
1163 0 : pg_fatal("unreachable state");
1164 : }
1165 :
1166 1412 : if (PQresultStatus(res) != status)
1167 0 : pg_fatal("%s reported status %s, expected %s\n"
1168 : "Error message: \"%s\"",
1169 : description, PQresStatus(PQresultStatus(res)),
1170 : PQresStatus(status), PQerrorMessage(conn));
1171 :
1172 1412 : if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1173 0 : pg_fatal("%s expected command tag '%s', got '%s'",
1174 : description, cmdtag, PQcmdStatus(res));
1175 :
1176 : pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1177 :
1178 1412 : PQclear(res);
1179 : }
1180 : }
1181 :
1182 : /* Write more rows and/or the end pipeline message, if needed */
1183 33432 : if (FD_ISSET(sock, &output_mask))
1184 : {
1185 33428 : PQflush(conn);
1186 :
1187 33428 : if (send_step == BI_INSERT_ROWS)
1188 : {
1189 1400 : snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1190 : /* use up some buffer space with a wide value */
1191 1400 : snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1192 :
1193 1400 : if (PQsendQueryPrepared(conn, "my_insert",
1194 : 2, insert_params, NULL, NULL, 0) == 1)
1195 : {
1196 : pg_debug("sent row %d\n", rows_to_send);
1197 :
1198 1400 : rows_to_send--;
1199 1400 : if (rows_to_send == 0)
1200 2 : send_step++;
1201 : }
1202 : else
1203 : {
1204 : /*
1205 : * in nonblocking mode, so it's OK for an insert to fail
1206 : * to send
1207 : */
1208 0 : fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1209 : rows_to_send, PQerrorMessage(conn));
1210 : }
1211 : }
1212 32028 : else if (send_step == BI_COMMIT_TX)
1213 : {
1214 2 : if (PQsendQueryParams(conn, "COMMIT",
1215 : 0, NULL, NULL, NULL, NULL, 0) == 1)
1216 : {
1217 : pg_debug("sent COMMIT\n");
1218 2 : send_step++;
1219 : }
1220 : else
1221 : {
1222 0 : fprintf(stderr, "WARNING: failed to send commit: %s\n",
1223 : PQerrorMessage(conn));
1224 : }
1225 : }
1226 32026 : else if (send_step == BI_SYNC)
1227 : {
1228 2 : if (PQpipelineSync(conn) == 1)
1229 : {
1230 2 : fprintf(stdout, "pipeline sync sent\n");
1231 2 : send_step++;
1232 : }
1233 : else
1234 : {
1235 0 : fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1236 : PQerrorMessage(conn));
1237 : }
1238 : }
1239 : }
1240 : }
1241 :
1242 : /* We've got the sync message and the pipeline should be done */
1243 2 : if (PQexitPipelineMode(conn) != 1)
1244 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1245 : PQerrorMessage(conn));
1246 :
1247 2 : if (PQsetnonblocking(conn, 0) != 0)
1248 0 : pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1249 :
1250 2 : fprintf(stderr, "ok\n");
1251 2 : }
1252 :
1253 : static void
1254 2 : test_prepared(PGconn *conn)
1255 : {
1256 2 : PGresult *res = NULL;
1257 2 : Oid param_oids[1] = {INT4OID};
1258 : Oid expected_oids[4];
1259 : Oid typ;
1260 :
1261 2 : fprintf(stderr, "prepared... ");
1262 :
1263 2 : if (PQenterPipelineMode(conn) != 1)
1264 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1265 2 : if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1266 : "interval '1 sec'",
1267 : 1, param_oids) != 1)
1268 0 : pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1269 2 : expected_oids[0] = INT4OID;
1270 2 : expected_oids[1] = TEXTOID;
1271 2 : expected_oids[2] = NUMERICOID;
1272 2 : expected_oids[3] = INTERVALOID;
1273 2 : if (PQsendDescribePrepared(conn, "select_one") != 1)
1274 0 : pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1275 2 : if (PQpipelineSync(conn) != 1)
1276 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1277 :
1278 2 : res = PQgetResult(conn);
1279 2 : if (res == NULL)
1280 0 : pg_fatal("PQgetResult returned null");
1281 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1282 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1283 2 : PQclear(res);
1284 2 : res = PQgetResult(conn);
1285 2 : if (res != NULL)
1286 0 : pg_fatal("expected NULL result");
1287 :
1288 2 : res = PQgetResult(conn);
1289 2 : if (res == NULL)
1290 0 : pg_fatal("PQgetResult returned NULL");
1291 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1292 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1293 2 : if (PQnfields(res) != lengthof(expected_oids))
1294 0 : pg_fatal("expected %zu columns, got %d",
1295 : lengthof(expected_oids), PQnfields(res));
1296 10 : for (int i = 0; i < PQnfields(res); i++)
1297 : {
1298 8 : typ = PQftype(res, i);
1299 8 : if (typ != expected_oids[i])
1300 0 : pg_fatal("field %d: expected type %u, got %u",
1301 : i, expected_oids[i], typ);
1302 : }
1303 2 : PQclear(res);
1304 2 : res = PQgetResult(conn);
1305 2 : if (res != NULL)
1306 0 : pg_fatal("expected NULL result");
1307 :
1308 2 : res = PQgetResult(conn);
1309 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1310 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1311 :
1312 2 : fprintf(stderr, "closing statement..");
1313 2 : if (PQsendClosePrepared(conn, "select_one") != 1)
1314 0 : pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1315 2 : if (PQpipelineSync(conn) != 1)
1316 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1317 :
1318 2 : res = PQgetResult(conn);
1319 2 : if (res == NULL)
1320 0 : pg_fatal("expected non-NULL result");
1321 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1322 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1323 2 : PQclear(res);
1324 2 : res = PQgetResult(conn);
1325 2 : if (res != NULL)
1326 0 : pg_fatal("expected NULL result");
1327 2 : res = PQgetResult(conn);
1328 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1329 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1330 :
1331 2 : if (PQexitPipelineMode(conn) != 1)
1332 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1333 :
1334 : /* Now that it's closed we should get an error when describing */
1335 2 : res = PQdescribePrepared(conn, "select_one");
1336 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1337 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1338 :
1339 : /*
1340 : * Also test the blocking close, this should not fail since closing a
1341 : * non-existent prepared statement is a no-op
1342 : */
1343 2 : res = PQclosePrepared(conn, "select_one");
1344 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1345 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1346 :
1347 2 : fprintf(stderr, "creating portal... ");
1348 2 : PQexec(conn, "BEGIN");
1349 2 : PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1350 2 : PQenterPipelineMode(conn);
1351 2 : if (PQsendDescribePortal(conn, "cursor_one") != 1)
1352 0 : pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1353 2 : if (PQpipelineSync(conn) != 1)
1354 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1355 2 : res = PQgetResult(conn);
1356 2 : if (res == NULL)
1357 0 : pg_fatal("PQgetResult returned null");
1358 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1359 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1360 :
1361 2 : typ = PQftype(res, 0);
1362 2 : if (typ != INT4OID)
1363 0 : pg_fatal("portal: expected type %u, got %u",
1364 : INT4OID, typ);
1365 2 : PQclear(res);
1366 2 : res = PQgetResult(conn);
1367 2 : if (res != NULL)
1368 0 : pg_fatal("expected NULL result");
1369 2 : res = PQgetResult(conn);
1370 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1371 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1372 :
1373 2 : fprintf(stderr, "closing portal... ");
1374 2 : if (PQsendClosePortal(conn, "cursor_one") != 1)
1375 0 : pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1376 2 : if (PQpipelineSync(conn) != 1)
1377 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1378 :
1379 2 : res = PQgetResult(conn);
1380 2 : if (res == NULL)
1381 0 : pg_fatal("expected non-NULL result");
1382 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1383 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1384 2 : PQclear(res);
1385 2 : res = PQgetResult(conn);
1386 2 : if (res != NULL)
1387 0 : pg_fatal("expected NULL result");
1388 2 : res = PQgetResult(conn);
1389 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1390 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1391 :
1392 2 : if (PQexitPipelineMode(conn) != 1)
1393 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1394 :
1395 : /* Now that it's closed we should get an error when describing */
1396 2 : res = PQdescribePortal(conn, "cursor_one");
1397 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1398 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1399 :
1400 : /*
1401 : * Also test the blocking close, this should not fail since closing a
1402 : * non-existent portal is a no-op
1403 : */
1404 2 : res = PQclosePortal(conn, "cursor_one");
1405 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1406 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1407 :
1408 2 : fprintf(stderr, "ok\n");
1409 2 : }
1410 :
1411 : /* Notice processor: print notices, and count how many we got */
1412 : static void
1413 2 : notice_processor(void *arg, const char *message)
1414 : {
1415 2 : int *n_notices = (int *) arg;
1416 :
1417 2 : (*n_notices)++;
1418 2 : fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1419 2 : }
1420 :
1421 : /* Verify behavior in "idle" state */
1422 : static void
1423 2 : test_pipeline_idle(PGconn *conn)
1424 : {
1425 : PGresult *res;
1426 2 : int n_notices = 0;
1427 :
1428 2 : fprintf(stderr, "\npipeline idle...\n");
1429 :
1430 2 : PQsetNoticeProcessor(conn, notice_processor, &n_notices);
1431 :
1432 : /* Try to exit pipeline mode in pipeline-idle state */
1433 2 : if (PQenterPipelineMode(conn) != 1)
1434 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1435 2 : if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1436 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1437 2 : PQsendFlushRequest(conn);
1438 2 : res = PQgetResult(conn);
1439 2 : if (res == NULL)
1440 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1441 : PQerrorMessage(conn));
1442 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1443 0 : pg_fatal("unexpected result code %s from first pipeline item",
1444 : PQresStatus(PQresultStatus(res)));
1445 2 : PQclear(res);
1446 2 : res = PQgetResult(conn);
1447 2 : if (res != NULL)
1448 0 : pg_fatal("did not receive terminating NULL");
1449 2 : if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1450 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1451 2 : if (PQexitPipelineMode(conn) == 1)
1452 0 : pg_fatal("exiting pipeline succeeded when it shouldn't");
1453 2 : if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1454 : strlen("cannot exit pipeline mode")) != 0)
1455 0 : pg_fatal("did not get expected error; got: %s",
1456 : PQerrorMessage(conn));
1457 2 : PQsendFlushRequest(conn);
1458 2 : res = PQgetResult(conn);
1459 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1460 0 : pg_fatal("unexpected result code %s from second pipeline item",
1461 : PQresStatus(PQresultStatus(res)));
1462 2 : PQclear(res);
1463 2 : res = PQgetResult(conn);
1464 2 : if (res != NULL)
1465 0 : pg_fatal("did not receive terminating NULL");
1466 2 : if (PQexitPipelineMode(conn) != 1)
1467 0 : pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1468 :
1469 2 : if (n_notices > 0)
1470 0 : pg_fatal("got %d notice(s)", n_notices);
1471 2 : fprintf(stderr, "ok - 1\n");
1472 :
1473 : /* Have a WARNING in the middle of a resultset */
1474 2 : if (PQenterPipelineMode(conn) != 1)
1475 0 : pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1476 2 : if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1477 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1478 2 : PQsendFlushRequest(conn);
1479 2 : res = PQgetResult(conn);
1480 2 : if (res == NULL)
1481 0 : pg_fatal("unexpected NULL result received");
1482 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1483 0 : pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
1484 2 : if (PQexitPipelineMode(conn) != 1)
1485 0 : pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1486 2 : fprintf(stderr, "ok - 2\n");
1487 2 : }
1488 :
1489 : static void
1490 2 : test_simple_pipeline(PGconn *conn)
1491 : {
1492 2 : PGresult *res = NULL;
1493 2 : const char *dummy_params[1] = {"1"};
1494 2 : Oid dummy_param_oids[1] = {INT4OID};
1495 :
1496 2 : fprintf(stderr, "simple pipeline... ");
1497 :
1498 : /*
1499 : * Enter pipeline mode and dispatch a set of operations, which we'll then
1500 : * process the results of as they come in.
1501 : *
1502 : * For a simple case we should be able to do this without interim
1503 : * processing of results since our output buffer will give us enough slush
1504 : * to work with and we won't block on sending. So blocking mode is fine.
1505 : */
1506 2 : if (PQisnonblocking(conn))
1507 0 : pg_fatal("Expected blocking connection mode");
1508 :
1509 2 : if (PQenterPipelineMode(conn) != 1)
1510 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1511 :
1512 2 : if (PQsendQueryParams(conn, "SELECT $1",
1513 : 1, dummy_param_oids, dummy_params,
1514 : NULL, NULL, 0) != 1)
1515 0 : pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1516 :
1517 2 : if (PQexitPipelineMode(conn) != 0)
1518 0 : pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1519 :
1520 2 : if (PQpipelineSync(conn) != 1)
1521 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1522 :
1523 2 : res = PQgetResult(conn);
1524 2 : if (res == NULL)
1525 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1526 : PQerrorMessage(conn));
1527 :
1528 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1529 0 : pg_fatal("Unexpected result code %s from first pipeline item",
1530 : PQresStatus(PQresultStatus(res)));
1531 :
1532 2 : PQclear(res);
1533 2 : res = NULL;
1534 :
1535 2 : if (PQgetResult(conn) != NULL)
1536 0 : pg_fatal("PQgetResult returned something extra after first query result.");
1537 :
1538 : /*
1539 : * Even though we've processed the result there's still a sync to come and
1540 : * we can't exit pipeline mode yet
1541 : */
1542 2 : if (PQexitPipelineMode(conn) != 0)
1543 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1544 :
1545 2 : res = PQgetResult(conn);
1546 2 : if (res == NULL)
1547 0 : pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1548 : PQerrorMessage(conn));
1549 :
1550 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1551 0 : pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1552 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
1553 :
1554 2 : PQclear(res);
1555 2 : res = NULL;
1556 :
1557 2 : if (PQgetResult(conn) != NULL)
1558 0 : pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1559 : PQresStatus(PQresultStatus(res)));
1560 :
1561 : /* We're still in pipeline mode... */
1562 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1563 0 : pg_fatal("Fell out of pipeline mode somehow");
1564 :
1565 : /* ... until we end it, which we can safely do now */
1566 2 : if (PQexitPipelineMode(conn) != 1)
1567 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1568 : PQerrorMessage(conn));
1569 :
1570 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1571 0 : pg_fatal("Exiting pipeline mode didn't seem to work");
1572 :
1573 2 : fprintf(stderr, "ok\n");
1574 2 : }
1575 :
1576 : static void
1577 2 : test_singlerowmode(PGconn *conn)
1578 : {
1579 : PGresult *res;
1580 : int i;
1581 2 : bool pipeline_ended = false;
1582 :
1583 2 : if (PQenterPipelineMode(conn) != 1)
1584 0 : pg_fatal("failed to enter pipeline mode: %s",
1585 : PQerrorMessage(conn));
1586 :
1587 : /* One series of three commands, using single-row mode for the first two. */
1588 8 : for (i = 0; i < 3; i++)
1589 : {
1590 : char *param[1];
1591 :
1592 6 : param[0] = psprintf("%d", 44 + i);
1593 :
1594 6 : if (PQsendQueryParams(conn,
1595 : "SELECT generate_series(42, $1)",
1596 : 1,
1597 : NULL,
1598 : (const char **) param,
1599 : NULL,
1600 : NULL,
1601 : 0) != 1)
1602 0 : pg_fatal("failed to send query: %s",
1603 : PQerrorMessage(conn));
1604 6 : pfree(param[0]);
1605 : }
1606 2 : if (PQpipelineSync(conn) != 1)
1607 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1608 :
1609 10 : for (i = 0; !pipeline_ended; i++)
1610 : {
1611 8 : bool first = true;
1612 : bool saw_ending_tuplesok;
1613 8 : bool isSingleTuple = false;
1614 :
1615 : /* Set single row mode for only first 2 SELECT queries */
1616 8 : if (i < 2)
1617 : {
1618 4 : if (PQsetSingleRowMode(conn) != 1)
1619 0 : pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1620 : }
1621 :
1622 : /* Consume rows for this query */
1623 8 : saw_ending_tuplesok = false;
1624 28 : while ((res = PQgetResult(conn)) != NULL)
1625 : {
1626 22 : ExecStatusType est = PQresultStatus(res);
1627 :
1628 22 : if (est == PGRES_PIPELINE_SYNC)
1629 : {
1630 2 : fprintf(stderr, "end of pipeline reached\n");
1631 2 : pipeline_ended = true;
1632 2 : PQclear(res);
1633 2 : if (i != 3)
1634 0 : pg_fatal("Expected three results, got %d", i);
1635 2 : break;
1636 : }
1637 :
1638 : /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1639 20 : if (first)
1640 : {
1641 6 : if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1642 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1643 : i, PQresStatus(est));
1644 6 : if (i >= 2 && est != PGRES_TUPLES_OK)
1645 0 : pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1646 : i, PQresStatus(est));
1647 6 : first = false;
1648 : }
1649 :
1650 20 : fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1651 20 : switch (est)
1652 : {
1653 6 : case PGRES_TUPLES_OK:
1654 6 : fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1655 6 : saw_ending_tuplesok = true;
1656 6 : if (isSingleTuple)
1657 : {
1658 4 : if (PQntuples(res) == 0)
1659 4 : fprintf(stderr, "all tuples received in query %d\n", i);
1660 : else
1661 0 : pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1662 : }
1663 6 : break;
1664 :
1665 14 : case PGRES_SINGLE_TUPLE:
1666 14 : isSingleTuple = true;
1667 14 : fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1668 14 : break;
1669 :
1670 0 : default:
1671 0 : pg_fatal("unexpected");
1672 : }
1673 20 : PQclear(res);
1674 : }
1675 8 : if (!pipeline_ended && !saw_ending_tuplesok)
1676 0 : pg_fatal("didn't get expected terminating TUPLES_OK");
1677 : }
1678 :
1679 : /*
1680 : * Now issue one command, get its results in with single-row mode, then
1681 : * issue another command, and get its results in normal mode; make sure
1682 : * the single-row mode flag is reset as expected.
1683 : */
1684 2 : if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1685 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1686 0 : pg_fatal("failed to send query: %s",
1687 : PQerrorMessage(conn));
1688 2 : if (PQsendFlushRequest(conn) != 1)
1689 0 : pg_fatal("failed to send flush request");
1690 2 : if (PQsetSingleRowMode(conn) != 1)
1691 0 : pg_fatal("PQsetSingleRowMode() failed");
1692 2 : res = PQgetResult(conn);
1693 2 : if (res == NULL)
1694 0 : pg_fatal("unexpected NULL");
1695 2 : if (PQresultStatus(res) != PGRES_SINGLE_TUPLE)
1696 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1697 : PQresStatus(PQresultStatus(res)));
1698 2 : res = PQgetResult(conn);
1699 2 : if (res == NULL)
1700 0 : pg_fatal("unexpected NULL");
1701 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1702 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1703 : PQresStatus(PQresultStatus(res)));
1704 2 : if (PQgetResult(conn) != NULL)
1705 0 : pg_fatal("expected NULL result");
1706 :
1707 2 : if (PQsendQueryParams(conn, "SELECT 1",
1708 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1709 0 : pg_fatal("failed to send query: %s",
1710 : PQerrorMessage(conn));
1711 2 : if (PQsendFlushRequest(conn) != 1)
1712 0 : pg_fatal("failed to send flush request");
1713 2 : res = PQgetResult(conn);
1714 2 : if (res == NULL)
1715 0 : pg_fatal("unexpected NULL");
1716 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1717 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1718 : PQresStatus(PQresultStatus(res)));
1719 2 : if (PQgetResult(conn) != NULL)
1720 0 : pg_fatal("expected NULL result");
1721 :
1722 : /*
1723 : * Try chunked mode as well; make sure that it correctly delivers a
1724 : * partial final chunk.
1725 : */
1726 2 : if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1727 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1728 0 : pg_fatal("failed to send query: %s",
1729 : PQerrorMessage(conn));
1730 2 : if (PQsendFlushRequest(conn) != 1)
1731 0 : pg_fatal("failed to send flush request");
1732 2 : if (PQsetChunkedRowsMode(conn, 3) != 1)
1733 0 : pg_fatal("PQsetChunkedRowsMode() failed");
1734 2 : res = PQgetResult(conn);
1735 2 : if (res == NULL)
1736 0 : pg_fatal("unexpected NULL");
1737 2 : if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
1738 0 : pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
1739 : PQresStatus(PQresultStatus(res)),
1740 : PQerrorMessage(conn));
1741 2 : if (PQntuples(res) != 3)
1742 0 : pg_fatal("Expected 3 rows, got %d", PQntuples(res));
1743 2 : res = PQgetResult(conn);
1744 2 : if (res == NULL)
1745 0 : pg_fatal("unexpected NULL");
1746 2 : if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
1747 0 : pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
1748 : PQresStatus(PQresultStatus(res)));
1749 2 : if (PQntuples(res) != 2)
1750 0 : pg_fatal("Expected 2 rows, got %d", PQntuples(res));
1751 2 : res = PQgetResult(conn);
1752 2 : if (res == NULL)
1753 0 : pg_fatal("unexpected NULL");
1754 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1755 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1756 : PQresStatus(PQresultStatus(res)));
1757 2 : if (PQntuples(res) != 0)
1758 0 : pg_fatal("Expected 0 rows, got %d", PQntuples(res));
1759 2 : if (PQgetResult(conn) != NULL)
1760 0 : pg_fatal("expected NULL result");
1761 :
1762 2 : if (PQexitPipelineMode(conn) != 1)
1763 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1764 :
1765 2 : fprintf(stderr, "ok\n");
1766 2 : }
1767 :
1768 : /*
1769 : * Simple test to verify that a pipeline is discarded as a whole when there's
1770 : * an error, ignoring transaction commands.
1771 : */
1772 : static void
1773 2 : test_transaction(PGconn *conn)
1774 : {
1775 : PGresult *res;
1776 : bool expect_null;
1777 2 : int num_syncs = 0;
1778 :
1779 2 : res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1780 : "CREATE TABLE pq_pipeline_tst (id int)");
1781 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1782 0 : pg_fatal("failed to create test table: %s",
1783 : PQerrorMessage(conn));
1784 2 : PQclear(res);
1785 :
1786 2 : if (PQenterPipelineMode(conn) != 1)
1787 0 : pg_fatal("failed to enter pipeline mode: %s",
1788 : PQerrorMessage(conn));
1789 2 : if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1790 0 : pg_fatal("could not send prepare on pipeline: %s",
1791 : PQerrorMessage(conn));
1792 :
1793 2 : if (PQsendQueryParams(conn,
1794 : "BEGIN",
1795 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1796 0 : pg_fatal("failed to send query: %s",
1797 : PQerrorMessage(conn));
1798 2 : if (PQsendQueryParams(conn,
1799 : "SELECT 0/0",
1800 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1801 0 : pg_fatal("failed to send query: %s",
1802 : PQerrorMessage(conn));
1803 :
1804 : /*
1805 : * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1806 : * get out of the pipeline-aborted state first.
1807 : */
1808 2 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1809 0 : pg_fatal("failed to execute prepared: %s",
1810 : PQerrorMessage(conn));
1811 :
1812 : /* This insert fails because we're in pipeline-aborted state */
1813 2 : if (PQsendQueryParams(conn,
1814 : "INSERT INTO pq_pipeline_tst VALUES (1)",
1815 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1816 0 : pg_fatal("failed to send query: %s",
1817 : PQerrorMessage(conn));
1818 2 : if (PQpipelineSync(conn) != 1)
1819 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1820 2 : num_syncs++;
1821 :
1822 : /*
1823 : * This insert fails even though the pipeline got a SYNC, because we're in
1824 : * an aborted transaction
1825 : */
1826 2 : if (PQsendQueryParams(conn,
1827 : "INSERT INTO pq_pipeline_tst VALUES (2)",
1828 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1829 0 : pg_fatal("failed to send query: %s",
1830 : PQerrorMessage(conn));
1831 2 : if (PQpipelineSync(conn) != 1)
1832 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1833 2 : num_syncs++;
1834 :
1835 : /*
1836 : * Send ROLLBACK using prepared stmt. This one works because we just did
1837 : * PQpipelineSync above.
1838 : */
1839 2 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1840 0 : pg_fatal("failed to execute prepared: %s",
1841 : PQerrorMessage(conn));
1842 :
1843 : /*
1844 : * Now that we're out of a transaction and in pipeline-good mode, this
1845 : * insert works
1846 : */
1847 2 : if (PQsendQueryParams(conn,
1848 : "INSERT INTO pq_pipeline_tst VALUES (3)",
1849 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1850 0 : pg_fatal("failed to send query: %s",
1851 : PQerrorMessage(conn));
1852 : /* Send two syncs now -- match up to SYNC messages below */
1853 2 : if (PQpipelineSync(conn) != 1)
1854 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1855 2 : num_syncs++;
1856 2 : if (PQpipelineSync(conn) != 1)
1857 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1858 2 : num_syncs++;
1859 :
1860 2 : expect_null = false;
1861 2 : for (int i = 0;; i++)
1862 38 : {
1863 : ExecStatusType restype;
1864 :
1865 40 : res = PQgetResult(conn);
1866 40 : if (res == NULL)
1867 : {
1868 16 : printf("%d: got NULL result\n", i);
1869 16 : if (!expect_null)
1870 0 : pg_fatal("did not expect NULL here");
1871 16 : expect_null = false;
1872 16 : continue;
1873 : }
1874 24 : restype = PQresultStatus(res);
1875 24 : printf("%d: got status %s", i, PQresStatus(restype));
1876 24 : if (expect_null)
1877 0 : pg_fatal("expected NULL");
1878 24 : if (restype == PGRES_FATAL_ERROR)
1879 4 : printf("; error: %s", PQerrorMessage(conn));
1880 20 : else if (restype == PGRES_PIPELINE_ABORTED)
1881 : {
1882 4 : printf(": command didn't run because pipeline aborted\n");
1883 : }
1884 : else
1885 16 : printf("\n");
1886 24 : PQclear(res);
1887 :
1888 24 : if (restype == PGRES_PIPELINE_SYNC)
1889 8 : num_syncs--;
1890 : else
1891 16 : expect_null = true;
1892 24 : if (num_syncs <= 0)
1893 2 : break;
1894 : }
1895 2 : if (PQgetResult(conn) != NULL)
1896 0 : pg_fatal("returned something extra after all the syncs: %s",
1897 : PQresStatus(PQresultStatus(res)));
1898 :
1899 2 : if (PQexitPipelineMode(conn) != 1)
1900 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1901 :
1902 : /* We expect to find one tuple containing the value "3" */
1903 2 : res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1904 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1905 0 : pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1906 2 : if (PQntuples(res) != 1)
1907 0 : pg_fatal("did not get 1 tuple");
1908 2 : if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1909 0 : pg_fatal("did not get expected tuple");
1910 2 : PQclear(res);
1911 :
1912 2 : fprintf(stderr, "ok\n");
1913 2 : }
1914 :
1915 : /*
1916 : * In this test mode we send a stream of queries, with one in the middle
1917 : * causing an error. Verify that we can still send some more after the
1918 : * error and have libpq work properly.
1919 : */
1920 : static void
1921 2 : test_uniqviol(PGconn *conn)
1922 : {
1923 2 : int sock = PQsocket(conn);
1924 : PGresult *res;
1925 2 : Oid paramTypes[2] = {INT8OID, INT8OID};
1926 : const char *paramValues[2];
1927 : char paramValue0[MAXINT8LEN];
1928 : char paramValue1[MAXINT8LEN];
1929 2 : int ctr = 0;
1930 2 : int numsent = 0;
1931 2 : int results = 0;
1932 2 : bool read_done = false;
1933 2 : bool write_done = false;
1934 2 : bool error_sent = false;
1935 2 : bool got_error = false;
1936 2 : int switched = 0;
1937 2 : int socketful = 0;
1938 : fd_set in_fds;
1939 : fd_set out_fds;
1940 :
1941 2 : fprintf(stderr, "uniqviol ...");
1942 :
1943 2 : PQsetnonblocking(conn, 1);
1944 :
1945 2 : paramValues[0] = paramValue0;
1946 2 : paramValues[1] = paramValue1;
1947 2 : sprintf(paramValue1, "42");
1948 :
1949 2 : res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1950 : "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1951 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1952 0 : pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1953 :
1954 2 : res = PQexec(conn, "begin");
1955 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1956 0 : pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1957 :
1958 2 : res = PQprepare(conn, "insertion",
1959 : "insert into ppln_uniqviol values ($1, $2) returning id",
1960 : 2, paramTypes);
1961 2 : if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1962 0 : pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1963 :
1964 2 : if (PQenterPipelineMode(conn) != 1)
1965 0 : pg_fatal("failed to enter pipeline mode");
1966 :
1967 16 : while (!read_done)
1968 : {
1969 : /*
1970 : * Avoid deadlocks by reading everything the server has sent before
1971 : * sending anything. (Special precaution is needed here to process
1972 : * PQisBusy before testing the socket for read-readiness, because the
1973 : * socket does not turn read-ready after "sending" queries in aborted
1974 : * pipeline mode.)
1975 : */
1976 1212 : while (PQisBusy(conn) == 0)
1977 : {
1978 : bool new_error;
1979 :
1980 1202 : if (results >= numsent)
1981 : {
1982 2 : if (write_done)
1983 0 : read_done = true;
1984 2 : break;
1985 : }
1986 :
1987 1200 : res = PQgetResult(conn);
1988 1200 : new_error = process_result(conn, res, results, numsent);
1989 1200 : if (new_error && got_error)
1990 0 : pg_fatal("got two errors");
1991 1200 : got_error |= new_error;
1992 1200 : if (results++ >= numsent - 1)
1993 : {
1994 4 : if (write_done)
1995 2 : read_done = true;
1996 4 : break;
1997 : }
1998 : }
1999 :
2000 16 : if (read_done)
2001 2 : break;
2002 :
2003 14 : FD_ZERO(&out_fds);
2004 14 : FD_SET(sock, &out_fds);
2005 :
2006 14 : FD_ZERO(&in_fds);
2007 14 : FD_SET(sock, &in_fds);
2008 :
2009 14 : if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
2010 : {
2011 0 : if (errno == EINTR)
2012 0 : continue;
2013 0 : pg_fatal("select() failed: %m");
2014 : }
2015 :
2016 14 : if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
2017 0 : pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
2018 :
2019 : /*
2020 : * If the socket is writable and we haven't finished sending queries,
2021 : * send some.
2022 : */
2023 14 : if (!write_done && FD_ISSET(sock, &out_fds))
2024 : {
2025 : for (;;)
2026 1194 : {
2027 : int flush;
2028 :
2029 : /*
2030 : * provoke uniqueness violation exactly once after having
2031 : * switched to read mode.
2032 : */
2033 1200 : if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
2034 : {
2035 2 : sprintf(paramValue0, "%d", numsent / 2);
2036 2 : fprintf(stderr, "E");
2037 2 : error_sent = true;
2038 : }
2039 : else
2040 : {
2041 1198 : fprintf(stderr, ".");
2042 1198 : sprintf(paramValue0, "%d", ctr++);
2043 : }
2044 :
2045 1200 : if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
2046 0 : pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
2047 1200 : numsent++;
2048 :
2049 : /* Are we done writing? */
2050 1200 : if (socketful != 0 && numsent % socketful == 42 && error_sent)
2051 : {
2052 2 : if (PQsendFlushRequest(conn) != 1)
2053 0 : pg_fatal("failed to send flush request");
2054 2 : write_done = true;
2055 2 : fprintf(stderr, "\ndone writing\n");
2056 2 : PQflush(conn);
2057 2 : break;
2058 : }
2059 :
2060 : /* is the outgoing socket full? */
2061 1198 : flush = PQflush(conn);
2062 1198 : if (flush == -1)
2063 0 : pg_fatal("failed to flush: %s", PQerrorMessage(conn));
2064 1198 : if (flush == 1)
2065 : {
2066 4 : if (socketful == 0)
2067 2 : socketful = numsent;
2068 4 : fprintf(stderr, "\nswitch to reading\n");
2069 4 : switched++;
2070 4 : break;
2071 : }
2072 : }
2073 : }
2074 : }
2075 :
2076 2 : if (!got_error)
2077 0 : pg_fatal("did not get expected error");
2078 :
2079 2 : fprintf(stderr, "ok\n");
2080 2 : }
2081 :
2082 : /*
2083 : * Subroutine for test_uniqviol; given a PGresult, print it out and consume
2084 : * the expected NULL that should follow it.
2085 : *
2086 : * Returns true if we read a fatal error message, otherwise false.
2087 : */
2088 : static bool
2089 1200 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
2090 : {
2091 : PGresult *res2;
2092 1200 : bool got_error = false;
2093 :
2094 1200 : if (res == NULL)
2095 0 : pg_fatal("got unexpected NULL");
2096 :
2097 1200 : switch (PQresultStatus(res))
2098 : {
2099 2 : case PGRES_FATAL_ERROR:
2100 2 : got_error = true;
2101 2 : fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2102 2 : PQclear(res);
2103 :
2104 2 : res2 = PQgetResult(conn);
2105 2 : if (res2 != NULL)
2106 0 : pg_fatal("expected NULL, got %s",
2107 : PQresStatus(PQresultStatus(res2)));
2108 2 : break;
2109 :
2110 836 : case PGRES_TUPLES_OK:
2111 836 : fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2112 836 : PQclear(res);
2113 :
2114 836 : res2 = PQgetResult(conn);
2115 836 : if (res2 != NULL)
2116 0 : pg_fatal("expected NULL, got %s",
2117 : PQresStatus(PQresultStatus(res2)));
2118 836 : break;
2119 :
2120 362 : case PGRES_PIPELINE_ABORTED:
2121 362 : fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2122 362 : res2 = PQgetResult(conn);
2123 362 : if (res2 != NULL)
2124 0 : pg_fatal("expected NULL, got %s",
2125 : PQresStatus(PQresultStatus(res2)));
2126 362 : break;
2127 :
2128 0 : default:
2129 0 : pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2130 : }
2131 :
2132 1200 : return got_error;
2133 : }
2134 :
2135 :
2136 : static void
2137 0 : usage(const char *progname)
2138 : {
2139 0 : fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2140 0 : fprintf(stderr, "Usage:\n");
2141 0 : fprintf(stderr, " %s [OPTION] tests\n", progname);
2142 0 : fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2143 0 : fprintf(stderr, "\nOptions:\n");
2144 0 : fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2145 0 : fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2146 0 : }
2147 :
2148 : static void
2149 2 : print_test_list(void)
2150 : {
2151 2 : printf("cancel\n");
2152 2 : printf("disallowed_in_pipeline\n");
2153 2 : printf("multi_pipelines\n");
2154 2 : printf("nosync\n");
2155 2 : printf("pipeline_abort\n");
2156 2 : printf("pipeline_idle\n");
2157 2 : printf("pipelined_insert\n");
2158 2 : printf("prepared\n");
2159 2 : printf("simple_pipeline\n");
2160 2 : printf("singlerow\n");
2161 2 : printf("transaction\n");
2162 2 : printf("uniqviol\n");
2163 2 : }
2164 :
2165 : int
2166 26 : main(int argc, char **argv)
2167 : {
2168 26 : const char *conninfo = "";
2169 : PGconn *conn;
2170 : FILE *trace;
2171 : char *testname;
2172 26 : int numrows = 10000;
2173 : PGresult *res;
2174 : int c;
2175 :
2176 94 : while ((c = getopt(argc, argv, "r:t:")) != -1)
2177 : {
2178 42 : switch (c)
2179 : {
2180 24 : case 'r': /* numrows */
2181 24 : errno = 0;
2182 24 : numrows = strtol(optarg, NULL, 10);
2183 24 : if (errno != 0 || numrows <= 0)
2184 : {
2185 0 : fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2186 : optarg);
2187 0 : exit(1);
2188 : }
2189 24 : break;
2190 18 : case 't': /* trace file */
2191 18 : tracefile = pg_strdup(optarg);
2192 18 : break;
2193 : }
2194 68 : }
2195 :
2196 26 : if (optind < argc)
2197 : {
2198 26 : testname = pg_strdup(argv[optind]);
2199 26 : optind++;
2200 : }
2201 : else
2202 : {
2203 0 : usage(argv[0]);
2204 0 : exit(1);
2205 : }
2206 :
2207 26 : if (strcmp(testname, "tests") == 0)
2208 : {
2209 2 : print_test_list();
2210 2 : exit(0);
2211 : }
2212 :
2213 24 : if (optind < argc)
2214 : {
2215 24 : conninfo = pg_strdup(argv[optind]);
2216 24 : optind++;
2217 : }
2218 :
2219 : /* Make a connection to the database */
2220 24 : conn = PQconnectdb(conninfo);
2221 24 : if (PQstatus(conn) != CONNECTION_OK)
2222 : {
2223 0 : fprintf(stderr, "Connection to database failed: %s\n",
2224 : PQerrorMessage(conn));
2225 0 : exit_nicely(conn);
2226 : }
2227 :
2228 24 : res = PQexec(conn, "SET lc_messages TO \"C\"");
2229 24 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2230 0 : pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
2231 24 : res = PQexec(conn, "SET debug_parallel_query = off");
2232 24 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2233 0 : pg_fatal("failed to set debug_parallel_query: %s", PQerrorMessage(conn));
2234 :
2235 : /* Set the trace file, if requested */
2236 24 : if (tracefile != NULL)
2237 : {
2238 18 : if (strcmp(tracefile, "-") == 0)
2239 0 : trace = stdout;
2240 : else
2241 18 : trace = fopen(tracefile, "w");
2242 18 : if (trace == NULL)
2243 0 : pg_fatal("could not open file \"%s\": %m", tracefile);
2244 :
2245 : /* Make it line-buffered */
2246 18 : setvbuf(trace, NULL, PG_IOLBF, 0);
2247 :
2248 18 : PQtrace(conn, trace);
2249 18 : PQsetTraceFlags(conn,
2250 : PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
2251 : }
2252 :
2253 24 : if (strcmp(testname, "cancel") == 0)
2254 2 : test_cancel(conn);
2255 22 : else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2256 2 : test_disallowed_in_pipeline(conn);
2257 20 : else if (strcmp(testname, "multi_pipelines") == 0)
2258 2 : test_multi_pipelines(conn);
2259 18 : else if (strcmp(testname, "nosync") == 0)
2260 2 : test_nosync(conn);
2261 16 : else if (strcmp(testname, "pipeline_abort") == 0)
2262 2 : test_pipeline_abort(conn);
2263 14 : else if (strcmp(testname, "pipeline_idle") == 0)
2264 2 : test_pipeline_idle(conn);
2265 12 : else if (strcmp(testname, "pipelined_insert") == 0)
2266 2 : test_pipelined_insert(conn, numrows);
2267 10 : else if (strcmp(testname, "prepared") == 0)
2268 2 : test_prepared(conn);
2269 8 : else if (strcmp(testname, "simple_pipeline") == 0)
2270 2 : test_simple_pipeline(conn);
2271 6 : else if (strcmp(testname, "singlerow") == 0)
2272 2 : test_singlerowmode(conn);
2273 4 : else if (strcmp(testname, "transaction") == 0)
2274 2 : test_transaction(conn);
2275 2 : else if (strcmp(testname, "uniqviol") == 0)
2276 2 : test_uniqviol(conn);
2277 : else
2278 : {
2279 0 : fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2280 0 : exit(1);
2281 : }
2282 :
2283 : /* close the connection to the database and cleanup */
2284 24 : PQfinish(conn);
2285 24 : return 0;
2286 : }
|