Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * libpq_pipeline.c
4 : * Verify libpq pipeline execution functionality
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/test/modules/libpq_pipeline/libpq_pipeline.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres_fe.h"
17 :
18 : #include <sys/select.h>
19 : #include <sys/time.h>
20 :
21 : #include "catalog/pg_type_d.h"
22 : #include "libpq-fe.h"
23 : #include "pg_getopt.h"
24 :
25 :
26 : static void exit_nicely(PGconn *conn);
27 : static void pg_attribute_noreturn() pg_fatal_impl(int line, const char *fmt,...)
28 : pg_attribute_printf(2, 3);
29 : static bool process_result(PGconn *conn, PGresult *res, int results,
30 : int numsent);
31 :
32 : static const char *const progname = "libpq_pipeline";
33 :
34 : /* Options and defaults */
35 : static char *tracefile = NULL; /* path to PQtrace() file */
36 :
37 :
38 : #ifdef DEBUG_OUTPUT
39 : #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
40 : #else
41 : #define pg_debug(...)
42 : #endif
43 :
44 : static const char *const drop_table_sql =
45 : "DROP TABLE IF EXISTS pq_pipeline_demo";
46 : static const char *const create_table_sql =
47 : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
48 : "int8filler int8);";
49 : static const char *const insert_sql =
50 : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
51 : static const char *const insert_sql2 =
52 : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
53 :
54 : /* max char length of an int32/64, plus sign and null terminator */
55 : #define MAXINTLEN 12
56 : #define MAXINT8LEN 20
57 :
58 : static void
59 0 : exit_nicely(PGconn *conn)
60 : {
61 0 : PQfinish(conn);
62 0 : exit(1);
63 : }
64 :
65 : /*
66 : * The following few functions are wrapped in macros to make the reported line
67 : * number in an error match the line number of the invocation.
68 : */
69 :
70 : /*
71 : * Print an error to stderr and terminate the program.
72 : */
73 : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
74 : static void
75 : pg_attribute_noreturn()
76 0 : pg_fatal_impl(int line, const char *fmt,...)
77 : {
78 : va_list args;
79 :
80 0 : fflush(stdout);
81 :
82 0 : fprintf(stderr, "\n%s:%d: ", progname, line);
83 0 : va_start(args, fmt);
84 0 : vfprintf(stderr, fmt, args);
85 0 : va_end(args);
86 : Assert(fmt[strlen(fmt) - 1] != '\n');
87 0 : fprintf(stderr, "\n");
88 0 : exit(1);
89 : }
90 :
91 : /*
92 : * Check that the query on the given connection got canceled.
93 : */
94 : #define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
95 : static void
96 12 : confirm_query_canceled_impl(int line, PGconn *conn)
97 : {
98 12 : PGresult *res = NULL;
99 :
100 12 : res = PQgetResult(conn);
101 12 : if (res == NULL)
102 0 : pg_fatal_impl(line, "PQgetResult returned null: %s",
103 : PQerrorMessage(conn));
104 12 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
105 0 : pg_fatal_impl(line, "query did not fail when it was expected");
106 12 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
107 0 : pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
108 : PQerrorMessage(conn));
109 12 : PQclear(res);
110 :
111 12 : while (PQisBusy(conn))
112 0 : PQconsumeInput(conn);
113 12 : }
114 :
115 : /*
116 : * Using monitorConn, query pg_stat_activity to see that the connection with
117 : * the given PID is either in the given state, or waiting on the given event
118 : * (only one of them can be given).
119 : */
120 : static void
121 24 : wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
122 : char *state, char *event)
123 : {
124 24 : const Oid paramTypes[] = {INT4OID, TEXTOID};
125 : const char *paramValues[2];
126 24 : char *pidstr = psprintf("%d", procpid);
127 :
128 : Assert((state == NULL) ^ (event == NULL));
129 :
130 24 : paramValues[0] = pidstr;
131 24 : paramValues[1] = state ? state : event;
132 :
133 : while (true)
134 0 : {
135 : PGresult *res;
136 : char *value;
137 :
138 24 : if (state != NULL)
139 12 : res = PQexecParams(monitorConn,
140 : "SELECT count(*) FROM pg_stat_activity WHERE "
141 : "pid = $1 AND state = $2",
142 : 2, paramTypes, paramValues, NULL, NULL, 0);
143 : else
144 12 : res = PQexecParams(monitorConn,
145 : "SELECT count(*) FROM pg_stat_activity WHERE "
146 : "pid = $1 AND wait_event = $2",
147 : 2, paramTypes, paramValues, NULL, NULL, 0);
148 :
149 24 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
150 0 : pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
151 24 : if (PQntuples(res) != 1)
152 0 : pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
153 24 : if (PQnfields(res) != 1)
154 0 : pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
155 24 : value = PQgetvalue(res, 0, 0);
156 24 : if (strcmp(value, "0") != 0)
157 : {
158 24 : PQclear(res);
159 24 : break;
160 : }
161 0 : PQclear(res);
162 :
163 : /* wait 10ms before polling again */
164 0 : pg_usleep(10000);
165 : }
166 :
167 24 : pfree(pidstr);
168 24 : }
169 :
170 : #define send_cancellable_query(conn, monitorConn) \
171 : send_cancellable_query_impl(__LINE__, conn, monitorConn)
172 : static void
173 12 : send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
174 : {
175 : const char *env_wait;
176 12 : const Oid paramTypes[1] = {INT4OID};
177 :
178 : /*
179 : * Wait for the connection to be idle, so that our check for an active
180 : * connection below is reliable, instead of possibly seeing an outdated
181 : * state.
182 : */
183 12 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
184 :
185 12 : env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
186 12 : if (env_wait == NULL)
187 12 : env_wait = "180";
188 :
189 12 : if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
190 : &env_wait, NULL, NULL, 0) != 1)
191 0 : pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
192 :
193 : /*
194 : * Wait for the sleep to be active, because if the query is not running
195 : * yet, the cancel request that we send won't have any effect.
196 : */
197 12 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
198 12 : }
199 :
200 : /*
201 : * Create a new connection with the same conninfo as the given one.
202 : */
203 : static PGconn *
204 2 : copy_connection(PGconn *conn)
205 : {
206 : PGconn *copyConn;
207 2 : PQconninfoOption *opts = PQconninfo(conn);
208 : const char **keywords;
209 : const char **vals;
210 2 : int nopts = 1;
211 2 : int i = 0;
212 :
213 88 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
214 86 : nopts++;
215 :
216 2 : keywords = pg_malloc(sizeof(char *) * nopts);
217 2 : vals = pg_malloc(sizeof(char *) * nopts);
218 :
219 88 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
220 : {
221 86 : if (opt->val)
222 : {
223 38 : keywords[i] = opt->keyword;
224 38 : vals[i] = opt->val;
225 38 : i++;
226 : }
227 : }
228 2 : keywords[i] = vals[i] = NULL;
229 :
230 2 : copyConn = PQconnectdbParams(keywords, vals, false);
231 :
232 2 : if (PQstatus(copyConn) != CONNECTION_OK)
233 0 : pg_fatal("Connection to database failed: %s",
234 : PQerrorMessage(copyConn));
235 :
236 2 : return copyConn;
237 : }
238 :
239 : /*
240 : * Test query cancellation routines
241 : */
242 : static void
243 2 : test_cancel(PGconn *conn)
244 : {
245 : PGcancel *cancel;
246 : PGcancelConn *cancelConn;
247 : PGconn *monitorConn;
248 : char errorbuf[256];
249 :
250 2 : fprintf(stderr, "test cancellations... ");
251 :
252 2 : if (PQsetnonblocking(conn, 1) != 0)
253 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
254 :
255 : /*
256 : * Make a separate connection to the database to monitor the query on the
257 : * main connection.
258 : */
259 2 : monitorConn = copy_connection(conn);
260 : Assert(PQstatus(monitorConn) == CONNECTION_OK);
261 :
262 : /* test PQcancel */
263 2 : send_cancellable_query(conn, monitorConn);
264 2 : cancel = PQgetCancel(conn);
265 2 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
266 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
267 2 : confirm_query_canceled(conn);
268 :
269 : /* PGcancel object can be reused for the next query */
270 2 : send_cancellable_query(conn, monitorConn);
271 2 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
272 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
273 2 : confirm_query_canceled(conn);
274 :
275 2 : PQfreeCancel(cancel);
276 :
277 : /* test PQrequestCancel */
278 2 : send_cancellable_query(conn, monitorConn);
279 2 : if (!PQrequestCancel(conn))
280 0 : pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
281 2 : confirm_query_canceled(conn);
282 :
283 : /* test PQcancelBlocking */
284 2 : send_cancellable_query(conn, monitorConn);
285 2 : cancelConn = PQcancelCreate(conn);
286 2 : if (!PQcancelBlocking(cancelConn))
287 0 : pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
288 2 : confirm_query_canceled(conn);
289 2 : PQcancelFinish(cancelConn);
290 :
291 : /* test PQcancelCreate and then polling with PQcancelPoll */
292 2 : send_cancellable_query(conn, monitorConn);
293 2 : cancelConn = PQcancelCreate(conn);
294 2 : if (!PQcancelStart(cancelConn))
295 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
296 : while (true)
297 2 : {
298 : struct timeval tv;
299 : fd_set input_mask;
300 : fd_set output_mask;
301 4 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
302 4 : int sock = PQcancelSocket(cancelConn);
303 :
304 4 : if (pollres == PGRES_POLLING_OK)
305 2 : break;
306 :
307 2 : FD_ZERO(&input_mask);
308 2 : FD_ZERO(&output_mask);
309 2 : switch (pollres)
310 : {
311 2 : case PGRES_POLLING_READING:
312 : pg_debug("polling for reads\n");
313 2 : FD_SET(sock, &input_mask);
314 2 : break;
315 0 : case PGRES_POLLING_WRITING:
316 : pg_debug("polling for writes\n");
317 0 : FD_SET(sock, &output_mask);
318 0 : break;
319 0 : default:
320 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
321 : }
322 :
323 2 : if (sock < 0)
324 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
325 :
326 2 : tv.tv_sec = 3;
327 2 : tv.tv_usec = 0;
328 :
329 : while (true)
330 : {
331 2 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
332 : {
333 0 : if (errno == EINTR)
334 0 : continue;
335 0 : pg_fatal("select() failed: %m");
336 : }
337 2 : break;
338 : }
339 : }
340 2 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
341 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
342 2 : confirm_query_canceled(conn);
343 :
344 : /*
345 : * test PQcancelReset works on the cancel connection and it can be reused
346 : * afterwards
347 : */
348 2 : PQcancelReset(cancelConn);
349 :
350 2 : send_cancellable_query(conn, monitorConn);
351 2 : if (!PQcancelStart(cancelConn))
352 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
353 : while (true)
354 2 : {
355 : struct timeval tv;
356 : fd_set input_mask;
357 : fd_set output_mask;
358 4 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
359 4 : int sock = PQcancelSocket(cancelConn);
360 :
361 4 : if (pollres == PGRES_POLLING_OK)
362 2 : break;
363 :
364 2 : FD_ZERO(&input_mask);
365 2 : FD_ZERO(&output_mask);
366 2 : switch (pollres)
367 : {
368 2 : case PGRES_POLLING_READING:
369 : pg_debug("polling for reads\n");
370 2 : FD_SET(sock, &input_mask);
371 2 : break;
372 0 : case PGRES_POLLING_WRITING:
373 : pg_debug("polling for writes\n");
374 0 : FD_SET(sock, &output_mask);
375 0 : break;
376 0 : default:
377 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
378 : }
379 :
380 2 : if (sock < 0)
381 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
382 :
383 2 : tv.tv_sec = 3;
384 2 : tv.tv_usec = 0;
385 :
386 : while (true)
387 : {
388 2 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
389 : {
390 0 : if (errno == EINTR)
391 0 : continue;
392 0 : pg_fatal("select() failed: %m");
393 : }
394 2 : break;
395 : }
396 : }
397 2 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
398 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
399 2 : confirm_query_canceled(conn);
400 :
401 2 : PQcancelFinish(cancelConn);
402 :
403 2 : fprintf(stderr, "ok\n");
404 2 : }
405 :
406 : static void
407 2 : test_disallowed_in_pipeline(PGconn *conn)
408 : {
409 2 : PGresult *res = NULL;
410 :
411 2 : fprintf(stderr, "test error cases... ");
412 :
413 2 : if (PQisnonblocking(conn))
414 0 : pg_fatal("Expected blocking connection mode");
415 :
416 2 : if (PQenterPipelineMode(conn) != 1)
417 0 : pg_fatal("Unable to enter pipeline mode");
418 :
419 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
420 0 : pg_fatal("Pipeline mode not activated properly");
421 :
422 : /* PQexec should fail in pipeline mode */
423 2 : res = PQexec(conn, "SELECT 1");
424 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
425 0 : pg_fatal("PQexec should fail in pipeline mode but succeeded");
426 2 : if (strcmp(PQerrorMessage(conn),
427 : "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
428 0 : pg_fatal("did not get expected error message; got: \"%s\"",
429 : PQerrorMessage(conn));
430 :
431 : /* PQsendQuery should fail in pipeline mode */
432 2 : if (PQsendQuery(conn, "SELECT 1") != 0)
433 0 : pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
434 2 : if (strcmp(PQerrorMessage(conn),
435 : "PQsendQuery not allowed in pipeline mode\n") != 0)
436 0 : pg_fatal("did not get expected error message; got: \"%s\"",
437 : PQerrorMessage(conn));
438 :
439 : /* Entering pipeline mode when already in pipeline mode is OK */
440 2 : if (PQenterPipelineMode(conn) != 1)
441 0 : pg_fatal("re-entering pipeline mode should be a no-op but failed");
442 :
443 2 : if (PQisBusy(conn) != 0)
444 0 : pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
445 :
446 : /* ok, back to normal command mode */
447 2 : if (PQexitPipelineMode(conn) != 1)
448 0 : pg_fatal("couldn't exit idle empty pipeline mode");
449 :
450 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
451 0 : pg_fatal("Pipeline mode not terminated properly");
452 :
453 : /* exiting pipeline mode when not in pipeline mode should be a no-op */
454 2 : if (PQexitPipelineMode(conn) != 1)
455 0 : pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
456 :
457 : /* can now PQexec again */
458 2 : res = PQexec(conn, "SELECT 1");
459 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
460 0 : pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
461 : PQerrorMessage(conn));
462 :
463 2 : fprintf(stderr, "ok\n");
464 2 : }
465 :
466 : static void
467 2 : test_multi_pipelines(PGconn *conn)
468 : {
469 2 : PGresult *res = NULL;
470 2 : const char *dummy_params[1] = {"1"};
471 2 : Oid dummy_param_oids[1] = {INT4OID};
472 :
473 2 : fprintf(stderr, "multi pipeline... ");
474 :
475 : /*
476 : * Queue up a couple of small pipelines and process each without returning
477 : * to command mode first.
478 : */
479 2 : if (PQenterPipelineMode(conn) != 1)
480 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
481 :
482 : /* first pipeline */
483 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
484 : dummy_params, NULL, NULL, 0) != 1)
485 0 : pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
486 :
487 2 : if (PQpipelineSync(conn) != 1)
488 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
489 :
490 : /* second pipeline */
491 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
492 : dummy_params, NULL, NULL, 0) != 1)
493 0 : pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
494 :
495 : /* Skip flushing once. */
496 2 : if (PQsendPipelineSync(conn) != 1)
497 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
498 :
499 : /* third pipeline */
500 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
501 : dummy_params, NULL, NULL, 0) != 1)
502 0 : pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
503 :
504 2 : if (PQpipelineSync(conn) != 1)
505 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
506 :
507 : /* OK, start processing the results */
508 :
509 : /* first pipeline */
510 :
511 2 : res = PQgetResult(conn);
512 2 : if (res == NULL)
513 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
514 : PQerrorMessage(conn));
515 :
516 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
517 0 : pg_fatal("Unexpected result code %s from first pipeline item",
518 : PQresStatus(PQresultStatus(res)));
519 2 : PQclear(res);
520 2 : res = NULL;
521 :
522 2 : if (PQgetResult(conn) != NULL)
523 0 : pg_fatal("PQgetResult returned something extra after first result");
524 :
525 2 : if (PQexitPipelineMode(conn) != 0)
526 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
527 :
528 2 : res = PQgetResult(conn);
529 2 : if (res == NULL)
530 0 : pg_fatal("PQgetResult returned null when sync result expected: %s",
531 : PQerrorMessage(conn));
532 :
533 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
534 0 : pg_fatal("Unexpected result code %s instead of sync result, error: %s",
535 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
536 2 : PQclear(res);
537 :
538 : /* second pipeline */
539 :
540 2 : res = PQgetResult(conn);
541 2 : if (res == NULL)
542 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
543 : PQerrorMessage(conn));
544 :
545 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
546 0 : pg_fatal("Unexpected result code %s from second pipeline item",
547 : PQresStatus(PQresultStatus(res)));
548 2 : PQclear(res);
549 2 : res = NULL;
550 :
551 2 : if (PQgetResult(conn) != NULL)
552 0 : pg_fatal("PQgetResult returned something extra after first result");
553 :
554 2 : if (PQexitPipelineMode(conn) != 0)
555 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
556 :
557 2 : res = PQgetResult(conn);
558 2 : if (res == NULL)
559 0 : pg_fatal("PQgetResult returned null when sync result expected: %s",
560 : PQerrorMessage(conn));
561 :
562 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
563 0 : pg_fatal("Unexpected result code %s instead of sync result, error: %s",
564 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
565 2 : PQclear(res);
566 :
567 : /* third pipeline */
568 :
569 2 : res = PQgetResult(conn);
570 2 : if (res == NULL)
571 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
572 : PQerrorMessage(conn));
573 :
574 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
575 0 : pg_fatal("Unexpected result code %s from third pipeline item",
576 : PQresStatus(PQresultStatus(res)));
577 :
578 2 : res = PQgetResult(conn);
579 2 : if (res != NULL)
580 0 : pg_fatal("Expected null result, got %s",
581 : PQresStatus(PQresultStatus(res)));
582 :
583 2 : res = PQgetResult(conn);
584 2 : if (res == NULL)
585 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
586 : PQerrorMessage(conn));
587 :
588 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
589 0 : pg_fatal("Unexpected result code %s from second pipeline sync",
590 : PQresStatus(PQresultStatus(res)));
591 :
592 : /* We're still in pipeline mode ... */
593 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
594 0 : pg_fatal("Fell out of pipeline mode somehow");
595 :
596 : /* until we end it, which we can safely do now */
597 2 : if (PQexitPipelineMode(conn) != 1)
598 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
599 : PQerrorMessage(conn));
600 :
601 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
602 0 : pg_fatal("exiting pipeline mode didn't seem to work");
603 :
604 2 : fprintf(stderr, "ok\n");
605 2 : }
606 :
607 : /*
608 : * Test behavior when a pipeline dispatches a number of commands that are
609 : * not flushed by a sync point.
610 : */
611 : static void
612 2 : test_nosync(PGconn *conn)
613 : {
614 2 : int numqueries = 10;
615 2 : int results = 0;
616 2 : int sock = PQsocket(conn);
617 :
618 2 : fprintf(stderr, "nosync... ");
619 :
620 2 : if (sock < 0)
621 0 : pg_fatal("invalid socket");
622 :
623 2 : if (PQenterPipelineMode(conn) != 1)
624 0 : pg_fatal("could not enter pipeline mode");
625 22 : for (int i = 0; i < numqueries; i++)
626 : {
627 : fd_set input_mask;
628 : struct timeval tv;
629 :
630 20 : if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
631 : 0, NULL, NULL, NULL, NULL, 0) != 1)
632 0 : pg_fatal("error sending select: %s", PQerrorMessage(conn));
633 20 : PQflush(conn);
634 :
635 : /*
636 : * If the server has written anything to us, read (some of) it now.
637 : */
638 20 : FD_ZERO(&input_mask);
639 20 : FD_SET(sock, &input_mask);
640 20 : tv.tv_sec = 0;
641 20 : tv.tv_usec = 0;
642 20 : if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
643 : {
644 0 : fprintf(stderr, "select() failed: %m\n");
645 0 : exit_nicely(conn);
646 : }
647 20 : if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
648 0 : pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
649 : }
650 :
651 : /* tell server to flush its output buffer */
652 2 : if (PQsendFlushRequest(conn) != 1)
653 0 : pg_fatal("failed to send flush request");
654 2 : PQflush(conn);
655 :
656 : /* Now read all results */
657 : for (;;)
658 18 : {
659 : PGresult *res;
660 :
661 20 : res = PQgetResult(conn);
662 :
663 : /* NULL results are only expected after TUPLES_OK */
664 20 : if (res == NULL)
665 0 : pg_fatal("got unexpected NULL result after %d results", results);
666 :
667 : /* We expect exactly one TUPLES_OK result for each query we sent */
668 20 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
669 : {
670 : PGresult *res2;
671 :
672 : /* and one NULL result should follow each */
673 20 : res2 = PQgetResult(conn);
674 20 : if (res2 != NULL)
675 0 : pg_fatal("expected NULL, got %s",
676 : PQresStatus(PQresultStatus(res2)));
677 20 : PQclear(res);
678 20 : results++;
679 :
680 : /* if we're done, we're done */
681 20 : if (results == numqueries)
682 2 : break;
683 :
684 18 : continue;
685 : }
686 :
687 : /* anything else is unexpected */
688 0 : pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
689 : }
690 :
691 2 : fprintf(stderr, "ok\n");
692 2 : }
693 :
694 : /*
695 : * When an operation in a pipeline fails the rest of the pipeline is flushed. We
696 : * still have to get results for each pipeline item, but the item will just be
697 : * a PGRES_PIPELINE_ABORTED code.
698 : *
699 : * This intentionally doesn't use a transaction to wrap the pipeline. You should
700 : * usually use an xact, but in this case we want to observe the effects of each
701 : * statement.
702 : */
703 : static void
704 2 : test_pipeline_abort(PGconn *conn)
705 : {
706 2 : PGresult *res = NULL;
707 2 : const char *dummy_params[1] = {"1"};
708 2 : Oid dummy_param_oids[1] = {INT4OID};
709 : int i;
710 : int gotrows;
711 : bool goterror;
712 :
713 2 : fprintf(stderr, "aborted pipeline... ");
714 :
715 2 : res = PQexec(conn, drop_table_sql);
716 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
717 0 : pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
718 :
719 2 : res = PQexec(conn, create_table_sql);
720 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
721 0 : pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
722 :
723 : /*
724 : * Queue up a couple of small pipelines and process each without returning
725 : * to command mode first. Make sure the second operation in the first
726 : * pipeline ERRORs.
727 : */
728 2 : if (PQenterPipelineMode(conn) != 1)
729 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
730 :
731 2 : dummy_params[0] = "1";
732 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
733 : dummy_params, NULL, NULL, 0) != 1)
734 0 : pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
735 :
736 2 : if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
737 : 1, dummy_param_oids, dummy_params,
738 : NULL, NULL, 0) != 1)
739 0 : pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
740 :
741 2 : dummy_params[0] = "2";
742 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
743 : dummy_params, NULL, NULL, 0) != 1)
744 0 : pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
745 :
746 2 : if (PQpipelineSync(conn) != 1)
747 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
748 :
749 2 : dummy_params[0] = "3";
750 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
751 : dummy_params, NULL, NULL, 0) != 1)
752 0 : pg_fatal("dispatching second-pipeline insert failed: %s",
753 : PQerrorMessage(conn));
754 :
755 2 : if (PQpipelineSync(conn) != 1)
756 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
757 :
758 : /*
759 : * OK, start processing the pipeline results.
760 : *
761 : * We should get a command-ok for the first query, then a fatal error and
762 : * a pipeline aborted message for the second insert, a pipeline-end, then
763 : * a command-ok and a pipeline-ok for the second pipeline operation.
764 : */
765 2 : res = PQgetResult(conn);
766 2 : if (res == NULL)
767 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
768 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
769 0 : pg_fatal("Unexpected result status %s: %s",
770 : PQresStatus(PQresultStatus(res)),
771 : PQresultErrorMessage(res));
772 2 : PQclear(res);
773 :
774 : /* NULL result to signal end-of-results for this command */
775 2 : if ((res = PQgetResult(conn)) != NULL)
776 0 : pg_fatal("Expected null result, got %s",
777 : PQresStatus(PQresultStatus(res)));
778 :
779 : /* Second query caused error, so we expect an error next */
780 2 : res = PQgetResult(conn);
781 2 : if (res == NULL)
782 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
783 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
784 0 : pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
785 : PQresStatus(PQresultStatus(res)));
786 2 : PQclear(res);
787 :
788 : /* NULL result to signal end-of-results for this command */
789 2 : if ((res = PQgetResult(conn)) != NULL)
790 0 : pg_fatal("Expected null result, got %s",
791 : PQresStatus(PQresultStatus(res)));
792 :
793 : /*
794 : * pipeline should now be aborted.
795 : *
796 : * Note that we could still queue more queries at this point if we wanted;
797 : * they'd get added to a new third pipeline since we've already sent a
798 : * second. The aborted flag relates only to the pipeline being received.
799 : */
800 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
801 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
802 :
803 : /* third query in pipeline, the second insert */
804 2 : res = PQgetResult(conn);
805 2 : if (res == NULL)
806 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
807 2 : if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
808 0 : pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
809 : PQresStatus(PQresultStatus(res)));
810 2 : PQclear(res);
811 :
812 : /* NULL result to signal end-of-results for this command */
813 2 : if ((res = PQgetResult(conn)) != NULL)
814 0 : pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
815 :
816 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
817 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
818 :
819 : /* Ensure we're still in pipeline */
820 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
821 0 : pg_fatal("Fell out of pipeline mode somehow");
822 :
823 : /*
824 : * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
825 : *
826 : * (This is so clients know to start processing results normally again and
827 : * can tell the difference between skipped commands and the sync.)
828 : */
829 2 : res = PQgetResult(conn);
830 2 : if (res == NULL)
831 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
832 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
833 0 : pg_fatal("Unexpected result code from first pipeline sync\n"
834 : "Expected PGRES_PIPELINE_SYNC, got %s",
835 : PQresStatus(PQresultStatus(res)));
836 2 : PQclear(res);
837 :
838 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
839 0 : pg_fatal("sync should've cleared the aborted flag but didn't");
840 :
841 : /* We're still in pipeline mode... */
842 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
843 0 : pg_fatal("Fell out of pipeline mode somehow");
844 :
845 : /* the insert from the second pipeline */
846 2 : res = PQgetResult(conn);
847 2 : if (res == NULL)
848 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
849 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
850 0 : pg_fatal("Unexpected result code %s from first item in second pipeline",
851 : PQresStatus(PQresultStatus(res)));
852 2 : PQclear(res);
853 :
854 : /* Read the NULL result at the end of the command */
855 2 : if ((res = PQgetResult(conn)) != NULL)
856 0 : pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
857 :
858 : /* the second pipeline sync */
859 2 : if ((res = PQgetResult(conn)) == NULL)
860 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
861 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
862 0 : pg_fatal("Unexpected result code %s from second pipeline sync",
863 : PQresStatus(PQresultStatus(res)));
864 2 : PQclear(res);
865 :
866 2 : if ((res = PQgetResult(conn)) != NULL)
867 0 : pg_fatal("Expected null result, got %s: %s",
868 : PQresStatus(PQresultStatus(res)),
869 : PQerrorMessage(conn));
870 :
871 : /* Try to send two queries in one command */
872 2 : if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
873 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
874 2 : if (PQpipelineSync(conn) != 1)
875 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
876 2 : goterror = false;
877 4 : while ((res = PQgetResult(conn)) != NULL)
878 : {
879 2 : switch (PQresultStatus(res))
880 : {
881 2 : case PGRES_FATAL_ERROR:
882 2 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
883 0 : pg_fatal("expected error about multiple commands, got %s",
884 : PQerrorMessage(conn));
885 2 : printf("got expected %s", PQerrorMessage(conn));
886 2 : goterror = true;
887 2 : break;
888 0 : default:
889 0 : pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
890 : break;
891 : }
892 : }
893 2 : if (!goterror)
894 0 : pg_fatal("did not get cannot-insert-multiple-commands error");
895 2 : res = PQgetResult(conn);
896 2 : if (res == NULL)
897 0 : pg_fatal("got NULL result");
898 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
899 0 : pg_fatal("Unexpected result code %s from pipeline sync",
900 : PQresStatus(PQresultStatus(res)));
901 2 : fprintf(stderr, "ok\n");
902 :
903 : /* Test single-row mode with an error partways */
904 2 : if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
905 : 0, NULL, NULL, NULL, NULL, 0) != 1)
906 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
907 2 : if (PQpipelineSync(conn) != 1)
908 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
909 2 : PQsetSingleRowMode(conn);
910 2 : goterror = false;
911 2 : gotrows = 0;
912 10 : while ((res = PQgetResult(conn)) != NULL)
913 : {
914 8 : switch (PQresultStatus(res))
915 : {
916 6 : case PGRES_SINGLE_TUPLE:
917 6 : printf("got row: %s\n", PQgetvalue(res, 0, 0));
918 6 : gotrows++;
919 6 : break;
920 2 : case PGRES_FATAL_ERROR:
921 2 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
922 0 : pg_fatal("expected division-by-zero, got: %s (%s)",
923 : PQerrorMessage(conn),
924 : PQresultErrorField(res, PG_DIAG_SQLSTATE));
925 2 : printf("got expected division-by-zero\n");
926 2 : goterror = true;
927 2 : break;
928 0 : default:
929 0 : pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
930 : }
931 8 : PQclear(res);
932 : }
933 2 : if (!goterror)
934 0 : pg_fatal("did not get division-by-zero error");
935 2 : if (gotrows != 3)
936 0 : pg_fatal("did not get three rows");
937 : /* the third pipeline sync */
938 2 : if ((res = PQgetResult(conn)) == NULL)
939 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
940 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
941 0 : pg_fatal("Unexpected result code %s from third pipeline sync",
942 : PQresStatus(PQresultStatus(res)));
943 2 : PQclear(res);
944 :
945 : /* We're still in pipeline mode... */
946 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
947 0 : pg_fatal("Fell out of pipeline mode somehow");
948 :
949 : /* until we end it, which we can safely do now */
950 2 : if (PQexitPipelineMode(conn) != 1)
951 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
952 : PQerrorMessage(conn));
953 :
954 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
955 0 : pg_fatal("exiting pipeline mode didn't seem to work");
956 :
957 : /*-
958 : * Since we fired the pipelines off without a surrounding xact, the results
959 : * should be:
960 : *
961 : * - Implicit xact started by server around 1st pipeline
962 : * - First insert applied
963 : * - Second statement aborted xact
964 : * - Third insert skipped
965 : * - Sync rolled back first implicit xact
966 : * - Implicit xact created by server around 2nd pipeline
967 : * - insert applied from 2nd pipeline
968 : * - Sync commits 2nd xact
969 : *
970 : * So we should only have the value 3 that we inserted.
971 : */
972 2 : res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
973 :
974 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
975 0 : pg_fatal("Expected tuples, got %s: %s",
976 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
977 2 : if (PQntuples(res) != 1)
978 0 : pg_fatal("expected 1 result, got %d", PQntuples(res));
979 4 : for (i = 0; i < PQntuples(res); i++)
980 : {
981 2 : const char *val = PQgetvalue(res, i, 0);
982 :
983 2 : if (strcmp(val, "3") != 0)
984 0 : pg_fatal("expected only insert with value 3, got %s", val);
985 : }
986 :
987 2 : PQclear(res);
988 :
989 2 : fprintf(stderr, "ok\n");
990 2 : }
991 :
992 : /* State machine enum for test_pipelined_insert */
993 : enum PipelineInsertStep
994 : {
995 : BI_BEGIN_TX,
996 : BI_DROP_TABLE,
997 : BI_CREATE_TABLE,
998 : BI_PREPARE,
999 : BI_INSERT_ROWS,
1000 : BI_COMMIT_TX,
1001 : BI_SYNC,
1002 : BI_DONE,
1003 : };
1004 :
1005 : static void
1006 2 : test_pipelined_insert(PGconn *conn, int n_rows)
1007 : {
1008 2 : Oid insert_param_oids[2] = {INT4OID, INT8OID};
1009 : const char *insert_params[2];
1010 : char insert_param_0[MAXINTLEN];
1011 : char insert_param_1[MAXINT8LEN];
1012 2 : enum PipelineInsertStep send_step = BI_BEGIN_TX,
1013 2 : recv_step = BI_BEGIN_TX;
1014 : int rows_to_send,
1015 : rows_to_receive;
1016 :
1017 2 : insert_params[0] = insert_param_0;
1018 2 : insert_params[1] = insert_param_1;
1019 :
1020 2 : rows_to_send = rows_to_receive = n_rows;
1021 :
1022 : /*
1023 : * Do a pipelined insert into a table created at the start of the pipeline
1024 : */
1025 2 : if (PQenterPipelineMode(conn) != 1)
1026 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1027 :
1028 8 : while (send_step != BI_PREPARE)
1029 : {
1030 : const char *sql;
1031 :
1032 6 : switch (send_step)
1033 : {
1034 2 : case BI_BEGIN_TX:
1035 2 : sql = "BEGIN TRANSACTION";
1036 2 : send_step = BI_DROP_TABLE;
1037 2 : break;
1038 :
1039 2 : case BI_DROP_TABLE:
1040 2 : sql = drop_table_sql;
1041 2 : send_step = BI_CREATE_TABLE;
1042 2 : break;
1043 :
1044 2 : case BI_CREATE_TABLE:
1045 2 : sql = create_table_sql;
1046 2 : send_step = BI_PREPARE;
1047 2 : break;
1048 :
1049 0 : default:
1050 0 : pg_fatal("invalid state");
1051 : sql = NULL; /* keep compiler quiet */
1052 : }
1053 :
1054 : pg_debug("sending: %s\n", sql);
1055 6 : if (PQsendQueryParams(conn, sql,
1056 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1057 0 : pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
1058 : }
1059 :
1060 : Assert(send_step == BI_PREPARE);
1061 : pg_debug("sending: %s\n", insert_sql2);
1062 2 : if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
1063 0 : pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
1064 2 : send_step = BI_INSERT_ROWS;
1065 :
1066 : /*
1067 : * Now we start inserting. We'll be sending enough data that we could fill
1068 : * our output buffer, so to avoid deadlocking we need to enter nonblocking
1069 : * mode and consume input while we send more output. As results of each
1070 : * query are processed we should pop them to allow processing of the next
1071 : * query. There's no need to finish the pipeline before processing
1072 : * results.
1073 : */
1074 2 : if (PQsetnonblocking(conn, 1) != 0)
1075 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1076 :
1077 41332 : while (recv_step != BI_DONE)
1078 : {
1079 : int sock;
1080 : fd_set input_mask;
1081 : fd_set output_mask;
1082 :
1083 41330 : sock = PQsocket(conn);
1084 :
1085 41330 : if (sock < 0)
1086 0 : break; /* shouldn't happen */
1087 :
1088 41330 : FD_ZERO(&input_mask);
1089 41330 : FD_SET(sock, &input_mask);
1090 41330 : FD_ZERO(&output_mask);
1091 41330 : FD_SET(sock, &output_mask);
1092 :
1093 41330 : if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1094 : {
1095 0 : fprintf(stderr, "select() failed: %m\n");
1096 0 : exit_nicely(conn);
1097 : }
1098 :
1099 : /*
1100 : * Process any results, so we keep the server's output buffer free
1101 : * flowing and it can continue to process input
1102 : */
1103 41330 : if (FD_ISSET(sock, &input_mask))
1104 : {
1105 4 : PQconsumeInput(conn);
1106 :
1107 : /* Read until we'd block if we tried to read */
1108 2826 : while (!PQisBusy(conn) && recv_step < BI_DONE)
1109 : {
1110 : PGresult *res;
1111 2822 : const char *cmdtag = "";
1112 2822 : const char *description = "";
1113 : int status;
1114 :
1115 : /*
1116 : * Read next result. If no more results from this query,
1117 : * advance to the next query
1118 : */
1119 2822 : res = PQgetResult(conn);
1120 2822 : if (res == NULL)
1121 1410 : continue;
1122 :
1123 1412 : status = PGRES_COMMAND_OK;
1124 1412 : switch (recv_step)
1125 : {
1126 2 : case BI_BEGIN_TX:
1127 2 : cmdtag = "BEGIN";
1128 2 : recv_step++;
1129 2 : break;
1130 2 : case BI_DROP_TABLE:
1131 2 : cmdtag = "DROP TABLE";
1132 2 : recv_step++;
1133 2 : break;
1134 2 : case BI_CREATE_TABLE:
1135 2 : cmdtag = "CREATE TABLE";
1136 2 : recv_step++;
1137 2 : break;
1138 2 : case BI_PREPARE:
1139 2 : cmdtag = "";
1140 2 : description = "PREPARE";
1141 2 : recv_step++;
1142 2 : break;
1143 1400 : case BI_INSERT_ROWS:
1144 1400 : cmdtag = "INSERT";
1145 1400 : rows_to_receive--;
1146 1400 : if (rows_to_receive == 0)
1147 2 : recv_step++;
1148 1400 : break;
1149 2 : case BI_COMMIT_TX:
1150 2 : cmdtag = "COMMIT";
1151 2 : recv_step++;
1152 2 : break;
1153 2 : case BI_SYNC:
1154 2 : cmdtag = "";
1155 2 : description = "SYNC";
1156 2 : status = PGRES_PIPELINE_SYNC;
1157 2 : recv_step++;
1158 2 : break;
1159 0 : case BI_DONE:
1160 : /* unreachable */
1161 0 : pg_fatal("unreachable state");
1162 : }
1163 :
1164 1412 : if (PQresultStatus(res) != status)
1165 0 : pg_fatal("%s reported status %s, expected %s\n"
1166 : "Error message: \"%s\"",
1167 : description, PQresStatus(PQresultStatus(res)),
1168 : PQresStatus(status), PQerrorMessage(conn));
1169 :
1170 1412 : if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1171 0 : pg_fatal("%s expected command tag '%s', got '%s'",
1172 : description, cmdtag, PQcmdStatus(res));
1173 :
1174 : pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1175 :
1176 1412 : PQclear(res);
1177 : }
1178 : }
1179 :
1180 : /* Write more rows and/or the end pipeline message, if needed */
1181 41330 : if (FD_ISSET(sock, &output_mask))
1182 : {
1183 41328 : PQflush(conn);
1184 :
1185 41328 : if (send_step == BI_INSERT_ROWS)
1186 : {
1187 1400 : snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1188 : /* use up some buffer space with a wide value */
1189 1400 : snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1190 :
1191 1400 : if (PQsendQueryPrepared(conn, "my_insert",
1192 : 2, insert_params, NULL, NULL, 0) == 1)
1193 : {
1194 : pg_debug("sent row %d\n", rows_to_send);
1195 :
1196 1400 : rows_to_send--;
1197 1400 : if (rows_to_send == 0)
1198 2 : send_step++;
1199 : }
1200 : else
1201 : {
1202 : /*
1203 : * in nonblocking mode, so it's OK for an insert to fail
1204 : * to send
1205 : */
1206 0 : fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1207 : rows_to_send, PQerrorMessage(conn));
1208 : }
1209 : }
1210 39928 : else if (send_step == BI_COMMIT_TX)
1211 : {
1212 2 : if (PQsendQueryParams(conn, "COMMIT",
1213 : 0, NULL, NULL, NULL, NULL, 0) == 1)
1214 : {
1215 : pg_debug("sent COMMIT\n");
1216 2 : send_step++;
1217 : }
1218 : else
1219 : {
1220 0 : fprintf(stderr, "WARNING: failed to send commit: %s\n",
1221 : PQerrorMessage(conn));
1222 : }
1223 : }
1224 39926 : else if (send_step == BI_SYNC)
1225 : {
1226 2 : if (PQpipelineSync(conn) == 1)
1227 : {
1228 2 : fprintf(stdout, "pipeline sync sent\n");
1229 2 : send_step++;
1230 : }
1231 : else
1232 : {
1233 0 : fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1234 : PQerrorMessage(conn));
1235 : }
1236 : }
1237 : }
1238 : }
1239 :
1240 : /* We've got the sync message and the pipeline should be done */
1241 2 : if (PQexitPipelineMode(conn) != 1)
1242 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1243 : PQerrorMessage(conn));
1244 :
1245 2 : if (PQsetnonblocking(conn, 0) != 0)
1246 0 : pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1247 :
1248 2 : fprintf(stderr, "ok\n");
1249 2 : }
1250 :
1251 : static void
1252 2 : test_prepared(PGconn *conn)
1253 : {
1254 2 : PGresult *res = NULL;
1255 2 : Oid param_oids[1] = {INT4OID};
1256 : Oid expected_oids[4];
1257 : Oid typ;
1258 :
1259 2 : fprintf(stderr, "prepared... ");
1260 :
1261 2 : if (PQenterPipelineMode(conn) != 1)
1262 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1263 2 : if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1264 : "interval '1 sec'",
1265 : 1, param_oids) != 1)
1266 0 : pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1267 2 : expected_oids[0] = INT4OID;
1268 2 : expected_oids[1] = TEXTOID;
1269 2 : expected_oids[2] = NUMERICOID;
1270 2 : expected_oids[3] = INTERVALOID;
1271 2 : if (PQsendDescribePrepared(conn, "select_one") != 1)
1272 0 : pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1273 2 : if (PQpipelineSync(conn) != 1)
1274 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1275 :
1276 2 : res = PQgetResult(conn);
1277 2 : if (res == NULL)
1278 0 : pg_fatal("PQgetResult returned null");
1279 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1280 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1281 2 : PQclear(res);
1282 2 : res = PQgetResult(conn);
1283 2 : if (res != NULL)
1284 0 : pg_fatal("expected NULL result");
1285 :
1286 2 : res = PQgetResult(conn);
1287 2 : if (res == NULL)
1288 0 : pg_fatal("PQgetResult returned NULL");
1289 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1290 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1291 2 : if (PQnfields(res) != lengthof(expected_oids))
1292 0 : pg_fatal("expected %zu columns, got %d",
1293 : lengthof(expected_oids), PQnfields(res));
1294 10 : for (int i = 0; i < PQnfields(res); i++)
1295 : {
1296 8 : typ = PQftype(res, i);
1297 8 : if (typ != expected_oids[i])
1298 0 : pg_fatal("field %d: expected type %u, got %u",
1299 : i, expected_oids[i], typ);
1300 : }
1301 2 : PQclear(res);
1302 2 : res = PQgetResult(conn);
1303 2 : if (res != NULL)
1304 0 : pg_fatal("expected NULL result");
1305 :
1306 2 : res = PQgetResult(conn);
1307 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1308 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1309 :
1310 2 : fprintf(stderr, "closing statement..");
1311 2 : if (PQsendClosePrepared(conn, "select_one") != 1)
1312 0 : pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1313 2 : if (PQpipelineSync(conn) != 1)
1314 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1315 :
1316 2 : res = PQgetResult(conn);
1317 2 : if (res == NULL)
1318 0 : pg_fatal("expected non-NULL result");
1319 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1320 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1321 2 : PQclear(res);
1322 2 : res = PQgetResult(conn);
1323 2 : if (res != NULL)
1324 0 : pg_fatal("expected NULL result");
1325 2 : res = PQgetResult(conn);
1326 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1327 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1328 :
1329 2 : if (PQexitPipelineMode(conn) != 1)
1330 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1331 :
1332 : /* Now that it's closed we should get an error when describing */
1333 2 : res = PQdescribePrepared(conn, "select_one");
1334 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1335 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1336 :
1337 : /*
1338 : * Also test the blocking close, this should not fail since closing a
1339 : * non-existent prepared statement is a no-op
1340 : */
1341 2 : res = PQclosePrepared(conn, "select_one");
1342 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1343 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1344 :
1345 2 : fprintf(stderr, "creating portal... ");
1346 2 : PQexec(conn, "BEGIN");
1347 2 : PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1348 2 : PQenterPipelineMode(conn);
1349 2 : if (PQsendDescribePortal(conn, "cursor_one") != 1)
1350 0 : pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1351 2 : if (PQpipelineSync(conn) != 1)
1352 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1353 2 : res = PQgetResult(conn);
1354 2 : if (res == NULL)
1355 0 : pg_fatal("PQgetResult returned null");
1356 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1357 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1358 :
1359 2 : typ = PQftype(res, 0);
1360 2 : if (typ != INT4OID)
1361 0 : pg_fatal("portal: expected type %u, got %u",
1362 : INT4OID, typ);
1363 2 : PQclear(res);
1364 2 : res = PQgetResult(conn);
1365 2 : if (res != NULL)
1366 0 : pg_fatal("expected NULL result");
1367 2 : res = PQgetResult(conn);
1368 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1369 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1370 :
1371 2 : fprintf(stderr, "closing portal... ");
1372 2 : if (PQsendClosePortal(conn, "cursor_one") != 1)
1373 0 : pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1374 2 : if (PQpipelineSync(conn) != 1)
1375 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1376 :
1377 2 : res = PQgetResult(conn);
1378 2 : if (res == NULL)
1379 0 : pg_fatal("expected non-NULL result");
1380 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1381 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1382 2 : PQclear(res);
1383 2 : res = PQgetResult(conn);
1384 2 : if (res != NULL)
1385 0 : pg_fatal("expected NULL result");
1386 2 : res = PQgetResult(conn);
1387 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1388 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1389 :
1390 2 : if (PQexitPipelineMode(conn) != 1)
1391 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1392 :
1393 : /* Now that it's closed we should get an error when describing */
1394 2 : res = PQdescribePortal(conn, "cursor_one");
1395 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1396 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1397 :
1398 : /*
1399 : * Also test the blocking close, this should not fail since closing a
1400 : * non-existent portal is a no-op
1401 : */
1402 2 : res = PQclosePortal(conn, "cursor_one");
1403 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1404 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1405 :
1406 2 : fprintf(stderr, "ok\n");
1407 2 : }
1408 :
1409 : /* Notice processor: print notices, and count how many we got */
1410 : static void
1411 2 : notice_processor(void *arg, const char *message)
1412 : {
1413 2 : int *n_notices = (int *) arg;
1414 :
1415 2 : (*n_notices)++;
1416 2 : fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1417 2 : }
1418 :
1419 : /* Verify behavior in "idle" state */
1420 : static void
1421 2 : test_pipeline_idle(PGconn *conn)
1422 : {
1423 : PGresult *res;
1424 2 : int n_notices = 0;
1425 :
1426 2 : fprintf(stderr, "\npipeline idle...\n");
1427 :
1428 2 : PQsetNoticeProcessor(conn, notice_processor, &n_notices);
1429 :
1430 : /* Try to exit pipeline mode in pipeline-idle state */
1431 2 : if (PQenterPipelineMode(conn) != 1)
1432 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1433 2 : if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1434 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1435 2 : PQsendFlushRequest(conn);
1436 2 : res = PQgetResult(conn);
1437 2 : if (res == NULL)
1438 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1439 : PQerrorMessage(conn));
1440 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1441 0 : pg_fatal("unexpected result code %s from first pipeline item",
1442 : PQresStatus(PQresultStatus(res)));
1443 2 : PQclear(res);
1444 2 : res = PQgetResult(conn);
1445 2 : if (res != NULL)
1446 0 : pg_fatal("did not receive terminating NULL");
1447 2 : if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1448 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1449 2 : if (PQexitPipelineMode(conn) == 1)
1450 0 : pg_fatal("exiting pipeline succeeded when it shouldn't");
1451 2 : if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1452 : strlen("cannot exit pipeline mode")) != 0)
1453 0 : pg_fatal("did not get expected error; got: %s",
1454 : PQerrorMessage(conn));
1455 2 : PQsendFlushRequest(conn);
1456 2 : res = PQgetResult(conn);
1457 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1458 0 : pg_fatal("unexpected result code %s from second pipeline item",
1459 : PQresStatus(PQresultStatus(res)));
1460 2 : PQclear(res);
1461 2 : res = PQgetResult(conn);
1462 2 : if (res != NULL)
1463 0 : pg_fatal("did not receive terminating NULL");
1464 2 : if (PQexitPipelineMode(conn) != 1)
1465 0 : pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1466 :
1467 2 : if (n_notices > 0)
1468 0 : pg_fatal("got %d notice(s)", n_notices);
1469 2 : fprintf(stderr, "ok - 1\n");
1470 :
1471 : /* Have a WARNING in the middle of a resultset */
1472 2 : if (PQenterPipelineMode(conn) != 1)
1473 0 : pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1474 2 : if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1475 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1476 2 : PQsendFlushRequest(conn);
1477 2 : res = PQgetResult(conn);
1478 2 : if (res == NULL)
1479 0 : pg_fatal("unexpected NULL result received");
1480 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1481 0 : pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
1482 2 : if (PQexitPipelineMode(conn) != 1)
1483 0 : pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1484 2 : fprintf(stderr, "ok - 2\n");
1485 2 : }
1486 :
1487 : static void
1488 2 : test_simple_pipeline(PGconn *conn)
1489 : {
1490 2 : PGresult *res = NULL;
1491 2 : const char *dummy_params[1] = {"1"};
1492 2 : Oid dummy_param_oids[1] = {INT4OID};
1493 :
1494 2 : fprintf(stderr, "simple pipeline... ");
1495 :
1496 : /*
1497 : * Enter pipeline mode and dispatch a set of operations, which we'll then
1498 : * process the results of as they come in.
1499 : *
1500 : * For a simple case we should be able to do this without interim
1501 : * processing of results since our output buffer will give us enough slush
1502 : * to work with and we won't block on sending. So blocking mode is fine.
1503 : */
1504 2 : if (PQisnonblocking(conn))
1505 0 : pg_fatal("Expected blocking connection mode");
1506 :
1507 2 : if (PQenterPipelineMode(conn) != 1)
1508 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1509 :
1510 2 : if (PQsendQueryParams(conn, "SELECT $1",
1511 : 1, dummy_param_oids, dummy_params,
1512 : NULL, NULL, 0) != 1)
1513 0 : pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1514 :
1515 2 : if (PQexitPipelineMode(conn) != 0)
1516 0 : pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1517 :
1518 2 : if (PQpipelineSync(conn) != 1)
1519 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1520 :
1521 2 : res = PQgetResult(conn);
1522 2 : if (res == NULL)
1523 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1524 : PQerrorMessage(conn));
1525 :
1526 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1527 0 : pg_fatal("Unexpected result code %s from first pipeline item",
1528 : PQresStatus(PQresultStatus(res)));
1529 :
1530 2 : PQclear(res);
1531 2 : res = NULL;
1532 :
1533 2 : if (PQgetResult(conn) != NULL)
1534 0 : pg_fatal("PQgetResult returned something extra after first query result.");
1535 :
1536 : /*
1537 : * Even though we've processed the result there's still a sync to come and
1538 : * we can't exit pipeline mode yet
1539 : */
1540 2 : if (PQexitPipelineMode(conn) != 0)
1541 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1542 :
1543 2 : res = PQgetResult(conn);
1544 2 : if (res == NULL)
1545 0 : pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1546 : PQerrorMessage(conn));
1547 :
1548 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1549 0 : pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1550 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
1551 :
1552 2 : PQclear(res);
1553 2 : res = NULL;
1554 :
1555 2 : if (PQgetResult(conn) != NULL)
1556 0 : pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1557 : PQresStatus(PQresultStatus(res)));
1558 :
1559 : /* We're still in pipeline mode... */
1560 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1561 0 : pg_fatal("Fell out of pipeline mode somehow");
1562 :
1563 : /* ... until we end it, which we can safely do now */
1564 2 : if (PQexitPipelineMode(conn) != 1)
1565 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1566 : PQerrorMessage(conn));
1567 :
1568 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1569 0 : pg_fatal("Exiting pipeline mode didn't seem to work");
1570 :
1571 2 : fprintf(stderr, "ok\n");
1572 2 : }
1573 :
1574 : static void
1575 2 : test_singlerowmode(PGconn *conn)
1576 : {
1577 : PGresult *res;
1578 : int i;
1579 2 : bool pipeline_ended = false;
1580 :
1581 2 : if (PQenterPipelineMode(conn) != 1)
1582 0 : pg_fatal("failed to enter pipeline mode: %s",
1583 : PQerrorMessage(conn));
1584 :
1585 : /* One series of three commands, using single-row mode for the first two. */
1586 8 : for (i = 0; i < 3; i++)
1587 : {
1588 : char *param[1];
1589 :
1590 6 : param[0] = psprintf("%d", 44 + i);
1591 :
1592 6 : if (PQsendQueryParams(conn,
1593 : "SELECT generate_series(42, $1)",
1594 : 1,
1595 : NULL,
1596 : (const char **) param,
1597 : NULL,
1598 : NULL,
1599 : 0) != 1)
1600 0 : pg_fatal("failed to send query: %s",
1601 : PQerrorMessage(conn));
1602 6 : pfree(param[0]);
1603 : }
1604 2 : if (PQpipelineSync(conn) != 1)
1605 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1606 :
1607 10 : for (i = 0; !pipeline_ended; i++)
1608 : {
1609 8 : bool first = true;
1610 : bool saw_ending_tuplesok;
1611 8 : bool isSingleTuple = false;
1612 :
1613 : /* Set single row mode for only first 2 SELECT queries */
1614 8 : if (i < 2)
1615 : {
1616 4 : if (PQsetSingleRowMode(conn) != 1)
1617 0 : pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1618 : }
1619 :
1620 : /* Consume rows for this query */
1621 8 : saw_ending_tuplesok = false;
1622 28 : while ((res = PQgetResult(conn)) != NULL)
1623 : {
1624 22 : ExecStatusType est = PQresultStatus(res);
1625 :
1626 22 : if (est == PGRES_PIPELINE_SYNC)
1627 : {
1628 2 : fprintf(stderr, "end of pipeline reached\n");
1629 2 : pipeline_ended = true;
1630 2 : PQclear(res);
1631 2 : if (i != 3)
1632 0 : pg_fatal("Expected three results, got %d", i);
1633 2 : break;
1634 : }
1635 :
1636 : /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1637 20 : if (first)
1638 : {
1639 6 : if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1640 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1641 : i, PQresStatus(est));
1642 6 : if (i >= 2 && est != PGRES_TUPLES_OK)
1643 0 : pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1644 : i, PQresStatus(est));
1645 6 : first = false;
1646 : }
1647 :
1648 20 : fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1649 20 : switch (est)
1650 : {
1651 6 : case PGRES_TUPLES_OK:
1652 6 : fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1653 6 : saw_ending_tuplesok = true;
1654 6 : if (isSingleTuple)
1655 : {
1656 4 : if (PQntuples(res) == 0)
1657 4 : fprintf(stderr, "all tuples received in query %d\n", i);
1658 : else
1659 0 : pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1660 : }
1661 6 : break;
1662 :
1663 14 : case PGRES_SINGLE_TUPLE:
1664 14 : isSingleTuple = true;
1665 14 : fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1666 14 : break;
1667 :
1668 0 : default:
1669 0 : pg_fatal("unexpected");
1670 : }
1671 20 : PQclear(res);
1672 : }
1673 8 : if (!pipeline_ended && !saw_ending_tuplesok)
1674 0 : pg_fatal("didn't get expected terminating TUPLES_OK");
1675 : }
1676 :
1677 : /*
1678 : * Now issue one command, get its results in with single-row mode, then
1679 : * issue another command, and get its results in normal mode; make sure
1680 : * the single-row mode flag is reset as expected.
1681 : */
1682 2 : if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1683 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1684 0 : pg_fatal("failed to send query: %s",
1685 : PQerrorMessage(conn));
1686 2 : if (PQsendFlushRequest(conn) != 1)
1687 0 : pg_fatal("failed to send flush request");
1688 2 : if (PQsetSingleRowMode(conn) != 1)
1689 0 : pg_fatal("PQsetSingleRowMode() failed");
1690 2 : res = PQgetResult(conn);
1691 2 : if (res == NULL)
1692 0 : pg_fatal("unexpected NULL");
1693 2 : if (PQresultStatus(res) != PGRES_SINGLE_TUPLE)
1694 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1695 : PQresStatus(PQresultStatus(res)));
1696 2 : res = PQgetResult(conn);
1697 2 : if (res == NULL)
1698 0 : pg_fatal("unexpected NULL");
1699 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1700 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1701 : PQresStatus(PQresultStatus(res)));
1702 2 : if (PQgetResult(conn) != NULL)
1703 0 : pg_fatal("expected NULL result");
1704 :
1705 2 : if (PQsendQueryParams(conn, "SELECT 1",
1706 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1707 0 : pg_fatal("failed to send query: %s",
1708 : PQerrorMessage(conn));
1709 2 : if (PQsendFlushRequest(conn) != 1)
1710 0 : pg_fatal("failed to send flush request");
1711 2 : res = PQgetResult(conn);
1712 2 : if (res == NULL)
1713 0 : pg_fatal("unexpected NULL");
1714 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1715 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1716 : PQresStatus(PQresultStatus(res)));
1717 2 : if (PQgetResult(conn) != NULL)
1718 0 : pg_fatal("expected NULL result");
1719 :
1720 : /*
1721 : * Try chunked mode as well; make sure that it correctly delivers a
1722 : * partial final chunk.
1723 : */
1724 2 : if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1725 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1726 0 : pg_fatal("failed to send query: %s",
1727 : PQerrorMessage(conn));
1728 2 : if (PQsendFlushRequest(conn) != 1)
1729 0 : pg_fatal("failed to send flush request");
1730 2 : if (PQsetChunkedRowsMode(conn, 3) != 1)
1731 0 : pg_fatal("PQsetChunkedRowsMode() failed");
1732 2 : res = PQgetResult(conn);
1733 2 : if (res == NULL)
1734 0 : pg_fatal("unexpected NULL");
1735 2 : if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
1736 0 : pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
1737 : PQresStatus(PQresultStatus(res)),
1738 : PQerrorMessage(conn));
1739 2 : if (PQntuples(res) != 3)
1740 0 : pg_fatal("Expected 3 rows, got %d", PQntuples(res));
1741 2 : res = PQgetResult(conn);
1742 2 : if (res == NULL)
1743 0 : pg_fatal("unexpected NULL");
1744 2 : if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
1745 0 : pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
1746 : PQresStatus(PQresultStatus(res)));
1747 2 : if (PQntuples(res) != 2)
1748 0 : pg_fatal("Expected 2 rows, got %d", PQntuples(res));
1749 2 : res = PQgetResult(conn);
1750 2 : if (res == NULL)
1751 0 : pg_fatal("unexpected NULL");
1752 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1753 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1754 : PQresStatus(PQresultStatus(res)));
1755 2 : if (PQntuples(res) != 0)
1756 0 : pg_fatal("Expected 0 rows, got %d", PQntuples(res));
1757 2 : if (PQgetResult(conn) != NULL)
1758 0 : pg_fatal("expected NULL result");
1759 :
1760 2 : if (PQexitPipelineMode(conn) != 1)
1761 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1762 :
1763 2 : fprintf(stderr, "ok\n");
1764 2 : }
1765 :
1766 : /*
1767 : * Simple test to verify that a pipeline is discarded as a whole when there's
1768 : * an error, ignoring transaction commands.
1769 : */
1770 : static void
1771 2 : test_transaction(PGconn *conn)
1772 : {
1773 : PGresult *res;
1774 : bool expect_null;
1775 2 : int num_syncs = 0;
1776 :
1777 2 : res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1778 : "CREATE TABLE pq_pipeline_tst (id int)");
1779 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1780 0 : pg_fatal("failed to create test table: %s",
1781 : PQerrorMessage(conn));
1782 2 : PQclear(res);
1783 :
1784 2 : if (PQenterPipelineMode(conn) != 1)
1785 0 : pg_fatal("failed to enter pipeline mode: %s",
1786 : PQerrorMessage(conn));
1787 2 : if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1788 0 : pg_fatal("could not send prepare on pipeline: %s",
1789 : PQerrorMessage(conn));
1790 :
1791 2 : if (PQsendQueryParams(conn,
1792 : "BEGIN",
1793 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1794 0 : pg_fatal("failed to send query: %s",
1795 : PQerrorMessage(conn));
1796 2 : if (PQsendQueryParams(conn,
1797 : "SELECT 0/0",
1798 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1799 0 : pg_fatal("failed to send query: %s",
1800 : PQerrorMessage(conn));
1801 :
1802 : /*
1803 : * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1804 : * get out of the pipeline-aborted state first.
1805 : */
1806 2 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1807 0 : pg_fatal("failed to execute prepared: %s",
1808 : PQerrorMessage(conn));
1809 :
1810 : /* This insert fails because we're in pipeline-aborted state */
1811 2 : if (PQsendQueryParams(conn,
1812 : "INSERT INTO pq_pipeline_tst VALUES (1)",
1813 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1814 0 : pg_fatal("failed to send query: %s",
1815 : PQerrorMessage(conn));
1816 2 : if (PQpipelineSync(conn) != 1)
1817 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1818 2 : num_syncs++;
1819 :
1820 : /*
1821 : * This insert fails even though the pipeline got a SYNC, because we're in
1822 : * an aborted transaction
1823 : */
1824 2 : if (PQsendQueryParams(conn,
1825 : "INSERT INTO pq_pipeline_tst VALUES (2)",
1826 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1827 0 : pg_fatal("failed to send query: %s",
1828 : PQerrorMessage(conn));
1829 2 : if (PQpipelineSync(conn) != 1)
1830 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1831 2 : num_syncs++;
1832 :
1833 : /*
1834 : * Send ROLLBACK using prepared stmt. This one works because we just did
1835 : * PQpipelineSync above.
1836 : */
1837 2 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1838 0 : pg_fatal("failed to execute prepared: %s",
1839 : PQerrorMessage(conn));
1840 :
1841 : /*
1842 : * Now that we're out of a transaction and in pipeline-good mode, this
1843 : * insert works
1844 : */
1845 2 : if (PQsendQueryParams(conn,
1846 : "INSERT INTO pq_pipeline_tst VALUES (3)",
1847 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1848 0 : pg_fatal("failed to send query: %s",
1849 : PQerrorMessage(conn));
1850 : /* Send two syncs now -- match up to SYNC messages below */
1851 2 : if (PQpipelineSync(conn) != 1)
1852 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1853 2 : num_syncs++;
1854 2 : if (PQpipelineSync(conn) != 1)
1855 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1856 2 : num_syncs++;
1857 :
1858 2 : expect_null = false;
1859 2 : for (int i = 0;; i++)
1860 38 : {
1861 : ExecStatusType restype;
1862 :
1863 40 : res = PQgetResult(conn);
1864 40 : if (res == NULL)
1865 : {
1866 16 : printf("%d: got NULL result\n", i);
1867 16 : if (!expect_null)
1868 0 : pg_fatal("did not expect NULL here");
1869 16 : expect_null = false;
1870 16 : continue;
1871 : }
1872 24 : restype = PQresultStatus(res);
1873 24 : printf("%d: got status %s", i, PQresStatus(restype));
1874 24 : if (expect_null)
1875 0 : pg_fatal("expected NULL");
1876 24 : if (restype == PGRES_FATAL_ERROR)
1877 4 : printf("; error: %s", PQerrorMessage(conn));
1878 20 : else if (restype == PGRES_PIPELINE_ABORTED)
1879 : {
1880 4 : printf(": command didn't run because pipeline aborted\n");
1881 : }
1882 : else
1883 16 : printf("\n");
1884 24 : PQclear(res);
1885 :
1886 24 : if (restype == PGRES_PIPELINE_SYNC)
1887 8 : num_syncs--;
1888 : else
1889 16 : expect_null = true;
1890 24 : if (num_syncs <= 0)
1891 2 : break;
1892 : }
1893 2 : if (PQgetResult(conn) != NULL)
1894 0 : pg_fatal("returned something extra after all the syncs: %s",
1895 : PQresStatus(PQresultStatus(res)));
1896 :
1897 2 : if (PQexitPipelineMode(conn) != 1)
1898 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1899 :
1900 : /* We expect to find one tuple containing the value "3" */
1901 2 : res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1902 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1903 0 : pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1904 2 : if (PQntuples(res) != 1)
1905 0 : pg_fatal("did not get 1 tuple");
1906 2 : if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1907 0 : pg_fatal("did not get expected tuple");
1908 2 : PQclear(res);
1909 :
1910 2 : fprintf(stderr, "ok\n");
1911 2 : }
1912 :
1913 : /*
1914 : * In this test mode we send a stream of queries, with one in the middle
1915 : * causing an error. Verify that we can still send some more after the
1916 : * error and have libpq work properly.
1917 : */
1918 : static void
1919 2 : test_uniqviol(PGconn *conn)
1920 : {
1921 2 : int sock = PQsocket(conn);
1922 : PGresult *res;
1923 2 : Oid paramTypes[2] = {INT8OID, INT8OID};
1924 : const char *paramValues[2];
1925 : char paramValue0[MAXINT8LEN];
1926 : char paramValue1[MAXINT8LEN];
1927 2 : int ctr = 0;
1928 2 : int numsent = 0;
1929 2 : int results = 0;
1930 2 : bool read_done = false;
1931 2 : bool write_done = false;
1932 2 : bool error_sent = false;
1933 2 : bool got_error = false;
1934 2 : int switched = 0;
1935 2 : int socketful = 0;
1936 : fd_set in_fds;
1937 : fd_set out_fds;
1938 :
1939 2 : fprintf(stderr, "uniqviol ...");
1940 :
1941 2 : PQsetnonblocking(conn, 1);
1942 :
1943 2 : paramValues[0] = paramValue0;
1944 2 : paramValues[1] = paramValue1;
1945 2 : sprintf(paramValue1, "42");
1946 :
1947 2 : res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1948 : "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1949 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1950 0 : pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1951 :
1952 2 : res = PQexec(conn, "begin");
1953 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1954 0 : pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1955 :
1956 2 : res = PQprepare(conn, "insertion",
1957 : "insert into ppln_uniqviol values ($1, $2) returning id",
1958 : 2, paramTypes);
1959 2 : if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1960 0 : pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1961 :
1962 2 : if (PQenterPipelineMode(conn) != 1)
1963 0 : pg_fatal("failed to enter pipeline mode");
1964 :
1965 16 : while (!read_done)
1966 : {
1967 : /*
1968 : * Avoid deadlocks by reading everything the server has sent before
1969 : * sending anything. (Special precaution is needed here to process
1970 : * PQisBusy before testing the socket for read-readiness, because the
1971 : * socket does not turn read-ready after "sending" queries in aborted
1972 : * pipeline mode.)
1973 : */
1974 1212 : while (PQisBusy(conn) == 0)
1975 : {
1976 : bool new_error;
1977 :
1978 1202 : if (results >= numsent)
1979 : {
1980 2 : if (write_done)
1981 0 : read_done = true;
1982 2 : break;
1983 : }
1984 :
1985 1200 : res = PQgetResult(conn);
1986 1200 : new_error = process_result(conn, res, results, numsent);
1987 1200 : if (new_error && got_error)
1988 0 : pg_fatal("got two errors");
1989 1200 : got_error |= new_error;
1990 1200 : if (results++ >= numsent - 1)
1991 : {
1992 4 : if (write_done)
1993 2 : read_done = true;
1994 4 : break;
1995 : }
1996 : }
1997 :
1998 16 : if (read_done)
1999 2 : break;
2000 :
2001 14 : FD_ZERO(&out_fds);
2002 14 : FD_SET(sock, &out_fds);
2003 :
2004 14 : FD_ZERO(&in_fds);
2005 14 : FD_SET(sock, &in_fds);
2006 :
2007 14 : if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
2008 : {
2009 0 : if (errno == EINTR)
2010 0 : continue;
2011 0 : pg_fatal("select() failed: %m");
2012 : }
2013 :
2014 14 : if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
2015 0 : pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
2016 :
2017 : /*
2018 : * If the socket is writable and we haven't finished sending queries,
2019 : * send some.
2020 : */
2021 14 : if (!write_done && FD_ISSET(sock, &out_fds))
2022 : {
2023 : for (;;)
2024 1194 : {
2025 : int flush;
2026 :
2027 : /*
2028 : * provoke uniqueness violation exactly once after having
2029 : * switched to read mode.
2030 : */
2031 1200 : if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
2032 : {
2033 2 : sprintf(paramValue0, "%d", numsent / 2);
2034 2 : fprintf(stderr, "E");
2035 2 : error_sent = true;
2036 : }
2037 : else
2038 : {
2039 1198 : fprintf(stderr, ".");
2040 1198 : sprintf(paramValue0, "%d", ctr++);
2041 : }
2042 :
2043 1200 : if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
2044 0 : pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
2045 1200 : numsent++;
2046 :
2047 : /* Are we done writing? */
2048 1200 : if (socketful != 0 && numsent % socketful == 42 && error_sent)
2049 : {
2050 2 : if (PQsendFlushRequest(conn) != 1)
2051 0 : pg_fatal("failed to send flush request");
2052 2 : write_done = true;
2053 2 : fprintf(stderr, "\ndone writing\n");
2054 2 : PQflush(conn);
2055 2 : break;
2056 : }
2057 :
2058 : /* is the outgoing socket full? */
2059 1198 : flush = PQflush(conn);
2060 1198 : if (flush == -1)
2061 0 : pg_fatal("failed to flush: %s", PQerrorMessage(conn));
2062 1198 : if (flush == 1)
2063 : {
2064 4 : if (socketful == 0)
2065 2 : socketful = numsent;
2066 4 : fprintf(stderr, "\nswitch to reading\n");
2067 4 : switched++;
2068 4 : break;
2069 : }
2070 : }
2071 : }
2072 : }
2073 :
2074 2 : if (!got_error)
2075 0 : pg_fatal("did not get expected error");
2076 :
2077 2 : fprintf(stderr, "ok\n");
2078 2 : }
2079 :
2080 : /*
2081 : * Subroutine for test_uniqviol; given a PGresult, print it out and consume
2082 : * the expected NULL that should follow it.
2083 : *
2084 : * Returns true if we read a fatal error message, otherwise false.
2085 : */
2086 : static bool
2087 1200 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
2088 : {
2089 : PGresult *res2;
2090 1200 : bool got_error = false;
2091 :
2092 1200 : if (res == NULL)
2093 0 : pg_fatal("got unexpected NULL");
2094 :
2095 1200 : switch (PQresultStatus(res))
2096 : {
2097 2 : case PGRES_FATAL_ERROR:
2098 2 : got_error = true;
2099 2 : fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2100 2 : PQclear(res);
2101 :
2102 2 : res2 = PQgetResult(conn);
2103 2 : if (res2 != NULL)
2104 0 : pg_fatal("expected NULL, got %s",
2105 : PQresStatus(PQresultStatus(res2)));
2106 2 : break;
2107 :
2108 836 : case PGRES_TUPLES_OK:
2109 836 : fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2110 836 : PQclear(res);
2111 :
2112 836 : res2 = PQgetResult(conn);
2113 836 : if (res2 != NULL)
2114 0 : pg_fatal("expected NULL, got %s",
2115 : PQresStatus(PQresultStatus(res2)));
2116 836 : break;
2117 :
2118 362 : case PGRES_PIPELINE_ABORTED:
2119 362 : fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2120 362 : res2 = PQgetResult(conn);
2121 362 : if (res2 != NULL)
2122 0 : pg_fatal("expected NULL, got %s",
2123 : PQresStatus(PQresultStatus(res2)));
2124 362 : break;
2125 :
2126 0 : default:
2127 0 : pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2128 : }
2129 :
2130 1200 : return got_error;
2131 : }
2132 :
2133 :
2134 : static void
2135 0 : usage(const char *progname)
2136 : {
2137 0 : fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2138 0 : fprintf(stderr, "Usage:\n");
2139 0 : fprintf(stderr, " %s [OPTION] tests\n", progname);
2140 0 : fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2141 0 : fprintf(stderr, "\nOptions:\n");
2142 0 : fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2143 0 : fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2144 0 : }
2145 :
2146 : static void
2147 2 : print_test_list(void)
2148 : {
2149 2 : printf("cancel\n");
2150 2 : printf("disallowed_in_pipeline\n");
2151 2 : printf("multi_pipelines\n");
2152 2 : printf("nosync\n");
2153 2 : printf("pipeline_abort\n");
2154 2 : printf("pipeline_idle\n");
2155 2 : printf("pipelined_insert\n");
2156 2 : printf("prepared\n");
2157 2 : printf("simple_pipeline\n");
2158 2 : printf("singlerow\n");
2159 2 : printf("transaction\n");
2160 2 : printf("uniqviol\n");
2161 2 : }
2162 :
2163 : int
2164 26 : main(int argc, char **argv)
2165 : {
2166 26 : const char *conninfo = "";
2167 : PGconn *conn;
2168 : FILE *trace;
2169 : char *testname;
2170 26 : int numrows = 10000;
2171 : PGresult *res;
2172 : int c;
2173 :
2174 94 : while ((c = getopt(argc, argv, "r:t:")) != -1)
2175 : {
2176 42 : switch (c)
2177 : {
2178 24 : case 'r': /* numrows */
2179 24 : errno = 0;
2180 24 : numrows = strtol(optarg, NULL, 10);
2181 24 : if (errno != 0 || numrows <= 0)
2182 : {
2183 0 : fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2184 : optarg);
2185 0 : exit(1);
2186 : }
2187 24 : break;
2188 18 : case 't': /* trace file */
2189 18 : tracefile = pg_strdup(optarg);
2190 18 : break;
2191 : }
2192 68 : }
2193 :
2194 26 : if (optind < argc)
2195 : {
2196 26 : testname = pg_strdup(argv[optind]);
2197 26 : optind++;
2198 : }
2199 : else
2200 : {
2201 0 : usage(argv[0]);
2202 0 : exit(1);
2203 : }
2204 :
2205 26 : if (strcmp(testname, "tests") == 0)
2206 : {
2207 2 : print_test_list();
2208 2 : exit(0);
2209 : }
2210 :
2211 24 : if (optind < argc)
2212 : {
2213 24 : conninfo = pg_strdup(argv[optind]);
2214 24 : optind++;
2215 : }
2216 :
2217 : /* Make a connection to the database */
2218 24 : conn = PQconnectdb(conninfo);
2219 24 : if (PQstatus(conn) != CONNECTION_OK)
2220 : {
2221 0 : fprintf(stderr, "Connection to database failed: %s\n",
2222 : PQerrorMessage(conn));
2223 0 : exit_nicely(conn);
2224 : }
2225 :
2226 24 : res = PQexec(conn, "SET lc_messages TO \"C\"");
2227 24 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2228 0 : pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
2229 24 : res = PQexec(conn, "SET debug_parallel_query = off");
2230 24 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2231 0 : pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
2232 :
2233 : /* Set the trace file, if requested */
2234 24 : if (tracefile != NULL)
2235 : {
2236 18 : if (strcmp(tracefile, "-") == 0)
2237 0 : trace = stdout;
2238 : else
2239 18 : trace = fopen(tracefile, "w");
2240 18 : if (trace == NULL)
2241 0 : pg_fatal("could not open file \"%s\": %m", tracefile);
2242 :
2243 : /* Make it line-buffered */
2244 18 : setvbuf(trace, NULL, PG_IOLBF, 0);
2245 :
2246 18 : PQtrace(conn, trace);
2247 18 : PQsetTraceFlags(conn,
2248 : PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
2249 : }
2250 :
2251 24 : if (strcmp(testname, "cancel") == 0)
2252 2 : test_cancel(conn);
2253 22 : else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2254 2 : test_disallowed_in_pipeline(conn);
2255 20 : else if (strcmp(testname, "multi_pipelines") == 0)
2256 2 : test_multi_pipelines(conn);
2257 18 : else if (strcmp(testname, "nosync") == 0)
2258 2 : test_nosync(conn);
2259 16 : else if (strcmp(testname, "pipeline_abort") == 0)
2260 2 : test_pipeline_abort(conn);
2261 14 : else if (strcmp(testname, "pipeline_idle") == 0)
2262 2 : test_pipeline_idle(conn);
2263 12 : else if (strcmp(testname, "pipelined_insert") == 0)
2264 2 : test_pipelined_insert(conn, numrows);
2265 10 : else if (strcmp(testname, "prepared") == 0)
2266 2 : test_prepared(conn);
2267 8 : else if (strcmp(testname, "simple_pipeline") == 0)
2268 2 : test_simple_pipeline(conn);
2269 6 : else if (strcmp(testname, "singlerow") == 0)
2270 2 : test_singlerowmode(conn);
2271 4 : else if (strcmp(testname, "transaction") == 0)
2272 2 : test_transaction(conn);
2273 2 : else if (strcmp(testname, "uniqviol") == 0)
2274 2 : test_uniqviol(conn);
2275 : else
2276 : {
2277 0 : fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2278 0 : exit(1);
2279 : }
2280 :
2281 : /* close the connection to the database and cleanup */
2282 24 : PQfinish(conn);
2283 24 : return 0;
2284 : }
|