Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * libpq_pipeline.c
4 : * Verify libpq pipeline execution functionality
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/test/modules/libpq_pipeline/libpq_pipeline.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres_fe.h"
17 :
18 : #include <sys/select.h>
19 : #include <sys/time.h>
20 :
21 : #include "catalog/pg_type_d.h"
22 : #include "libpq-fe.h"
23 : #include "pg_getopt.h"
24 :
25 :
26 : static void exit_nicely(PGconn *conn);
27 : pg_noreturn static void pg_fatal_impl(int line, const char *fmt,...)
28 : pg_attribute_printf(2, 3);
29 : static bool process_result(PGconn *conn, PGresult *res, int results,
30 : int numsent);
31 :
32 : static const char *const progname = "libpq_pipeline";
33 :
34 : /* Options and defaults */
35 : static char *tracefile = NULL; /* path to PQtrace() file */
36 :
37 :
38 : #ifdef DEBUG_OUTPUT
39 : #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
40 : #else
41 : #define pg_debug(...)
42 : #endif
43 :
44 : static const char *const drop_table_sql =
45 : "DROP TABLE IF EXISTS pq_pipeline_demo";
46 : static const char *const create_table_sql =
47 : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
48 : "int8filler int8);";
49 : static const char *const insert_sql =
50 : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
51 : static const char *const insert_sql2 =
52 : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
53 :
54 : /* max char length of an int32/64, plus sign and null terminator */
55 : #define MAXINTLEN 12
56 : #define MAXINT8LEN 20
57 :
58 : static void
59 0 : exit_nicely(PGconn *conn)
60 : {
61 0 : PQfinish(conn);
62 0 : exit(1);
63 : }
64 :
65 : /*
66 : * The following few functions are wrapped in macros to make the reported line
67 : * number in an error match the line number of the invocation.
68 : */
69 :
70 : /*
71 : * Print an error to stderr and terminate the program.
72 : */
73 : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
74 : pg_noreturn static void
75 0 : pg_fatal_impl(int line, const char *fmt,...)
76 : {
77 : va_list args;
78 :
79 0 : fflush(stdout);
80 :
81 0 : fprintf(stderr, "\n%s:%d: ", progname, line);
82 0 : va_start(args, fmt);
83 0 : vfprintf(stderr, fmt, args);
84 0 : va_end(args);
85 : Assert(fmt[strlen(fmt) - 1] != '\n');
86 0 : fprintf(stderr, "\n");
87 0 : exit(1);
88 : }
89 :
90 : /*
91 : * Check that the query on the given connection got canceled.
92 : */
93 : #define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
94 : static void
95 24 : confirm_query_canceled_impl(int line, PGconn *conn)
96 : {
97 24 : PGresult *res = NULL;
98 :
99 24 : res = PQgetResult(conn);
100 24 : if (res == NULL)
101 0 : pg_fatal_impl(line, "PQgetResult returned null: %s",
102 : PQerrorMessage(conn));
103 24 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
104 0 : pg_fatal_impl(line, "query did not fail when it was expected");
105 24 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
106 0 : pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
107 : PQerrorMessage(conn));
108 24 : PQclear(res);
109 :
110 24 : while (PQisBusy(conn))
111 0 : PQconsumeInput(conn);
112 24 : }
113 :
114 : /*
115 : * Using monitorConn, query pg_stat_activity to see that the connection with
116 : * the given PID is either in the given state, or waiting on the given event
117 : * (only one of them can be given).
118 : */
119 : static void
120 48 : wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
121 : char *state, char *event)
122 : {
123 48 : const Oid paramTypes[] = {INT4OID, TEXTOID};
124 : const char *paramValues[2];
125 48 : char *pidstr = psprintf("%d", procpid);
126 :
127 : Assert((state == NULL) ^ (event == NULL));
128 :
129 48 : paramValues[0] = pidstr;
130 48 : paramValues[1] = state ? state : event;
131 :
132 : while (true)
133 0 : {
134 : PGresult *res;
135 : char *value;
136 :
137 48 : if (state != NULL)
138 24 : res = PQexecParams(monitorConn,
139 : "SELECT count(*) FROM pg_stat_activity WHERE "
140 : "pid = $1 AND state = $2",
141 : 2, paramTypes, paramValues, NULL, NULL, 0);
142 : else
143 24 : res = PQexecParams(monitorConn,
144 : "SELECT count(*) FROM pg_stat_activity WHERE "
145 : "pid = $1 AND wait_event = $2",
146 : 2, paramTypes, paramValues, NULL, NULL, 0);
147 :
148 48 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
149 0 : pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
150 48 : if (PQntuples(res) != 1)
151 0 : pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
152 48 : if (PQnfields(res) != 1)
153 0 : pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
154 48 : value = PQgetvalue(res, 0, 0);
155 48 : if (strcmp(value, "0") != 0)
156 : {
157 48 : PQclear(res);
158 48 : break;
159 : }
160 0 : PQclear(res);
161 :
162 : /* wait 10ms before polling again */
163 0 : pg_usleep(10000);
164 : }
165 :
166 48 : pfree(pidstr);
167 48 : }
168 :
169 : #define send_cancellable_query(conn, monitorConn) \
170 : send_cancellable_query_impl(__LINE__, conn, monitorConn)
171 : static void
172 24 : send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
173 : {
174 : const char *env_wait;
175 24 : const Oid paramTypes[1] = {INT4OID};
176 :
177 : /*
178 : * Wait for the connection to be idle, so that our check for an active
179 : * connection below is reliable, instead of possibly seeing an outdated
180 : * state.
181 : */
182 24 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
183 :
184 24 : env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
185 24 : if (env_wait == NULL)
186 24 : env_wait = "180";
187 :
188 24 : if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
189 : &env_wait, NULL, NULL, 0) != 1)
190 0 : pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
191 :
192 : /*
193 : * Wait for the sleep to be active, because if the query is not running
194 : * yet, the cancel request that we send won't have any effect.
195 : */
196 24 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
197 24 : }
198 :
199 : /*
200 : * Create a new connection with the same conninfo as the given one.
201 : */
202 : static PGconn *
203 4 : copy_connection(PGconn *conn)
204 : {
205 : PGconn *copyConn;
206 4 : PQconninfoOption *opts = PQconninfo(conn);
207 : const char **keywords;
208 : const char **vals;
209 4 : int nopts = 0;
210 : int i;
211 :
212 204 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
213 200 : nopts++;
214 4 : nopts++; /* for the NULL terminator */
215 :
216 4 : keywords = pg_malloc(sizeof(char *) * nopts);
217 4 : vals = pg_malloc(sizeof(char *) * nopts);
218 :
219 4 : i = 0;
220 204 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
221 : {
222 200 : if (opt->val)
223 : {
224 80 : keywords[i] = opt->keyword;
225 80 : vals[i] = opt->val;
226 80 : i++;
227 : }
228 : }
229 4 : keywords[i] = vals[i] = NULL;
230 :
231 4 : copyConn = PQconnectdbParams(keywords, vals, false);
232 :
233 4 : if (PQstatus(copyConn) != CONNECTION_OK)
234 0 : pg_fatal("Connection to database failed: %s",
235 : PQerrorMessage(copyConn));
236 :
237 4 : return copyConn;
238 : }
239 :
240 : /*
241 : * Test query cancellation routines
242 : */
243 : static void
244 4 : test_cancel(PGconn *conn)
245 : {
246 : PGcancel *cancel;
247 : PGcancelConn *cancelConn;
248 : PGconn *monitorConn;
249 : char errorbuf[256];
250 :
251 4 : fprintf(stderr, "test cancellations... ");
252 :
253 4 : if (PQsetnonblocking(conn, 1) != 0)
254 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
255 :
256 : /*
257 : * Make a separate connection to the database to monitor the query on the
258 : * main connection.
259 : */
260 4 : monitorConn = copy_connection(conn);
261 : Assert(PQstatus(monitorConn) == CONNECTION_OK);
262 :
263 : /* test PQcancel */
264 4 : send_cancellable_query(conn, monitorConn);
265 4 : cancel = PQgetCancel(conn);
266 4 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
267 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
268 4 : confirm_query_canceled(conn);
269 :
270 : /* PGcancel object can be reused for the next query */
271 4 : send_cancellable_query(conn, monitorConn);
272 4 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
273 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
274 4 : confirm_query_canceled(conn);
275 :
276 4 : PQfreeCancel(cancel);
277 :
278 : /* test PQrequestCancel */
279 4 : send_cancellable_query(conn, monitorConn);
280 4 : if (!PQrequestCancel(conn))
281 0 : pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
282 4 : confirm_query_canceled(conn);
283 :
284 : /* test PQcancelBlocking */
285 4 : send_cancellable_query(conn, monitorConn);
286 4 : cancelConn = PQcancelCreate(conn);
287 4 : if (!PQcancelBlocking(cancelConn))
288 0 : pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
289 4 : confirm_query_canceled(conn);
290 4 : PQcancelFinish(cancelConn);
291 :
292 : /* test PQcancelCreate and then polling with PQcancelPoll */
293 4 : send_cancellable_query(conn, monitorConn);
294 4 : cancelConn = PQcancelCreate(conn);
295 4 : if (!PQcancelStart(cancelConn))
296 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
297 : while (true)
298 4 : {
299 : struct timeval tv;
300 : fd_set input_mask;
301 : fd_set output_mask;
302 8 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
303 8 : int sock = PQcancelSocket(cancelConn);
304 :
305 8 : if (pollres == PGRES_POLLING_OK)
306 4 : break;
307 :
308 4 : FD_ZERO(&input_mask);
309 4 : FD_ZERO(&output_mask);
310 4 : switch (pollres)
311 : {
312 4 : case PGRES_POLLING_READING:
313 : pg_debug("polling for reads\n");
314 4 : FD_SET(sock, &input_mask);
315 4 : break;
316 0 : case PGRES_POLLING_WRITING:
317 : pg_debug("polling for writes\n");
318 0 : FD_SET(sock, &output_mask);
319 0 : break;
320 0 : default:
321 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
322 : }
323 :
324 4 : if (sock < 0)
325 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
326 :
327 4 : tv.tv_sec = 3;
328 4 : tv.tv_usec = 0;
329 :
330 : while (true)
331 : {
332 4 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
333 : {
334 0 : if (errno == EINTR)
335 0 : continue;
336 0 : pg_fatal("select() failed: %m");
337 : }
338 4 : break;
339 : }
340 : }
341 4 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
342 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
343 4 : confirm_query_canceled(conn);
344 :
345 : /*
346 : * test PQcancelReset works on the cancel connection and it can be reused
347 : * afterwards
348 : */
349 4 : PQcancelReset(cancelConn);
350 :
351 4 : send_cancellable_query(conn, monitorConn);
352 4 : if (!PQcancelStart(cancelConn))
353 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
354 : while (true)
355 4 : {
356 : struct timeval tv;
357 : fd_set input_mask;
358 : fd_set output_mask;
359 8 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
360 8 : int sock = PQcancelSocket(cancelConn);
361 :
362 8 : if (pollres == PGRES_POLLING_OK)
363 4 : break;
364 :
365 4 : FD_ZERO(&input_mask);
366 4 : FD_ZERO(&output_mask);
367 4 : switch (pollres)
368 : {
369 4 : case PGRES_POLLING_READING:
370 : pg_debug("polling for reads\n");
371 4 : FD_SET(sock, &input_mask);
372 4 : break;
373 0 : case PGRES_POLLING_WRITING:
374 : pg_debug("polling for writes\n");
375 0 : FD_SET(sock, &output_mask);
376 0 : break;
377 0 : default:
378 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
379 : }
380 :
381 4 : if (sock < 0)
382 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
383 :
384 4 : tv.tv_sec = 3;
385 4 : tv.tv_usec = 0;
386 :
387 : while (true)
388 : {
389 4 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
390 : {
391 0 : if (errno == EINTR)
392 0 : continue;
393 0 : pg_fatal("select() failed: %m");
394 : }
395 4 : break;
396 : }
397 : }
398 4 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
399 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
400 4 : confirm_query_canceled(conn);
401 :
402 4 : PQcancelFinish(cancelConn);
403 :
404 4 : fprintf(stderr, "ok\n");
405 4 : }
406 :
407 : static void
408 2 : test_disallowed_in_pipeline(PGconn *conn)
409 : {
410 2 : PGresult *res = NULL;
411 :
412 2 : fprintf(stderr, "test error cases... ");
413 :
414 2 : if (PQisnonblocking(conn))
415 0 : pg_fatal("Expected blocking connection mode");
416 :
417 2 : if (PQenterPipelineMode(conn) != 1)
418 0 : pg_fatal("Unable to enter pipeline mode");
419 :
420 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
421 0 : pg_fatal("Pipeline mode not activated properly");
422 :
423 : /* PQexec should fail in pipeline mode */
424 2 : res = PQexec(conn, "SELECT 1");
425 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
426 0 : pg_fatal("PQexec should fail in pipeline mode but succeeded");
427 2 : if (strcmp(PQerrorMessage(conn),
428 : "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
429 0 : pg_fatal("did not get expected error message; got: \"%s\"",
430 : PQerrorMessage(conn));
431 :
432 : /* PQsendQuery should fail in pipeline mode */
433 2 : if (PQsendQuery(conn, "SELECT 1") != 0)
434 0 : pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
435 2 : if (strcmp(PQerrorMessage(conn),
436 : "PQsendQuery not allowed in pipeline mode\n") != 0)
437 0 : pg_fatal("did not get expected error message; got: \"%s\"",
438 : PQerrorMessage(conn));
439 :
440 : /* Entering pipeline mode when already in pipeline mode is OK */
441 2 : if (PQenterPipelineMode(conn) != 1)
442 0 : pg_fatal("re-entering pipeline mode should be a no-op but failed");
443 :
444 2 : if (PQisBusy(conn) != 0)
445 0 : pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
446 :
447 : /* ok, back to normal command mode */
448 2 : if (PQexitPipelineMode(conn) != 1)
449 0 : pg_fatal("couldn't exit idle empty pipeline mode");
450 :
451 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
452 0 : pg_fatal("Pipeline mode not terminated properly");
453 :
454 : /* exiting pipeline mode when not in pipeline mode should be a no-op */
455 2 : if (PQexitPipelineMode(conn) != 1)
456 0 : pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
457 :
458 : /* can now PQexec again */
459 2 : res = PQexec(conn, "SELECT 1");
460 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
461 0 : pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
462 : PQerrorMessage(conn));
463 :
464 2 : fprintf(stderr, "ok\n");
465 2 : }
466 :
467 : static void
468 2 : test_multi_pipelines(PGconn *conn)
469 : {
470 2 : PGresult *res = NULL;
471 2 : const char *dummy_params[1] = {"1"};
472 2 : Oid dummy_param_oids[1] = {INT4OID};
473 :
474 2 : fprintf(stderr, "multi pipeline... ");
475 :
476 : /*
477 : * Queue up a couple of small pipelines and process each without returning
478 : * to command mode first.
479 : */
480 2 : if (PQenterPipelineMode(conn) != 1)
481 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
482 :
483 : /* first pipeline */
484 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
485 : dummy_params, NULL, NULL, 0) != 1)
486 0 : pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
487 :
488 2 : if (PQpipelineSync(conn) != 1)
489 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
490 :
491 : /* second pipeline */
492 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
493 : dummy_params, NULL, NULL, 0) != 1)
494 0 : pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
495 :
496 : /* Skip flushing once. */
497 2 : if (PQsendPipelineSync(conn) != 1)
498 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
499 :
500 : /* third pipeline */
501 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
502 : dummy_params, NULL, NULL, 0) != 1)
503 0 : pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
504 :
505 2 : if (PQpipelineSync(conn) != 1)
506 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
507 :
508 : /* OK, start processing the results */
509 :
510 : /* first pipeline */
511 :
512 2 : res = PQgetResult(conn);
513 2 : if (res == NULL)
514 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
515 : PQerrorMessage(conn));
516 :
517 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
518 0 : pg_fatal("Unexpected result code %s from first pipeline item",
519 : PQresStatus(PQresultStatus(res)));
520 2 : PQclear(res);
521 2 : res = NULL;
522 :
523 2 : if (PQgetResult(conn) != NULL)
524 0 : pg_fatal("PQgetResult returned something extra after first result");
525 :
526 2 : if (PQexitPipelineMode(conn) != 0)
527 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
528 :
529 2 : res = PQgetResult(conn);
530 2 : if (res == NULL)
531 0 : pg_fatal("PQgetResult returned null when sync result expected: %s",
532 : PQerrorMessage(conn));
533 :
534 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
535 0 : pg_fatal("Unexpected result code %s instead of sync result, error: %s",
536 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
537 2 : PQclear(res);
538 :
539 : /* second pipeline */
540 :
541 2 : res = PQgetResult(conn);
542 2 : if (res == NULL)
543 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
544 : PQerrorMessage(conn));
545 :
546 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
547 0 : pg_fatal("Unexpected result code %s from second pipeline item",
548 : PQresStatus(PQresultStatus(res)));
549 2 : PQclear(res);
550 2 : res = NULL;
551 :
552 2 : if (PQgetResult(conn) != NULL)
553 0 : pg_fatal("PQgetResult returned something extra after first result");
554 :
555 2 : if (PQexitPipelineMode(conn) != 0)
556 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
557 :
558 2 : res = PQgetResult(conn);
559 2 : if (res == NULL)
560 0 : pg_fatal("PQgetResult returned null when sync result expected: %s",
561 : PQerrorMessage(conn));
562 :
563 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
564 0 : pg_fatal("Unexpected result code %s instead of sync result, error: %s",
565 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
566 2 : PQclear(res);
567 :
568 : /* third pipeline */
569 :
570 2 : res = PQgetResult(conn);
571 2 : if (res == NULL)
572 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
573 : PQerrorMessage(conn));
574 :
575 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
576 0 : pg_fatal("Unexpected result code %s from third pipeline item",
577 : PQresStatus(PQresultStatus(res)));
578 :
579 2 : res = PQgetResult(conn);
580 2 : if (res != NULL)
581 0 : pg_fatal("Expected null result, got %s",
582 : PQresStatus(PQresultStatus(res)));
583 :
584 2 : res = PQgetResult(conn);
585 2 : if (res == NULL)
586 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
587 : PQerrorMessage(conn));
588 :
589 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
590 0 : pg_fatal("Unexpected result code %s from second pipeline sync",
591 : PQresStatus(PQresultStatus(res)));
592 :
593 : /* We're still in pipeline mode ... */
594 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
595 0 : pg_fatal("Fell out of pipeline mode somehow");
596 :
597 : /* until we end it, which we can safely do now */
598 2 : if (PQexitPipelineMode(conn) != 1)
599 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
600 : PQerrorMessage(conn));
601 :
602 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
603 0 : pg_fatal("exiting pipeline mode didn't seem to work");
604 :
605 2 : fprintf(stderr, "ok\n");
606 2 : }
607 :
608 : /*
609 : * Test behavior when a pipeline dispatches a number of commands that are
610 : * not flushed by a sync point.
611 : */
612 : static void
613 2 : test_nosync(PGconn *conn)
614 : {
615 2 : int numqueries = 10;
616 2 : int results = 0;
617 2 : int sock = PQsocket(conn);
618 :
619 2 : fprintf(stderr, "nosync... ");
620 :
621 2 : if (sock < 0)
622 0 : pg_fatal("invalid socket");
623 :
624 2 : if (PQenterPipelineMode(conn) != 1)
625 0 : pg_fatal("could not enter pipeline mode");
626 22 : for (int i = 0; i < numqueries; i++)
627 : {
628 : fd_set input_mask;
629 : struct timeval tv;
630 :
631 20 : if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
632 : 0, NULL, NULL, NULL, NULL, 0) != 1)
633 0 : pg_fatal("error sending select: %s", PQerrorMessage(conn));
634 20 : PQflush(conn);
635 :
636 : /*
637 : * If the server has written anything to us, read (some of) it now.
638 : */
639 20 : FD_ZERO(&input_mask);
640 20 : FD_SET(sock, &input_mask);
641 20 : tv.tv_sec = 0;
642 20 : tv.tv_usec = 0;
643 20 : if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
644 : {
645 0 : fprintf(stderr, "select() failed: %m\n");
646 0 : exit_nicely(conn);
647 : }
648 20 : if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
649 0 : pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
650 : }
651 :
652 : /* tell server to flush its output buffer */
653 2 : if (PQsendFlushRequest(conn) != 1)
654 0 : pg_fatal("failed to send flush request");
655 2 : PQflush(conn);
656 :
657 : /* Now read all results */
658 : for (;;)
659 18 : {
660 : PGresult *res;
661 :
662 20 : res = PQgetResult(conn);
663 :
664 : /* NULL results are only expected after TUPLES_OK */
665 20 : if (res == NULL)
666 0 : pg_fatal("got unexpected NULL result after %d results", results);
667 :
668 : /* We expect exactly one TUPLES_OK result for each query we sent */
669 20 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
670 : {
671 : PGresult *res2;
672 :
673 : /* and one NULL result should follow each */
674 20 : res2 = PQgetResult(conn);
675 20 : if (res2 != NULL)
676 0 : pg_fatal("expected NULL, got %s",
677 : PQresStatus(PQresultStatus(res2)));
678 20 : PQclear(res);
679 20 : results++;
680 :
681 : /* if we're done, we're done */
682 20 : if (results == numqueries)
683 2 : break;
684 :
685 18 : continue;
686 : }
687 :
688 : /* anything else is unexpected */
689 0 : pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
690 : }
691 :
692 2 : fprintf(stderr, "ok\n");
693 2 : }
694 :
695 : /*
696 : * When an operation in a pipeline fails the rest of the pipeline is flushed. We
697 : * still have to get results for each pipeline item, but the item will just be
698 : * a PGRES_PIPELINE_ABORTED code.
699 : *
700 : * This intentionally doesn't use a transaction to wrap the pipeline. You should
701 : * usually use an xact, but in this case we want to observe the effects of each
702 : * statement.
703 : */
704 : static void
705 2 : test_pipeline_abort(PGconn *conn)
706 : {
707 2 : PGresult *res = NULL;
708 2 : const char *dummy_params[1] = {"1"};
709 2 : Oid dummy_param_oids[1] = {INT4OID};
710 : int i;
711 : int gotrows;
712 : bool goterror;
713 :
714 2 : fprintf(stderr, "aborted pipeline... ");
715 :
716 2 : res = PQexec(conn, drop_table_sql);
717 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
718 0 : pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
719 :
720 2 : res = PQexec(conn, create_table_sql);
721 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
722 0 : pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
723 :
724 : /*
725 : * Queue up a couple of small pipelines and process each without returning
726 : * to command mode first. Make sure the second operation in the first
727 : * pipeline ERRORs.
728 : */
729 2 : if (PQenterPipelineMode(conn) != 1)
730 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
731 :
732 2 : dummy_params[0] = "1";
733 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
734 : dummy_params, NULL, NULL, 0) != 1)
735 0 : pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
736 :
737 2 : if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
738 : 1, dummy_param_oids, dummy_params,
739 : NULL, NULL, 0) != 1)
740 0 : pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
741 :
742 2 : dummy_params[0] = "2";
743 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
744 : dummy_params, NULL, NULL, 0) != 1)
745 0 : pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
746 :
747 2 : if (PQpipelineSync(conn) != 1)
748 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
749 :
750 2 : dummy_params[0] = "3";
751 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
752 : dummy_params, NULL, NULL, 0) != 1)
753 0 : pg_fatal("dispatching second-pipeline insert failed: %s",
754 : PQerrorMessage(conn));
755 :
756 2 : if (PQpipelineSync(conn) != 1)
757 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
758 :
759 : /*
760 : * OK, start processing the pipeline results.
761 : *
762 : * We should get a command-ok for the first query, then a fatal error and
763 : * a pipeline aborted message for the second insert, a pipeline-end, then
764 : * a command-ok and a pipeline-ok for the second pipeline operation.
765 : */
766 2 : res = PQgetResult(conn);
767 2 : if (res == NULL)
768 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
769 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
770 0 : pg_fatal("Unexpected result status %s: %s",
771 : PQresStatus(PQresultStatus(res)),
772 : PQresultErrorMessage(res));
773 2 : PQclear(res);
774 :
775 : /* NULL result to signal end-of-results for this command */
776 2 : if ((res = PQgetResult(conn)) != NULL)
777 0 : pg_fatal("Expected null result, got %s",
778 : PQresStatus(PQresultStatus(res)));
779 :
780 : /* Second query caused error, so we expect an error next */
781 2 : res = PQgetResult(conn);
782 2 : if (res == NULL)
783 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
784 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
785 0 : pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
786 : PQresStatus(PQresultStatus(res)));
787 2 : PQclear(res);
788 :
789 : /* NULL result to signal end-of-results for this command */
790 2 : if ((res = PQgetResult(conn)) != NULL)
791 0 : pg_fatal("Expected null result, got %s",
792 : PQresStatus(PQresultStatus(res)));
793 :
794 : /*
795 : * pipeline should now be aborted.
796 : *
797 : * Note that we could still queue more queries at this point if we wanted;
798 : * they'd get added to a new third pipeline since we've already sent a
799 : * second. The aborted flag relates only to the pipeline being received.
800 : */
801 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
802 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
803 :
804 : /* third query in pipeline, the second insert */
805 2 : res = PQgetResult(conn);
806 2 : if (res == NULL)
807 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
808 2 : if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
809 0 : pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
810 : PQresStatus(PQresultStatus(res)));
811 2 : PQclear(res);
812 :
813 : /* NULL result to signal end-of-results for this command */
814 2 : if ((res = PQgetResult(conn)) != NULL)
815 0 : pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
816 :
817 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
818 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
819 :
820 : /* Ensure we're still in pipeline */
821 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
822 0 : pg_fatal("Fell out of pipeline mode somehow");
823 :
824 : /*
825 : * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
826 : *
827 : * (This is so clients know to start processing results normally again and
828 : * can tell the difference between skipped commands and the sync.)
829 : */
830 2 : res = PQgetResult(conn);
831 2 : if (res == NULL)
832 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
833 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
834 0 : pg_fatal("Unexpected result code from first pipeline sync\n"
835 : "Expected PGRES_PIPELINE_SYNC, got %s",
836 : PQresStatus(PQresultStatus(res)));
837 2 : PQclear(res);
838 :
839 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
840 0 : pg_fatal("sync should've cleared the aborted flag but didn't");
841 :
842 : /* We're still in pipeline mode... */
843 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
844 0 : pg_fatal("Fell out of pipeline mode somehow");
845 :
846 : /* the insert from the second pipeline */
847 2 : res = PQgetResult(conn);
848 2 : if (res == NULL)
849 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
850 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
851 0 : pg_fatal("Unexpected result code %s from first item in second pipeline",
852 : PQresStatus(PQresultStatus(res)));
853 2 : PQclear(res);
854 :
855 : /* Read the NULL result at the end of the command */
856 2 : if ((res = PQgetResult(conn)) != NULL)
857 0 : pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
858 :
859 : /* the second pipeline sync */
860 2 : if ((res = PQgetResult(conn)) == NULL)
861 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
862 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
863 0 : pg_fatal("Unexpected result code %s from second pipeline sync",
864 : PQresStatus(PQresultStatus(res)));
865 2 : PQclear(res);
866 :
867 2 : if ((res = PQgetResult(conn)) != NULL)
868 0 : pg_fatal("Expected null result, got %s: %s",
869 : PQresStatus(PQresultStatus(res)),
870 : PQerrorMessage(conn));
871 :
872 : /* Try to send two queries in one command */
873 2 : if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
874 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
875 2 : if (PQpipelineSync(conn) != 1)
876 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
877 2 : goterror = false;
878 4 : while ((res = PQgetResult(conn)) != NULL)
879 : {
880 2 : switch (PQresultStatus(res))
881 : {
882 2 : case PGRES_FATAL_ERROR:
883 2 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
884 0 : pg_fatal("expected error about multiple commands, got %s",
885 : PQerrorMessage(conn));
886 2 : printf("got expected %s", PQerrorMessage(conn));
887 2 : goterror = true;
888 2 : break;
889 0 : default:
890 0 : pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
891 : break;
892 : }
893 : }
894 2 : if (!goterror)
895 0 : pg_fatal("did not get cannot-insert-multiple-commands error");
896 2 : res = PQgetResult(conn);
897 2 : if (res == NULL)
898 0 : pg_fatal("got NULL result");
899 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
900 0 : pg_fatal("Unexpected result code %s from pipeline sync",
901 : PQresStatus(PQresultStatus(res)));
902 2 : fprintf(stderr, "ok\n");
903 :
904 : /* Test single-row mode with an error partways */
905 2 : if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
906 : 0, NULL, NULL, NULL, NULL, 0) != 1)
907 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
908 2 : if (PQpipelineSync(conn) != 1)
909 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
910 2 : PQsetSingleRowMode(conn);
911 2 : goterror = false;
912 2 : gotrows = 0;
913 10 : while ((res = PQgetResult(conn)) != NULL)
914 : {
915 8 : switch (PQresultStatus(res))
916 : {
917 6 : case PGRES_SINGLE_TUPLE:
918 6 : printf("got row: %s\n", PQgetvalue(res, 0, 0));
919 6 : gotrows++;
920 6 : break;
921 2 : case PGRES_FATAL_ERROR:
922 2 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
923 0 : pg_fatal("expected division-by-zero, got: %s (%s)",
924 : PQerrorMessage(conn),
925 : PQresultErrorField(res, PG_DIAG_SQLSTATE));
926 2 : printf("got expected division-by-zero\n");
927 2 : goterror = true;
928 2 : break;
929 0 : default:
930 0 : pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
931 : }
932 8 : PQclear(res);
933 : }
934 2 : if (!goterror)
935 0 : pg_fatal("did not get division-by-zero error");
936 2 : if (gotrows != 3)
937 0 : pg_fatal("did not get three rows");
938 : /* the third pipeline sync */
939 2 : if ((res = PQgetResult(conn)) == NULL)
940 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
941 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
942 0 : pg_fatal("Unexpected result code %s from third pipeline sync",
943 : PQresStatus(PQresultStatus(res)));
944 2 : PQclear(res);
945 :
946 : /* We're still in pipeline mode... */
947 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
948 0 : pg_fatal("Fell out of pipeline mode somehow");
949 :
950 : /* until we end it, which we can safely do now */
951 2 : if (PQexitPipelineMode(conn) != 1)
952 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
953 : PQerrorMessage(conn));
954 :
955 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
956 0 : pg_fatal("exiting pipeline mode didn't seem to work");
957 :
958 : /*-
959 : * Since we fired the pipelines off without a surrounding xact, the results
960 : * should be:
961 : *
962 : * - Implicit xact started by server around 1st pipeline
963 : * - First insert applied
964 : * - Second statement aborted xact
965 : * - Third insert skipped
966 : * - Sync rolled back first implicit xact
967 : * - Implicit xact created by server around 2nd pipeline
968 : * - insert applied from 2nd pipeline
969 : * - Sync commits 2nd xact
970 : *
971 : * So we should only have the value 3 that we inserted.
972 : */
973 2 : res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
974 :
975 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
976 0 : pg_fatal("Expected tuples, got %s: %s",
977 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
978 2 : if (PQntuples(res) != 1)
979 0 : pg_fatal("expected 1 result, got %d", PQntuples(res));
980 4 : for (i = 0; i < PQntuples(res); i++)
981 : {
982 2 : const char *val = PQgetvalue(res, i, 0);
983 :
984 2 : if (strcmp(val, "3") != 0)
985 0 : pg_fatal("expected only insert with value 3, got %s", val);
986 : }
987 :
988 2 : PQclear(res);
989 :
990 2 : fprintf(stderr, "ok\n");
991 2 : }
992 :
993 : /* State machine enum for test_pipelined_insert */
994 : enum PipelineInsertStep
995 : {
996 : BI_BEGIN_TX,
997 : BI_DROP_TABLE,
998 : BI_CREATE_TABLE,
999 : BI_PREPARE,
1000 : BI_INSERT_ROWS,
1001 : BI_COMMIT_TX,
1002 : BI_SYNC,
1003 : BI_DONE,
1004 : };
1005 :
1006 : static void
1007 2 : test_pipelined_insert(PGconn *conn, int n_rows)
1008 : {
1009 2 : Oid insert_param_oids[2] = {INT4OID, INT8OID};
1010 : const char *insert_params[2];
1011 : char insert_param_0[MAXINTLEN];
1012 : char insert_param_1[MAXINT8LEN];
1013 2 : enum PipelineInsertStep send_step = BI_BEGIN_TX,
1014 2 : recv_step = BI_BEGIN_TX;
1015 : int rows_to_send,
1016 : rows_to_receive;
1017 :
1018 2 : insert_params[0] = insert_param_0;
1019 2 : insert_params[1] = insert_param_1;
1020 :
1021 2 : rows_to_send = rows_to_receive = n_rows;
1022 :
1023 : /*
1024 : * Do a pipelined insert into a table created at the start of the pipeline
1025 : */
1026 2 : if (PQenterPipelineMode(conn) != 1)
1027 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1028 :
1029 8 : while (send_step != BI_PREPARE)
1030 : {
1031 : const char *sql;
1032 :
1033 6 : switch (send_step)
1034 : {
1035 2 : case BI_BEGIN_TX:
1036 2 : sql = "BEGIN TRANSACTION";
1037 2 : send_step = BI_DROP_TABLE;
1038 2 : break;
1039 :
1040 2 : case BI_DROP_TABLE:
1041 2 : sql = drop_table_sql;
1042 2 : send_step = BI_CREATE_TABLE;
1043 2 : break;
1044 :
1045 2 : case BI_CREATE_TABLE:
1046 2 : sql = create_table_sql;
1047 2 : send_step = BI_PREPARE;
1048 2 : break;
1049 :
1050 0 : default:
1051 0 : pg_fatal("invalid state");
1052 : sql = NULL; /* keep compiler quiet */
1053 : }
1054 :
1055 : pg_debug("sending: %s\n", sql);
1056 6 : if (PQsendQueryParams(conn, sql,
1057 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1058 0 : pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
1059 : }
1060 :
1061 : Assert(send_step == BI_PREPARE);
1062 : pg_debug("sending: %s\n", insert_sql2);
1063 2 : if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
1064 0 : pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
1065 2 : send_step = BI_INSERT_ROWS;
1066 :
1067 : /*
1068 : * Now we start inserting. We'll be sending enough data that we could fill
1069 : * our output buffer, so to avoid deadlocking we need to enter nonblocking
1070 : * mode and consume input while we send more output. As results of each
1071 : * query are processed we should pop them to allow processing of the next
1072 : * query. There's no need to finish the pipeline before processing
1073 : * results.
1074 : */
1075 2 : if (PQsetnonblocking(conn, 1) != 0)
1076 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1077 :
1078 22368 : while (recv_step != BI_DONE)
1079 : {
1080 : int sock;
1081 : fd_set input_mask;
1082 : fd_set output_mask;
1083 :
1084 22366 : sock = PQsocket(conn);
1085 :
1086 22366 : if (sock < 0)
1087 0 : break; /* shouldn't happen */
1088 :
1089 22366 : FD_ZERO(&input_mask);
1090 22366 : FD_SET(sock, &input_mask);
1091 22366 : FD_ZERO(&output_mask);
1092 22366 : FD_SET(sock, &output_mask);
1093 :
1094 22366 : if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1095 : {
1096 0 : fprintf(stderr, "select() failed: %m\n");
1097 0 : exit_nicely(conn);
1098 : }
1099 :
1100 : /*
1101 : * Process any results, so we keep the server's output buffer free
1102 : * flowing and it can continue to process input
1103 : */
1104 22366 : if (FD_ISSET(sock, &input_mask))
1105 : {
1106 6 : PQconsumeInput(conn);
1107 :
1108 : /* Read until we'd block if we tried to read */
1109 2828 : while (!PQisBusy(conn) && recv_step < BI_DONE)
1110 : {
1111 : PGresult *res;
1112 2822 : const char *cmdtag = "";
1113 2822 : const char *description = "";
1114 : int status;
1115 :
1116 : /*
1117 : * Read next result. If no more results from this query,
1118 : * advance to the next query
1119 : */
1120 2822 : res = PQgetResult(conn);
1121 2822 : if (res == NULL)
1122 1410 : continue;
1123 :
1124 1412 : status = PGRES_COMMAND_OK;
1125 1412 : switch (recv_step)
1126 : {
1127 2 : case BI_BEGIN_TX:
1128 2 : cmdtag = "BEGIN";
1129 2 : recv_step++;
1130 2 : break;
1131 2 : case BI_DROP_TABLE:
1132 2 : cmdtag = "DROP TABLE";
1133 2 : recv_step++;
1134 2 : break;
1135 2 : case BI_CREATE_TABLE:
1136 2 : cmdtag = "CREATE TABLE";
1137 2 : recv_step++;
1138 2 : break;
1139 2 : case BI_PREPARE:
1140 2 : cmdtag = "";
1141 2 : description = "PREPARE";
1142 2 : recv_step++;
1143 2 : break;
1144 1400 : case BI_INSERT_ROWS:
1145 1400 : cmdtag = "INSERT";
1146 1400 : rows_to_receive--;
1147 1400 : if (rows_to_receive == 0)
1148 2 : recv_step++;
1149 1400 : break;
1150 2 : case BI_COMMIT_TX:
1151 2 : cmdtag = "COMMIT";
1152 2 : recv_step++;
1153 2 : break;
1154 2 : case BI_SYNC:
1155 2 : cmdtag = "";
1156 2 : description = "SYNC";
1157 2 : status = PGRES_PIPELINE_SYNC;
1158 2 : recv_step++;
1159 2 : break;
1160 0 : case BI_DONE:
1161 : /* unreachable */
1162 0 : pg_fatal("unreachable state");
1163 : }
1164 :
1165 1412 : if (PQresultStatus(res) != status)
1166 0 : pg_fatal("%s reported status %s, expected %s\n"
1167 : "Error message: \"%s\"",
1168 : description, PQresStatus(PQresultStatus(res)),
1169 : PQresStatus(status), PQerrorMessage(conn));
1170 :
1171 1412 : if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1172 0 : pg_fatal("%s expected command tag '%s', got '%s'",
1173 : description, cmdtag, PQcmdStatus(res));
1174 :
1175 : pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1176 :
1177 1412 : PQclear(res);
1178 : }
1179 : }
1180 :
1181 : /* Write more rows and/or the end pipeline message, if needed */
1182 22366 : if (FD_ISSET(sock, &output_mask))
1183 : {
1184 22364 : PQflush(conn);
1185 :
1186 22364 : if (send_step == BI_INSERT_ROWS)
1187 : {
1188 1400 : snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1189 : /* use up some buffer space with a wide value */
1190 1400 : snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1191 :
1192 1400 : if (PQsendQueryPrepared(conn, "my_insert",
1193 : 2, insert_params, NULL, NULL, 0) == 1)
1194 : {
1195 : pg_debug("sent row %d\n", rows_to_send);
1196 :
1197 1400 : rows_to_send--;
1198 1400 : if (rows_to_send == 0)
1199 2 : send_step++;
1200 : }
1201 : else
1202 : {
1203 : /*
1204 : * in nonblocking mode, so it's OK for an insert to fail
1205 : * to send
1206 : */
1207 0 : fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1208 : rows_to_send, PQerrorMessage(conn));
1209 : }
1210 : }
1211 20964 : else if (send_step == BI_COMMIT_TX)
1212 : {
1213 2 : if (PQsendQueryParams(conn, "COMMIT",
1214 : 0, NULL, NULL, NULL, NULL, 0) == 1)
1215 : {
1216 : pg_debug("sent COMMIT\n");
1217 2 : send_step++;
1218 : }
1219 : else
1220 : {
1221 0 : fprintf(stderr, "WARNING: failed to send commit: %s\n",
1222 : PQerrorMessage(conn));
1223 : }
1224 : }
1225 20962 : else if (send_step == BI_SYNC)
1226 : {
1227 2 : if (PQpipelineSync(conn) == 1)
1228 : {
1229 2 : fprintf(stdout, "pipeline sync sent\n");
1230 2 : send_step++;
1231 : }
1232 : else
1233 : {
1234 0 : fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1235 : PQerrorMessage(conn));
1236 : }
1237 : }
1238 : }
1239 : }
1240 :
1241 : /* We've got the sync message and the pipeline should be done */
1242 2 : if (PQexitPipelineMode(conn) != 1)
1243 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1244 : PQerrorMessage(conn));
1245 :
1246 2 : if (PQsetnonblocking(conn, 0) != 0)
1247 0 : pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1248 :
1249 2 : fprintf(stderr, "ok\n");
1250 2 : }
1251 :
1252 : static void
1253 2 : test_prepared(PGconn *conn)
1254 : {
1255 2 : PGresult *res = NULL;
1256 2 : Oid param_oids[1] = {INT4OID};
1257 : Oid expected_oids[4];
1258 : Oid typ;
1259 :
1260 2 : fprintf(stderr, "prepared... ");
1261 :
1262 2 : if (PQenterPipelineMode(conn) != 1)
1263 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1264 2 : if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1265 : "interval '1 sec'",
1266 : 1, param_oids) != 1)
1267 0 : pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1268 2 : expected_oids[0] = INT4OID;
1269 2 : expected_oids[1] = TEXTOID;
1270 2 : expected_oids[2] = NUMERICOID;
1271 2 : expected_oids[3] = INTERVALOID;
1272 2 : if (PQsendDescribePrepared(conn, "select_one") != 1)
1273 0 : pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1274 2 : if (PQpipelineSync(conn) != 1)
1275 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1276 :
1277 2 : res = PQgetResult(conn);
1278 2 : if (res == NULL)
1279 0 : pg_fatal("PQgetResult returned null");
1280 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1281 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1282 2 : PQclear(res);
1283 2 : res = PQgetResult(conn);
1284 2 : if (res != NULL)
1285 0 : pg_fatal("expected NULL result");
1286 :
1287 2 : res = PQgetResult(conn);
1288 2 : if (res == NULL)
1289 0 : pg_fatal("PQgetResult returned NULL");
1290 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1291 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1292 2 : if (PQnfields(res) != lengthof(expected_oids))
1293 0 : pg_fatal("expected %zu columns, got %d",
1294 : lengthof(expected_oids), PQnfields(res));
1295 10 : for (int i = 0; i < PQnfields(res); i++)
1296 : {
1297 8 : typ = PQftype(res, i);
1298 8 : if (typ != expected_oids[i])
1299 0 : pg_fatal("field %d: expected type %u, got %u",
1300 : i, expected_oids[i], typ);
1301 : }
1302 2 : PQclear(res);
1303 2 : res = PQgetResult(conn);
1304 2 : if (res != NULL)
1305 0 : pg_fatal("expected NULL result");
1306 :
1307 2 : res = PQgetResult(conn);
1308 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1309 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1310 :
1311 2 : fprintf(stderr, "closing statement..");
1312 2 : if (PQsendClosePrepared(conn, "select_one") != 1)
1313 0 : pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1314 2 : if (PQpipelineSync(conn) != 1)
1315 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1316 :
1317 2 : res = PQgetResult(conn);
1318 2 : if (res == NULL)
1319 0 : pg_fatal("expected non-NULL result");
1320 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1321 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1322 2 : PQclear(res);
1323 2 : res = PQgetResult(conn);
1324 2 : if (res != NULL)
1325 0 : pg_fatal("expected NULL result");
1326 2 : res = PQgetResult(conn);
1327 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1328 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1329 :
1330 2 : if (PQexitPipelineMode(conn) != 1)
1331 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1332 :
1333 : /* Now that it's closed we should get an error when describing */
1334 2 : res = PQdescribePrepared(conn, "select_one");
1335 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1336 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1337 :
1338 : /*
1339 : * Also test the blocking close, this should not fail since closing a
1340 : * non-existent prepared statement is a no-op
1341 : */
1342 2 : res = PQclosePrepared(conn, "select_one");
1343 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1344 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1345 :
1346 2 : fprintf(stderr, "creating portal... ");
1347 2 : PQexec(conn, "BEGIN");
1348 2 : PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1349 2 : PQenterPipelineMode(conn);
1350 2 : if (PQsendDescribePortal(conn, "cursor_one") != 1)
1351 0 : pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1352 2 : if (PQpipelineSync(conn) != 1)
1353 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1354 2 : res = PQgetResult(conn);
1355 2 : if (res == NULL)
1356 0 : pg_fatal("PQgetResult returned null");
1357 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1358 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1359 :
1360 2 : typ = PQftype(res, 0);
1361 2 : if (typ != INT4OID)
1362 0 : pg_fatal("portal: expected type %u, got %u",
1363 : INT4OID, typ);
1364 2 : PQclear(res);
1365 2 : res = PQgetResult(conn);
1366 2 : if (res != NULL)
1367 0 : pg_fatal("expected NULL result");
1368 2 : res = PQgetResult(conn);
1369 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1370 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1371 :
1372 2 : fprintf(stderr, "closing portal... ");
1373 2 : if (PQsendClosePortal(conn, "cursor_one") != 1)
1374 0 : pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1375 2 : if (PQpipelineSync(conn) != 1)
1376 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1377 :
1378 2 : res = PQgetResult(conn);
1379 2 : if (res == NULL)
1380 0 : pg_fatal("expected non-NULL result");
1381 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1382 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1383 2 : PQclear(res);
1384 2 : res = PQgetResult(conn);
1385 2 : if (res != NULL)
1386 0 : pg_fatal("expected NULL result");
1387 2 : res = PQgetResult(conn);
1388 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1389 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1390 :
1391 2 : if (PQexitPipelineMode(conn) != 1)
1392 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1393 :
1394 : /* Now that it's closed we should get an error when describing */
1395 2 : res = PQdescribePortal(conn, "cursor_one");
1396 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1397 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1398 :
1399 : /*
1400 : * Also test the blocking close, this should not fail since closing a
1401 : * non-existent portal is a no-op
1402 : */
1403 2 : res = PQclosePortal(conn, "cursor_one");
1404 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1405 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1406 :
1407 2 : fprintf(stderr, "ok\n");
1408 2 : }
1409 :
1410 : /*
1411 : * Test max_protocol_version options.
1412 : */
1413 : static void
1414 2 : test_protocol_version(PGconn *conn)
1415 : {
1416 : const char **keywords;
1417 : const char **vals;
1418 : int nopts;
1419 2 : PQconninfoOption *opts = PQconninfo(conn);
1420 : int protocol_version;
1421 : int max_protocol_version_index;
1422 : int i;
1423 :
1424 : /*
1425 : * Prepare keywords/vals arrays, copied from the existing connection, with
1426 : * an extra slot for 'max_protocol_version'.
1427 : */
1428 2 : nopts = 0;
1429 102 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
1430 100 : nopts++;
1431 2 : nopts++; /* max_protocol_version */
1432 2 : nopts++; /* NULL terminator */
1433 :
1434 2 : keywords = pg_malloc0(sizeof(char *) * nopts);
1435 2 : vals = pg_malloc0(sizeof(char *) * nopts);
1436 :
1437 2 : i = 0;
1438 102 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
1439 : {
1440 100 : if (opt->val)
1441 : {
1442 40 : keywords[i] = opt->keyword;
1443 40 : vals[i] = opt->val;
1444 40 : i++;
1445 : }
1446 : }
1447 :
1448 2 : max_protocol_version_index = i;
1449 2 : keywords[i] = "max_protocol_version"; /* value is filled in below */
1450 2 : i++;
1451 2 : keywords[i] = vals[i] = NULL;
1452 :
1453 : /*
1454 : * Test max_protocol_version=3.0
1455 : */
1456 2 : vals[max_protocol_version_index] = "3.0";
1457 2 : conn = PQconnectdbParams(keywords, vals, false);
1458 :
1459 2 : if (PQstatus(conn) != CONNECTION_OK)
1460 0 : pg_fatal("Connection to database failed: %s",
1461 : PQerrorMessage(conn));
1462 :
1463 2 : protocol_version = PQfullProtocolVersion(conn);
1464 2 : if (protocol_version != 30000)
1465 0 : pg_fatal("expected 30000, got %d", protocol_version);
1466 :
1467 2 : PQfinish(conn);
1468 :
1469 : /*
1470 : * Test max_protocol_version=3.1. It's not valid, we went straight from
1471 : * 3.0 to 3.2.
1472 : */
1473 2 : vals[max_protocol_version_index] = "3.1";
1474 2 : conn = PQconnectdbParams(keywords, vals, false);
1475 :
1476 2 : if (PQstatus(conn) != CONNECTION_BAD)
1477 0 : pg_fatal("Connecting with max_protocol_version 3.1 should have failed.");
1478 :
1479 2 : PQfinish(conn);
1480 :
1481 : /*
1482 : * Test max_protocol_version=3.2
1483 : */
1484 2 : vals[max_protocol_version_index] = "3.2";
1485 2 : conn = PQconnectdbParams(keywords, vals, false);
1486 :
1487 2 : if (PQstatus(conn) != CONNECTION_OK)
1488 0 : pg_fatal("Connection to database failed: %s",
1489 : PQerrorMessage(conn));
1490 :
1491 2 : protocol_version = PQfullProtocolVersion(conn);
1492 2 : if (protocol_version != 30002)
1493 0 : pg_fatal("expected 30002, got %d", protocol_version);
1494 :
1495 2 : PQfinish(conn);
1496 :
1497 : /*
1498 : * Test max_protocol_version=latest. 'latest' currently means '3.2'.
1499 : */
1500 2 : vals[max_protocol_version_index] = "latest";
1501 2 : conn = PQconnectdbParams(keywords, vals, false);
1502 :
1503 2 : if (PQstatus(conn) != CONNECTION_OK)
1504 0 : pg_fatal("Connection to database failed: %s",
1505 : PQerrorMessage(conn));
1506 :
1507 2 : protocol_version = PQfullProtocolVersion(conn);
1508 2 : if (protocol_version != 30002)
1509 0 : pg_fatal("expected 30002, got %d", protocol_version);
1510 :
1511 2 : PQfinish(conn);
1512 2 : }
1513 :
1514 : /* Notice processor: print notices, and count how many we got */
1515 : static void
1516 2 : notice_processor(void *arg, const char *message)
1517 : {
1518 2 : int *n_notices = (int *) arg;
1519 :
1520 2 : (*n_notices)++;
1521 2 : fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1522 2 : }
1523 :
1524 : /* Verify behavior in "idle" state */
1525 : static void
1526 2 : test_pipeline_idle(PGconn *conn)
1527 : {
1528 : PGresult *res;
1529 2 : int n_notices = 0;
1530 :
1531 2 : fprintf(stderr, "\npipeline idle...\n");
1532 :
1533 2 : PQsetNoticeProcessor(conn, notice_processor, &n_notices);
1534 :
1535 : /* Try to exit pipeline mode in pipeline-idle state */
1536 2 : if (PQenterPipelineMode(conn) != 1)
1537 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1538 2 : if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1539 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1540 2 : PQsendFlushRequest(conn);
1541 2 : res = PQgetResult(conn);
1542 2 : if (res == NULL)
1543 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1544 : PQerrorMessage(conn));
1545 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1546 0 : pg_fatal("unexpected result code %s from first pipeline item",
1547 : PQresStatus(PQresultStatus(res)));
1548 2 : PQclear(res);
1549 2 : res = PQgetResult(conn);
1550 2 : if (res != NULL)
1551 0 : pg_fatal("did not receive terminating NULL");
1552 2 : if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1553 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1554 2 : if (PQexitPipelineMode(conn) == 1)
1555 0 : pg_fatal("exiting pipeline succeeded when it shouldn't");
1556 2 : if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1557 : strlen("cannot exit pipeline mode")) != 0)
1558 0 : pg_fatal("did not get expected error; got: %s",
1559 : PQerrorMessage(conn));
1560 2 : PQsendFlushRequest(conn);
1561 2 : res = PQgetResult(conn);
1562 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1563 0 : pg_fatal("unexpected result code %s from second pipeline item",
1564 : PQresStatus(PQresultStatus(res)));
1565 2 : PQclear(res);
1566 2 : res = PQgetResult(conn);
1567 2 : if (res != NULL)
1568 0 : pg_fatal("did not receive terminating NULL");
1569 2 : if (PQexitPipelineMode(conn) != 1)
1570 0 : pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1571 :
1572 2 : if (n_notices > 0)
1573 0 : pg_fatal("got %d notice(s)", n_notices);
1574 2 : fprintf(stderr, "ok - 1\n");
1575 :
1576 : /* Have a WARNING in the middle of a resultset */
1577 2 : if (PQenterPipelineMode(conn) != 1)
1578 0 : pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1579 2 : if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1580 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1581 2 : PQsendFlushRequest(conn);
1582 2 : res = PQgetResult(conn);
1583 2 : if (res == NULL)
1584 0 : pg_fatal("unexpected NULL result received");
1585 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1586 0 : pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
1587 2 : if (PQexitPipelineMode(conn) != 1)
1588 0 : pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1589 2 : fprintf(stderr, "ok - 2\n");
1590 2 : }
1591 :
1592 : static void
1593 2 : test_simple_pipeline(PGconn *conn)
1594 : {
1595 2 : PGresult *res = NULL;
1596 2 : const char *dummy_params[1] = {"1"};
1597 2 : Oid dummy_param_oids[1] = {INT4OID};
1598 :
1599 2 : fprintf(stderr, "simple pipeline... ");
1600 :
1601 : /*
1602 : * Enter pipeline mode and dispatch a set of operations, which we'll then
1603 : * process the results of as they come in.
1604 : *
1605 : * For a simple case we should be able to do this without interim
1606 : * processing of results since our output buffer will give us enough slush
1607 : * to work with and we won't block on sending. So blocking mode is fine.
1608 : */
1609 2 : if (PQisnonblocking(conn))
1610 0 : pg_fatal("Expected blocking connection mode");
1611 :
1612 2 : if (PQenterPipelineMode(conn) != 1)
1613 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1614 :
1615 2 : if (PQsendQueryParams(conn, "SELECT $1",
1616 : 1, dummy_param_oids, dummy_params,
1617 : NULL, NULL, 0) != 1)
1618 0 : pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1619 :
1620 2 : if (PQexitPipelineMode(conn) != 0)
1621 0 : pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1622 :
1623 2 : if (PQpipelineSync(conn) != 1)
1624 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1625 :
1626 2 : res = PQgetResult(conn);
1627 2 : if (res == NULL)
1628 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1629 : PQerrorMessage(conn));
1630 :
1631 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1632 0 : pg_fatal("Unexpected result code %s from first pipeline item",
1633 : PQresStatus(PQresultStatus(res)));
1634 :
1635 2 : PQclear(res);
1636 2 : res = NULL;
1637 :
1638 2 : if (PQgetResult(conn) != NULL)
1639 0 : pg_fatal("PQgetResult returned something extra after first query result.");
1640 :
1641 : /*
1642 : * Even though we've processed the result there's still a sync to come and
1643 : * we can't exit pipeline mode yet
1644 : */
1645 2 : if (PQexitPipelineMode(conn) != 0)
1646 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1647 :
1648 2 : res = PQgetResult(conn);
1649 2 : if (res == NULL)
1650 0 : pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1651 : PQerrorMessage(conn));
1652 :
1653 2 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1654 0 : pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1655 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
1656 :
1657 2 : PQclear(res);
1658 2 : res = NULL;
1659 :
1660 2 : if (PQgetResult(conn) != NULL)
1661 0 : pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1662 : PQresStatus(PQresultStatus(res)));
1663 :
1664 : /* We're still in pipeline mode... */
1665 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1666 0 : pg_fatal("Fell out of pipeline mode somehow");
1667 :
1668 : /* ... until we end it, which we can safely do now */
1669 2 : if (PQexitPipelineMode(conn) != 1)
1670 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1671 : PQerrorMessage(conn));
1672 :
1673 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1674 0 : pg_fatal("Exiting pipeline mode didn't seem to work");
1675 :
1676 2 : fprintf(stderr, "ok\n");
1677 2 : }
1678 :
1679 : static void
1680 2 : test_singlerowmode(PGconn *conn)
1681 : {
1682 : PGresult *res;
1683 : int i;
1684 2 : bool pipeline_ended = false;
1685 :
1686 2 : if (PQenterPipelineMode(conn) != 1)
1687 0 : pg_fatal("failed to enter pipeline mode: %s",
1688 : PQerrorMessage(conn));
1689 :
1690 : /* One series of three commands, using single-row mode for the first two. */
1691 8 : for (i = 0; i < 3; i++)
1692 : {
1693 : char *param[1];
1694 :
1695 6 : param[0] = psprintf("%d", 44 + i);
1696 :
1697 6 : if (PQsendQueryParams(conn,
1698 : "SELECT generate_series(42, $1)",
1699 : 1,
1700 : NULL,
1701 : (const char **) param,
1702 : NULL,
1703 : NULL,
1704 : 0) != 1)
1705 0 : pg_fatal("failed to send query: %s",
1706 : PQerrorMessage(conn));
1707 6 : pfree(param[0]);
1708 : }
1709 2 : if (PQpipelineSync(conn) != 1)
1710 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1711 :
1712 10 : for (i = 0; !pipeline_ended; i++)
1713 : {
1714 8 : bool first = true;
1715 : bool saw_ending_tuplesok;
1716 8 : bool isSingleTuple = false;
1717 :
1718 : /* Set single row mode for only first 2 SELECT queries */
1719 8 : if (i < 2)
1720 : {
1721 4 : if (PQsetSingleRowMode(conn) != 1)
1722 0 : pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1723 : }
1724 :
1725 : /* Consume rows for this query */
1726 8 : saw_ending_tuplesok = false;
1727 28 : while ((res = PQgetResult(conn)) != NULL)
1728 : {
1729 22 : ExecStatusType est = PQresultStatus(res);
1730 :
1731 22 : if (est == PGRES_PIPELINE_SYNC)
1732 : {
1733 2 : fprintf(stderr, "end of pipeline reached\n");
1734 2 : pipeline_ended = true;
1735 2 : PQclear(res);
1736 2 : if (i != 3)
1737 0 : pg_fatal("Expected three results, got %d", i);
1738 2 : break;
1739 : }
1740 :
1741 : /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1742 20 : if (first)
1743 : {
1744 6 : if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1745 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1746 : i, PQresStatus(est));
1747 6 : if (i >= 2 && est != PGRES_TUPLES_OK)
1748 0 : pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1749 : i, PQresStatus(est));
1750 6 : first = false;
1751 : }
1752 :
1753 20 : fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1754 20 : switch (est)
1755 : {
1756 6 : case PGRES_TUPLES_OK:
1757 6 : fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1758 6 : saw_ending_tuplesok = true;
1759 6 : if (isSingleTuple)
1760 : {
1761 4 : if (PQntuples(res) == 0)
1762 4 : fprintf(stderr, "all tuples received in query %d\n", i);
1763 : else
1764 0 : pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1765 : }
1766 6 : break;
1767 :
1768 14 : case PGRES_SINGLE_TUPLE:
1769 14 : isSingleTuple = true;
1770 14 : fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1771 14 : break;
1772 :
1773 0 : default:
1774 0 : pg_fatal("unexpected");
1775 : }
1776 20 : PQclear(res);
1777 : }
1778 8 : if (!pipeline_ended && !saw_ending_tuplesok)
1779 0 : pg_fatal("didn't get expected terminating TUPLES_OK");
1780 : }
1781 :
1782 : /*
1783 : * Now issue one command, get its results in with single-row mode, then
1784 : * issue another command, and get its results in normal mode; make sure
1785 : * the single-row mode flag is reset as expected.
1786 : */
1787 2 : if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1788 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1789 0 : pg_fatal("failed to send query: %s",
1790 : PQerrorMessage(conn));
1791 2 : if (PQsendFlushRequest(conn) != 1)
1792 0 : pg_fatal("failed to send flush request");
1793 2 : if (PQsetSingleRowMode(conn) != 1)
1794 0 : pg_fatal("PQsetSingleRowMode() failed");
1795 2 : res = PQgetResult(conn);
1796 2 : if (res == NULL)
1797 0 : pg_fatal("unexpected NULL");
1798 2 : if (PQresultStatus(res) != PGRES_SINGLE_TUPLE)
1799 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1800 : PQresStatus(PQresultStatus(res)));
1801 2 : res = PQgetResult(conn);
1802 2 : if (res == NULL)
1803 0 : pg_fatal("unexpected NULL");
1804 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1805 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1806 : PQresStatus(PQresultStatus(res)));
1807 2 : if (PQgetResult(conn) != NULL)
1808 0 : pg_fatal("expected NULL result");
1809 :
1810 2 : if (PQsendQueryParams(conn, "SELECT 1",
1811 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1812 0 : pg_fatal("failed to send query: %s",
1813 : PQerrorMessage(conn));
1814 2 : if (PQsendFlushRequest(conn) != 1)
1815 0 : pg_fatal("failed to send flush request");
1816 2 : res = PQgetResult(conn);
1817 2 : if (res == NULL)
1818 0 : pg_fatal("unexpected NULL");
1819 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1820 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1821 : PQresStatus(PQresultStatus(res)));
1822 2 : if (PQgetResult(conn) != NULL)
1823 0 : pg_fatal("expected NULL result");
1824 :
1825 : /*
1826 : * Try chunked mode as well; make sure that it correctly delivers a
1827 : * partial final chunk.
1828 : */
1829 2 : if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1830 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1831 0 : pg_fatal("failed to send query: %s",
1832 : PQerrorMessage(conn));
1833 2 : if (PQsendFlushRequest(conn) != 1)
1834 0 : pg_fatal("failed to send flush request");
1835 2 : if (PQsetChunkedRowsMode(conn, 3) != 1)
1836 0 : pg_fatal("PQsetChunkedRowsMode() failed");
1837 2 : res = PQgetResult(conn);
1838 2 : if (res == NULL)
1839 0 : pg_fatal("unexpected NULL");
1840 2 : if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
1841 0 : pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
1842 : PQresStatus(PQresultStatus(res)),
1843 : PQerrorMessage(conn));
1844 2 : if (PQntuples(res) != 3)
1845 0 : pg_fatal("Expected 3 rows, got %d", PQntuples(res));
1846 2 : res = PQgetResult(conn);
1847 2 : if (res == NULL)
1848 0 : pg_fatal("unexpected NULL");
1849 2 : if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
1850 0 : pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
1851 : PQresStatus(PQresultStatus(res)));
1852 2 : if (PQntuples(res) != 2)
1853 0 : pg_fatal("Expected 2 rows, got %d", PQntuples(res));
1854 2 : res = PQgetResult(conn);
1855 2 : if (res == NULL)
1856 0 : pg_fatal("unexpected NULL");
1857 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1858 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1859 : PQresStatus(PQresultStatus(res)));
1860 2 : if (PQntuples(res) != 0)
1861 0 : pg_fatal("Expected 0 rows, got %d", PQntuples(res));
1862 2 : if (PQgetResult(conn) != NULL)
1863 0 : pg_fatal("expected NULL result");
1864 :
1865 2 : if (PQexitPipelineMode(conn) != 1)
1866 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1867 :
1868 2 : fprintf(stderr, "ok\n");
1869 2 : }
1870 :
1871 : /*
1872 : * Simple test to verify that a pipeline is discarded as a whole when there's
1873 : * an error, ignoring transaction commands.
1874 : */
1875 : static void
1876 2 : test_transaction(PGconn *conn)
1877 : {
1878 : PGresult *res;
1879 : bool expect_null;
1880 2 : int num_syncs = 0;
1881 :
1882 2 : res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1883 : "CREATE TABLE pq_pipeline_tst (id int)");
1884 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1885 0 : pg_fatal("failed to create test table: %s",
1886 : PQerrorMessage(conn));
1887 2 : PQclear(res);
1888 :
1889 2 : if (PQenterPipelineMode(conn) != 1)
1890 0 : pg_fatal("failed to enter pipeline mode: %s",
1891 : PQerrorMessage(conn));
1892 2 : if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1893 0 : pg_fatal("could not send prepare on pipeline: %s",
1894 : PQerrorMessage(conn));
1895 :
1896 2 : if (PQsendQueryParams(conn,
1897 : "BEGIN",
1898 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1899 0 : pg_fatal("failed to send query: %s",
1900 : PQerrorMessage(conn));
1901 2 : if (PQsendQueryParams(conn,
1902 : "SELECT 0/0",
1903 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1904 0 : pg_fatal("failed to send query: %s",
1905 : PQerrorMessage(conn));
1906 :
1907 : /*
1908 : * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1909 : * get out of the pipeline-aborted state first.
1910 : */
1911 2 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1912 0 : pg_fatal("failed to execute prepared: %s",
1913 : PQerrorMessage(conn));
1914 :
1915 : /* This insert fails because we're in pipeline-aborted state */
1916 2 : if (PQsendQueryParams(conn,
1917 : "INSERT INTO pq_pipeline_tst VALUES (1)",
1918 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1919 0 : pg_fatal("failed to send query: %s",
1920 : PQerrorMessage(conn));
1921 2 : if (PQpipelineSync(conn) != 1)
1922 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1923 2 : num_syncs++;
1924 :
1925 : /*
1926 : * This insert fails even though the pipeline got a SYNC, because we're in
1927 : * an aborted transaction
1928 : */
1929 2 : if (PQsendQueryParams(conn,
1930 : "INSERT INTO pq_pipeline_tst VALUES (2)",
1931 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1932 0 : pg_fatal("failed to send query: %s",
1933 : PQerrorMessage(conn));
1934 2 : if (PQpipelineSync(conn) != 1)
1935 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1936 2 : num_syncs++;
1937 :
1938 : /*
1939 : * Send ROLLBACK using prepared stmt. This one works because we just did
1940 : * PQpipelineSync above.
1941 : */
1942 2 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1943 0 : pg_fatal("failed to execute prepared: %s",
1944 : PQerrorMessage(conn));
1945 :
1946 : /*
1947 : * Now that we're out of a transaction and in pipeline-good mode, this
1948 : * insert works
1949 : */
1950 2 : if (PQsendQueryParams(conn,
1951 : "INSERT INTO pq_pipeline_tst VALUES (3)",
1952 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1953 0 : pg_fatal("failed to send query: %s",
1954 : PQerrorMessage(conn));
1955 : /* Send two syncs now -- match up to SYNC messages below */
1956 2 : if (PQpipelineSync(conn) != 1)
1957 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1958 2 : num_syncs++;
1959 2 : if (PQpipelineSync(conn) != 1)
1960 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1961 2 : num_syncs++;
1962 :
1963 2 : expect_null = false;
1964 2 : for (int i = 0;; i++)
1965 38 : {
1966 : ExecStatusType restype;
1967 :
1968 40 : res = PQgetResult(conn);
1969 40 : if (res == NULL)
1970 : {
1971 16 : printf("%d: got NULL result\n", i);
1972 16 : if (!expect_null)
1973 0 : pg_fatal("did not expect NULL here");
1974 16 : expect_null = false;
1975 16 : continue;
1976 : }
1977 24 : restype = PQresultStatus(res);
1978 24 : printf("%d: got status %s", i, PQresStatus(restype));
1979 24 : if (expect_null)
1980 0 : pg_fatal("expected NULL");
1981 24 : if (restype == PGRES_FATAL_ERROR)
1982 4 : printf("; error: %s", PQerrorMessage(conn));
1983 20 : else if (restype == PGRES_PIPELINE_ABORTED)
1984 : {
1985 4 : printf(": command didn't run because pipeline aborted\n");
1986 : }
1987 : else
1988 16 : printf("\n");
1989 24 : PQclear(res);
1990 :
1991 24 : if (restype == PGRES_PIPELINE_SYNC)
1992 8 : num_syncs--;
1993 : else
1994 16 : expect_null = true;
1995 24 : if (num_syncs <= 0)
1996 2 : break;
1997 : }
1998 2 : if (PQgetResult(conn) != NULL)
1999 0 : pg_fatal("returned something extra after all the syncs: %s",
2000 : PQresStatus(PQresultStatus(res)));
2001 :
2002 2 : if (PQexitPipelineMode(conn) != 1)
2003 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
2004 :
2005 : /* We expect to find one tuple containing the value "3" */
2006 2 : res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
2007 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2008 0 : pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
2009 2 : if (PQntuples(res) != 1)
2010 0 : pg_fatal("did not get 1 tuple");
2011 2 : if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
2012 0 : pg_fatal("did not get expected tuple");
2013 2 : PQclear(res);
2014 :
2015 2 : fprintf(stderr, "ok\n");
2016 2 : }
2017 :
2018 : /*
2019 : * In this test mode we send a stream of queries, with one in the middle
2020 : * causing an error. Verify that we can still send some more after the
2021 : * error and have libpq work properly.
2022 : */
2023 : static void
2024 2 : test_uniqviol(PGconn *conn)
2025 : {
2026 2 : int sock = PQsocket(conn);
2027 : PGresult *res;
2028 2 : Oid paramTypes[2] = {INT8OID, INT8OID};
2029 : const char *paramValues[2];
2030 : char paramValue0[MAXINT8LEN];
2031 : char paramValue1[MAXINT8LEN];
2032 2 : int ctr = 0;
2033 2 : int numsent = 0;
2034 2 : int results = 0;
2035 2 : bool read_done = false;
2036 2 : bool write_done = false;
2037 2 : bool error_sent = false;
2038 2 : bool got_error = false;
2039 2 : int switched = 0;
2040 2 : int socketful = 0;
2041 : fd_set in_fds;
2042 : fd_set out_fds;
2043 :
2044 2 : fprintf(stderr, "uniqviol ...");
2045 :
2046 2 : PQsetnonblocking(conn, 1);
2047 :
2048 2 : paramValues[0] = paramValue0;
2049 2 : paramValues[1] = paramValue1;
2050 2 : sprintf(paramValue1, "42");
2051 :
2052 2 : res = PQexec(conn, "drop table if exists ppln_uniqviol;"
2053 : "create table ppln_uniqviol(id bigint primary key, idata bigint)");
2054 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2055 0 : pg_fatal("failed to create table: %s", PQerrorMessage(conn));
2056 :
2057 2 : res = PQexec(conn, "begin");
2058 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2059 0 : pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
2060 :
2061 2 : res = PQprepare(conn, "insertion",
2062 : "insert into ppln_uniqviol values ($1, $2) returning id",
2063 : 2, paramTypes);
2064 2 : if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
2065 0 : pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
2066 :
2067 2 : if (PQenterPipelineMode(conn) != 1)
2068 0 : pg_fatal("failed to enter pipeline mode");
2069 :
2070 16 : while (!read_done)
2071 : {
2072 : /*
2073 : * Avoid deadlocks by reading everything the server has sent before
2074 : * sending anything. (Special precaution is needed here to process
2075 : * PQisBusy before testing the socket for read-readiness, because the
2076 : * socket does not turn read-ready after "sending" queries in aborted
2077 : * pipeline mode.)
2078 : */
2079 1212 : while (PQisBusy(conn) == 0)
2080 : {
2081 : bool new_error;
2082 :
2083 1202 : if (results >= numsent)
2084 : {
2085 2 : if (write_done)
2086 0 : read_done = true;
2087 2 : break;
2088 : }
2089 :
2090 1200 : res = PQgetResult(conn);
2091 1200 : new_error = process_result(conn, res, results, numsent);
2092 1200 : if (new_error && got_error)
2093 0 : pg_fatal("got two errors");
2094 1200 : got_error |= new_error;
2095 1200 : if (results++ >= numsent - 1)
2096 : {
2097 4 : if (write_done)
2098 2 : read_done = true;
2099 4 : break;
2100 : }
2101 : }
2102 :
2103 16 : if (read_done)
2104 2 : break;
2105 :
2106 14 : FD_ZERO(&out_fds);
2107 14 : FD_SET(sock, &out_fds);
2108 :
2109 14 : FD_ZERO(&in_fds);
2110 14 : FD_SET(sock, &in_fds);
2111 :
2112 14 : if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
2113 : {
2114 0 : if (errno == EINTR)
2115 0 : continue;
2116 0 : pg_fatal("select() failed: %m");
2117 : }
2118 :
2119 14 : if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
2120 0 : pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
2121 :
2122 : /*
2123 : * If the socket is writable and we haven't finished sending queries,
2124 : * send some.
2125 : */
2126 14 : if (!write_done && FD_ISSET(sock, &out_fds))
2127 : {
2128 : for (;;)
2129 1194 : {
2130 : int flush;
2131 :
2132 : /*
2133 : * provoke uniqueness violation exactly once after having
2134 : * switched to read mode.
2135 : */
2136 1200 : if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
2137 : {
2138 2 : sprintf(paramValue0, "%d", numsent / 2);
2139 2 : fprintf(stderr, "E");
2140 2 : error_sent = true;
2141 : }
2142 : else
2143 : {
2144 1198 : fprintf(stderr, ".");
2145 1198 : sprintf(paramValue0, "%d", ctr++);
2146 : }
2147 :
2148 1200 : if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
2149 0 : pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
2150 1200 : numsent++;
2151 :
2152 : /* Are we done writing? */
2153 1200 : if (socketful != 0 && numsent % socketful == 42 && error_sent)
2154 : {
2155 2 : if (PQsendFlushRequest(conn) != 1)
2156 0 : pg_fatal("failed to send flush request");
2157 2 : write_done = true;
2158 2 : fprintf(stderr, "\ndone writing\n");
2159 2 : PQflush(conn);
2160 2 : break;
2161 : }
2162 :
2163 : /* is the outgoing socket full? */
2164 1198 : flush = PQflush(conn);
2165 1198 : if (flush == -1)
2166 0 : pg_fatal("failed to flush: %s", PQerrorMessage(conn));
2167 1198 : if (flush == 1)
2168 : {
2169 4 : if (socketful == 0)
2170 2 : socketful = numsent;
2171 4 : fprintf(stderr, "\nswitch to reading\n");
2172 4 : switched++;
2173 4 : break;
2174 : }
2175 : }
2176 : }
2177 : }
2178 :
2179 2 : if (!got_error)
2180 0 : pg_fatal("did not get expected error");
2181 :
2182 2 : fprintf(stderr, "ok\n");
2183 2 : }
2184 :
2185 : /*
2186 : * Subroutine for test_uniqviol; given a PGresult, print it out and consume
2187 : * the expected NULL that should follow it.
2188 : *
2189 : * Returns true if we read a fatal error message, otherwise false.
2190 : */
2191 : static bool
2192 1200 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
2193 : {
2194 : PGresult *res2;
2195 1200 : bool got_error = false;
2196 :
2197 1200 : if (res == NULL)
2198 0 : pg_fatal("got unexpected NULL");
2199 :
2200 1200 : switch (PQresultStatus(res))
2201 : {
2202 2 : case PGRES_FATAL_ERROR:
2203 2 : got_error = true;
2204 2 : fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2205 2 : PQclear(res);
2206 :
2207 2 : res2 = PQgetResult(conn);
2208 2 : if (res2 != NULL)
2209 0 : pg_fatal("expected NULL, got %s",
2210 : PQresStatus(PQresultStatus(res2)));
2211 2 : break;
2212 :
2213 836 : case PGRES_TUPLES_OK:
2214 836 : fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2215 836 : PQclear(res);
2216 :
2217 836 : res2 = PQgetResult(conn);
2218 836 : if (res2 != NULL)
2219 0 : pg_fatal("expected NULL, got %s",
2220 : PQresStatus(PQresultStatus(res2)));
2221 836 : break;
2222 :
2223 362 : case PGRES_PIPELINE_ABORTED:
2224 362 : fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2225 362 : res2 = PQgetResult(conn);
2226 362 : if (res2 != NULL)
2227 0 : pg_fatal("expected NULL, got %s",
2228 : PQresStatus(PQresultStatus(res2)));
2229 362 : break;
2230 :
2231 0 : default:
2232 0 : pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2233 : }
2234 :
2235 1200 : return got_error;
2236 : }
2237 :
2238 :
2239 : static void
2240 0 : usage(const char *progname)
2241 : {
2242 0 : fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2243 0 : fprintf(stderr, "Usage:\n");
2244 0 : fprintf(stderr, " %s [OPTION] tests\n", progname);
2245 0 : fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2246 0 : fprintf(stderr, "\nOptions:\n");
2247 0 : fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2248 0 : fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2249 0 : }
2250 :
2251 : static void
2252 2 : print_test_list(void)
2253 : {
2254 2 : printf("cancel\n");
2255 2 : printf("disallowed_in_pipeline\n");
2256 2 : printf("multi_pipelines\n");
2257 2 : printf("nosync\n");
2258 2 : printf("pipeline_abort\n");
2259 2 : printf("pipeline_idle\n");
2260 2 : printf("pipelined_insert\n");
2261 2 : printf("prepared\n");
2262 2 : printf("protocol_version\n");
2263 2 : printf("simple_pipeline\n");
2264 2 : printf("singlerow\n");
2265 2 : printf("transaction\n");
2266 2 : printf("uniqviol\n");
2267 2 : }
2268 :
2269 : int
2270 30 : main(int argc, char **argv)
2271 : {
2272 30 : const char *conninfo = "";
2273 : PGconn *conn;
2274 : FILE *trace;
2275 : char *testname;
2276 30 : int numrows = 10000;
2277 : PGresult *res;
2278 : int c;
2279 :
2280 104 : while ((c = getopt(argc, argv, "r:t:")) != -1)
2281 : {
2282 44 : switch (c)
2283 : {
2284 26 : case 'r': /* numrows */
2285 26 : errno = 0;
2286 26 : numrows = strtol(optarg, NULL, 10);
2287 26 : if (errno != 0 || numrows <= 0)
2288 : {
2289 0 : fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2290 : optarg);
2291 0 : exit(1);
2292 : }
2293 26 : break;
2294 18 : case 't': /* trace file */
2295 18 : tracefile = pg_strdup(optarg);
2296 18 : break;
2297 : }
2298 74 : }
2299 :
2300 30 : if (optind < argc)
2301 : {
2302 30 : testname = pg_strdup(argv[optind]);
2303 30 : optind++;
2304 : }
2305 : else
2306 : {
2307 0 : usage(argv[0]);
2308 0 : exit(1);
2309 : }
2310 :
2311 30 : if (strcmp(testname, "tests") == 0)
2312 : {
2313 2 : print_test_list();
2314 2 : exit(0);
2315 : }
2316 :
2317 28 : if (optind < argc)
2318 : {
2319 28 : conninfo = pg_strdup(argv[optind]);
2320 28 : optind++;
2321 : }
2322 :
2323 : /* Make a connection to the database */
2324 28 : conn = PQconnectdb(conninfo);
2325 28 : if (PQstatus(conn) != CONNECTION_OK)
2326 : {
2327 0 : fprintf(stderr, "Connection to database failed: %s\n",
2328 : PQerrorMessage(conn));
2329 0 : exit_nicely(conn);
2330 : }
2331 :
2332 28 : res = PQexec(conn, "SET lc_messages TO \"C\"");
2333 28 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2334 0 : pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
2335 28 : res = PQexec(conn, "SET debug_parallel_query = off");
2336 28 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2337 0 : pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
2338 :
2339 : /* Set the trace file, if requested */
2340 28 : if (tracefile != NULL)
2341 : {
2342 18 : if (strcmp(tracefile, "-") == 0)
2343 0 : trace = stdout;
2344 : else
2345 18 : trace = fopen(tracefile, "w");
2346 18 : if (trace == NULL)
2347 0 : pg_fatal("could not open file \"%s\": %m", tracefile);
2348 :
2349 : /* Make it line-buffered */
2350 18 : setvbuf(trace, NULL, PG_IOLBF, 0);
2351 :
2352 18 : PQtrace(conn, trace);
2353 18 : PQsetTraceFlags(conn,
2354 : PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
2355 : }
2356 :
2357 28 : if (strcmp(testname, "cancel") == 0)
2358 4 : test_cancel(conn);
2359 24 : else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2360 2 : test_disallowed_in_pipeline(conn);
2361 22 : else if (strcmp(testname, "multi_pipelines") == 0)
2362 2 : test_multi_pipelines(conn);
2363 20 : else if (strcmp(testname, "nosync") == 0)
2364 2 : test_nosync(conn);
2365 18 : else if (strcmp(testname, "pipeline_abort") == 0)
2366 2 : test_pipeline_abort(conn);
2367 16 : else if (strcmp(testname, "pipeline_idle") == 0)
2368 2 : test_pipeline_idle(conn);
2369 14 : else if (strcmp(testname, "pipelined_insert") == 0)
2370 2 : test_pipelined_insert(conn, numrows);
2371 12 : else if (strcmp(testname, "prepared") == 0)
2372 2 : test_prepared(conn);
2373 10 : else if (strcmp(testname, "protocol_version") == 0)
2374 2 : test_protocol_version(conn);
2375 8 : else if (strcmp(testname, "simple_pipeline") == 0)
2376 2 : test_simple_pipeline(conn);
2377 6 : else if (strcmp(testname, "singlerow") == 0)
2378 2 : test_singlerowmode(conn);
2379 4 : else if (strcmp(testname, "transaction") == 0)
2380 2 : test_transaction(conn);
2381 2 : else if (strcmp(testname, "uniqviol") == 0)
2382 2 : test_uniqviol(conn);
2383 : else
2384 : {
2385 0 : fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2386 0 : exit(1);
2387 : }
2388 :
2389 : /* close the connection to the database and cleanup */
2390 28 : PQfinish(conn);
2391 28 : return 0;
2392 : }
|