LCOV - code coverage report
Current view: top level - src/test/modules/libpq_pipeline - libpq_pipeline.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 864 1206 71.6 %
Date: 2024-04-25 21:11:15 Functions: 20 23 87.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * libpq_pipeline.c
       4             :  *      Verify libpq pipeline execution functionality
       5             :  *
       6             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *      src/test/modules/libpq_pipeline/libpq_pipeline.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres_fe.h"
      17             : 
      18             : #include <sys/select.h>
      19             : #include <sys/time.h>
      20             : 
      21             : #include "catalog/pg_type_d.h"
      22             : #include "common/fe_memutils.h"
      23             : #include "libpq-fe.h"
      24             : #include "pg_getopt.h"
      25             : #include "portability/instr_time.h"
      26             : 
      27             : 
      28             : static void exit_nicely(PGconn *conn);
      29             : static void pg_attribute_noreturn() pg_fatal_impl(int line, const char *fmt,...)
      30             :             pg_attribute_printf(2, 3);
      31             : static bool process_result(PGconn *conn, PGresult *res, int results,
      32             :                            int numsent);
      33             : 
      34             : const char *const progname = "libpq_pipeline";
      35             : 
      36             : /* Options and defaults */
      37             : char       *tracefile = NULL;   /* path to PQtrace() file */
      38             : 
      39             : 
      40             : #ifdef DEBUG_OUTPUT
      41             : #define pg_debug(...)  do { fprintf(stderr, __VA_ARGS__); } while (0)
      42             : #else
      43             : #define pg_debug(...)
      44             : #endif
      45             : 
      46             : static const char *const drop_table_sql =
      47             : "DROP TABLE IF EXISTS pq_pipeline_demo";
      48             : static const char *const create_table_sql =
      49             : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
      50             : "int8filler int8);";
      51             : static const char *const insert_sql =
      52             : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
      53             : static const char *const insert_sql2 =
      54             : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
      55             : 
      56             : /* max char length of an int32/64, plus sign and null terminator */
      57             : #define MAXINTLEN 12
      58             : #define MAXINT8LEN 20
      59             : 
      60             : static void
      61           0 : exit_nicely(PGconn *conn)
      62             : {
      63           0 :     PQfinish(conn);
      64           0 :     exit(1);
      65             : }
      66             : 
      67             : /*
      68             :  * The following few functions are wrapped in macros to make the reported line
      69             :  * number in an error match the line number of the invocation.
      70             :  */
      71             : 
      72             : /*
      73             :  * Print an error to stderr and terminate the program.
      74             :  */
      75             : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
      76             : static void
      77             : pg_attribute_noreturn()
      78           0 : pg_fatal_impl(int line, const char *fmt,...)
      79             : {
      80             :     va_list     args;
      81             : 
      82           0 :     fflush(stdout);
      83             : 
      84           0 :     fprintf(stderr, "\n%s:%d: ", progname, line);
      85           0 :     va_start(args, fmt);
      86           0 :     vfprintf(stderr, fmt, args);
      87           0 :     va_end(args);
      88             :     Assert(fmt[strlen(fmt) - 1] != '\n');
      89           0 :     fprintf(stderr, "\n");
      90           0 :     exit(1);
      91             : }
      92             : 
      93             : /*
      94             :  * Check that the query on the given connection got canceled.
      95             :  */
      96             : #define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
      97             : static void
      98          12 : confirm_query_canceled_impl(int line, PGconn *conn)
      99             : {
     100          12 :     PGresult   *res = NULL;
     101             : 
     102          12 :     res = PQgetResult(conn);
     103          12 :     if (res == NULL)
     104           0 :         pg_fatal_impl(line, "PQgetResult returned null: %s",
     105             :                       PQerrorMessage(conn));
     106          12 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
     107           0 :         pg_fatal_impl(line, "query did not fail when it was expected");
     108          12 :     if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
     109           0 :         pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
     110             :                       PQerrorMessage(conn));
     111          12 :     PQclear(res);
     112             : 
     113          12 :     while (PQisBusy(conn))
     114           0 :         PQconsumeInput(conn);
     115          12 : }
     116             : 
     117             : /*
     118             :  * Using monitorConn, query pg_stat_activity to see that the connection with
     119             :  * the given PID is either in the given state, or waiting on the given event
     120             :  * (only one of them can be given).
     121             :  */
     122             : static void
     123          24 : wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
     124             :                           char *state, char *event)
     125             : {
     126          24 :     const Oid   paramTypes[] = {INT4OID, TEXTOID};
     127             :     const char *paramValues[2];
     128          24 :     char       *pidstr = psprintf("%d", procpid);
     129             : 
     130             :     Assert((state == NULL) ^ (event == NULL));
     131             : 
     132          24 :     paramValues[0] = pidstr;
     133          24 :     paramValues[1] = state ? state : event;
     134             : 
     135             :     while (true)
     136           0 :     {
     137             :         PGresult   *res;
     138             :         char       *value;
     139             : 
     140          24 :         if (state != NULL)
     141          12 :             res = PQexecParams(monitorConn,
     142             :                                "SELECT count(*) FROM pg_stat_activity WHERE "
     143             :                                "pid = $1 AND state = $2",
     144             :                                2, paramTypes, paramValues, NULL, NULL, 0);
     145             :         else
     146          12 :             res = PQexecParams(monitorConn,
     147             :                                "SELECT count(*) FROM pg_stat_activity WHERE "
     148             :                                "pid = $1 AND wait_event = $2",
     149             :                                2, paramTypes, paramValues, NULL, NULL, 0);
     150             : 
     151          24 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     152           0 :             pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
     153          24 :         if (PQntuples(res) != 1)
     154           0 :             pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
     155          24 :         if (PQnfields(res) != 1)
     156           0 :             pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
     157          24 :         value = PQgetvalue(res, 0, 0);
     158          24 :         if (strcmp(value, "0") != 0)
     159             :         {
     160          24 :             PQclear(res);
     161          24 :             break;
     162             :         }
     163           0 :         PQclear(res);
     164             : 
     165             :         /* wait 10ms before polling again */
     166           0 :         pg_usleep(10000);
     167             :     }
     168             : 
     169          24 :     pfree(pidstr);
     170          24 : }
     171             : 
     172             : #define send_cancellable_query(conn, monitorConn) \
     173             :     send_cancellable_query_impl(__LINE__, conn, monitorConn)
     174             : static void
     175          12 : send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
     176             : {
     177             :     const char *env_wait;
     178          12 :     const Oid   paramTypes[1] = {INT4OID};
     179             : 
     180             :     /*
     181             :      * Wait for the connection to be idle, so that our check for an active
     182             :      * connection below is reliable, instead of possibly seeing an outdated
     183             :      * state.
     184             :      */
     185          12 :     wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
     186             : 
     187          12 :     env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
     188          12 :     if (env_wait == NULL)
     189          12 :         env_wait = "180";
     190             : 
     191          12 :     if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
     192             :                           &env_wait, NULL, NULL, 0) != 1)
     193           0 :         pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
     194             : 
     195             :     /*
     196             :      * Wait for the sleep to be active, because if the query is not running
     197             :      * yet, the cancel request that we send won't have any effect.
     198             :      */
     199          12 :     wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
     200          12 : }
     201             : 
     202             : /*
     203             :  * Create a new connection with the same conninfo as the given one.
     204             :  */
     205             : static PGconn *
     206           2 : copy_connection(PGconn *conn)
     207             : {
     208             :     PGconn     *copyConn;
     209           2 :     PQconninfoOption *opts = PQconninfo(conn);
     210             :     const char **keywords;
     211             :     const char **vals;
     212           2 :     int         nopts = 1;
     213           2 :     int         i = 0;
     214             : 
     215          84 :     for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
     216          82 :         nopts++;
     217             : 
     218           2 :     keywords = pg_malloc(sizeof(char *) * nopts);
     219           2 :     vals = pg_malloc(sizeof(char *) * nopts);
     220             : 
     221          84 :     for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
     222             :     {
     223          82 :         if (opt->val)
     224             :         {
     225          38 :             keywords[i] = opt->keyword;
     226          38 :             vals[i] = opt->val;
     227          38 :             i++;
     228             :         }
     229             :     }
     230           2 :     keywords[i] = vals[i] = NULL;
     231             : 
     232           2 :     copyConn = PQconnectdbParams(keywords, vals, false);
     233             : 
     234           2 :     if (PQstatus(copyConn) != CONNECTION_OK)
     235           0 :         pg_fatal("Connection to database failed: %s",
     236             :                  PQerrorMessage(copyConn));
     237             : 
     238           2 :     return copyConn;
     239             : }
     240             : 
     241             : /*
     242             :  * Test query cancellation routines
     243             :  */
     244             : static void
     245           2 : test_cancel(PGconn *conn)
     246             : {
     247             :     PGcancel   *cancel;
     248             :     PGcancelConn *cancelConn;
     249             :     PGconn     *monitorConn;
     250             :     char        errorbuf[256];
     251             : 
     252           2 :     fprintf(stderr, "test cancellations... ");
     253             : 
     254           2 :     if (PQsetnonblocking(conn, 1) != 0)
     255           0 :         pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
     256             : 
     257             :     /*
     258             :      * Make a separate connection to the database to monitor the query on the
     259             :      * main connection.
     260             :      */
     261           2 :     monitorConn = copy_connection(conn);
     262             :     Assert(PQstatus(monitorConn) == CONNECTION_OK);
     263             : 
     264             :     /* test PQcancel */
     265           2 :     send_cancellable_query(conn, monitorConn);
     266           2 :     cancel = PQgetCancel(conn);
     267           2 :     if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
     268           0 :         pg_fatal("failed to run PQcancel: %s", errorbuf);
     269           2 :     confirm_query_canceled(conn);
     270             : 
     271             :     /* PGcancel object can be reused for the next query */
     272           2 :     send_cancellable_query(conn, monitorConn);
     273           2 :     if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
     274           0 :         pg_fatal("failed to run PQcancel: %s", errorbuf);
     275           2 :     confirm_query_canceled(conn);
     276             : 
     277           2 :     PQfreeCancel(cancel);
     278             : 
     279             :     /* test PQrequestCancel */
     280           2 :     send_cancellable_query(conn, monitorConn);
     281           2 :     if (!PQrequestCancel(conn))
     282           0 :         pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
     283           2 :     confirm_query_canceled(conn);
     284             : 
     285             :     /* test PQcancelBlocking */
     286           2 :     send_cancellable_query(conn, monitorConn);
     287           2 :     cancelConn = PQcancelCreate(conn);
     288           2 :     if (!PQcancelBlocking(cancelConn))
     289           0 :         pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
     290           2 :     confirm_query_canceled(conn);
     291           2 :     PQcancelFinish(cancelConn);
     292             : 
     293             :     /* test PQcancelCreate and then polling with PQcancelPoll */
     294           2 :     send_cancellable_query(conn, monitorConn);
     295           2 :     cancelConn = PQcancelCreate(conn);
     296           2 :     if (!PQcancelStart(cancelConn))
     297           0 :         pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     298             :     while (true)
     299           2 :     {
     300             :         struct timeval tv;
     301             :         fd_set      input_mask;
     302             :         fd_set      output_mask;
     303           4 :         PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
     304           4 :         int         sock = PQcancelSocket(cancelConn);
     305             : 
     306           4 :         if (pollres == PGRES_POLLING_OK)
     307           2 :             break;
     308             : 
     309           2 :         FD_ZERO(&input_mask);
     310           2 :         FD_ZERO(&output_mask);
     311           2 :         switch (pollres)
     312             :         {
     313           2 :             case PGRES_POLLING_READING:
     314             :                 pg_debug("polling for reads\n");
     315           2 :                 FD_SET(sock, &input_mask);
     316           2 :                 break;
     317           0 :             case PGRES_POLLING_WRITING:
     318             :                 pg_debug("polling for writes\n");
     319           0 :                 FD_SET(sock, &output_mask);
     320           0 :                 break;
     321           0 :             default:
     322           0 :                 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     323             :         }
     324             : 
     325           2 :         if (sock < 0)
     326           0 :             pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
     327             : 
     328           2 :         tv.tv_sec = 3;
     329           2 :         tv.tv_usec = 0;
     330             : 
     331             :         while (true)
     332             :         {
     333           2 :             if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
     334             :             {
     335           0 :                 if (errno == EINTR)
     336           0 :                     continue;
     337           0 :                 pg_fatal("select() failed: %m");
     338             :             }
     339           2 :             break;
     340             :         }
     341             :     }
     342           2 :     if (PQcancelStatus(cancelConn) != CONNECTION_OK)
     343           0 :         pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
     344           2 :     confirm_query_canceled(conn);
     345             : 
     346             :     /*
     347             :      * test PQcancelReset works on the cancel connection and it can be reused
     348             :      * afterwards
     349             :      */
     350           2 :     PQcancelReset(cancelConn);
     351             : 
     352           2 :     send_cancellable_query(conn, monitorConn);
     353           2 :     if (!PQcancelStart(cancelConn))
     354           0 :         pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     355             :     while (true)
     356           2 :     {
     357             :         struct timeval tv;
     358             :         fd_set      input_mask;
     359             :         fd_set      output_mask;
     360           4 :         PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
     361           4 :         int         sock = PQcancelSocket(cancelConn);
     362             : 
     363           4 :         if (pollres == PGRES_POLLING_OK)
     364           2 :             break;
     365             : 
     366           2 :         FD_ZERO(&input_mask);
     367           2 :         FD_ZERO(&output_mask);
     368           2 :         switch (pollres)
     369             :         {
     370           2 :             case PGRES_POLLING_READING:
     371             :                 pg_debug("polling for reads\n");
     372           2 :                 FD_SET(sock, &input_mask);
     373           2 :                 break;
     374           0 :             case PGRES_POLLING_WRITING:
     375             :                 pg_debug("polling for writes\n");
     376           0 :                 FD_SET(sock, &output_mask);
     377           0 :                 break;
     378           0 :             default:
     379           0 :                 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     380             :         }
     381             : 
     382           2 :         if (sock < 0)
     383           0 :             pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
     384             : 
     385           2 :         tv.tv_sec = 3;
     386           2 :         tv.tv_usec = 0;
     387             : 
     388             :         while (true)
     389             :         {
     390           2 :             if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
     391             :             {
     392           0 :                 if (errno == EINTR)
     393           0 :                     continue;
     394           0 :                 pg_fatal("select() failed: %m");
     395             :             }
     396           2 :             break;
     397             :         }
     398             :     }
     399           2 :     if (PQcancelStatus(cancelConn) != CONNECTION_OK)
     400           0 :         pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
     401           2 :     confirm_query_canceled(conn);
     402             : 
     403           2 :     PQcancelFinish(cancelConn);
     404             : 
     405           2 :     fprintf(stderr, "ok\n");
     406           2 : }
     407             : 
     408             : static void
     409           2 : test_disallowed_in_pipeline(PGconn *conn)
     410             : {
     411           2 :     PGresult   *res = NULL;
     412             : 
     413           2 :     fprintf(stderr, "test error cases... ");
     414             : 
     415           2 :     if (PQisnonblocking(conn))
     416           0 :         pg_fatal("Expected blocking connection mode");
     417             : 
     418           2 :     if (PQenterPipelineMode(conn) != 1)
     419           0 :         pg_fatal("Unable to enter pipeline mode");
     420             : 
     421           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     422           0 :         pg_fatal("Pipeline mode not activated properly");
     423             : 
     424             :     /* PQexec should fail in pipeline mode */
     425           2 :     res = PQexec(conn, "SELECT 1");
     426           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
     427           0 :         pg_fatal("PQexec should fail in pipeline mode but succeeded");
     428           2 :     if (strcmp(PQerrorMessage(conn),
     429             :                "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
     430           0 :         pg_fatal("did not get expected error message; got: \"%s\"",
     431             :                  PQerrorMessage(conn));
     432             : 
     433             :     /* PQsendQuery should fail in pipeline mode */
     434           2 :     if (PQsendQuery(conn, "SELECT 1") != 0)
     435           0 :         pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
     436           2 :     if (strcmp(PQerrorMessage(conn),
     437             :                "PQsendQuery not allowed in pipeline mode\n") != 0)
     438           0 :         pg_fatal("did not get expected error message; got: \"%s\"",
     439             :                  PQerrorMessage(conn));
     440             : 
     441             :     /* Entering pipeline mode when already in pipeline mode is OK */
     442           2 :     if (PQenterPipelineMode(conn) != 1)
     443           0 :         pg_fatal("re-entering pipeline mode should be a no-op but failed");
     444             : 
     445           2 :     if (PQisBusy(conn) != 0)
     446           0 :         pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
     447             : 
     448             :     /* ok, back to normal command mode */
     449           2 :     if (PQexitPipelineMode(conn) != 1)
     450           0 :         pg_fatal("couldn't exit idle empty pipeline mode");
     451             : 
     452           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     453           0 :         pg_fatal("Pipeline mode not terminated properly");
     454             : 
     455             :     /* exiting pipeline mode when not in pipeline mode should be a no-op */
     456           2 :     if (PQexitPipelineMode(conn) != 1)
     457           0 :         pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
     458             : 
     459             :     /* can now PQexec again */
     460           2 :     res = PQexec(conn, "SELECT 1");
     461           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     462           0 :         pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
     463             :                  PQerrorMessage(conn));
     464             : 
     465           2 :     fprintf(stderr, "ok\n");
     466           2 : }
     467             : 
     468             : static void
     469           2 : test_multi_pipelines(PGconn *conn)
     470             : {
     471           2 :     PGresult   *res = NULL;
     472           2 :     const char *dummy_params[1] = {"1"};
     473           2 :     Oid         dummy_param_oids[1] = {INT4OID};
     474             : 
     475           2 :     fprintf(stderr, "multi pipeline... ");
     476             : 
     477             :     /*
     478             :      * Queue up a couple of small pipelines and process each without returning
     479             :      * to command mode first.
     480             :      */
     481           2 :     if (PQenterPipelineMode(conn) != 1)
     482           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     483             : 
     484             :     /* first pipeline */
     485           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     486             :                           dummy_params, NULL, NULL, 0) != 1)
     487           0 :         pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
     488             : 
     489           2 :     if (PQpipelineSync(conn) != 1)
     490           0 :         pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
     491             : 
     492             :     /* second pipeline */
     493           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     494             :                           dummy_params, NULL, NULL, 0) != 1)
     495           0 :         pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
     496             : 
     497             :     /* Skip flushing once. */
     498           2 :     if (PQsendPipelineSync(conn) != 1)
     499           0 :         pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
     500             : 
     501             :     /* third pipeline */
     502           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     503             :                           dummy_params, NULL, NULL, 0) != 1)
     504           0 :         pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
     505             : 
     506           2 :     if (PQpipelineSync(conn) != 1)
     507           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     508             : 
     509             :     /* OK, start processing the results */
     510             : 
     511             :     /* first pipeline */
     512             : 
     513           2 :     res = PQgetResult(conn);
     514           2 :     if (res == NULL)
     515           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     516             :                  PQerrorMessage(conn));
     517             : 
     518           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     519           0 :         pg_fatal("Unexpected result code %s from first pipeline item",
     520             :                  PQresStatus(PQresultStatus(res)));
     521           2 :     PQclear(res);
     522           2 :     res = NULL;
     523             : 
     524           2 :     if (PQgetResult(conn) != NULL)
     525           0 :         pg_fatal("PQgetResult returned something extra after first result");
     526             : 
     527           2 :     if (PQexitPipelineMode(conn) != 0)
     528           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
     529             : 
     530           2 :     res = PQgetResult(conn);
     531           2 :     if (res == NULL)
     532           0 :         pg_fatal("PQgetResult returned null when sync result expected: %s",
     533             :                  PQerrorMessage(conn));
     534             : 
     535           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     536           0 :         pg_fatal("Unexpected result code %s instead of sync result, error: %s",
     537             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     538           2 :     PQclear(res);
     539             : 
     540             :     /* second pipeline */
     541             : 
     542           2 :     res = PQgetResult(conn);
     543           2 :     if (res == NULL)
     544           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     545             :                  PQerrorMessage(conn));
     546             : 
     547           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     548           0 :         pg_fatal("Unexpected result code %s from second pipeline item",
     549             :                  PQresStatus(PQresultStatus(res)));
     550           2 :     PQclear(res);
     551           2 :     res = NULL;
     552             : 
     553           2 :     if (PQgetResult(conn) != NULL)
     554           0 :         pg_fatal("PQgetResult returned something extra after first result");
     555             : 
     556           2 :     if (PQexitPipelineMode(conn) != 0)
     557           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
     558             : 
     559           2 :     res = PQgetResult(conn);
     560           2 :     if (res == NULL)
     561           0 :         pg_fatal("PQgetResult returned null when sync result expected: %s",
     562             :                  PQerrorMessage(conn));
     563             : 
     564           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     565           0 :         pg_fatal("Unexpected result code %s instead of sync result, error: %s",
     566             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     567           2 :     PQclear(res);
     568             : 
     569             :     /* third pipeline */
     570             : 
     571           2 :     res = PQgetResult(conn);
     572           2 :     if (res == NULL)
     573           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     574             :                  PQerrorMessage(conn));
     575             : 
     576           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     577           0 :         pg_fatal("Unexpected result code %s from third pipeline item",
     578             :                  PQresStatus(PQresultStatus(res)));
     579             : 
     580           2 :     res = PQgetResult(conn);
     581           2 :     if (res != NULL)
     582           0 :         pg_fatal("Expected null result, got %s",
     583             :                  PQresStatus(PQresultStatus(res)));
     584             : 
     585           2 :     res = PQgetResult(conn);
     586           2 :     if (res == NULL)
     587           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     588             :                  PQerrorMessage(conn));
     589             : 
     590           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     591           0 :         pg_fatal("Unexpected result code %s from second pipeline sync",
     592             :                  PQresStatus(PQresultStatus(res)));
     593             : 
     594             :     /* We're still in pipeline mode ... */
     595           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     596           0 :         pg_fatal("Fell out of pipeline mode somehow");
     597             : 
     598             :     /* until we end it, which we can safely do now */
     599           2 :     if (PQexitPipelineMode(conn) != 1)
     600           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     601             :                  PQerrorMessage(conn));
     602             : 
     603           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     604           0 :         pg_fatal("exiting pipeline mode didn't seem to work");
     605             : 
     606           2 :     fprintf(stderr, "ok\n");
     607           2 : }
     608             : 
     609             : /*
     610             :  * Test behavior when a pipeline dispatches a number of commands that are
     611             :  * not flushed by a sync point.
     612             :  */
     613             : static void
     614           2 : test_nosync(PGconn *conn)
     615             : {
     616           2 :     int         numqueries = 10;
     617           2 :     int         results = 0;
     618           2 :     int         sock = PQsocket(conn);
     619             : 
     620           2 :     fprintf(stderr, "nosync... ");
     621             : 
     622           2 :     if (sock < 0)
     623           0 :         pg_fatal("invalid socket");
     624             : 
     625           2 :     if (PQenterPipelineMode(conn) != 1)
     626           0 :         pg_fatal("could not enter pipeline mode");
     627          22 :     for (int i = 0; i < numqueries; i++)
     628             :     {
     629             :         fd_set      input_mask;
     630             :         struct timeval tv;
     631             : 
     632          20 :         if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
     633             :                               0, NULL, NULL, NULL, NULL, 0) != 1)
     634           0 :             pg_fatal("error sending select: %s", PQerrorMessage(conn));
     635          20 :         PQflush(conn);
     636             : 
     637             :         /*
     638             :          * If the server has written anything to us, read (some of) it now.
     639             :          */
     640          20 :         FD_ZERO(&input_mask);
     641          20 :         FD_SET(sock, &input_mask);
     642          20 :         tv.tv_sec = 0;
     643          20 :         tv.tv_usec = 0;
     644          20 :         if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
     645             :         {
     646           0 :             fprintf(stderr, "select() failed: %m\n");
     647           0 :             exit_nicely(conn);
     648             :         }
     649          20 :         if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
     650           0 :             pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
     651             :     }
     652             : 
     653             :     /* tell server to flush its output buffer */
     654           2 :     if (PQsendFlushRequest(conn) != 1)
     655           0 :         pg_fatal("failed to send flush request");
     656           2 :     PQflush(conn);
     657             : 
     658             :     /* Now read all results */
     659             :     for (;;)
     660          18 :     {
     661             :         PGresult   *res;
     662             : 
     663          20 :         res = PQgetResult(conn);
     664             : 
     665             :         /* NULL results are only expected after TUPLES_OK */
     666          20 :         if (res == NULL)
     667           0 :             pg_fatal("got unexpected NULL result after %d results", results);
     668             : 
     669             :         /* We expect exactly one TUPLES_OK result for each query we sent */
     670          20 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
     671             :         {
     672             :             PGresult   *res2;
     673             : 
     674             :             /* and one NULL result should follow each */
     675          20 :             res2 = PQgetResult(conn);
     676          20 :             if (res2 != NULL)
     677           0 :                 pg_fatal("expected NULL, got %s",
     678             :                          PQresStatus(PQresultStatus(res2)));
     679          20 :             PQclear(res);
     680          20 :             results++;
     681             : 
     682             :             /* if we're done, we're done */
     683          20 :             if (results == numqueries)
     684           2 :                 break;
     685             : 
     686          18 :             continue;
     687             :         }
     688             : 
     689             :         /* anything else is unexpected */
     690           0 :         pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
     691             :     }
     692             : 
     693           2 :     fprintf(stderr, "ok\n");
     694           2 : }
     695             : 
     696             : /*
     697             :  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
     698             :  * still have to get results for each pipeline item, but the item will just be
     699             :  * a PGRES_PIPELINE_ABORTED code.
     700             :  *
     701             :  * This intentionally doesn't use a transaction to wrap the pipeline. You should
     702             :  * usually use an xact, but in this case we want to observe the effects of each
     703             :  * statement.
     704             :  */
     705             : static void
     706           2 : test_pipeline_abort(PGconn *conn)
     707             : {
     708           2 :     PGresult   *res = NULL;
     709           2 :     const char *dummy_params[1] = {"1"};
     710           2 :     Oid         dummy_param_oids[1] = {INT4OID};
     711             :     int         i;
     712             :     int         gotrows;
     713             :     bool        goterror;
     714             : 
     715           2 :     fprintf(stderr, "aborted pipeline... ");
     716             : 
     717           2 :     res = PQexec(conn, drop_table_sql);
     718           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     719           0 :         pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
     720             : 
     721           2 :     res = PQexec(conn, create_table_sql);
     722           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     723           0 :         pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
     724             : 
     725             :     /*
     726             :      * Queue up a couple of small pipelines and process each without returning
     727             :      * to command mode first. Make sure the second operation in the first
     728             :      * pipeline ERRORs.
     729             :      */
     730           2 :     if (PQenterPipelineMode(conn) != 1)
     731           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     732             : 
     733           2 :     dummy_params[0] = "1";
     734           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     735             :                           dummy_params, NULL, NULL, 0) != 1)
     736           0 :         pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
     737             : 
     738           2 :     if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
     739             :                           1, dummy_param_oids, dummy_params,
     740             :                           NULL, NULL, 0) != 1)
     741           0 :         pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
     742             : 
     743           2 :     dummy_params[0] = "2";
     744           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     745             :                           dummy_params, NULL, NULL, 0) != 1)
     746           0 :         pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
     747             : 
     748           2 :     if (PQpipelineSync(conn) != 1)
     749           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     750             : 
     751           2 :     dummy_params[0] = "3";
     752           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     753             :                           dummy_params, NULL, NULL, 0) != 1)
     754           0 :         pg_fatal("dispatching second-pipeline insert failed: %s",
     755             :                  PQerrorMessage(conn));
     756             : 
     757           2 :     if (PQpipelineSync(conn) != 1)
     758           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     759             : 
     760             :     /*
     761             :      * OK, start processing the pipeline results.
     762             :      *
     763             :      * We should get a command-ok for the first query, then a fatal error and
     764             :      * a pipeline aborted message for the second insert, a pipeline-end, then
     765             :      * a command-ok and a pipeline-ok for the second pipeline operation.
     766             :      */
     767           2 :     res = PQgetResult(conn);
     768           2 :     if (res == NULL)
     769           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     770           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     771           0 :         pg_fatal("Unexpected result status %s: %s",
     772             :                  PQresStatus(PQresultStatus(res)),
     773             :                  PQresultErrorMessage(res));
     774           2 :     PQclear(res);
     775             : 
     776             :     /* NULL result to signal end-of-results for this command */
     777           2 :     if ((res = PQgetResult(conn)) != NULL)
     778           0 :         pg_fatal("Expected null result, got %s",
     779             :                  PQresStatus(PQresultStatus(res)));
     780             : 
     781             :     /* Second query caused error, so we expect an error next */
     782           2 :     res = PQgetResult(conn);
     783           2 :     if (res == NULL)
     784           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     785           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
     786           0 :         pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
     787             :                  PQresStatus(PQresultStatus(res)));
     788           2 :     PQclear(res);
     789             : 
     790             :     /* NULL result to signal end-of-results for this command */
     791           2 :     if ((res = PQgetResult(conn)) != NULL)
     792           0 :         pg_fatal("Expected null result, got %s",
     793             :                  PQresStatus(PQresultStatus(res)));
     794             : 
     795             :     /*
     796             :      * pipeline should now be aborted.
     797             :      *
     798             :      * Note that we could still queue more queries at this point if we wanted;
     799             :      * they'd get added to a new third pipeline since we've already sent a
     800             :      * second. The aborted flag relates only to the pipeline being received.
     801             :      */
     802           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
     803           0 :         pg_fatal("pipeline should be flagged as aborted but isn't");
     804             : 
     805             :     /* third query in pipeline, the second insert */
     806           2 :     res = PQgetResult(conn);
     807           2 :     if (res == NULL)
     808           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     809           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
     810           0 :         pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
     811             :                  PQresStatus(PQresultStatus(res)));
     812           2 :     PQclear(res);
     813             : 
     814             :     /* NULL result to signal end-of-results for this command */
     815           2 :     if ((res = PQgetResult(conn)) != NULL)
     816           0 :         pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
     817             : 
     818           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
     819           0 :         pg_fatal("pipeline should be flagged as aborted but isn't");
     820             : 
     821             :     /* Ensure we're still in pipeline */
     822           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     823           0 :         pg_fatal("Fell out of pipeline mode somehow");
     824             : 
     825             :     /*
     826             :      * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
     827             :      *
     828             :      * (This is so clients know to start processing results normally again and
     829             :      * can tell the difference between skipped commands and the sync.)
     830             :      */
     831           2 :     res = PQgetResult(conn);
     832           2 :     if (res == NULL)
     833           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     834           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     835           0 :         pg_fatal("Unexpected result code from first pipeline sync\n"
     836             :                  "Expected PGRES_PIPELINE_SYNC, got %s",
     837             :                  PQresStatus(PQresultStatus(res)));
     838           2 :     PQclear(res);
     839             : 
     840           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
     841           0 :         pg_fatal("sync should've cleared the aborted flag but didn't");
     842             : 
     843             :     /* We're still in pipeline mode... */
     844           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     845           0 :         pg_fatal("Fell out of pipeline mode somehow");
     846             : 
     847             :     /* the insert from the second pipeline */
     848           2 :     res = PQgetResult(conn);
     849           2 :     if (res == NULL)
     850           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     851           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     852           0 :         pg_fatal("Unexpected result code %s from first item in second pipeline",
     853             :                  PQresStatus(PQresultStatus(res)));
     854           2 :     PQclear(res);
     855             : 
     856             :     /* Read the NULL result at the end of the command */
     857           2 :     if ((res = PQgetResult(conn)) != NULL)
     858           0 :         pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
     859             : 
     860             :     /* the second pipeline sync */
     861           2 :     if ((res = PQgetResult(conn)) == NULL)
     862           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     863           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     864           0 :         pg_fatal("Unexpected result code %s from second pipeline sync",
     865             :                  PQresStatus(PQresultStatus(res)));
     866           2 :     PQclear(res);
     867             : 
     868           2 :     if ((res = PQgetResult(conn)) != NULL)
     869           0 :         pg_fatal("Expected null result, got %s: %s",
     870             :                  PQresStatus(PQresultStatus(res)),
     871             :                  PQerrorMessage(conn));
     872             : 
     873             :     /* Try to send two queries in one command */
     874           2 :     if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
     875           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
     876           2 :     if (PQpipelineSync(conn) != 1)
     877           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     878           2 :     goterror = false;
     879           4 :     while ((res = PQgetResult(conn)) != NULL)
     880             :     {
     881           2 :         switch (PQresultStatus(res))
     882             :         {
     883           2 :             case PGRES_FATAL_ERROR:
     884           2 :                 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
     885           0 :                     pg_fatal("expected error about multiple commands, got %s",
     886             :                              PQerrorMessage(conn));
     887           2 :                 printf("got expected %s", PQerrorMessage(conn));
     888           2 :                 goterror = true;
     889           2 :                 break;
     890           0 :             default:
     891           0 :                 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
     892             :                 break;
     893             :         }
     894             :     }
     895           2 :     if (!goterror)
     896           0 :         pg_fatal("did not get cannot-insert-multiple-commands error");
     897           2 :     res = PQgetResult(conn);
     898           2 :     if (res == NULL)
     899           0 :         pg_fatal("got NULL result");
     900           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     901           0 :         pg_fatal("Unexpected result code %s from pipeline sync",
     902             :                  PQresStatus(PQresultStatus(res)));
     903           2 :     fprintf(stderr, "ok\n");
     904             : 
     905             :     /* Test single-row mode with an error partways */
     906           2 :     if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
     907             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
     908           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
     909           2 :     if (PQpipelineSync(conn) != 1)
     910           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     911           2 :     PQsetSingleRowMode(conn);
     912           2 :     goterror = false;
     913           2 :     gotrows = 0;
     914          10 :     while ((res = PQgetResult(conn)) != NULL)
     915             :     {
     916           8 :         switch (PQresultStatus(res))
     917             :         {
     918           6 :             case PGRES_SINGLE_TUPLE:
     919           6 :                 printf("got row: %s\n", PQgetvalue(res, 0, 0));
     920           6 :                 gotrows++;
     921           6 :                 break;
     922           2 :             case PGRES_FATAL_ERROR:
     923           2 :                 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
     924           0 :                     pg_fatal("expected division-by-zero, got: %s (%s)",
     925             :                              PQerrorMessage(conn),
     926             :                              PQresultErrorField(res, PG_DIAG_SQLSTATE));
     927           2 :                 printf("got expected division-by-zero\n");
     928           2 :                 goterror = true;
     929           2 :                 break;
     930           0 :             default:
     931           0 :                 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
     932             :         }
     933           8 :         PQclear(res);
     934             :     }
     935           2 :     if (!goterror)
     936           0 :         pg_fatal("did not get division-by-zero error");
     937           2 :     if (gotrows != 3)
     938           0 :         pg_fatal("did not get three rows");
     939             :     /* the third pipeline sync */
     940           2 :     if ((res = PQgetResult(conn)) == NULL)
     941           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     942           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     943           0 :         pg_fatal("Unexpected result code %s from third pipeline sync",
     944             :                  PQresStatus(PQresultStatus(res)));
     945           2 :     PQclear(res);
     946             : 
     947             :     /* We're still in pipeline mode... */
     948           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     949           0 :         pg_fatal("Fell out of pipeline mode somehow");
     950             : 
     951             :     /* until we end it, which we can safely do now */
     952           2 :     if (PQexitPipelineMode(conn) != 1)
     953           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     954             :                  PQerrorMessage(conn));
     955             : 
     956           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     957           0 :         pg_fatal("exiting pipeline mode didn't seem to work");
     958             : 
     959             :     /*-
     960             :      * Since we fired the pipelines off without a surrounding xact, the results
     961             :      * should be:
     962             :      *
     963             :      * - Implicit xact started by server around 1st pipeline
     964             :      * - First insert applied
     965             :      * - Second statement aborted xact
     966             :      * - Third insert skipped
     967             :      * - Sync rolled back first implicit xact
     968             :      * - Implicit xact created by server around 2nd pipeline
     969             :      * - insert applied from 2nd pipeline
     970             :      * - Sync commits 2nd xact
     971             :      *
     972             :      * So we should only have the value 3 that we inserted.
     973             :      */
     974           2 :     res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
     975             : 
     976           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     977           0 :         pg_fatal("Expected tuples, got %s: %s",
     978             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     979           2 :     if (PQntuples(res) != 1)
     980           0 :         pg_fatal("expected 1 result, got %d", PQntuples(res));
     981           4 :     for (i = 0; i < PQntuples(res); i++)
     982             :     {
     983           2 :         const char *val = PQgetvalue(res, i, 0);
     984             : 
     985           2 :         if (strcmp(val, "3") != 0)
     986           0 :             pg_fatal("expected only insert with value 3, got %s", val);
     987             :     }
     988             : 
     989           2 :     PQclear(res);
     990             : 
     991           2 :     fprintf(stderr, "ok\n");
     992           2 : }
     993             : 
     994             : /* State machine enum for test_pipelined_insert */
     995             : enum PipelineInsertStep
     996             : {
     997             :     BI_BEGIN_TX,
     998             :     BI_DROP_TABLE,
     999             :     BI_CREATE_TABLE,
    1000             :     BI_PREPARE,
    1001             :     BI_INSERT_ROWS,
    1002             :     BI_COMMIT_TX,
    1003             :     BI_SYNC,
    1004             :     BI_DONE,
    1005             : };
    1006             : 
    1007             : static void
    1008           2 : test_pipelined_insert(PGconn *conn, int n_rows)
    1009             : {
    1010           2 :     Oid         insert_param_oids[2] = {INT4OID, INT8OID};
    1011             :     const char *insert_params[2];
    1012             :     char        insert_param_0[MAXINTLEN];
    1013             :     char        insert_param_1[MAXINT8LEN];
    1014           2 :     enum PipelineInsertStep send_step = BI_BEGIN_TX,
    1015           2 :                 recv_step = BI_BEGIN_TX;
    1016             :     int         rows_to_send,
    1017             :                 rows_to_receive;
    1018             : 
    1019           2 :     insert_params[0] = insert_param_0;
    1020           2 :     insert_params[1] = insert_param_1;
    1021             : 
    1022           2 :     rows_to_send = rows_to_receive = n_rows;
    1023             : 
    1024             :     /*
    1025             :      * Do a pipelined insert into a table created at the start of the pipeline
    1026             :      */
    1027           2 :     if (PQenterPipelineMode(conn) != 1)
    1028           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1029             : 
    1030           8 :     while (send_step != BI_PREPARE)
    1031             :     {
    1032             :         const char *sql;
    1033             : 
    1034           6 :         switch (send_step)
    1035             :         {
    1036           2 :             case BI_BEGIN_TX:
    1037           2 :                 sql = "BEGIN TRANSACTION";
    1038           2 :                 send_step = BI_DROP_TABLE;
    1039           2 :                 break;
    1040             : 
    1041           2 :             case BI_DROP_TABLE:
    1042           2 :                 sql = drop_table_sql;
    1043           2 :                 send_step = BI_CREATE_TABLE;
    1044           2 :                 break;
    1045             : 
    1046           2 :             case BI_CREATE_TABLE:
    1047           2 :                 sql = create_table_sql;
    1048           2 :                 send_step = BI_PREPARE;
    1049           2 :                 break;
    1050             : 
    1051           0 :             default:
    1052           0 :                 pg_fatal("invalid state");
    1053             :                 sql = NULL;     /* keep compiler quiet */
    1054             :         }
    1055             : 
    1056             :         pg_debug("sending: %s\n", sql);
    1057           6 :         if (PQsendQueryParams(conn, sql,
    1058             :                               0, NULL, NULL, NULL, NULL, 0) != 1)
    1059           0 :             pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
    1060             :     }
    1061             : 
    1062             :     Assert(send_step == BI_PREPARE);
    1063             :     pg_debug("sending: %s\n", insert_sql2);
    1064           2 :     if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
    1065           0 :         pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
    1066           2 :     send_step = BI_INSERT_ROWS;
    1067             : 
    1068             :     /*
    1069             :      * Now we start inserting. We'll be sending enough data that we could fill
    1070             :      * our output buffer, so to avoid deadlocking we need to enter nonblocking
    1071             :      * mode and consume input while we send more output. As results of each
    1072             :      * query are processed we should pop them to allow processing of the next
    1073             :      * query. There's no need to finish the pipeline before processing
    1074             :      * results.
    1075             :      */
    1076           2 :     if (PQsetnonblocking(conn, 1) != 0)
    1077           0 :         pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
    1078             : 
    1079       33434 :     while (recv_step != BI_DONE)
    1080             :     {
    1081             :         int         sock;
    1082             :         fd_set      input_mask;
    1083             :         fd_set      output_mask;
    1084             : 
    1085       33432 :         sock = PQsocket(conn);
    1086             : 
    1087       33432 :         if (sock < 0)
    1088           0 :             break;              /* shouldn't happen */
    1089             : 
    1090       33432 :         FD_ZERO(&input_mask);
    1091       33432 :         FD_SET(sock, &input_mask);
    1092       33432 :         FD_ZERO(&output_mask);
    1093       33432 :         FD_SET(sock, &output_mask);
    1094             : 
    1095       33432 :         if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
    1096             :         {
    1097           0 :             fprintf(stderr, "select() failed: %m\n");
    1098           0 :             exit_nicely(conn);
    1099             :         }
    1100             : 
    1101             :         /*
    1102             :          * Process any results, so we keep the server's output buffer free
    1103             :          * flowing and it can continue to process input
    1104             :          */
    1105       33432 :         if (FD_ISSET(sock, &input_mask))
    1106             :         {
    1107           6 :             PQconsumeInput(conn);
    1108             : 
    1109             :             /* Read until we'd block if we tried to read */
    1110        2828 :             while (!PQisBusy(conn) && recv_step < BI_DONE)
    1111             :             {
    1112             :                 PGresult   *res;
    1113        2822 :                 const char *cmdtag = "";
    1114        2822 :                 const char *description = "";
    1115             :                 int         status;
    1116             : 
    1117             :                 /*
    1118             :                  * Read next result.  If no more results from this query,
    1119             :                  * advance to the next query
    1120             :                  */
    1121        2822 :                 res = PQgetResult(conn);
    1122        2822 :                 if (res == NULL)
    1123        1410 :                     continue;
    1124             : 
    1125        1412 :                 status = PGRES_COMMAND_OK;
    1126        1412 :                 switch (recv_step)
    1127             :                 {
    1128           2 :                     case BI_BEGIN_TX:
    1129           2 :                         cmdtag = "BEGIN";
    1130           2 :                         recv_step++;
    1131           2 :                         break;
    1132           2 :                     case BI_DROP_TABLE:
    1133           2 :                         cmdtag = "DROP TABLE";
    1134           2 :                         recv_step++;
    1135           2 :                         break;
    1136           2 :                     case BI_CREATE_TABLE:
    1137           2 :                         cmdtag = "CREATE TABLE";
    1138           2 :                         recv_step++;
    1139           2 :                         break;
    1140           2 :                     case BI_PREPARE:
    1141           2 :                         cmdtag = "";
    1142           2 :                         description = "PREPARE";
    1143           2 :                         recv_step++;
    1144           2 :                         break;
    1145        1400 :                     case BI_INSERT_ROWS:
    1146        1400 :                         cmdtag = "INSERT";
    1147        1400 :                         rows_to_receive--;
    1148        1400 :                         if (rows_to_receive == 0)
    1149           2 :                             recv_step++;
    1150        1400 :                         break;
    1151           2 :                     case BI_COMMIT_TX:
    1152           2 :                         cmdtag = "COMMIT";
    1153           2 :                         recv_step++;
    1154           2 :                         break;
    1155           2 :                     case BI_SYNC:
    1156           2 :                         cmdtag = "";
    1157           2 :                         description = "SYNC";
    1158           2 :                         status = PGRES_PIPELINE_SYNC;
    1159           2 :                         recv_step++;
    1160           2 :                         break;
    1161           0 :                     case BI_DONE:
    1162             :                         /* unreachable */
    1163           0 :                         pg_fatal("unreachable state");
    1164             :                 }
    1165             : 
    1166        1412 :                 if (PQresultStatus(res) != status)
    1167           0 :                     pg_fatal("%s reported status %s, expected %s\n"
    1168             :                              "Error message: \"%s\"",
    1169             :                              description, PQresStatus(PQresultStatus(res)),
    1170             :                              PQresStatus(status), PQerrorMessage(conn));
    1171             : 
    1172        1412 :                 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
    1173           0 :                     pg_fatal("%s expected command tag '%s', got '%s'",
    1174             :                              description, cmdtag, PQcmdStatus(res));
    1175             : 
    1176             :                 pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
    1177             : 
    1178        1412 :                 PQclear(res);
    1179             :             }
    1180             :         }
    1181             : 
    1182             :         /* Write more rows and/or the end pipeline message, if needed */
    1183       33432 :         if (FD_ISSET(sock, &output_mask))
    1184             :         {
    1185       33428 :             PQflush(conn);
    1186             : 
    1187       33428 :             if (send_step == BI_INSERT_ROWS)
    1188             :             {
    1189        1400 :                 snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
    1190             :                 /* use up some buffer space with a wide value */
    1191        1400 :                 snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
    1192             : 
    1193        1400 :                 if (PQsendQueryPrepared(conn, "my_insert",
    1194             :                                         2, insert_params, NULL, NULL, 0) == 1)
    1195             :                 {
    1196             :                     pg_debug("sent row %d\n", rows_to_send);
    1197             : 
    1198        1400 :                     rows_to_send--;
    1199        1400 :                     if (rows_to_send == 0)
    1200           2 :                         send_step++;
    1201             :                 }
    1202             :                 else
    1203             :                 {
    1204             :                     /*
    1205             :                      * in nonblocking mode, so it's OK for an insert to fail
    1206             :                      * to send
    1207             :                      */
    1208           0 :                     fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
    1209             :                             rows_to_send, PQerrorMessage(conn));
    1210             :                 }
    1211             :             }
    1212       32028 :             else if (send_step == BI_COMMIT_TX)
    1213             :             {
    1214           2 :                 if (PQsendQueryParams(conn, "COMMIT",
    1215             :                                       0, NULL, NULL, NULL, NULL, 0) == 1)
    1216             :                 {
    1217             :                     pg_debug("sent COMMIT\n");
    1218           2 :                     send_step++;
    1219             :                 }
    1220             :                 else
    1221             :                 {
    1222           0 :                     fprintf(stderr, "WARNING: failed to send commit: %s\n",
    1223             :                             PQerrorMessage(conn));
    1224             :                 }
    1225             :             }
    1226       32026 :             else if (send_step == BI_SYNC)
    1227             :             {
    1228           2 :                 if (PQpipelineSync(conn) == 1)
    1229             :                 {
    1230           2 :                     fprintf(stdout, "pipeline sync sent\n");
    1231           2 :                     send_step++;
    1232             :                 }
    1233             :                 else
    1234             :                 {
    1235           0 :                     fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
    1236             :                             PQerrorMessage(conn));
    1237             :                 }
    1238             :             }
    1239             :         }
    1240             :     }
    1241             : 
    1242             :     /* We've got the sync message and the pipeline should be done */
    1243           2 :     if (PQexitPipelineMode(conn) != 1)
    1244           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
    1245             :                  PQerrorMessage(conn));
    1246             : 
    1247           2 :     if (PQsetnonblocking(conn, 0) != 0)
    1248           0 :         pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
    1249             : 
    1250           2 :     fprintf(stderr, "ok\n");
    1251           2 : }
    1252             : 
    1253             : static void
    1254           2 : test_prepared(PGconn *conn)
    1255             : {
    1256           2 :     PGresult   *res = NULL;
    1257           2 :     Oid         param_oids[1] = {INT4OID};
    1258             :     Oid         expected_oids[4];
    1259             :     Oid         typ;
    1260             : 
    1261           2 :     fprintf(stderr, "prepared... ");
    1262             : 
    1263           2 :     if (PQenterPipelineMode(conn) != 1)
    1264           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1265           2 :     if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
    1266             :                       "interval '1 sec'",
    1267             :                       1, param_oids) != 1)
    1268           0 :         pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
    1269           2 :     expected_oids[0] = INT4OID;
    1270           2 :     expected_oids[1] = TEXTOID;
    1271           2 :     expected_oids[2] = NUMERICOID;
    1272           2 :     expected_oids[3] = INTERVALOID;
    1273           2 :     if (PQsendDescribePrepared(conn, "select_one") != 1)
    1274           0 :         pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
    1275           2 :     if (PQpipelineSync(conn) != 1)
    1276           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1277             : 
    1278           2 :     res = PQgetResult(conn);
    1279           2 :     if (res == NULL)
    1280           0 :         pg_fatal("PQgetResult returned null");
    1281           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1282           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1283           2 :     PQclear(res);
    1284           2 :     res = PQgetResult(conn);
    1285           2 :     if (res != NULL)
    1286           0 :         pg_fatal("expected NULL result");
    1287             : 
    1288           2 :     res = PQgetResult(conn);
    1289           2 :     if (res == NULL)
    1290           0 :         pg_fatal("PQgetResult returned NULL");
    1291           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1292           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1293           2 :     if (PQnfields(res) != lengthof(expected_oids))
    1294           0 :         pg_fatal("expected %zu columns, got %d",
    1295             :                  lengthof(expected_oids), PQnfields(res));
    1296          10 :     for (int i = 0; i < PQnfields(res); i++)
    1297             :     {
    1298           8 :         typ = PQftype(res, i);
    1299           8 :         if (typ != expected_oids[i])
    1300           0 :             pg_fatal("field %d: expected type %u, got %u",
    1301             :                      i, expected_oids[i], typ);
    1302             :     }
    1303           2 :     PQclear(res);
    1304           2 :     res = PQgetResult(conn);
    1305           2 :     if (res != NULL)
    1306           0 :         pg_fatal("expected NULL result");
    1307             : 
    1308           2 :     res = PQgetResult(conn);
    1309           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1310           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
    1311             : 
    1312           2 :     fprintf(stderr, "closing statement..");
    1313           2 :     if (PQsendClosePrepared(conn, "select_one") != 1)
    1314           0 :         pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
    1315           2 :     if (PQpipelineSync(conn) != 1)
    1316           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1317             : 
    1318           2 :     res = PQgetResult(conn);
    1319           2 :     if (res == NULL)
    1320           0 :         pg_fatal("expected non-NULL result");
    1321           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1322           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1323           2 :     PQclear(res);
    1324           2 :     res = PQgetResult(conn);
    1325           2 :     if (res != NULL)
    1326           0 :         pg_fatal("expected NULL result");
    1327           2 :     res = PQgetResult(conn);
    1328           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1329           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
    1330             : 
    1331           2 :     if (PQexitPipelineMode(conn) != 1)
    1332           0 :         pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
    1333             : 
    1334             :     /* Now that it's closed we should get an error when describing */
    1335           2 :     res = PQdescribePrepared(conn, "select_one");
    1336           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
    1337           0 :         pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
    1338             : 
    1339             :     /*
    1340             :      * Also test the blocking close, this should not fail since closing a
    1341             :      * non-existent prepared statement is a no-op
    1342             :      */
    1343           2 :     res = PQclosePrepared(conn, "select_one");
    1344           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1345           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1346             : 
    1347           2 :     fprintf(stderr, "creating portal... ");
    1348           2 :     PQexec(conn, "BEGIN");
    1349           2 :     PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
    1350           2 :     PQenterPipelineMode(conn);
    1351           2 :     if (PQsendDescribePortal(conn, "cursor_one") != 1)
    1352           0 :         pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
    1353           2 :     if (PQpipelineSync(conn) != 1)
    1354           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1355           2 :     res = PQgetResult(conn);
    1356           2 :     if (res == NULL)
    1357           0 :         pg_fatal("PQgetResult returned null");
    1358           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1359           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1360             : 
    1361           2 :     typ = PQftype(res, 0);
    1362           2 :     if (typ != INT4OID)
    1363           0 :         pg_fatal("portal: expected type %u, got %u",
    1364             :                  INT4OID, typ);
    1365           2 :     PQclear(res);
    1366           2 :     res = PQgetResult(conn);
    1367           2 :     if (res != NULL)
    1368           0 :         pg_fatal("expected NULL result");
    1369           2 :     res = PQgetResult(conn);
    1370           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1371           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
    1372             : 
    1373           2 :     fprintf(stderr, "closing portal... ");
    1374           2 :     if (PQsendClosePortal(conn, "cursor_one") != 1)
    1375           0 :         pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
    1376           2 :     if (PQpipelineSync(conn) != 1)
    1377           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1378             : 
    1379           2 :     res = PQgetResult(conn);
    1380           2 :     if (res == NULL)
    1381           0 :         pg_fatal("expected non-NULL result");
    1382           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1383           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1384           2 :     PQclear(res);
    1385           2 :     res = PQgetResult(conn);
    1386           2 :     if (res != NULL)
    1387           0 :         pg_fatal("expected NULL result");
    1388           2 :     res = PQgetResult(conn);
    1389           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1390           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
    1391             : 
    1392           2 :     if (PQexitPipelineMode(conn) != 1)
    1393           0 :         pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
    1394             : 
    1395             :     /* Now that it's closed we should get an error when describing */
    1396           2 :     res = PQdescribePortal(conn, "cursor_one");
    1397           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
    1398           0 :         pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
    1399             : 
    1400             :     /*
    1401             :      * Also test the blocking close, this should not fail since closing a
    1402             :      * non-existent portal is a no-op
    1403             :      */
    1404           2 :     res = PQclosePortal(conn, "cursor_one");
    1405           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1406           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1407             : 
    1408           2 :     fprintf(stderr, "ok\n");
    1409           2 : }
    1410             : 
    1411             : /* Notice processor: print notices, and count how many we got */
    1412             : static void
    1413           2 : notice_processor(void *arg, const char *message)
    1414             : {
    1415           2 :     int        *n_notices = (int *) arg;
    1416             : 
    1417           2 :     (*n_notices)++;
    1418           2 :     fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
    1419           2 : }
    1420             : 
    1421             : /* Verify behavior in "idle" state */
    1422             : static void
    1423           2 : test_pipeline_idle(PGconn *conn)
    1424             : {
    1425             :     PGresult   *res;
    1426           2 :     int         n_notices = 0;
    1427             : 
    1428           2 :     fprintf(stderr, "\npipeline idle...\n");
    1429             : 
    1430           2 :     PQsetNoticeProcessor(conn, notice_processor, &n_notices);
    1431             : 
    1432             :     /* Try to exit pipeline mode in pipeline-idle state */
    1433           2 :     if (PQenterPipelineMode(conn) != 1)
    1434           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1435           2 :     if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1436           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1437           2 :     PQsendFlushRequest(conn);
    1438           2 :     res = PQgetResult(conn);
    1439           2 :     if (res == NULL)
    1440           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
    1441             :                  PQerrorMessage(conn));
    1442           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1443           0 :         pg_fatal("unexpected result code %s from first pipeline item",
    1444             :                  PQresStatus(PQresultStatus(res)));
    1445           2 :     PQclear(res);
    1446           2 :     res = PQgetResult(conn);
    1447           2 :     if (res != NULL)
    1448           0 :         pg_fatal("did not receive terminating NULL");
    1449           2 :     if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1450           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1451           2 :     if (PQexitPipelineMode(conn) == 1)
    1452           0 :         pg_fatal("exiting pipeline succeeded when it shouldn't");
    1453           2 :     if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
    1454             :                 strlen("cannot exit pipeline mode")) != 0)
    1455           0 :         pg_fatal("did not get expected error; got: %s",
    1456             :                  PQerrorMessage(conn));
    1457           2 :     PQsendFlushRequest(conn);
    1458           2 :     res = PQgetResult(conn);
    1459           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1460           0 :         pg_fatal("unexpected result code %s from second pipeline item",
    1461             :                  PQresStatus(PQresultStatus(res)));
    1462           2 :     PQclear(res);
    1463           2 :     res = PQgetResult(conn);
    1464           2 :     if (res != NULL)
    1465           0 :         pg_fatal("did not receive terminating NULL");
    1466           2 :     if (PQexitPipelineMode(conn) != 1)
    1467           0 :         pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
    1468             : 
    1469           2 :     if (n_notices > 0)
    1470           0 :         pg_fatal("got %d notice(s)", n_notices);
    1471           2 :     fprintf(stderr, "ok - 1\n");
    1472             : 
    1473             :     /* Have a WARNING in the middle of a resultset */
    1474           2 :     if (PQenterPipelineMode(conn) != 1)
    1475           0 :         pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
    1476           2 :     if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1477           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1478           2 :     PQsendFlushRequest(conn);
    1479           2 :     res = PQgetResult(conn);
    1480           2 :     if (res == NULL)
    1481           0 :         pg_fatal("unexpected NULL result received");
    1482           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1483           0 :         pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
    1484           2 :     if (PQexitPipelineMode(conn) != 1)
    1485           0 :         pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
    1486           2 :     fprintf(stderr, "ok - 2\n");
    1487           2 : }
    1488             : 
    1489             : static void
    1490           2 : test_simple_pipeline(PGconn *conn)
    1491             : {
    1492           2 :     PGresult   *res = NULL;
    1493           2 :     const char *dummy_params[1] = {"1"};
    1494           2 :     Oid         dummy_param_oids[1] = {INT4OID};
    1495             : 
    1496           2 :     fprintf(stderr, "simple pipeline... ");
    1497             : 
    1498             :     /*
    1499             :      * Enter pipeline mode and dispatch a set of operations, which we'll then
    1500             :      * process the results of as they come in.
    1501             :      *
    1502             :      * For a simple case we should be able to do this without interim
    1503             :      * processing of results since our output buffer will give us enough slush
    1504             :      * to work with and we won't block on sending. So blocking mode is fine.
    1505             :      */
    1506           2 :     if (PQisnonblocking(conn))
    1507           0 :         pg_fatal("Expected blocking connection mode");
    1508             : 
    1509           2 :     if (PQenterPipelineMode(conn) != 1)
    1510           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1511             : 
    1512           2 :     if (PQsendQueryParams(conn, "SELECT $1",
    1513             :                           1, dummy_param_oids, dummy_params,
    1514             :                           NULL, NULL, 0) != 1)
    1515           0 :         pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
    1516             : 
    1517           2 :     if (PQexitPipelineMode(conn) != 0)
    1518           0 :         pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
    1519             : 
    1520           2 :     if (PQpipelineSync(conn) != 1)
    1521           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1522             : 
    1523           2 :     res = PQgetResult(conn);
    1524           2 :     if (res == NULL)
    1525           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
    1526             :                  PQerrorMessage(conn));
    1527             : 
    1528           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1529           0 :         pg_fatal("Unexpected result code %s from first pipeline item",
    1530             :                  PQresStatus(PQresultStatus(res)));
    1531             : 
    1532           2 :     PQclear(res);
    1533           2 :     res = NULL;
    1534             : 
    1535           2 :     if (PQgetResult(conn) != NULL)
    1536           0 :         pg_fatal("PQgetResult returned something extra after first query result.");
    1537             : 
    1538             :     /*
    1539             :      * Even though we've processed the result there's still a sync to come and
    1540             :      * we can't exit pipeline mode yet
    1541             :      */
    1542           2 :     if (PQexitPipelineMode(conn) != 0)
    1543           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
    1544             : 
    1545           2 :     res = PQgetResult(conn);
    1546           2 :     if (res == NULL)
    1547           0 :         pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
    1548             :                  PQerrorMessage(conn));
    1549             : 
    1550           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1551           0 :         pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
    1552             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
    1553             : 
    1554           2 :     PQclear(res);
    1555           2 :     res = NULL;
    1556             : 
    1557           2 :     if (PQgetResult(conn) != NULL)
    1558           0 :         pg_fatal("PQgetResult returned something extra after pipeline end: %s",
    1559             :                  PQresStatus(PQresultStatus(res)));
    1560             : 
    1561             :     /* We're still in pipeline mode... */
    1562           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
    1563           0 :         pg_fatal("Fell out of pipeline mode somehow");
    1564             : 
    1565             :     /* ... until we end it, which we can safely do now */
    1566           2 :     if (PQexitPipelineMode(conn) != 1)
    1567           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
    1568             :                  PQerrorMessage(conn));
    1569             : 
    1570           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
    1571           0 :         pg_fatal("Exiting pipeline mode didn't seem to work");
    1572             : 
    1573           2 :     fprintf(stderr, "ok\n");
    1574           2 : }
    1575             : 
    1576             : static void
    1577           2 : test_singlerowmode(PGconn *conn)
    1578             : {
    1579             :     PGresult   *res;
    1580             :     int         i;
    1581           2 :     bool        pipeline_ended = false;
    1582             : 
    1583           2 :     if (PQenterPipelineMode(conn) != 1)
    1584           0 :         pg_fatal("failed to enter pipeline mode: %s",
    1585             :                  PQerrorMessage(conn));
    1586             : 
    1587             :     /* One series of three commands, using single-row mode for the first two. */
    1588           8 :     for (i = 0; i < 3; i++)
    1589             :     {
    1590             :         char       *param[1];
    1591             : 
    1592           6 :         param[0] = psprintf("%d", 44 + i);
    1593             : 
    1594           6 :         if (PQsendQueryParams(conn,
    1595             :                               "SELECT generate_series(42, $1)",
    1596             :                               1,
    1597             :                               NULL,
    1598             :                               (const char **) param,
    1599             :                               NULL,
    1600             :                               NULL,
    1601             :                               0) != 1)
    1602           0 :             pg_fatal("failed to send query: %s",
    1603             :                      PQerrorMessage(conn));
    1604           6 :         pfree(param[0]);
    1605             :     }
    1606           2 :     if (PQpipelineSync(conn) != 1)
    1607           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1608             : 
    1609          10 :     for (i = 0; !pipeline_ended; i++)
    1610             :     {
    1611           8 :         bool        first = true;
    1612             :         bool        saw_ending_tuplesok;
    1613           8 :         bool        isSingleTuple = false;
    1614             : 
    1615             :         /* Set single row mode for only first 2 SELECT queries */
    1616           8 :         if (i < 2)
    1617             :         {
    1618           4 :             if (PQsetSingleRowMode(conn) != 1)
    1619           0 :                 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
    1620             :         }
    1621             : 
    1622             :         /* Consume rows for this query */
    1623           8 :         saw_ending_tuplesok = false;
    1624          28 :         while ((res = PQgetResult(conn)) != NULL)
    1625             :         {
    1626          22 :             ExecStatusType est = PQresultStatus(res);
    1627             : 
    1628          22 :             if (est == PGRES_PIPELINE_SYNC)
    1629             :             {
    1630           2 :                 fprintf(stderr, "end of pipeline reached\n");
    1631           2 :                 pipeline_ended = true;
    1632           2 :                 PQclear(res);
    1633           2 :                 if (i != 3)
    1634           0 :                     pg_fatal("Expected three results, got %d", i);
    1635           2 :                 break;
    1636             :             }
    1637             : 
    1638             :             /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
    1639          20 :             if (first)
    1640             :             {
    1641           6 :                 if (i <= 1 && est != PGRES_SINGLE_TUPLE)
    1642           0 :                     pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
    1643             :                              i, PQresStatus(est));
    1644           6 :                 if (i >= 2 && est != PGRES_TUPLES_OK)
    1645           0 :                     pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
    1646             :                              i, PQresStatus(est));
    1647           6 :                 first = false;
    1648             :             }
    1649             : 
    1650          20 :             fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
    1651          20 :             switch (est)
    1652             :             {
    1653           6 :                 case PGRES_TUPLES_OK:
    1654           6 :                     fprintf(stderr, ", tuples: %d\n", PQntuples(res));
    1655           6 :                     saw_ending_tuplesok = true;
    1656           6 :                     if (isSingleTuple)
    1657             :                     {
    1658           4 :                         if (PQntuples(res) == 0)
    1659           4 :                             fprintf(stderr, "all tuples received in query %d\n", i);
    1660             :                         else
    1661           0 :                             pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
    1662             :                     }
    1663           6 :                     break;
    1664             : 
    1665          14 :                 case PGRES_SINGLE_TUPLE:
    1666          14 :                     isSingleTuple = true;
    1667          14 :                     fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
    1668          14 :                     break;
    1669             : 
    1670           0 :                 default:
    1671           0 :                     pg_fatal("unexpected");
    1672             :             }
    1673          20 :             PQclear(res);
    1674             :         }
    1675           8 :         if (!pipeline_ended && !saw_ending_tuplesok)
    1676           0 :             pg_fatal("didn't get expected terminating TUPLES_OK");
    1677             :     }
    1678             : 
    1679             :     /*
    1680             :      * Now issue one command, get its results in with single-row mode, then
    1681             :      * issue another command, and get its results in normal mode; make sure
    1682             :      * the single-row mode flag is reset as expected.
    1683             :      */
    1684           2 :     if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
    1685             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1686           0 :         pg_fatal("failed to send query: %s",
    1687             :                  PQerrorMessage(conn));
    1688           2 :     if (PQsendFlushRequest(conn) != 1)
    1689           0 :         pg_fatal("failed to send flush request");
    1690           2 :     if (PQsetSingleRowMode(conn) != 1)
    1691           0 :         pg_fatal("PQsetSingleRowMode() failed");
    1692           2 :     res = PQgetResult(conn);
    1693           2 :     if (res == NULL)
    1694           0 :         pg_fatal("unexpected NULL");
    1695           2 :     if (PQresultStatus(res) != PGRES_SINGLE_TUPLE)
    1696           0 :         pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
    1697             :                  PQresStatus(PQresultStatus(res)));
    1698           2 :     res = PQgetResult(conn);
    1699           2 :     if (res == NULL)
    1700           0 :         pg_fatal("unexpected NULL");
    1701           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1702           0 :         pg_fatal("Expected PGRES_TUPLES_OK, got %s",
    1703             :                  PQresStatus(PQresultStatus(res)));
    1704           2 :     if (PQgetResult(conn) != NULL)
    1705           0 :         pg_fatal("expected NULL result");
    1706             : 
    1707           2 :     if (PQsendQueryParams(conn, "SELECT 1",
    1708             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1709           0 :         pg_fatal("failed to send query: %s",
    1710             :                  PQerrorMessage(conn));
    1711           2 :     if (PQsendFlushRequest(conn) != 1)
    1712           0 :         pg_fatal("failed to send flush request");
    1713           2 :     res = PQgetResult(conn);
    1714           2 :     if (res == NULL)
    1715           0 :         pg_fatal("unexpected NULL");
    1716           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1717           0 :         pg_fatal("Expected PGRES_TUPLES_OK, got %s",
    1718             :                  PQresStatus(PQresultStatus(res)));
    1719           2 :     if (PQgetResult(conn) != NULL)
    1720           0 :         pg_fatal("expected NULL result");
    1721             : 
    1722             :     /*
    1723             :      * Try chunked mode as well; make sure that it correctly delivers a
    1724             :      * partial final chunk.
    1725             :      */
    1726           2 :     if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
    1727             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1728           0 :         pg_fatal("failed to send query: %s",
    1729             :                  PQerrorMessage(conn));
    1730           2 :     if (PQsendFlushRequest(conn) != 1)
    1731           0 :         pg_fatal("failed to send flush request");
    1732           2 :     if (PQsetChunkedRowsMode(conn, 3) != 1)
    1733           0 :         pg_fatal("PQsetChunkedRowsMode() failed");
    1734           2 :     res = PQgetResult(conn);
    1735           2 :     if (res == NULL)
    1736           0 :         pg_fatal("unexpected NULL");
    1737           2 :     if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
    1738           0 :         pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
    1739             :                  PQresStatus(PQresultStatus(res)),
    1740             :                  PQerrorMessage(conn));
    1741           2 :     if (PQntuples(res) != 3)
    1742           0 :         pg_fatal("Expected 3 rows, got %d", PQntuples(res));
    1743           2 :     res = PQgetResult(conn);
    1744           2 :     if (res == NULL)
    1745           0 :         pg_fatal("unexpected NULL");
    1746           2 :     if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
    1747           0 :         pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
    1748             :                  PQresStatus(PQresultStatus(res)));
    1749           2 :     if (PQntuples(res) != 2)
    1750           0 :         pg_fatal("Expected 2 rows, got %d", PQntuples(res));
    1751           2 :     res = PQgetResult(conn);
    1752           2 :     if (res == NULL)
    1753           0 :         pg_fatal("unexpected NULL");
    1754           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1755           0 :         pg_fatal("Expected PGRES_TUPLES_OK, got %s",
    1756             :                  PQresStatus(PQresultStatus(res)));
    1757           2 :     if (PQntuples(res) != 0)
    1758           0 :         pg_fatal("Expected 0 rows, got %d", PQntuples(res));
    1759           2 :     if (PQgetResult(conn) != NULL)
    1760           0 :         pg_fatal("expected NULL result");
    1761             : 
    1762           2 :     if (PQexitPipelineMode(conn) != 1)
    1763           0 :         pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
    1764             : 
    1765           2 :     fprintf(stderr, "ok\n");
    1766           2 : }
    1767             : 
    1768             : /*
    1769             :  * Simple test to verify that a pipeline is discarded as a whole when there's
    1770             :  * an error, ignoring transaction commands.
    1771             :  */
    1772             : static void
    1773           2 : test_transaction(PGconn *conn)
    1774             : {
    1775             :     PGresult   *res;
    1776             :     bool        expect_null;
    1777           2 :     int         num_syncs = 0;
    1778             : 
    1779           2 :     res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
    1780             :                  "CREATE TABLE pq_pipeline_tst (id int)");
    1781           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1782           0 :         pg_fatal("failed to create test table: %s",
    1783             :                  PQerrorMessage(conn));
    1784           2 :     PQclear(res);
    1785             : 
    1786           2 :     if (PQenterPipelineMode(conn) != 1)
    1787           0 :         pg_fatal("failed to enter pipeline mode: %s",
    1788             :                  PQerrorMessage(conn));
    1789           2 :     if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
    1790           0 :         pg_fatal("could not send prepare on pipeline: %s",
    1791             :                  PQerrorMessage(conn));
    1792             : 
    1793           2 :     if (PQsendQueryParams(conn,
    1794             :                           "BEGIN",
    1795             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1796           0 :         pg_fatal("failed to send query: %s",
    1797             :                  PQerrorMessage(conn));
    1798           2 :     if (PQsendQueryParams(conn,
    1799             :                           "SELECT 0/0",
    1800             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1801           0 :         pg_fatal("failed to send query: %s",
    1802             :                  PQerrorMessage(conn));
    1803             : 
    1804             :     /*
    1805             :      * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
    1806             :      * get out of the pipeline-aborted state first.
    1807             :      */
    1808           2 :     if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
    1809           0 :         pg_fatal("failed to execute prepared: %s",
    1810             :                  PQerrorMessage(conn));
    1811             : 
    1812             :     /* This insert fails because we're in pipeline-aborted state */
    1813           2 :     if (PQsendQueryParams(conn,
    1814             :                           "INSERT INTO pq_pipeline_tst VALUES (1)",
    1815             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1816           0 :         pg_fatal("failed to send query: %s",
    1817             :                  PQerrorMessage(conn));
    1818           2 :     if (PQpipelineSync(conn) != 1)
    1819           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1820           2 :     num_syncs++;
    1821             : 
    1822             :     /*
    1823             :      * This insert fails even though the pipeline got a SYNC, because we're in
    1824             :      * an aborted transaction
    1825             :      */
    1826           2 :     if (PQsendQueryParams(conn,
    1827             :                           "INSERT INTO pq_pipeline_tst VALUES (2)",
    1828             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1829           0 :         pg_fatal("failed to send query: %s",
    1830             :                  PQerrorMessage(conn));
    1831           2 :     if (PQpipelineSync(conn) != 1)
    1832           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1833           2 :     num_syncs++;
    1834             : 
    1835             :     /*
    1836             :      * Send ROLLBACK using prepared stmt. This one works because we just did
    1837             :      * PQpipelineSync above.
    1838             :      */
    1839           2 :     if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
    1840           0 :         pg_fatal("failed to execute prepared: %s",
    1841             :                  PQerrorMessage(conn));
    1842             : 
    1843             :     /*
    1844             :      * Now that we're out of a transaction and in pipeline-good mode, this
    1845             :      * insert works
    1846             :      */
    1847           2 :     if (PQsendQueryParams(conn,
    1848             :                           "INSERT INTO pq_pipeline_tst VALUES (3)",
    1849             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1850           0 :         pg_fatal("failed to send query: %s",
    1851             :                  PQerrorMessage(conn));
    1852             :     /* Send two syncs now -- match up to SYNC messages below */
    1853           2 :     if (PQpipelineSync(conn) != 1)
    1854           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1855           2 :     num_syncs++;
    1856           2 :     if (PQpipelineSync(conn) != 1)
    1857           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1858           2 :     num_syncs++;
    1859             : 
    1860           2 :     expect_null = false;
    1861           2 :     for (int i = 0;; i++)
    1862          38 :     {
    1863             :         ExecStatusType restype;
    1864             : 
    1865          40 :         res = PQgetResult(conn);
    1866          40 :         if (res == NULL)
    1867             :         {
    1868          16 :             printf("%d: got NULL result\n", i);
    1869          16 :             if (!expect_null)
    1870           0 :                 pg_fatal("did not expect NULL here");
    1871          16 :             expect_null = false;
    1872          16 :             continue;
    1873             :         }
    1874          24 :         restype = PQresultStatus(res);
    1875          24 :         printf("%d: got status %s", i, PQresStatus(restype));
    1876          24 :         if (expect_null)
    1877           0 :             pg_fatal("expected NULL");
    1878          24 :         if (restype == PGRES_FATAL_ERROR)
    1879           4 :             printf("; error: %s", PQerrorMessage(conn));
    1880          20 :         else if (restype == PGRES_PIPELINE_ABORTED)
    1881             :         {
    1882           4 :             printf(": command didn't run because pipeline aborted\n");
    1883             :         }
    1884             :         else
    1885          16 :             printf("\n");
    1886          24 :         PQclear(res);
    1887             : 
    1888          24 :         if (restype == PGRES_PIPELINE_SYNC)
    1889           8 :             num_syncs--;
    1890             :         else
    1891          16 :             expect_null = true;
    1892          24 :         if (num_syncs <= 0)
    1893           2 :             break;
    1894             :     }
    1895           2 :     if (PQgetResult(conn) != NULL)
    1896           0 :         pg_fatal("returned something extra after all the syncs: %s",
    1897             :                  PQresStatus(PQresultStatus(res)));
    1898             : 
    1899           2 :     if (PQexitPipelineMode(conn) != 1)
    1900           0 :         pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
    1901             : 
    1902             :     /* We expect to find one tuple containing the value "3" */
    1903           2 :     res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
    1904           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1905           0 :         pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
    1906           2 :     if (PQntuples(res) != 1)
    1907           0 :         pg_fatal("did not get 1 tuple");
    1908           2 :     if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
    1909           0 :         pg_fatal("did not get expected tuple");
    1910           2 :     PQclear(res);
    1911             : 
    1912           2 :     fprintf(stderr, "ok\n");
    1913           2 : }
    1914             : 
    1915             : /*
    1916             :  * In this test mode we send a stream of queries, with one in the middle
    1917             :  * causing an error.  Verify that we can still send some more after the
    1918             :  * error and have libpq work properly.
    1919             :  */
    1920             : static void
    1921           2 : test_uniqviol(PGconn *conn)
    1922             : {
    1923           2 :     int         sock = PQsocket(conn);
    1924             :     PGresult   *res;
    1925           2 :     Oid         paramTypes[2] = {INT8OID, INT8OID};
    1926             :     const char *paramValues[2];
    1927             :     char        paramValue0[MAXINT8LEN];
    1928             :     char        paramValue1[MAXINT8LEN];
    1929           2 :     int         ctr = 0;
    1930           2 :     int         numsent = 0;
    1931           2 :     int         results = 0;
    1932           2 :     bool        read_done = false;
    1933           2 :     bool        write_done = false;
    1934           2 :     bool        error_sent = false;
    1935           2 :     bool        got_error = false;
    1936           2 :     int         switched = 0;
    1937           2 :     int         socketful = 0;
    1938             :     fd_set      in_fds;
    1939             :     fd_set      out_fds;
    1940             : 
    1941           2 :     fprintf(stderr, "uniqviol ...");
    1942             : 
    1943           2 :     PQsetnonblocking(conn, 1);
    1944             : 
    1945           2 :     paramValues[0] = paramValue0;
    1946           2 :     paramValues[1] = paramValue1;
    1947           2 :     sprintf(paramValue1, "42");
    1948             : 
    1949           2 :     res = PQexec(conn, "drop table if exists ppln_uniqviol;"
    1950             :                  "create table ppln_uniqviol(id bigint primary key, idata bigint)");
    1951           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1952           0 :         pg_fatal("failed to create table: %s", PQerrorMessage(conn));
    1953             : 
    1954           2 :     res = PQexec(conn, "begin");
    1955           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1956           0 :         pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
    1957             : 
    1958           2 :     res = PQprepare(conn, "insertion",
    1959             :                     "insert into ppln_uniqviol values ($1, $2) returning id",
    1960             :                     2, paramTypes);
    1961           2 :     if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
    1962           0 :         pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
    1963             : 
    1964           2 :     if (PQenterPipelineMode(conn) != 1)
    1965           0 :         pg_fatal("failed to enter pipeline mode");
    1966             : 
    1967          16 :     while (!read_done)
    1968             :     {
    1969             :         /*
    1970             :          * Avoid deadlocks by reading everything the server has sent before
    1971             :          * sending anything.  (Special precaution is needed here to process
    1972             :          * PQisBusy before testing the socket for read-readiness, because the
    1973             :          * socket does not turn read-ready after "sending" queries in aborted
    1974             :          * pipeline mode.)
    1975             :          */
    1976        1212 :         while (PQisBusy(conn) == 0)
    1977             :         {
    1978             :             bool        new_error;
    1979             : 
    1980        1202 :             if (results >= numsent)
    1981             :             {
    1982           2 :                 if (write_done)
    1983           0 :                     read_done = true;
    1984           2 :                 break;
    1985             :             }
    1986             : 
    1987        1200 :             res = PQgetResult(conn);
    1988        1200 :             new_error = process_result(conn, res, results, numsent);
    1989        1200 :             if (new_error && got_error)
    1990           0 :                 pg_fatal("got two errors");
    1991        1200 :             got_error |= new_error;
    1992        1200 :             if (results++ >= numsent - 1)
    1993             :             {
    1994           4 :                 if (write_done)
    1995           2 :                     read_done = true;
    1996           4 :                 break;
    1997             :             }
    1998             :         }
    1999             : 
    2000          16 :         if (read_done)
    2001           2 :             break;
    2002             : 
    2003          14 :         FD_ZERO(&out_fds);
    2004          14 :         FD_SET(sock, &out_fds);
    2005             : 
    2006          14 :         FD_ZERO(&in_fds);
    2007          14 :         FD_SET(sock, &in_fds);
    2008             : 
    2009          14 :         if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
    2010             :         {
    2011           0 :             if (errno == EINTR)
    2012           0 :                 continue;
    2013           0 :             pg_fatal("select() failed: %m");
    2014             :         }
    2015             : 
    2016          14 :         if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
    2017           0 :             pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
    2018             : 
    2019             :         /*
    2020             :          * If the socket is writable and we haven't finished sending queries,
    2021             :          * send some.
    2022             :          */
    2023          14 :         if (!write_done && FD_ISSET(sock, &out_fds))
    2024             :         {
    2025             :             for (;;)
    2026        1194 :             {
    2027             :                 int         flush;
    2028             : 
    2029             :                 /*
    2030             :                  * provoke uniqueness violation exactly once after having
    2031             :                  * switched to read mode.
    2032             :                  */
    2033        1200 :                 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
    2034             :                 {
    2035           2 :                     sprintf(paramValue0, "%d", numsent / 2);
    2036           2 :                     fprintf(stderr, "E");
    2037           2 :                     error_sent = true;
    2038             :                 }
    2039             :                 else
    2040             :                 {
    2041        1198 :                     fprintf(stderr, ".");
    2042        1198 :                     sprintf(paramValue0, "%d", ctr++);
    2043             :                 }
    2044             : 
    2045        1200 :                 if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
    2046           0 :                     pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
    2047        1200 :                 numsent++;
    2048             : 
    2049             :                 /* Are we done writing? */
    2050        1200 :                 if (socketful != 0 && numsent % socketful == 42 && error_sent)
    2051             :                 {
    2052           2 :                     if (PQsendFlushRequest(conn) != 1)
    2053           0 :                         pg_fatal("failed to send flush request");
    2054           2 :                     write_done = true;
    2055           2 :                     fprintf(stderr, "\ndone writing\n");
    2056           2 :                     PQflush(conn);
    2057           2 :                     break;
    2058             :                 }
    2059             : 
    2060             :                 /* is the outgoing socket full? */
    2061        1198 :                 flush = PQflush(conn);
    2062        1198 :                 if (flush == -1)
    2063           0 :                     pg_fatal("failed to flush: %s", PQerrorMessage(conn));
    2064        1198 :                 if (flush == 1)
    2065             :                 {
    2066           4 :                     if (socketful == 0)
    2067           2 :                         socketful = numsent;
    2068           4 :                     fprintf(stderr, "\nswitch to reading\n");
    2069           4 :                     switched++;
    2070           4 :                     break;
    2071             :                 }
    2072             :             }
    2073             :         }
    2074             :     }
    2075             : 
    2076           2 :     if (!got_error)
    2077           0 :         pg_fatal("did not get expected error");
    2078             : 
    2079           2 :     fprintf(stderr, "ok\n");
    2080           2 : }
    2081             : 
    2082             : /*
    2083             :  * Subroutine for test_uniqviol; given a PGresult, print it out and consume
    2084             :  * the expected NULL that should follow it.
    2085             :  *
    2086             :  * Returns true if we read a fatal error message, otherwise false.
    2087             :  */
    2088             : static bool
    2089        1200 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
    2090             : {
    2091             :     PGresult   *res2;
    2092        1200 :     bool        got_error = false;
    2093             : 
    2094        1200 :     if (res == NULL)
    2095           0 :         pg_fatal("got unexpected NULL");
    2096             : 
    2097        1200 :     switch (PQresultStatus(res))
    2098             :     {
    2099           2 :         case PGRES_FATAL_ERROR:
    2100           2 :             got_error = true;
    2101           2 :             fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
    2102           2 :             PQclear(res);
    2103             : 
    2104           2 :             res2 = PQgetResult(conn);
    2105           2 :             if (res2 != NULL)
    2106           0 :                 pg_fatal("expected NULL, got %s",
    2107             :                          PQresStatus(PQresultStatus(res2)));
    2108           2 :             break;
    2109             : 
    2110         836 :         case PGRES_TUPLES_OK:
    2111         836 :             fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
    2112         836 :             PQclear(res);
    2113             : 
    2114         836 :             res2 = PQgetResult(conn);
    2115         836 :             if (res2 != NULL)
    2116           0 :                 pg_fatal("expected NULL, got %s",
    2117             :                          PQresStatus(PQresultStatus(res2)));
    2118         836 :             break;
    2119             : 
    2120         362 :         case PGRES_PIPELINE_ABORTED:
    2121         362 :             fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
    2122         362 :             res2 = PQgetResult(conn);
    2123         362 :             if (res2 != NULL)
    2124           0 :                 pg_fatal("expected NULL, got %s",
    2125             :                          PQresStatus(PQresultStatus(res2)));
    2126         362 :             break;
    2127             : 
    2128           0 :         default:
    2129           0 :             pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
    2130             :     }
    2131             : 
    2132        1200 :     return got_error;
    2133             : }
    2134             : 
    2135             : 
    2136             : static void
    2137           0 : usage(const char *progname)
    2138             : {
    2139           0 :     fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
    2140           0 :     fprintf(stderr, "Usage:\n");
    2141           0 :     fprintf(stderr, "  %s [OPTION] tests\n", progname);
    2142           0 :     fprintf(stderr, "  %s [OPTION] TESTNAME [CONNINFO]\n", progname);
    2143           0 :     fprintf(stderr, "\nOptions:\n");
    2144           0 :     fprintf(stderr, "  -t TRACEFILE       generate a libpq trace to TRACEFILE\n");
    2145           0 :     fprintf(stderr, "  -r NUMROWS         use NUMROWS as the test size\n");
    2146           0 : }
    2147             : 
    2148             : static void
    2149           2 : print_test_list(void)
    2150             : {
    2151           2 :     printf("cancel\n");
    2152           2 :     printf("disallowed_in_pipeline\n");
    2153           2 :     printf("multi_pipelines\n");
    2154           2 :     printf("nosync\n");
    2155           2 :     printf("pipeline_abort\n");
    2156           2 :     printf("pipeline_idle\n");
    2157           2 :     printf("pipelined_insert\n");
    2158           2 :     printf("prepared\n");
    2159           2 :     printf("simple_pipeline\n");
    2160           2 :     printf("singlerow\n");
    2161           2 :     printf("transaction\n");
    2162           2 :     printf("uniqviol\n");
    2163           2 : }
    2164             : 
    2165             : int
    2166          26 : main(int argc, char **argv)
    2167             : {
    2168          26 :     const char *conninfo = "";
    2169             :     PGconn     *conn;
    2170             :     FILE       *trace;
    2171             :     char       *testname;
    2172          26 :     int         numrows = 10000;
    2173             :     PGresult   *res;
    2174             :     int         c;
    2175             : 
    2176          94 :     while ((c = getopt(argc, argv, "r:t:")) != -1)
    2177             :     {
    2178          42 :         switch (c)
    2179             :         {
    2180          24 :             case 'r':           /* numrows */
    2181          24 :                 errno = 0;
    2182          24 :                 numrows = strtol(optarg, NULL, 10);
    2183          24 :                 if (errno != 0 || numrows <= 0)
    2184             :                 {
    2185           0 :                     fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
    2186             :                             optarg);
    2187           0 :                     exit(1);
    2188             :                 }
    2189          24 :                 break;
    2190          18 :             case 't':           /* trace file */
    2191          18 :                 tracefile = pg_strdup(optarg);
    2192          18 :                 break;
    2193             :         }
    2194          68 :     }
    2195             : 
    2196          26 :     if (optind < argc)
    2197             :     {
    2198          26 :         testname = pg_strdup(argv[optind]);
    2199          26 :         optind++;
    2200             :     }
    2201             :     else
    2202             :     {
    2203           0 :         usage(argv[0]);
    2204           0 :         exit(1);
    2205             :     }
    2206             : 
    2207          26 :     if (strcmp(testname, "tests") == 0)
    2208             :     {
    2209           2 :         print_test_list();
    2210           2 :         exit(0);
    2211             :     }
    2212             : 
    2213          24 :     if (optind < argc)
    2214             :     {
    2215          24 :         conninfo = pg_strdup(argv[optind]);
    2216          24 :         optind++;
    2217             :     }
    2218             : 
    2219             :     /* Make a connection to the database */
    2220          24 :     conn = PQconnectdb(conninfo);
    2221          24 :     if (PQstatus(conn) != CONNECTION_OK)
    2222             :     {
    2223           0 :         fprintf(stderr, "Connection to database failed: %s\n",
    2224             :                 PQerrorMessage(conn));
    2225           0 :         exit_nicely(conn);
    2226             :     }
    2227             : 
    2228          24 :     res = PQexec(conn, "SET lc_messages TO \"C\"");
    2229          24 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2230           0 :         pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
    2231          24 :     res = PQexec(conn, "SET debug_parallel_query = off");
    2232          24 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2233           0 :         pg_fatal("failed to set debug_parallel_query: %s", PQerrorMessage(conn));
    2234             : 
    2235             :     /* Set the trace file, if requested */
    2236          24 :     if (tracefile != NULL)
    2237             :     {
    2238          18 :         if (strcmp(tracefile, "-") == 0)
    2239           0 :             trace = stdout;
    2240             :         else
    2241          18 :             trace = fopen(tracefile, "w");
    2242          18 :         if (trace == NULL)
    2243           0 :             pg_fatal("could not open file \"%s\": %m", tracefile);
    2244             : 
    2245             :         /* Make it line-buffered */
    2246          18 :         setvbuf(trace, NULL, PG_IOLBF, 0);
    2247             : 
    2248          18 :         PQtrace(conn, trace);
    2249          18 :         PQsetTraceFlags(conn,
    2250             :                         PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
    2251             :     }
    2252             : 
    2253          24 :     if (strcmp(testname, "cancel") == 0)
    2254           2 :         test_cancel(conn);
    2255          22 :     else if (strcmp(testname, "disallowed_in_pipeline") == 0)
    2256           2 :         test_disallowed_in_pipeline(conn);
    2257          20 :     else if (strcmp(testname, "multi_pipelines") == 0)
    2258           2 :         test_multi_pipelines(conn);
    2259          18 :     else if (strcmp(testname, "nosync") == 0)
    2260           2 :         test_nosync(conn);
    2261          16 :     else if (strcmp(testname, "pipeline_abort") == 0)
    2262           2 :         test_pipeline_abort(conn);
    2263          14 :     else if (strcmp(testname, "pipeline_idle") == 0)
    2264           2 :         test_pipeline_idle(conn);
    2265          12 :     else if (strcmp(testname, "pipelined_insert") == 0)
    2266           2 :         test_pipelined_insert(conn, numrows);
    2267          10 :     else if (strcmp(testname, "prepared") == 0)
    2268           2 :         test_prepared(conn);
    2269           8 :     else if (strcmp(testname, "simple_pipeline") == 0)
    2270           2 :         test_simple_pipeline(conn);
    2271           6 :     else if (strcmp(testname, "singlerow") == 0)
    2272           2 :         test_singlerowmode(conn);
    2273           4 :     else if (strcmp(testname, "transaction") == 0)
    2274           2 :         test_transaction(conn);
    2275           2 :     else if (strcmp(testname, "uniqviol") == 0)
    2276           2 :         test_uniqviol(conn);
    2277             :     else
    2278             :     {
    2279           0 :         fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
    2280           0 :         exit(1);
    2281             :     }
    2282             : 
    2283             :     /* close the connection to the database and cleanup */
    2284          24 :     PQfinish(conn);
    2285          24 :     return 0;
    2286             : }

Generated by: LCOV version 1.14