Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * libpq_pipeline.c
4 : * Verify libpq pipeline execution functionality
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/test/modules/libpq_pipeline/libpq_pipeline.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres_fe.h"
17 :
18 : #include <sys/select.h>
19 : #include <sys/time.h>
20 :
21 : #include "catalog/pg_type_d.h"
22 : #include "libpq-fe.h"
23 : #include "pg_getopt.h"
24 :
25 :
26 : static void exit_nicely(PGconn *conn);
27 : pg_noreturn static void pg_fatal_impl(int line, const char *fmt,...)
28 : pg_attribute_printf(2, 3);
29 : static bool process_result(PGconn *conn, PGresult *res, int results,
30 : int numsent);
31 :
32 : static const char *const progname = "libpq_pipeline";
33 :
34 : /* Options and defaults */
35 : static char *tracefile = NULL; /* path to PQtrace() file */
36 :
37 :
38 : #ifdef DEBUG_OUTPUT
39 : #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
40 : #else
41 : #define pg_debug(...)
42 : #endif
43 :
44 : static const char *const drop_table_sql =
45 : "DROP TABLE IF EXISTS pq_pipeline_demo";
46 : static const char *const create_table_sql =
47 : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
48 : "int8filler int8);";
49 : static const char *const insert_sql =
50 : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
51 : static const char *const insert_sql2 =
52 : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
53 :
54 : /* max char length of an int32/64, plus sign and null terminator */
55 : #define MAXINTLEN 12
56 : #define MAXINT8LEN 20
57 :
58 : static void
59 0 : exit_nicely(PGconn *conn)
60 : {
61 0 : PQfinish(conn);
62 0 : exit(1);
63 : }
64 :
65 : /*
66 : * The following few functions are wrapped in macros to make the reported line
67 : * number in an error match the line number of the invocation.
68 : */
69 :
70 : /*
71 : * Print an error to stderr and terminate the program.
72 : */
73 : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
74 : pg_noreturn static void
75 0 : pg_fatal_impl(int line, const char *fmt,...)
76 : {
77 : va_list args;
78 :
79 0 : fflush(stdout);
80 :
81 0 : fprintf(stderr, "\n%s:%d: ", progname, line);
82 0 : va_start(args, fmt);
83 0 : vfprintf(stderr, fmt, args);
84 0 : va_end(args);
85 : Assert(fmt[strlen(fmt) - 1] != '\n');
86 0 : fprintf(stderr, "\n");
87 0 : exit(1);
88 : }
89 :
90 : /*
91 : * Check that libpq next returns a PGresult with the specified status,
92 : * returning the PGresult so that caller can perform additional checks.
93 : */
94 : #define confirm_result_status(conn, status) confirm_result_status_impl(__LINE__, conn, status)
95 : static PGresult *
96 112 : confirm_result_status_impl(int line, PGconn *conn, ExecStatusType status)
97 : {
98 : PGresult *res;
99 :
100 112 : res = PQgetResult(conn);
101 112 : if (res == NULL)
102 0 : pg_fatal_impl(line, "PQgetResult returned null unexpectedly: %s",
103 : PQerrorMessage(conn));
104 112 : if (PQresultStatus(res) != status)
105 0 : pg_fatal_impl(line, "PQgetResult returned status %s, expected %s: %s",
106 : PQresStatus(PQresultStatus(res)),
107 : PQresStatus(status),
108 : PQerrorMessage(conn));
109 112 : return res;
110 : }
111 :
112 : /*
113 : * Check that libpq next returns a PGresult with the specified status,
114 : * then free the PGresult.
115 : */
116 : #define consume_result_status(conn, status) consume_result_status_impl(__LINE__, conn, status)
117 : static void
118 78 : consume_result_status_impl(int line, PGconn *conn, ExecStatusType status)
119 : {
120 : PGresult *res;
121 :
122 78 : res = confirm_result_status_impl(line, conn, status);
123 78 : PQclear(res);
124 78 : }
125 :
126 : /*
127 : * Check that libpq next returns a null PGresult.
128 : */
129 : #define consume_null_result(conn) consume_null_result_impl(__LINE__, conn)
130 : static void
131 1262 : consume_null_result_impl(int line, PGconn *conn)
132 : {
133 : PGresult *res;
134 :
135 1262 : res = PQgetResult(conn);
136 1262 : if (res != NULL)
137 0 : pg_fatal_impl(line, "expected NULL PGresult, got %s: %s",
138 : PQresStatus(PQresultStatus(res)),
139 : PQerrorMessage(conn));
140 1262 : }
141 :
142 : /*
143 : * Check that the query on the given connection got canceled.
144 : */
145 : #define consume_query_cancel(conn) consume_query_cancel_impl(__LINE__, conn)
146 : static void
147 24 : consume_query_cancel_impl(int line, PGconn *conn)
148 : {
149 : PGresult *res;
150 :
151 24 : res = confirm_result_status_impl(line, conn, PGRES_FATAL_ERROR);
152 24 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
153 0 : pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
154 : PQerrorMessage(conn));
155 24 : PQclear(res);
156 :
157 24 : while (PQisBusy(conn))
158 0 : PQconsumeInput(conn);
159 24 : }
160 :
161 : /*
162 : * Using monitorConn, query pg_stat_activity to see that the connection with
163 : * the given PID is either in the given state, or waiting on the given event
164 : * (only one of them can be given).
165 : */
166 : static void
167 48 : wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
168 : char *state, char *event)
169 : {
170 48 : const Oid paramTypes[] = {INT4OID, TEXTOID};
171 : const char *paramValues[2];
172 48 : char *pidstr = psprintf("%d", procpid);
173 :
174 : Assert((state == NULL) ^ (event == NULL));
175 :
176 48 : paramValues[0] = pidstr;
177 48 : paramValues[1] = state ? state : event;
178 :
179 : while (true)
180 0 : {
181 : PGresult *res;
182 : char *value;
183 :
184 48 : if (state != NULL)
185 24 : res = PQexecParams(monitorConn,
186 : "SELECT count(*) FROM pg_stat_activity WHERE "
187 : "pid = $1 AND state = $2",
188 : 2, paramTypes, paramValues, NULL, NULL, 0);
189 : else
190 24 : res = PQexecParams(monitorConn,
191 : "SELECT count(*) FROM pg_stat_activity WHERE "
192 : "pid = $1 AND wait_event = $2",
193 : 2, paramTypes, paramValues, NULL, NULL, 0);
194 :
195 48 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
196 0 : pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
197 48 : if (PQntuples(res) != 1)
198 0 : pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
199 48 : if (PQnfields(res) != 1)
200 0 : pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
201 48 : value = PQgetvalue(res, 0, 0);
202 48 : if (strcmp(value, "0") != 0)
203 : {
204 48 : PQclear(res);
205 48 : break;
206 : }
207 0 : PQclear(res);
208 :
209 : /* wait 10ms before polling again */
210 0 : pg_usleep(10000);
211 : }
212 :
213 48 : pfree(pidstr);
214 48 : }
215 :
216 : #define send_cancellable_query(conn, monitorConn) \
217 : send_cancellable_query_impl(__LINE__, conn, monitorConn)
218 : static void
219 24 : send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
220 : {
221 : const char *env_wait;
222 24 : const Oid paramTypes[1] = {INT4OID};
223 :
224 : /*
225 : * Wait for the connection to be idle, so that our check for an active
226 : * connection below is reliable, instead of possibly seeing an outdated
227 : * state.
228 : */
229 24 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
230 :
231 24 : env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
232 24 : if (env_wait == NULL)
233 24 : env_wait = "180";
234 :
235 24 : if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
236 : &env_wait, NULL, NULL, 0) != 1)
237 0 : pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
238 :
239 : /*
240 : * Wait for the sleep to be active, because if the query is not running
241 : * yet, the cancel request that we send won't have any effect.
242 : */
243 24 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
244 24 : }
245 :
246 : /*
247 : * Create a new connection with the same conninfo as the given one.
248 : */
249 : static PGconn *
250 4 : copy_connection(PGconn *conn)
251 : {
252 : PGconn *copyConn;
253 4 : PQconninfoOption *opts = PQconninfo(conn);
254 : const char **keywords;
255 : const char **vals;
256 4 : int nopts = 0;
257 : int i;
258 :
259 208 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
260 204 : nopts++;
261 4 : nopts++; /* for the NULL terminator */
262 :
263 4 : keywords = pg_malloc(sizeof(char *) * nopts);
264 4 : vals = pg_malloc(sizeof(char *) * nopts);
265 :
266 4 : i = 0;
267 208 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
268 : {
269 204 : if (opt->val)
270 : {
271 80 : keywords[i] = opt->keyword;
272 80 : vals[i] = opt->val;
273 80 : i++;
274 : }
275 : }
276 4 : keywords[i] = vals[i] = NULL;
277 :
278 4 : copyConn = PQconnectdbParams(keywords, vals, false);
279 :
280 4 : if (PQstatus(copyConn) != CONNECTION_OK)
281 0 : pg_fatal("Connection to database failed: %s",
282 : PQerrorMessage(copyConn));
283 :
284 4 : pfree(keywords);
285 4 : pfree(vals);
286 4 : PQconninfoFree(opts);
287 :
288 4 : return copyConn;
289 : }
290 :
291 : /*
292 : * Test query cancellation routines
293 : */
294 : static void
295 4 : test_cancel(PGconn *conn)
296 : {
297 : PGcancel *cancel;
298 : PGcancelConn *cancelConn;
299 : PGconn *monitorConn;
300 : char errorbuf[256];
301 :
302 4 : fprintf(stderr, "test cancellations... ");
303 :
304 4 : if (PQsetnonblocking(conn, 1) != 0)
305 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
306 :
307 : /*
308 : * Make a separate connection to the database to monitor the query on the
309 : * main connection.
310 : */
311 4 : monitorConn = copy_connection(conn);
312 : Assert(PQstatus(monitorConn) == CONNECTION_OK);
313 :
314 : /* test PQcancel */
315 4 : send_cancellable_query(conn, monitorConn);
316 4 : cancel = PQgetCancel(conn);
317 4 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
318 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
319 4 : consume_query_cancel(conn);
320 :
321 : /* PGcancel object can be reused for the next query */
322 4 : send_cancellable_query(conn, monitorConn);
323 4 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
324 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
325 4 : consume_query_cancel(conn);
326 :
327 4 : PQfreeCancel(cancel);
328 :
329 : /* test PQrequestCancel */
330 4 : send_cancellable_query(conn, monitorConn);
331 4 : if (!PQrequestCancel(conn))
332 0 : pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
333 4 : consume_query_cancel(conn);
334 :
335 : /* test PQcancelBlocking */
336 4 : send_cancellable_query(conn, monitorConn);
337 4 : cancelConn = PQcancelCreate(conn);
338 4 : if (!PQcancelBlocking(cancelConn))
339 0 : pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
340 4 : consume_query_cancel(conn);
341 4 : PQcancelFinish(cancelConn);
342 :
343 : /* test PQcancelCreate and then polling with PQcancelPoll */
344 4 : send_cancellable_query(conn, monitorConn);
345 4 : cancelConn = PQcancelCreate(conn);
346 4 : if (!PQcancelStart(cancelConn))
347 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
348 : while (true)
349 4 : {
350 : struct timeval tv;
351 : fd_set input_mask;
352 : fd_set output_mask;
353 8 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
354 8 : int sock = PQcancelSocket(cancelConn);
355 :
356 8 : if (pollres == PGRES_POLLING_OK)
357 4 : break;
358 :
359 68 : FD_ZERO(&input_mask);
360 68 : FD_ZERO(&output_mask);
361 4 : switch (pollres)
362 : {
363 4 : case PGRES_POLLING_READING:
364 : pg_debug("polling for reads\n");
365 4 : FD_SET(sock, &input_mask);
366 4 : break;
367 0 : case PGRES_POLLING_WRITING:
368 : pg_debug("polling for writes\n");
369 0 : FD_SET(sock, &output_mask);
370 0 : break;
371 0 : default:
372 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
373 : }
374 :
375 4 : if (sock < 0)
376 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
377 :
378 4 : tv.tv_sec = 3;
379 4 : tv.tv_usec = 0;
380 :
381 : while (true)
382 : {
383 4 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
384 : {
385 0 : if (errno == EINTR)
386 0 : continue;
387 0 : pg_fatal("select() failed: %m");
388 : }
389 4 : break;
390 : }
391 : }
392 4 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
393 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
394 4 : consume_query_cancel(conn);
395 :
396 : /*
397 : * test PQcancelReset works on the cancel connection and it can be reused
398 : * afterwards
399 : */
400 4 : PQcancelReset(cancelConn);
401 :
402 4 : send_cancellable_query(conn, monitorConn);
403 4 : if (!PQcancelStart(cancelConn))
404 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
405 : while (true)
406 4 : {
407 : struct timeval tv;
408 : fd_set input_mask;
409 : fd_set output_mask;
410 8 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
411 8 : int sock = PQcancelSocket(cancelConn);
412 :
413 8 : if (pollres == PGRES_POLLING_OK)
414 4 : break;
415 :
416 68 : FD_ZERO(&input_mask);
417 68 : FD_ZERO(&output_mask);
418 4 : switch (pollres)
419 : {
420 4 : case PGRES_POLLING_READING:
421 : pg_debug("polling for reads\n");
422 4 : FD_SET(sock, &input_mask);
423 4 : break;
424 0 : case PGRES_POLLING_WRITING:
425 : pg_debug("polling for writes\n");
426 0 : FD_SET(sock, &output_mask);
427 0 : break;
428 0 : default:
429 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
430 : }
431 :
432 4 : if (sock < 0)
433 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
434 :
435 4 : tv.tv_sec = 3;
436 4 : tv.tv_usec = 0;
437 :
438 : while (true)
439 : {
440 4 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
441 : {
442 0 : if (errno == EINTR)
443 0 : continue;
444 0 : pg_fatal("select() failed: %m");
445 : }
446 4 : break;
447 : }
448 : }
449 4 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
450 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
451 4 : consume_query_cancel(conn);
452 :
453 4 : PQcancelFinish(cancelConn);
454 4 : PQfinish(monitorConn);
455 :
456 4 : fprintf(stderr, "ok\n");
457 4 : }
458 :
459 : static void
460 2 : test_disallowed_in_pipeline(PGconn *conn)
461 : {
462 2 : PGresult *res = NULL;
463 :
464 2 : fprintf(stderr, "test error cases... ");
465 :
466 2 : if (PQisnonblocking(conn))
467 0 : pg_fatal("Expected blocking connection mode");
468 :
469 2 : if (PQenterPipelineMode(conn) != 1)
470 0 : pg_fatal("Unable to enter pipeline mode");
471 :
472 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
473 0 : pg_fatal("Pipeline mode not activated properly");
474 :
475 : /* PQexec should fail in pipeline mode */
476 2 : res = PQexec(conn, "SELECT 1");
477 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
478 0 : pg_fatal("PQexec should fail in pipeline mode but succeeded");
479 2 : if (strcmp(PQerrorMessage(conn),
480 : "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
481 0 : pg_fatal("did not get expected error message; got: \"%s\"",
482 : PQerrorMessage(conn));
483 2 : PQclear(res);
484 :
485 : /* PQsendQuery should fail in pipeline mode */
486 2 : if (PQsendQuery(conn, "SELECT 1") != 0)
487 0 : pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
488 2 : if (strcmp(PQerrorMessage(conn),
489 : "PQsendQuery not allowed in pipeline mode\n") != 0)
490 0 : pg_fatal("did not get expected error message; got: \"%s\"",
491 : PQerrorMessage(conn));
492 :
493 : /* Entering pipeline mode when already in pipeline mode is OK */
494 2 : if (PQenterPipelineMode(conn) != 1)
495 0 : pg_fatal("re-entering pipeline mode should be a no-op but failed");
496 :
497 2 : if (PQisBusy(conn) != 0)
498 0 : pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
499 :
500 : /* ok, back to normal command mode */
501 2 : if (PQexitPipelineMode(conn) != 1)
502 0 : pg_fatal("couldn't exit idle empty pipeline mode");
503 :
504 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
505 0 : pg_fatal("Pipeline mode not terminated properly");
506 :
507 : /* exiting pipeline mode when not in pipeline mode should be a no-op */
508 2 : if (PQexitPipelineMode(conn) != 1)
509 0 : pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
510 :
511 : /* can now PQexec again */
512 2 : res = PQexec(conn, "SELECT 1");
513 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
514 0 : pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
515 : PQerrorMessage(conn));
516 2 : PQclear(res);
517 :
518 2 : fprintf(stderr, "ok\n");
519 2 : }
520 :
521 : static void
522 2 : test_multi_pipelines(PGconn *conn)
523 : {
524 2 : const char *dummy_params[1] = {"1"};
525 2 : Oid dummy_param_oids[1] = {INT4OID};
526 :
527 2 : fprintf(stderr, "multi pipeline... ");
528 :
529 : /*
530 : * Queue up a couple of small pipelines and process each without returning
531 : * to command mode first.
532 : */
533 2 : if (PQenterPipelineMode(conn) != 1)
534 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
535 :
536 : /* first pipeline */
537 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
538 : dummy_params, NULL, NULL, 0) != 1)
539 0 : pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
540 :
541 2 : if (PQpipelineSync(conn) != 1)
542 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
543 :
544 : /* second pipeline */
545 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
546 : dummy_params, NULL, NULL, 0) != 1)
547 0 : pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
548 :
549 : /* Skip flushing once. */
550 2 : if (PQsendPipelineSync(conn) != 1)
551 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
552 :
553 : /* third pipeline */
554 2 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
555 : dummy_params, NULL, NULL, 0) != 1)
556 0 : pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
557 :
558 2 : if (PQpipelineSync(conn) != 1)
559 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
560 :
561 : /* OK, start processing the results */
562 :
563 : /* first pipeline */
564 2 : consume_result_status(conn, PGRES_TUPLES_OK);
565 :
566 2 : consume_null_result(conn);
567 :
568 2 : if (PQexitPipelineMode(conn) != 0)
569 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
570 :
571 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
572 :
573 : /* second pipeline */
574 2 : consume_result_status(conn, PGRES_TUPLES_OK);
575 :
576 2 : consume_null_result(conn);
577 :
578 2 : if (PQexitPipelineMode(conn) != 0)
579 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
580 :
581 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
582 :
583 : /* third pipeline */
584 2 : consume_result_status(conn, PGRES_TUPLES_OK);
585 :
586 2 : consume_null_result(conn);
587 :
588 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
589 :
590 : /* We're still in pipeline mode ... */
591 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
592 0 : pg_fatal("Fell out of pipeline mode somehow");
593 :
594 : /* until we end it, which we can safely do now */
595 2 : if (PQexitPipelineMode(conn) != 1)
596 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
597 : PQerrorMessage(conn));
598 :
599 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
600 0 : pg_fatal("exiting pipeline mode didn't seem to work");
601 :
602 2 : fprintf(stderr, "ok\n");
603 2 : }
604 :
605 : /*
606 : * Test behavior when a pipeline dispatches a number of commands that are
607 : * not flushed by a sync point.
608 : */
609 : static void
610 2 : test_nosync(PGconn *conn)
611 : {
612 2 : int numqueries = 10;
613 2 : int results = 0;
614 2 : int sock = PQsocket(conn);
615 :
616 2 : fprintf(stderr, "nosync... ");
617 :
618 2 : if (sock < 0)
619 0 : pg_fatal("invalid socket");
620 :
621 2 : if (PQenterPipelineMode(conn) != 1)
622 0 : pg_fatal("could not enter pipeline mode");
623 22 : for (int i = 0; i < numqueries; i++)
624 : {
625 : fd_set input_mask;
626 : struct timeval tv;
627 :
628 20 : if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
629 : 0, NULL, NULL, NULL, NULL, 0) != 1)
630 0 : pg_fatal("error sending select: %s", PQerrorMessage(conn));
631 20 : PQflush(conn);
632 :
633 : /*
634 : * If the server has written anything to us, read (some of) it now.
635 : */
636 340 : FD_ZERO(&input_mask);
637 20 : FD_SET(sock, &input_mask);
638 20 : tv.tv_sec = 0;
639 20 : tv.tv_usec = 0;
640 20 : if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
641 : {
642 0 : fprintf(stderr, "select() failed: %m\n");
643 0 : exit_nicely(conn);
644 : }
645 20 : if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
646 0 : pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
647 : }
648 :
649 : /* tell server to flush its output buffer */
650 2 : if (PQsendFlushRequest(conn) != 1)
651 0 : pg_fatal("failed to send flush request");
652 2 : PQflush(conn);
653 :
654 : /* Now read all results */
655 : for (;;)
656 : {
657 : /* We expect exactly one TUPLES_OK result for each query we sent */
658 20 : consume_result_status(conn, PGRES_TUPLES_OK);
659 :
660 : /* and one NULL result should follow each */
661 20 : consume_null_result(conn);
662 :
663 20 : results++;
664 :
665 : /* if we're done, we're done */
666 20 : if (results == numqueries)
667 2 : break;
668 : }
669 :
670 2 : fprintf(stderr, "ok\n");
671 2 : }
672 :
673 : /*
674 : * When an operation in a pipeline fails the rest of the pipeline is flushed. We
675 : * still have to get results for each pipeline item, but the item will just be
676 : * a PGRES_PIPELINE_ABORTED code.
677 : *
678 : * This intentionally doesn't use a transaction to wrap the pipeline. You should
679 : * usually use an xact, but in this case we want to observe the effects of each
680 : * statement.
681 : */
682 : static void
683 2 : test_pipeline_abort(PGconn *conn)
684 : {
685 2 : PGresult *res = NULL;
686 2 : const char *dummy_params[1] = {"1"};
687 2 : Oid dummy_param_oids[1] = {INT4OID};
688 : int i;
689 : int gotrows;
690 : bool goterror;
691 :
692 2 : fprintf(stderr, "aborted pipeline... ");
693 :
694 2 : res = PQexec(conn, drop_table_sql);
695 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
696 0 : pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
697 2 : PQclear(res);
698 :
699 2 : res = PQexec(conn, create_table_sql);
700 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
701 0 : pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
702 2 : PQclear(res);
703 :
704 : /*
705 : * Queue up a couple of small pipelines and process each without returning
706 : * to command mode first. Make sure the second operation in the first
707 : * pipeline ERRORs.
708 : */
709 2 : if (PQenterPipelineMode(conn) != 1)
710 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
711 :
712 2 : dummy_params[0] = "1";
713 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
714 : dummy_params, NULL, NULL, 0) != 1)
715 0 : pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
716 :
717 2 : if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
718 : 1, dummy_param_oids, dummy_params,
719 : NULL, NULL, 0) != 1)
720 0 : pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
721 :
722 2 : dummy_params[0] = "2";
723 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
724 : dummy_params, NULL, NULL, 0) != 1)
725 0 : pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
726 :
727 2 : if (PQpipelineSync(conn) != 1)
728 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
729 :
730 2 : dummy_params[0] = "3";
731 2 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
732 : dummy_params, NULL, NULL, 0) != 1)
733 0 : pg_fatal("dispatching second-pipeline insert failed: %s",
734 : PQerrorMessage(conn));
735 :
736 2 : if (PQpipelineSync(conn) != 1)
737 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
738 :
739 : /*
740 : * OK, start processing the pipeline results.
741 : *
742 : * We should get a command-ok for the first query, then a fatal error and
743 : * a pipeline aborted message for the second insert, a pipeline-end, then
744 : * a command-ok and a pipeline-ok for the second pipeline operation.
745 : */
746 2 : consume_result_status(conn, PGRES_COMMAND_OK);
747 :
748 : /* NULL result to signal end-of-results for this command */
749 2 : consume_null_result(conn);
750 :
751 : /* Second query caused error, so we expect an error next */
752 2 : consume_result_status(conn, PGRES_FATAL_ERROR);
753 :
754 : /* NULL result to signal end-of-results for this command */
755 2 : consume_null_result(conn);
756 :
757 : /*
758 : * pipeline should now be aborted.
759 : *
760 : * Note that we could still queue more queries at this point if we wanted;
761 : * they'd get added to a new third pipeline since we've already sent a
762 : * second. The aborted flag relates only to the pipeline being received.
763 : */
764 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
765 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
766 :
767 : /* third query in pipeline, the second insert */
768 2 : consume_result_status(conn, PGRES_PIPELINE_ABORTED);
769 :
770 : /* NULL result to signal end-of-results for this command */
771 2 : consume_null_result(conn);
772 :
773 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
774 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
775 :
776 : /* Ensure we're still in pipeline */
777 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
778 0 : pg_fatal("Fell out of pipeline mode somehow");
779 :
780 : /*
781 : * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
782 : *
783 : * (This is so clients know to start processing results normally again and
784 : * can tell the difference between skipped commands and the sync.)
785 : */
786 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
787 :
788 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
789 0 : pg_fatal("sync should've cleared the aborted flag but didn't");
790 :
791 : /* We're still in pipeline mode... */
792 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
793 0 : pg_fatal("Fell out of pipeline mode somehow");
794 :
795 : /* the insert from the second pipeline */
796 2 : consume_result_status(conn, PGRES_COMMAND_OK);
797 :
798 : /* Read the NULL result at the end of the command */
799 2 : consume_null_result(conn);
800 :
801 : /* the second pipeline sync */
802 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
803 :
804 : /* Read the NULL result at the end of the command */
805 2 : consume_null_result(conn);
806 :
807 : /* Try to send two queries in one command */
808 2 : if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
809 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
810 2 : if (PQpipelineSync(conn) != 1)
811 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
812 2 : goterror = false;
813 4 : while ((res = PQgetResult(conn)) != NULL)
814 : {
815 2 : switch (PQresultStatus(res))
816 : {
817 2 : case PGRES_FATAL_ERROR:
818 2 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
819 0 : pg_fatal("expected error about multiple commands, got %s",
820 : PQerrorMessage(conn));
821 2 : printf("got expected %s", PQerrorMessage(conn));
822 2 : goterror = true;
823 2 : break;
824 0 : default:
825 0 : pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
826 : break;
827 : }
828 2 : PQclear(res);
829 : }
830 2 : if (!goterror)
831 0 : pg_fatal("did not get cannot-insert-multiple-commands error");
832 :
833 : /* the second pipeline sync */
834 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
835 :
836 2 : fprintf(stderr, "ok\n");
837 :
838 : /* Test single-row mode with an error partways */
839 2 : if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
840 : 0, NULL, NULL, NULL, NULL, 0) != 1)
841 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
842 2 : if (PQpipelineSync(conn) != 1)
843 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
844 2 : PQsetSingleRowMode(conn);
845 2 : goterror = false;
846 2 : gotrows = 0;
847 10 : while ((res = PQgetResult(conn)) != NULL)
848 : {
849 8 : switch (PQresultStatus(res))
850 : {
851 6 : case PGRES_SINGLE_TUPLE:
852 6 : printf("got row: %s\n", PQgetvalue(res, 0, 0));
853 6 : gotrows++;
854 6 : break;
855 2 : case PGRES_FATAL_ERROR:
856 2 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
857 0 : pg_fatal("expected division-by-zero, got: %s (%s)",
858 : PQerrorMessage(conn),
859 : PQresultErrorField(res, PG_DIAG_SQLSTATE));
860 2 : printf("got expected division-by-zero\n");
861 2 : goterror = true;
862 2 : break;
863 0 : default:
864 0 : pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
865 : }
866 8 : PQclear(res);
867 : }
868 2 : if (!goterror)
869 0 : pg_fatal("did not get division-by-zero error");
870 2 : if (gotrows != 3)
871 0 : pg_fatal("did not get three rows");
872 :
873 : /* the third pipeline sync */
874 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
875 :
876 : /* We're still in pipeline mode... */
877 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
878 0 : pg_fatal("Fell out of pipeline mode somehow");
879 :
880 : /* until we end it, which we can safely do now */
881 2 : if (PQexitPipelineMode(conn) != 1)
882 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
883 : PQerrorMessage(conn));
884 :
885 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
886 0 : pg_fatal("exiting pipeline mode didn't seem to work");
887 :
888 : /*-
889 : * Since we fired the pipelines off without a surrounding xact, the results
890 : * should be:
891 : *
892 : * - Implicit xact started by server around 1st pipeline
893 : * - First insert applied
894 : * - Second statement aborted xact
895 : * - Third insert skipped
896 : * - Sync rolled back first implicit xact
897 : * - Implicit xact created by server around 2nd pipeline
898 : * - insert applied from 2nd pipeline
899 : * - Sync commits 2nd xact
900 : *
901 : * So we should only have the value 3 that we inserted.
902 : */
903 2 : res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
904 :
905 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
906 0 : pg_fatal("Expected tuples, got %s: %s",
907 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
908 2 : if (PQntuples(res) != 1)
909 0 : pg_fatal("expected 1 result, got %d", PQntuples(res));
910 4 : for (i = 0; i < PQntuples(res); i++)
911 : {
912 2 : const char *val = PQgetvalue(res, i, 0);
913 :
914 2 : if (strcmp(val, "3") != 0)
915 0 : pg_fatal("expected only insert with value 3, got %s", val);
916 : }
917 :
918 2 : PQclear(res);
919 :
920 2 : fprintf(stderr, "ok\n");
921 2 : }
922 :
923 : /* State machine enum for test_pipelined_insert */
924 : enum PipelineInsertStep
925 : {
926 : BI_BEGIN_TX,
927 : BI_DROP_TABLE,
928 : BI_CREATE_TABLE,
929 : BI_PREPARE,
930 : BI_INSERT_ROWS,
931 : BI_COMMIT_TX,
932 : BI_SYNC,
933 : BI_DONE,
934 : };
935 :
936 : static void
937 2 : test_pipelined_insert(PGconn *conn, int n_rows)
938 : {
939 2 : Oid insert_param_oids[2] = {INT4OID, INT8OID};
940 : const char *insert_params[2];
941 : char insert_param_0[MAXINTLEN];
942 : char insert_param_1[MAXINT8LEN];
943 2 : enum PipelineInsertStep send_step = BI_BEGIN_TX,
944 2 : recv_step = BI_BEGIN_TX;
945 : int rows_to_send,
946 : rows_to_receive;
947 :
948 2 : insert_params[0] = insert_param_0;
949 2 : insert_params[1] = insert_param_1;
950 :
951 2 : rows_to_send = rows_to_receive = n_rows;
952 :
953 : /*
954 : * Do a pipelined insert into a table created at the start of the pipeline
955 : */
956 2 : if (PQenterPipelineMode(conn) != 1)
957 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
958 :
959 8 : while (send_step != BI_PREPARE)
960 : {
961 : const char *sql;
962 :
963 6 : switch (send_step)
964 : {
965 2 : case BI_BEGIN_TX:
966 2 : sql = "BEGIN TRANSACTION";
967 2 : send_step = BI_DROP_TABLE;
968 2 : break;
969 :
970 2 : case BI_DROP_TABLE:
971 2 : sql = drop_table_sql;
972 2 : send_step = BI_CREATE_TABLE;
973 2 : break;
974 :
975 2 : case BI_CREATE_TABLE:
976 2 : sql = create_table_sql;
977 2 : send_step = BI_PREPARE;
978 2 : break;
979 :
980 0 : default:
981 0 : pg_fatal("invalid state");
982 : sql = NULL; /* keep compiler quiet */
983 : }
984 :
985 : pg_debug("sending: %s\n", sql);
986 6 : if (PQsendQueryParams(conn, sql,
987 : 0, NULL, NULL, NULL, NULL, 0) != 1)
988 0 : pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
989 : }
990 :
991 : Assert(send_step == BI_PREPARE);
992 : pg_debug("sending: %s\n", insert_sql2);
993 2 : if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
994 0 : pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
995 2 : send_step = BI_INSERT_ROWS;
996 :
997 : /*
998 : * Now we start inserting. We'll be sending enough data that we could fill
999 : * our output buffer, so to avoid deadlocking we need to enter nonblocking
1000 : * mode and consume input while we send more output. As results of each
1001 : * query are processed we should pop them to allow processing of the next
1002 : * query. There's no need to finish the pipeline before processing
1003 : * results.
1004 : */
1005 2 : if (PQsetnonblocking(conn, 1) != 0)
1006 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1007 :
1008 39852 : while (recv_step != BI_DONE)
1009 : {
1010 : int sock;
1011 : fd_set input_mask;
1012 : fd_set output_mask;
1013 :
1014 39850 : sock = PQsocket(conn);
1015 :
1016 39850 : if (sock < 0)
1017 0 : break; /* shouldn't happen */
1018 :
1019 677450 : FD_ZERO(&input_mask);
1020 39850 : FD_SET(sock, &input_mask);
1021 677450 : FD_ZERO(&output_mask);
1022 39850 : FD_SET(sock, &output_mask);
1023 :
1024 39850 : if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1025 : {
1026 0 : fprintf(stderr, "select() failed: %m\n");
1027 0 : exit_nicely(conn);
1028 : }
1029 :
1030 : /*
1031 : * Process any results, so we keep the server's output buffer free
1032 : * flowing and it can continue to process input
1033 : */
1034 39850 : if (FD_ISSET(sock, &input_mask))
1035 : {
1036 4 : PQconsumeInput(conn);
1037 :
1038 : /* Read until we'd block if we tried to read */
1039 2826 : while (!PQisBusy(conn) && recv_step < BI_DONE)
1040 : {
1041 : PGresult *res;
1042 2822 : const char *cmdtag = "";
1043 2822 : const char *description = "";
1044 : int status;
1045 :
1046 : /*
1047 : * Read next result. If no more results from this query,
1048 : * advance to the next query
1049 : */
1050 2822 : res = PQgetResult(conn);
1051 2822 : if (res == NULL)
1052 1410 : continue;
1053 :
1054 1412 : status = PGRES_COMMAND_OK;
1055 1412 : switch (recv_step)
1056 : {
1057 2 : case BI_BEGIN_TX:
1058 2 : cmdtag = "BEGIN";
1059 2 : recv_step++;
1060 2 : break;
1061 2 : case BI_DROP_TABLE:
1062 2 : cmdtag = "DROP TABLE";
1063 2 : recv_step++;
1064 2 : break;
1065 2 : case BI_CREATE_TABLE:
1066 2 : cmdtag = "CREATE TABLE";
1067 2 : recv_step++;
1068 2 : break;
1069 2 : case BI_PREPARE:
1070 2 : cmdtag = "";
1071 2 : description = "PREPARE";
1072 2 : recv_step++;
1073 2 : break;
1074 1400 : case BI_INSERT_ROWS:
1075 1400 : cmdtag = "INSERT";
1076 1400 : rows_to_receive--;
1077 1400 : if (rows_to_receive == 0)
1078 2 : recv_step++;
1079 1400 : break;
1080 2 : case BI_COMMIT_TX:
1081 2 : cmdtag = "COMMIT";
1082 2 : recv_step++;
1083 2 : break;
1084 2 : case BI_SYNC:
1085 2 : cmdtag = "";
1086 2 : description = "SYNC";
1087 2 : status = PGRES_PIPELINE_SYNC;
1088 2 : recv_step++;
1089 2 : break;
1090 0 : case BI_DONE:
1091 : /* unreachable */
1092 0 : pg_fatal("unreachable state");
1093 : }
1094 :
1095 1412 : if (PQresultStatus(res) != status)
1096 0 : pg_fatal("%s reported status %s, expected %s\n"
1097 : "Error message: \"%s\"",
1098 : description, PQresStatus(PQresultStatus(res)),
1099 : PQresStatus(status), PQerrorMessage(conn));
1100 :
1101 1412 : if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1102 0 : pg_fatal("%s expected command tag '%s', got '%s'",
1103 : description, cmdtag, PQcmdStatus(res));
1104 :
1105 : pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1106 :
1107 1412 : PQclear(res);
1108 : }
1109 : }
1110 :
1111 : /* Write more rows and/or the end pipeline message, if needed */
1112 39850 : if (FD_ISSET(sock, &output_mask))
1113 : {
1114 39848 : PQflush(conn);
1115 :
1116 39848 : if (send_step == BI_INSERT_ROWS)
1117 : {
1118 1400 : snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1119 : /* use up some buffer space with a wide value */
1120 1400 : snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1121 :
1122 1400 : if (PQsendQueryPrepared(conn, "my_insert",
1123 : 2, insert_params, NULL, NULL, 0) == 1)
1124 : {
1125 : pg_debug("sent row %d\n", rows_to_send);
1126 :
1127 1400 : rows_to_send--;
1128 1400 : if (rows_to_send == 0)
1129 2 : send_step++;
1130 : }
1131 : else
1132 : {
1133 : /*
1134 : * in nonblocking mode, so it's OK for an insert to fail
1135 : * to send
1136 : */
1137 0 : fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1138 : rows_to_send, PQerrorMessage(conn));
1139 : }
1140 : }
1141 38448 : else if (send_step == BI_COMMIT_TX)
1142 : {
1143 2 : if (PQsendQueryParams(conn, "COMMIT",
1144 : 0, NULL, NULL, NULL, NULL, 0) == 1)
1145 : {
1146 : pg_debug("sent COMMIT\n");
1147 2 : send_step++;
1148 : }
1149 : else
1150 : {
1151 0 : fprintf(stderr, "WARNING: failed to send commit: %s\n",
1152 : PQerrorMessage(conn));
1153 : }
1154 : }
1155 38446 : else if (send_step == BI_SYNC)
1156 : {
1157 2 : if (PQpipelineSync(conn) == 1)
1158 : {
1159 2 : fprintf(stdout, "pipeline sync sent\n");
1160 2 : send_step++;
1161 : }
1162 : else
1163 : {
1164 0 : fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1165 : PQerrorMessage(conn));
1166 : }
1167 : }
1168 : }
1169 : }
1170 :
1171 : /* We've got the sync message and the pipeline should be done */
1172 2 : if (PQexitPipelineMode(conn) != 1)
1173 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1174 : PQerrorMessage(conn));
1175 :
1176 2 : if (PQsetnonblocking(conn, 0) != 0)
1177 0 : pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1178 :
1179 2 : fprintf(stderr, "ok\n");
1180 2 : }
1181 :
1182 : static void
1183 2 : test_prepared(PGconn *conn)
1184 : {
1185 2 : PGresult *res = NULL;
1186 2 : Oid param_oids[1] = {INT4OID};
1187 : Oid expected_oids[4];
1188 : Oid typ;
1189 :
1190 2 : fprintf(stderr, "prepared... ");
1191 :
1192 2 : if (PQenterPipelineMode(conn) != 1)
1193 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1194 2 : if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1195 : "interval '1 sec'",
1196 : 1, param_oids) != 1)
1197 0 : pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1198 2 : expected_oids[0] = INT4OID;
1199 2 : expected_oids[1] = TEXTOID;
1200 2 : expected_oids[2] = NUMERICOID;
1201 2 : expected_oids[3] = INTERVALOID;
1202 2 : if (PQsendDescribePrepared(conn, "select_one") != 1)
1203 0 : pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1204 2 : if (PQpipelineSync(conn) != 1)
1205 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1206 :
1207 2 : consume_result_status(conn, PGRES_COMMAND_OK);
1208 :
1209 2 : consume_null_result(conn);
1210 :
1211 2 : res = confirm_result_status(conn, PGRES_COMMAND_OK);
1212 2 : if (PQnfields(res) != lengthof(expected_oids))
1213 0 : pg_fatal("expected %zu columns, got %d",
1214 : lengthof(expected_oids), PQnfields(res));
1215 10 : for (int i = 0; i < PQnfields(res); i++)
1216 : {
1217 8 : typ = PQftype(res, i);
1218 8 : if (typ != expected_oids[i])
1219 0 : pg_fatal("field %d: expected type %u, got %u",
1220 : i, expected_oids[i], typ);
1221 : }
1222 2 : PQclear(res);
1223 :
1224 2 : consume_null_result(conn);
1225 :
1226 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
1227 :
1228 2 : fprintf(stderr, "closing statement..");
1229 2 : if (PQsendClosePrepared(conn, "select_one") != 1)
1230 0 : pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1231 2 : if (PQpipelineSync(conn) != 1)
1232 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1233 :
1234 2 : consume_result_status(conn, PGRES_COMMAND_OK);
1235 :
1236 2 : consume_null_result(conn);
1237 :
1238 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
1239 :
1240 2 : if (PQexitPipelineMode(conn) != 1)
1241 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1242 :
1243 : /* Now that it's closed we should get an error when describing */
1244 2 : res = PQdescribePrepared(conn, "select_one");
1245 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1246 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1247 2 : PQclear(res);
1248 :
1249 : /*
1250 : * Also test the blocking close, this should not fail since closing a
1251 : * non-existent prepared statement is a no-op
1252 : */
1253 2 : res = PQclosePrepared(conn, "select_one");
1254 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1255 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1256 2 : PQclear(res);
1257 :
1258 2 : fprintf(stderr, "creating portal... ");
1259 :
1260 2 : res = PQexec(conn, "BEGIN");
1261 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1262 0 : pg_fatal("BEGIN failed: %s", PQerrorMessage(conn));
1263 2 : PQclear(res);
1264 :
1265 2 : res = PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1266 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1267 0 : pg_fatal("DECLARE CURSOR failed: %s", PQerrorMessage(conn));
1268 2 : PQclear(res);
1269 :
1270 2 : PQenterPipelineMode(conn);
1271 2 : if (PQsendDescribePortal(conn, "cursor_one") != 1)
1272 0 : pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1273 2 : if (PQpipelineSync(conn) != 1)
1274 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1275 :
1276 2 : res = confirm_result_status(conn, PGRES_COMMAND_OK);
1277 2 : typ = PQftype(res, 0);
1278 2 : if (typ != INT4OID)
1279 0 : pg_fatal("portal: expected type %u, got %u",
1280 : INT4OID, typ);
1281 2 : PQclear(res);
1282 :
1283 2 : consume_null_result(conn);
1284 :
1285 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
1286 :
1287 2 : fprintf(stderr, "closing portal... ");
1288 2 : if (PQsendClosePortal(conn, "cursor_one") != 1)
1289 0 : pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1290 2 : if (PQpipelineSync(conn) != 1)
1291 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1292 :
1293 2 : consume_result_status(conn, PGRES_COMMAND_OK);
1294 :
1295 2 : consume_null_result(conn);
1296 :
1297 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
1298 :
1299 2 : if (PQexitPipelineMode(conn) != 1)
1300 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1301 :
1302 : /* Now that it's closed we should get an error when describing */
1303 2 : res = PQdescribePortal(conn, "cursor_one");
1304 2 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1305 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1306 2 : PQclear(res);
1307 :
1308 : /*
1309 : * Also test the blocking close, this should not fail since closing a
1310 : * non-existent portal is a no-op
1311 : */
1312 2 : res = PQclosePortal(conn, "cursor_one");
1313 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1314 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1315 2 : PQclear(res);
1316 :
1317 2 : fprintf(stderr, "ok\n");
1318 2 : }
1319 :
1320 : /*
1321 : * Test max_protocol_version options.
1322 : */
1323 : static void
1324 2 : test_protocol_version(PGconn *conn)
1325 : {
1326 : const char **keywords;
1327 : const char **vals;
1328 : int nopts;
1329 2 : PQconninfoOption *opts = PQconninfo(conn);
1330 : int protocol_version;
1331 2 : int max_protocol_version_index = -1;
1332 : int i;
1333 :
1334 : /* Prepare keywords/vals arrays, copied from the existing connection. */
1335 2 : nopts = 0;
1336 104 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
1337 102 : nopts++;
1338 2 : nopts++; /* NULL terminator */
1339 :
1340 2 : keywords = pg_malloc0(sizeof(char *) * nopts);
1341 2 : vals = pg_malloc0(sizeof(char *) * nopts);
1342 :
1343 2 : i = 0;
1344 104 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
1345 : {
1346 : /*
1347 : * If the test already specified max_protocol_version, we want to
1348 : * replace it rather than attempting to override it. This matters when
1349 : * testing defaults, because empty option values at the end of the
1350 : * connection string won't replace earlier settings.
1351 : */
1352 102 : if (strcmp(opt->keyword, "max_protocol_version") == 0)
1353 2 : max_protocol_version_index = i;
1354 100 : else if (!opt->val)
1355 62 : continue;
1356 :
1357 40 : keywords[i] = opt->keyword;
1358 40 : vals[i] = opt->val;
1359 :
1360 40 : i++;
1361 : }
1362 :
1363 : Assert(max_protocol_version_index >= 0);
1364 :
1365 : /*
1366 : * Test default protocol_version
1367 : */
1368 2 : vals[max_protocol_version_index] = "";
1369 2 : conn = PQconnectdbParams(keywords, vals, false);
1370 :
1371 2 : if (PQstatus(conn) != CONNECTION_OK)
1372 0 : pg_fatal("Connection to database failed: %s",
1373 : PQerrorMessage(conn));
1374 :
1375 2 : protocol_version = PQfullProtocolVersion(conn);
1376 2 : if (protocol_version != 30000)
1377 0 : pg_fatal("expected 30000, got %d", protocol_version);
1378 :
1379 2 : PQfinish(conn);
1380 :
1381 : /*
1382 : * Test max_protocol_version=3.0
1383 : */
1384 2 : vals[max_protocol_version_index] = "3.0";
1385 2 : conn = PQconnectdbParams(keywords, vals, false);
1386 :
1387 2 : if (PQstatus(conn) != CONNECTION_OK)
1388 0 : pg_fatal("Connection to database failed: %s",
1389 : PQerrorMessage(conn));
1390 :
1391 2 : protocol_version = PQfullProtocolVersion(conn);
1392 2 : if (protocol_version != 30000)
1393 0 : pg_fatal("expected 30000, got %d", protocol_version);
1394 :
1395 2 : PQfinish(conn);
1396 :
1397 : /*
1398 : * Test max_protocol_version=3.1. It's not valid, we went straight from
1399 : * 3.0 to 3.2.
1400 : */
1401 2 : vals[max_protocol_version_index] = "3.1";
1402 2 : conn = PQconnectdbParams(keywords, vals, false);
1403 :
1404 2 : if (PQstatus(conn) != CONNECTION_BAD)
1405 0 : pg_fatal("Connecting with max_protocol_version 3.1 should have failed.");
1406 :
1407 2 : PQfinish(conn);
1408 :
1409 : /*
1410 : * Test max_protocol_version=3.2
1411 : */
1412 2 : vals[max_protocol_version_index] = "3.2";
1413 2 : conn = PQconnectdbParams(keywords, vals, false);
1414 :
1415 2 : if (PQstatus(conn) != CONNECTION_OK)
1416 0 : pg_fatal("Connection to database failed: %s",
1417 : PQerrorMessage(conn));
1418 :
1419 2 : protocol_version = PQfullProtocolVersion(conn);
1420 2 : if (protocol_version != 30002)
1421 0 : pg_fatal("expected 30002, got %d", protocol_version);
1422 :
1423 2 : PQfinish(conn);
1424 :
1425 : /*
1426 : * Test max_protocol_version=latest. 'latest' currently means '3.2'.
1427 : */
1428 2 : vals[max_protocol_version_index] = "latest";
1429 2 : conn = PQconnectdbParams(keywords, vals, false);
1430 :
1431 2 : if (PQstatus(conn) != CONNECTION_OK)
1432 0 : pg_fatal("Connection to database failed: %s",
1433 : PQerrorMessage(conn));
1434 :
1435 2 : protocol_version = PQfullProtocolVersion(conn);
1436 2 : if (protocol_version != 30002)
1437 0 : pg_fatal("expected 30002, got %d", protocol_version);
1438 :
1439 2 : PQfinish(conn);
1440 :
1441 2 : pfree(keywords);
1442 2 : pfree(vals);
1443 2 : PQconninfoFree(opts);
1444 2 : }
1445 :
1446 : /* Notice processor: print notices, and count how many we got */
1447 : static void
1448 2 : notice_processor(void *arg, const char *message)
1449 : {
1450 2 : int *n_notices = (int *) arg;
1451 :
1452 2 : (*n_notices)++;
1453 2 : fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1454 2 : }
1455 :
1456 : /* Verify behavior in "idle" state */
1457 : static void
1458 2 : test_pipeline_idle(PGconn *conn)
1459 : {
1460 2 : int n_notices = 0;
1461 :
1462 2 : fprintf(stderr, "\npipeline idle...\n");
1463 :
1464 2 : PQsetNoticeProcessor(conn, notice_processor, &n_notices);
1465 :
1466 : /* Try to exit pipeline mode in pipeline-idle state */
1467 2 : if (PQenterPipelineMode(conn) != 1)
1468 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1469 2 : if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1470 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1471 2 : PQsendFlushRequest(conn);
1472 :
1473 2 : consume_result_status(conn, PGRES_TUPLES_OK);
1474 :
1475 2 : consume_null_result(conn);
1476 :
1477 2 : if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1478 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1479 2 : if (PQexitPipelineMode(conn) == 1)
1480 0 : pg_fatal("exiting pipeline succeeded when it shouldn't");
1481 2 : if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1482 : strlen("cannot exit pipeline mode")) != 0)
1483 0 : pg_fatal("did not get expected error; got: %s",
1484 : PQerrorMessage(conn));
1485 2 : PQsendFlushRequest(conn);
1486 :
1487 2 : consume_result_status(conn, PGRES_TUPLES_OK);
1488 :
1489 2 : consume_null_result(conn);
1490 :
1491 2 : if (PQexitPipelineMode(conn) != 1)
1492 0 : pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1493 :
1494 2 : if (n_notices > 0)
1495 0 : pg_fatal("got %d notice(s)", n_notices);
1496 2 : fprintf(stderr, "ok - 1\n");
1497 :
1498 : /* Have a WARNING in the middle of a resultset */
1499 2 : if (PQenterPipelineMode(conn) != 1)
1500 0 : pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1501 2 : if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1502 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1503 2 : PQsendFlushRequest(conn);
1504 :
1505 2 : consume_result_status(conn, PGRES_TUPLES_OK);
1506 :
1507 2 : if (PQexitPipelineMode(conn) != 1)
1508 0 : pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1509 2 : fprintf(stderr, "ok - 2\n");
1510 2 : }
1511 :
1512 : static void
1513 2 : test_simple_pipeline(PGconn *conn)
1514 : {
1515 2 : const char *dummy_params[1] = {"1"};
1516 2 : Oid dummy_param_oids[1] = {INT4OID};
1517 :
1518 2 : fprintf(stderr, "simple pipeline... ");
1519 :
1520 : /*
1521 : * Enter pipeline mode and dispatch a set of operations, which we'll then
1522 : * process the results of as they come in.
1523 : *
1524 : * For a simple case we should be able to do this without interim
1525 : * processing of results since our output buffer will give us enough slush
1526 : * to work with and we won't block on sending. So blocking mode is fine.
1527 : */
1528 2 : if (PQisnonblocking(conn))
1529 0 : pg_fatal("Expected blocking connection mode");
1530 :
1531 2 : if (PQenterPipelineMode(conn) != 1)
1532 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1533 :
1534 2 : if (PQsendQueryParams(conn, "SELECT $1",
1535 : 1, dummy_param_oids, dummy_params,
1536 : NULL, NULL, 0) != 1)
1537 0 : pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1538 :
1539 2 : if (PQexitPipelineMode(conn) != 0)
1540 0 : pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1541 :
1542 2 : if (PQpipelineSync(conn) != 1)
1543 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1544 :
1545 2 : consume_result_status(conn, PGRES_TUPLES_OK);
1546 :
1547 2 : consume_null_result(conn);
1548 :
1549 : /*
1550 : * Even though we've processed the result there's still a sync to come and
1551 : * we can't exit pipeline mode yet
1552 : */
1553 2 : if (PQexitPipelineMode(conn) != 0)
1554 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1555 :
1556 2 : consume_result_status(conn, PGRES_PIPELINE_SYNC);
1557 :
1558 2 : consume_null_result(conn);
1559 :
1560 : /* We're still in pipeline mode... */
1561 2 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1562 0 : pg_fatal("Fell out of pipeline mode somehow");
1563 :
1564 : /* ... until we end it, which we can safely do now */
1565 2 : if (PQexitPipelineMode(conn) != 1)
1566 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1567 : PQerrorMessage(conn));
1568 :
1569 2 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1570 0 : pg_fatal("Exiting pipeline mode didn't seem to work");
1571 :
1572 2 : fprintf(stderr, "ok\n");
1573 2 : }
1574 :
1575 : static void
1576 2 : test_singlerowmode(PGconn *conn)
1577 : {
1578 : PGresult *res;
1579 : int i;
1580 2 : bool pipeline_ended = false;
1581 :
1582 2 : if (PQenterPipelineMode(conn) != 1)
1583 0 : pg_fatal("failed to enter pipeline mode: %s",
1584 : PQerrorMessage(conn));
1585 :
1586 : /* One series of three commands, using single-row mode for the first two. */
1587 8 : for (i = 0; i < 3; i++)
1588 : {
1589 : char *param[1];
1590 :
1591 6 : param[0] = psprintf("%d", 44 + i);
1592 :
1593 6 : if (PQsendQueryParams(conn,
1594 : "SELECT generate_series(42, $1)",
1595 : 1,
1596 : NULL,
1597 : (const char *const *) param,
1598 : NULL,
1599 : NULL,
1600 : 0) != 1)
1601 0 : pg_fatal("failed to send query: %s",
1602 : PQerrorMessage(conn));
1603 6 : pfree(param[0]);
1604 : }
1605 2 : if (PQpipelineSync(conn) != 1)
1606 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1607 :
1608 10 : for (i = 0; !pipeline_ended; i++)
1609 : {
1610 8 : bool first = true;
1611 : bool saw_ending_tuplesok;
1612 8 : bool isSingleTuple = false;
1613 :
1614 : /* Set single row mode for only first 2 SELECT queries */
1615 8 : if (i < 2)
1616 : {
1617 4 : if (PQsetSingleRowMode(conn) != 1)
1618 0 : pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1619 : }
1620 :
1621 : /* Consume rows for this query */
1622 8 : saw_ending_tuplesok = false;
1623 28 : while ((res = PQgetResult(conn)) != NULL)
1624 : {
1625 22 : ExecStatusType est = PQresultStatus(res);
1626 :
1627 22 : if (est == PGRES_PIPELINE_SYNC)
1628 : {
1629 2 : fprintf(stderr, "end of pipeline reached\n");
1630 2 : pipeline_ended = true;
1631 2 : PQclear(res);
1632 2 : if (i != 3)
1633 0 : pg_fatal("Expected three results, got %d", i);
1634 2 : break;
1635 : }
1636 :
1637 : /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1638 20 : if (first)
1639 : {
1640 6 : if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1641 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1642 : i, PQresStatus(est));
1643 6 : if (i >= 2 && est != PGRES_TUPLES_OK)
1644 0 : pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1645 : i, PQresStatus(est));
1646 6 : first = false;
1647 : }
1648 :
1649 20 : fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1650 20 : switch (est)
1651 : {
1652 6 : case PGRES_TUPLES_OK:
1653 6 : fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1654 6 : saw_ending_tuplesok = true;
1655 6 : if (isSingleTuple)
1656 : {
1657 4 : if (PQntuples(res) == 0)
1658 4 : fprintf(stderr, "all tuples received in query %d\n", i);
1659 : else
1660 0 : pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1661 : }
1662 6 : break;
1663 :
1664 14 : case PGRES_SINGLE_TUPLE:
1665 14 : isSingleTuple = true;
1666 14 : fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1667 14 : break;
1668 :
1669 0 : default:
1670 0 : pg_fatal("unexpected");
1671 : }
1672 20 : PQclear(res);
1673 : }
1674 8 : if (!pipeline_ended && !saw_ending_tuplesok)
1675 0 : pg_fatal("didn't get expected terminating TUPLES_OK");
1676 : }
1677 :
1678 : /*
1679 : * Now issue one command, get its results in with single-row mode, then
1680 : * issue another command, and get its results in normal mode; make sure
1681 : * the single-row mode flag is reset as expected.
1682 : */
1683 2 : if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1684 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1685 0 : pg_fatal("failed to send query: %s",
1686 : PQerrorMessage(conn));
1687 2 : if (PQsendFlushRequest(conn) != 1)
1688 0 : pg_fatal("failed to send flush request");
1689 2 : if (PQsetSingleRowMode(conn) != 1)
1690 0 : pg_fatal("PQsetSingleRowMode() failed");
1691 :
1692 2 : consume_result_status(conn, PGRES_SINGLE_TUPLE);
1693 :
1694 2 : consume_result_status(conn, PGRES_TUPLES_OK);
1695 :
1696 2 : consume_null_result(conn);
1697 :
1698 2 : if (PQsendQueryParams(conn, "SELECT 1",
1699 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1700 0 : pg_fatal("failed to send query: %s",
1701 : PQerrorMessage(conn));
1702 2 : if (PQsendFlushRequest(conn) != 1)
1703 0 : pg_fatal("failed to send flush request");
1704 :
1705 2 : consume_result_status(conn, PGRES_TUPLES_OK);
1706 :
1707 2 : consume_null_result(conn);
1708 :
1709 : /*
1710 : * Try chunked mode as well; make sure that it correctly delivers a
1711 : * partial final chunk.
1712 : */
1713 2 : if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1714 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1715 0 : pg_fatal("failed to send query: %s",
1716 : PQerrorMessage(conn));
1717 2 : if (PQsendFlushRequest(conn) != 1)
1718 0 : pg_fatal("failed to send flush request");
1719 2 : if (PQsetChunkedRowsMode(conn, 3) != 1)
1720 0 : pg_fatal("PQsetChunkedRowsMode() failed");
1721 :
1722 2 : res = confirm_result_status(conn, PGRES_TUPLES_CHUNK);
1723 2 : if (PQntuples(res) != 3)
1724 0 : pg_fatal("Expected 3 rows, got %d", PQntuples(res));
1725 2 : PQclear(res);
1726 :
1727 2 : res = confirm_result_status(conn, PGRES_TUPLES_CHUNK);
1728 2 : if (PQntuples(res) != 2)
1729 0 : pg_fatal("Expected 2 rows, got %d", PQntuples(res));
1730 2 : PQclear(res);
1731 :
1732 2 : res = confirm_result_status(conn, PGRES_TUPLES_OK);
1733 2 : if (PQntuples(res) != 0)
1734 0 : pg_fatal("Expected 0 rows, got %d", PQntuples(res));
1735 2 : PQclear(res);
1736 :
1737 2 : consume_null_result(conn);
1738 :
1739 2 : if (PQexitPipelineMode(conn) != 1)
1740 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1741 :
1742 2 : fprintf(stderr, "ok\n");
1743 2 : }
1744 :
1745 : /*
1746 : * Simple test to verify that a pipeline is discarded as a whole when there's
1747 : * an error, ignoring transaction commands.
1748 : */
1749 : static void
1750 2 : test_transaction(PGconn *conn)
1751 : {
1752 : PGresult *res;
1753 : bool expect_null;
1754 2 : int num_syncs = 0;
1755 :
1756 2 : res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1757 : "CREATE TABLE pq_pipeline_tst (id int)");
1758 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1759 0 : pg_fatal("failed to create test table: %s",
1760 : PQerrorMessage(conn));
1761 2 : PQclear(res);
1762 :
1763 2 : if (PQenterPipelineMode(conn) != 1)
1764 0 : pg_fatal("failed to enter pipeline mode: %s",
1765 : PQerrorMessage(conn));
1766 2 : if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1767 0 : pg_fatal("could not send prepare on pipeline: %s",
1768 : PQerrorMessage(conn));
1769 :
1770 2 : if (PQsendQueryParams(conn,
1771 : "BEGIN",
1772 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1773 0 : pg_fatal("failed to send query: %s",
1774 : PQerrorMessage(conn));
1775 2 : if (PQsendQueryParams(conn,
1776 : "SELECT 0/0",
1777 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1778 0 : pg_fatal("failed to send query: %s",
1779 : PQerrorMessage(conn));
1780 :
1781 : /*
1782 : * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1783 : * get out of the pipeline-aborted state first.
1784 : */
1785 2 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1786 0 : pg_fatal("failed to execute prepared: %s",
1787 : PQerrorMessage(conn));
1788 :
1789 : /* This insert fails because we're in pipeline-aborted state */
1790 2 : if (PQsendQueryParams(conn,
1791 : "INSERT INTO pq_pipeline_tst VALUES (1)",
1792 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1793 0 : pg_fatal("failed to send query: %s",
1794 : PQerrorMessage(conn));
1795 2 : if (PQpipelineSync(conn) != 1)
1796 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1797 2 : num_syncs++;
1798 :
1799 : /*
1800 : * This insert fails even though the pipeline got a SYNC, because we're in
1801 : * an aborted transaction
1802 : */
1803 2 : if (PQsendQueryParams(conn,
1804 : "INSERT INTO pq_pipeline_tst VALUES (2)",
1805 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1806 0 : pg_fatal("failed to send query: %s",
1807 : PQerrorMessage(conn));
1808 2 : if (PQpipelineSync(conn) != 1)
1809 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1810 2 : num_syncs++;
1811 :
1812 : /*
1813 : * Send ROLLBACK using prepared stmt. This one works because we just did
1814 : * PQpipelineSync above.
1815 : */
1816 2 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1817 0 : pg_fatal("failed to execute prepared: %s",
1818 : PQerrorMessage(conn));
1819 :
1820 : /*
1821 : * Now that we're out of a transaction and in pipeline-good mode, this
1822 : * insert works
1823 : */
1824 2 : if (PQsendQueryParams(conn,
1825 : "INSERT INTO pq_pipeline_tst VALUES (3)",
1826 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1827 0 : pg_fatal("failed to send query: %s",
1828 : PQerrorMessage(conn));
1829 : /* Send two syncs now -- match up to SYNC messages below */
1830 2 : if (PQpipelineSync(conn) != 1)
1831 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1832 2 : num_syncs++;
1833 2 : if (PQpipelineSync(conn) != 1)
1834 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1835 2 : num_syncs++;
1836 :
1837 2 : expect_null = false;
1838 2 : for (int i = 0;; i++)
1839 38 : {
1840 : ExecStatusType restype;
1841 :
1842 40 : res = PQgetResult(conn);
1843 40 : if (res == NULL)
1844 : {
1845 16 : printf("%d: got NULL result\n", i);
1846 16 : if (!expect_null)
1847 0 : pg_fatal("did not expect NULL here");
1848 16 : expect_null = false;
1849 16 : continue;
1850 : }
1851 24 : restype = PQresultStatus(res);
1852 24 : printf("%d: got status %s", i, PQresStatus(restype));
1853 24 : if (expect_null)
1854 0 : pg_fatal("expected NULL");
1855 24 : if (restype == PGRES_FATAL_ERROR)
1856 4 : printf("; error: %s", PQerrorMessage(conn));
1857 20 : else if (restype == PGRES_PIPELINE_ABORTED)
1858 : {
1859 4 : printf(": command didn't run because pipeline aborted\n");
1860 : }
1861 : else
1862 16 : printf("\n");
1863 24 : PQclear(res);
1864 :
1865 24 : if (restype == PGRES_PIPELINE_SYNC)
1866 8 : num_syncs--;
1867 : else
1868 16 : expect_null = true;
1869 24 : if (num_syncs <= 0)
1870 2 : break;
1871 : }
1872 :
1873 2 : consume_null_result(conn);
1874 :
1875 2 : if (PQexitPipelineMode(conn) != 1)
1876 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1877 :
1878 : /* We expect to find one tuple containing the value "3" */
1879 2 : res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1880 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1881 0 : pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1882 2 : if (PQntuples(res) != 1)
1883 0 : pg_fatal("did not get 1 tuple");
1884 2 : if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1885 0 : pg_fatal("did not get expected tuple");
1886 2 : PQclear(res);
1887 :
1888 2 : fprintf(stderr, "ok\n");
1889 2 : }
1890 :
1891 : /*
1892 : * In this test mode we send a stream of queries, with one in the middle
1893 : * causing an error. Verify that we can still send some more after the
1894 : * error and have libpq work properly.
1895 : */
1896 : static void
1897 2 : test_uniqviol(PGconn *conn)
1898 : {
1899 2 : int sock = PQsocket(conn);
1900 : PGresult *res;
1901 2 : Oid paramTypes[2] = {INT8OID, INT8OID};
1902 : const char *paramValues[2];
1903 : char paramValue0[MAXINT8LEN];
1904 : char paramValue1[MAXINT8LEN];
1905 2 : int ctr = 0;
1906 2 : int numsent = 0;
1907 2 : int results = 0;
1908 2 : bool read_done = false;
1909 2 : bool write_done = false;
1910 2 : bool error_sent = false;
1911 2 : bool got_error = false;
1912 2 : int switched = 0;
1913 2 : int socketful = 0;
1914 : fd_set in_fds;
1915 : fd_set out_fds;
1916 :
1917 2 : fprintf(stderr, "uniqviol ...");
1918 :
1919 2 : PQsetnonblocking(conn, 1);
1920 :
1921 2 : paramValues[0] = paramValue0;
1922 2 : paramValues[1] = paramValue1;
1923 2 : sprintf(paramValue1, "42");
1924 :
1925 2 : res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1926 : "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1927 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1928 0 : pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1929 2 : PQclear(res);
1930 :
1931 2 : res = PQexec(conn, "begin");
1932 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1933 0 : pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1934 2 : PQclear(res);
1935 :
1936 2 : res = PQprepare(conn, "insertion",
1937 : "insert into ppln_uniqviol values ($1, $2) returning id",
1938 : 2, paramTypes);
1939 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1940 0 : pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1941 2 : PQclear(res);
1942 :
1943 2 : if (PQenterPipelineMode(conn) != 1)
1944 0 : pg_fatal("failed to enter pipeline mode");
1945 :
1946 16 : while (!read_done)
1947 : {
1948 : /*
1949 : * Avoid deadlocks by reading everything the server has sent before
1950 : * sending anything. (Special precaution is needed here to process
1951 : * PQisBusy before testing the socket for read-readiness, because the
1952 : * socket does not turn read-ready after "sending" queries in aborted
1953 : * pipeline mode.)
1954 : */
1955 1212 : while (PQisBusy(conn) == 0)
1956 : {
1957 : bool new_error;
1958 :
1959 1202 : if (results >= numsent)
1960 : {
1961 2 : if (write_done)
1962 0 : read_done = true;
1963 2 : break;
1964 : }
1965 :
1966 1200 : res = PQgetResult(conn);
1967 1200 : new_error = process_result(conn, res, results, numsent);
1968 1200 : if (new_error && got_error)
1969 0 : pg_fatal("got two errors");
1970 1200 : got_error |= new_error;
1971 1200 : if (results++ >= numsent - 1)
1972 : {
1973 4 : if (write_done)
1974 2 : read_done = true;
1975 4 : break;
1976 : }
1977 : }
1978 :
1979 16 : if (read_done)
1980 2 : break;
1981 :
1982 238 : FD_ZERO(&out_fds);
1983 14 : FD_SET(sock, &out_fds);
1984 :
1985 238 : FD_ZERO(&in_fds);
1986 14 : FD_SET(sock, &in_fds);
1987 :
1988 14 : if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1989 : {
1990 0 : if (errno == EINTR)
1991 0 : continue;
1992 0 : pg_fatal("select() failed: %m");
1993 : }
1994 :
1995 14 : if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
1996 0 : pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
1997 :
1998 : /*
1999 : * If the socket is writable and we haven't finished sending queries,
2000 : * send some.
2001 : */
2002 14 : if (!write_done && FD_ISSET(sock, &out_fds))
2003 : {
2004 : for (;;)
2005 1194 : {
2006 : int flush;
2007 :
2008 : /*
2009 : * provoke uniqueness violation exactly once after having
2010 : * switched to read mode.
2011 : */
2012 1200 : if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
2013 : {
2014 2 : sprintf(paramValue0, "%d", numsent / 2);
2015 2 : fprintf(stderr, "E");
2016 2 : error_sent = true;
2017 : }
2018 : else
2019 : {
2020 1198 : fprintf(stderr, ".");
2021 1198 : sprintf(paramValue0, "%d", ctr++);
2022 : }
2023 :
2024 1200 : if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
2025 0 : pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
2026 1200 : numsent++;
2027 :
2028 : /* Are we done writing? */
2029 1200 : if (socketful != 0 && numsent % socketful == 42 && error_sent)
2030 : {
2031 2 : if (PQsendFlushRequest(conn) != 1)
2032 0 : pg_fatal("failed to send flush request");
2033 2 : write_done = true;
2034 2 : fprintf(stderr, "\ndone writing\n");
2035 2 : PQflush(conn);
2036 2 : break;
2037 : }
2038 :
2039 : /* is the outgoing socket full? */
2040 1198 : flush = PQflush(conn);
2041 1198 : if (flush == -1)
2042 0 : pg_fatal("failed to flush: %s", PQerrorMessage(conn));
2043 1198 : if (flush == 1)
2044 : {
2045 4 : if (socketful == 0)
2046 2 : socketful = numsent;
2047 4 : fprintf(stderr, "\nswitch to reading\n");
2048 4 : switched++;
2049 4 : break;
2050 : }
2051 : }
2052 : }
2053 : }
2054 :
2055 2 : if (!got_error)
2056 0 : pg_fatal("did not get expected error");
2057 :
2058 2 : fprintf(stderr, "ok\n");
2059 2 : }
2060 :
2061 : /*
2062 : * Subroutine for test_uniqviol; given a PGresult, print it out and consume
2063 : * the expected NULL that should follow it.
2064 : *
2065 : * Returns true if we read a fatal error message, otherwise false.
2066 : */
2067 : static bool
2068 1200 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
2069 : {
2070 1200 : bool got_error = false;
2071 :
2072 1200 : if (res == NULL)
2073 0 : pg_fatal("got unexpected NULL");
2074 :
2075 1200 : switch (PQresultStatus(res))
2076 : {
2077 2 : case PGRES_FATAL_ERROR:
2078 2 : got_error = true;
2079 2 : fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2080 2 : PQclear(res);
2081 2 : consume_null_result(conn);
2082 2 : break;
2083 :
2084 836 : case PGRES_TUPLES_OK:
2085 836 : fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2086 836 : PQclear(res);
2087 836 : consume_null_result(conn);
2088 836 : break;
2089 :
2090 362 : case PGRES_PIPELINE_ABORTED:
2091 362 : fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2092 362 : PQclear(res);
2093 362 : consume_null_result(conn);
2094 362 : break;
2095 :
2096 0 : default:
2097 0 : pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2098 : }
2099 :
2100 1200 : return got_error;
2101 : }
2102 :
2103 :
2104 : static void
2105 0 : usage(const char *progname)
2106 : {
2107 0 : fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2108 0 : fprintf(stderr, "Usage:\n");
2109 0 : fprintf(stderr, " %s [OPTION] tests\n", progname);
2110 0 : fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2111 0 : fprintf(stderr, "\nOptions:\n");
2112 0 : fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2113 0 : fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2114 0 : }
2115 :
2116 : static void
2117 2 : print_test_list(void)
2118 : {
2119 2 : printf("cancel\n");
2120 2 : printf("disallowed_in_pipeline\n");
2121 2 : printf("multi_pipelines\n");
2122 2 : printf("nosync\n");
2123 2 : printf("pipeline_abort\n");
2124 2 : printf("pipeline_idle\n");
2125 2 : printf("pipelined_insert\n");
2126 2 : printf("prepared\n");
2127 2 : printf("protocol_version\n");
2128 2 : printf("simple_pipeline\n");
2129 2 : printf("singlerow\n");
2130 2 : printf("transaction\n");
2131 2 : printf("uniqviol\n");
2132 2 : }
2133 :
2134 : int
2135 30 : main(int argc, char **argv)
2136 : {
2137 30 : const char *conninfo = "";
2138 : PGconn *conn;
2139 30 : FILE *trace = NULL;
2140 : char *testname;
2141 30 : int numrows = 10000;
2142 : PGresult *res;
2143 : int c;
2144 :
2145 104 : while ((c = getopt(argc, argv, "r:t:")) != -1)
2146 : {
2147 44 : switch (c)
2148 : {
2149 26 : case 'r': /* numrows */
2150 26 : errno = 0;
2151 26 : numrows = strtol(optarg, NULL, 10);
2152 26 : if (errno != 0 || numrows <= 0)
2153 : {
2154 0 : fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2155 : optarg);
2156 0 : exit(1);
2157 : }
2158 26 : break;
2159 18 : case 't': /* trace file */
2160 18 : tracefile = pg_strdup(optarg);
2161 18 : break;
2162 : }
2163 : }
2164 :
2165 30 : if (optind < argc)
2166 : {
2167 30 : testname = pg_strdup(argv[optind]);
2168 30 : optind++;
2169 : }
2170 : else
2171 : {
2172 0 : usage(argv[0]);
2173 0 : exit(1);
2174 : }
2175 :
2176 30 : if (strcmp(testname, "tests") == 0)
2177 : {
2178 2 : print_test_list();
2179 2 : exit(0);
2180 : }
2181 :
2182 28 : if (optind < argc)
2183 : {
2184 28 : conninfo = pg_strdup(argv[optind]);
2185 28 : optind++;
2186 : }
2187 :
2188 : /* Make a connection to the database */
2189 28 : conn = PQconnectdb(conninfo);
2190 28 : if (PQstatus(conn) != CONNECTION_OK)
2191 : {
2192 0 : fprintf(stderr, "Connection to database failed: %s\n",
2193 : PQerrorMessage(conn));
2194 0 : exit_nicely(conn);
2195 : }
2196 :
2197 28 : res = PQexec(conn, "SET lc_messages TO \"C\"");
2198 28 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2199 0 : pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
2200 28 : PQclear(res);
2201 28 : res = PQexec(conn, "SET debug_parallel_query = off");
2202 28 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2203 0 : pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
2204 28 : PQclear(res);
2205 :
2206 : /* Set the trace file, if requested */
2207 28 : if (tracefile != NULL)
2208 : {
2209 18 : if (strcmp(tracefile, "-") == 0)
2210 0 : trace = stdout;
2211 : else
2212 18 : trace = fopen(tracefile, "w");
2213 18 : if (trace == NULL)
2214 0 : pg_fatal("could not open file \"%s\": %m", tracefile);
2215 :
2216 : /* Make it line-buffered */
2217 18 : setvbuf(trace, NULL, PG_IOLBF, 0);
2218 :
2219 18 : PQtrace(conn, trace);
2220 18 : PQsetTraceFlags(conn,
2221 : PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
2222 : }
2223 :
2224 28 : if (strcmp(testname, "cancel") == 0)
2225 4 : test_cancel(conn);
2226 24 : else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2227 2 : test_disallowed_in_pipeline(conn);
2228 22 : else if (strcmp(testname, "multi_pipelines") == 0)
2229 2 : test_multi_pipelines(conn);
2230 20 : else if (strcmp(testname, "nosync") == 0)
2231 2 : test_nosync(conn);
2232 18 : else if (strcmp(testname, "pipeline_abort") == 0)
2233 2 : test_pipeline_abort(conn);
2234 16 : else if (strcmp(testname, "pipeline_idle") == 0)
2235 2 : test_pipeline_idle(conn);
2236 14 : else if (strcmp(testname, "pipelined_insert") == 0)
2237 2 : test_pipelined_insert(conn, numrows);
2238 12 : else if (strcmp(testname, "prepared") == 0)
2239 2 : test_prepared(conn);
2240 10 : else if (strcmp(testname, "protocol_version") == 0)
2241 2 : test_protocol_version(conn);
2242 8 : else if (strcmp(testname, "simple_pipeline") == 0)
2243 2 : test_simple_pipeline(conn);
2244 6 : else if (strcmp(testname, "singlerow") == 0)
2245 2 : test_singlerowmode(conn);
2246 4 : else if (strcmp(testname, "transaction") == 0)
2247 2 : test_transaction(conn);
2248 2 : else if (strcmp(testname, "uniqviol") == 0)
2249 2 : test_uniqviol(conn);
2250 : else
2251 : {
2252 0 : fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2253 0 : exit(1);
2254 : }
2255 :
2256 : /* close the connection to the database and cleanup */
2257 28 : PQfinish(conn);
2258 :
2259 28 : if (trace && trace != stdout)
2260 18 : fclose(trace);
2261 :
2262 28 : return 0;
2263 : }
|